Advertisement
PeachLemonade

Untitled

May 2nd, 2024
39
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.10 KB | None | 0 0
  1. from pyspark.sql import SparkSession, DataFrame
  2. from pyspark.sql import functions as f
  3. from pyspark.sql.types import StructType, StructField, DoubleType, StringType, TimestampType
  4.  
  5. def spark_init() -> SparkSession:
  6.     spark_jars_packages = ",".join(
  7.         [
  8.             "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0"
  9.         ]
  10.     )
  11.  
  12.  
  13.     return (SparkSession.builder
  14.             .master("local")
  15.             .appName('test stream')
  16.             .config("spark.jars.packages", spark_jars_packages)
  17.             .getOrCreate()
  18.             )
  19.  
  20.  
  21. def load_df(spark: SparkSession) -> DataFrame:
  22.     return (spark.readStream
  23.             .format('kafka')
  24.             .option('kafka.bootstrap.servers', 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091')
  25.             .option('kafka.security.protocol', 'SASL_SSL')
  26.             .option('kafka.sasl.mechanism', 'SCRAM-SHA-512')
  27.             .option('kafka.sasl.jaas.config','org.apache.kafka.common.security.scram.ScramLoginModule required username=\"de-student\" password=\"ltcneltyn\";') \
  28.             .option("subscribe", "student.topic.cohort21.kotlyarovb")
  29.             .load())
  30.  
  31.  
  32.  
  33. def transform(df: DataFrame) -> DataFrame:
  34.     schema = StructType([
  35.         StructField("client_id", StringType()),
  36.         StructField("timestamp", DoubleType()),
  37.         StructField("lat", DoubleType()),
  38.         StructField("lon", DoubleType()),
  39.     ])
  40.  
  41.     return (df
  42.             .withColumn('value', f.col('value').cast(StringType()))
  43.             .withColumn('event', f.from_json(f.col('value'), schema))
  44.             .selectExpr('event.*')
  45.             .withColumn('timestamp',
  46.                         f.from_unixtime(f.col('timestamp'), "yyyy-MM-dd' 'HH:mm:ss.SSS").cast(TimestampType()))
  47.             )
  48.  
  49.  
  50. spark = spark_init()
  51.  
  52. source_df = load_df(spark)
  53. output_df = transform(source_df)
  54. output_df.printSchema()
  55.  
  56. query = (output_df
  57.          .writeStream
  58.          .outputMode("append")
  59.          .format("console")
  60.          .option("truncate", False)
  61.          .trigger(once=True)
  62.          .start())
  63. try:
  64.     query.awaitTermination()
  65. finally:
  66.     query.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement