Advertisement
Mad_Axell

Sprint8_lesson8_part1

Apr 1st, 2023
20
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.36 KB | None | 0 0
  1. from pyspark.sql import SparkSession, DataFrame
  2. from pyspark.sql import functions as f
  3. from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampType
  4.  
  5.  
  6. def spark_init(test_name) -> SparkSession:
  7. spark_jars_packages = ",".join(
  8. [
  9. "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0",
  10. "org.postgresql:postgresql:42.4.0",
  11. ]
  12. )
  13.  
  14. spark = (
  15. SparkSession.builder.appName(test_name)
  16. .config("spark.sql.session.timeZone", "UTC")
  17. .config("spark.jars.packages", spark_jars_packages)
  18. .getOrCreate()
  19. )
  20.  
  21. return spark
  22.  
  23.  
  24.  
  25. def read_marketing(spark: SparkSession) -> DataFrame:
  26. marketing_df = (spark.read
  27. .format("jdbc")
  28. .option("url", "jdbc:postgresql://rc1a-fswjkpli01zafgjm.mdb.yandexcloud.net:6432/de")
  29. .option("dbtable", "marketing_companies")
  30. .option("driver", "org.postgresql.Driver")
  31. .option("user", "student")
  32. .option("password", "de-student")
  33. .load())
  34.  
  35. return marketing_df
  36.  
  37. def read_client_stream(spark: SparkSession) -> DataFrame:
  38. return (spark.readStream
  39. .format('kafka')
  40. .option('kafka.bootstrap.servers', 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091')
  41. .option('kafka.security.protocol', 'SASL_SSL')
  42. .option('kafka.sasl.mechanism', 'SCRAM-SHA-512')
  43. .option('kafka.sasl.jaas.config','org.apache.kafka.common.security.scram.ScramLoginModule required username=\"de-student\" password=\"ltcneltyn\";') \
  44. #.option('kafka.ssl.truststore.location', truststore_location)
  45. #.option('kafka.ssl.truststore.password', truststore_pass)
  46. .option("subscribe", "student.topic.cohort8.madaxell")
  47. .load()
  48. )
  49.  
  50. def transform(df: DataFrame) -> DataFrame:
  51. schema = StructType([
  52. StructField("client_id", StringType()),
  53. StructField("timestamp", DoubleType()),
  54. StructField("lat", DoubleType()),
  55. StructField("lon", DoubleType()),
  56. ])
  57.  
  58. return (df
  59. .withColumn('value', f.col('value').cast(StringType()))
  60. .withColumn('event', f.from_json(f.col('value'), schema))
  61. .selectExpr('event.*')
  62. .withColumn('timestamp', f.from_unixtime(f.col('timestamp'), "yyyy-MM-dd' 'HH:mm:ss.SSS").cast(TimestampType()))
  63. .withColumn('eventEdge', (f.unix_timestamp(f.current_timestamp()) - f.unix_timestamp(f.col('timestamp'))).cast(TimestampType()))
  64. .dropDuplicates(["client_id", "timestamp", "lat", "lon"])
  65. .withWatermark('eventEdge','10 minute')
  66. )
  67.  
  68. def join(user_df, marketing_df) -> DataFrame:
  69. return (
  70. user_df.crossJoin(marketing_df)
  71. )
  72.  
  73.  
  74. if __name__ == "__main__":
  75. spark = spark_init('join stream')
  76. marketing_df = read_marketing(spark)
  77. client_stream = read_client_stream(spark)
  78. client_stream_transformed = transform(client_stream)
  79. result = join(client_stream_transformed, marketing_df)
  80.  
  81. query = (result
  82. .writeStream
  83. .outputMode("append")
  84. .format("console")
  85. .option("truncate", False)
  86. .start())
  87. try:
  88. query.awaitTermination()
  89. finally:
  90. query.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement