Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pyspark.sql import SparkSession, DataFrame
- from pyspark.sql import functions as f
- from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampType
- def spark_init(test_name) -> SparkSession:
- spark_jars_packages = ",".join(
- [
- "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0",
- "org.postgresql:postgresql:42.4.0",
- ]
- )
- spark = (
- SparkSession.builder.appName(test_name)
- .config("spark.sql.session.timeZone", "UTC")
- .config("spark.jars.packages", spark_jars_packages)
- .getOrCreate()
- )
- return spark
- def read_marketing(spark: SparkSession) -> DataFrame:
- marketing_df = (spark.read
- .format("jdbc")
- .option("url", "jdbc:postgresql://rc1a-fswjkpli01zafgjm.mdb.yandexcloud.net:6432/de")
- .option("dbtable", "marketing_companies")
- .option("driver", "org.postgresql.Driver")
- .option("user", "student")
- .option("password", "de-student")
- .load())
- return marketing_df
- def read_client_stream(spark: SparkSession) -> DataFrame:
- return (spark.readStream
- .format('kafka')
- .option('kafka.bootstrap.servers', 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091')
- .option('kafka.security.protocol', 'SASL_SSL')
- .option('kafka.sasl.mechanism', 'SCRAM-SHA-512')
- .option('kafka.sasl.jaas.config','org.apache.kafka.common.security.scram.ScramLoginModule required username=\"de-student\" password=\"ltcneltyn\";') \
- #.option('kafka.ssl.truststore.location', truststore_location)
- #.option('kafka.ssl.truststore.password', truststore_pass)
- .option("subscribe", "student.topic.cohort8.madaxell")
- .load()
- )
- def transform(df: DataFrame) -> DataFrame:
- schema = StructType([
- StructField("client_id", StringType()),
- StructField("timestamp", DoubleType()),
- StructField("lat", DoubleType()),
- StructField("lon", DoubleType()),
- ])
- return (df
- .withColumn('value', f.col('value').cast(StringType()))
- .withColumn('event', f.from_json(f.col('value'), schema))
- .selectExpr('event.*')
- .withColumn('timestamp', f.from_unixtime(f.col('timestamp'), "yyyy-MM-dd' 'HH:mm:ss.SSS").cast(TimestampType()))
- .withColumn('eventEdge', (f.unix_timestamp(f.current_timestamp()) - f.unix_timestamp(f.col('timestamp'))).cast(TimestampType()))
- .dropDuplicates(["client_id", "timestamp", "lat", "lon"])
- .withWatermark('eventEdge','10 minute')
- )
- def join(user_df, marketing_df) -> DataFrame:
- return (
- user_df.crossJoin(marketing_df)
- )
- if __name__ == "__main__":
- spark = spark_init('join stream')
- marketing_df = read_marketing(spark)
- client_stream = read_client_stream(spark)
- client_stream_transformed = transform(client_stream)
- result = join(client_stream_transformed, marketing_df)
- query = (result
- .writeStream
- .outputMode("append")
- .format("console")
- .option("truncate", False)
- .start())
- try:
- query.awaitTermination()
- finally:
- query.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement