Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pyspark.sql import SparkSession
- from pyspark.sql import functions as f
- from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType
- spark_jars_packages = ",".join(
- [
- "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0",
- "org.postgresql:postgresql:42.4.0"
- ]
- )
- spark = (
- SparkSession.builder
- .master("local")
- .appName('test connect to kafka')
- .config("spark.jars.packages", spark_jars_packages)
- .getOrCreate()
- )
- truststore_location = "/etc/security/ssl"
- truststore_pass = "de_sprint_8"
- df = (spark.read
- .format('kafka')
- .option('kafka.bootstrap.servers', 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091')
- .option('kafka.security.protocol', 'SASL_SSL')
- .option('kafka.sasl.mechanism', 'SCRAM-SHA-512')
- .option('kafka.sasl.jaas.config','org.apache.kafka.common.security.scram.ScramLoginModule required username=\"de-student\" password=\"ltcneltyn\";')
- #.option('kafka.ssl.truststore.location', truststore_location)
- #.option('kafka.ssl.truststore.password', truststore_pass)
- .option("subscribe", "cawachi_in")
- .load())
- schema = StructType([
- StructField("restaurant_id", StringType()),
- StructField("adv_campaign_id", StringType()),
- StructField("adv_campaign_content", StringType()),
- StructField("adv_campaign_owner", StringType()),
- StructField("adv_campaign_owner_contact", StringType()),
- StructField("adv_campaign_datetime_start", DoubleType()),
- StructField("adv_campaign_datetime_end", DoubleType()),
- StructField("datetime_created", DoubleType())
- ])
- df2 = (df
- .withColumn('value', f.col('value').cast(StringType()))
- .withColumn('key', f.col('key').cast(StringType()))
- .withColumn('event', f.from_json(f.col('value'), schema))
- .selectExpr('event.*', '*').drop('event').drop('value')
- )
- df2.printSchema()
- df2.show(truncate=False)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement