Advertisement
Vitaly_Zverev

Untitled

Jan 9th, 2024
469
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.18 KB | None | 0 0
  1. spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1 --conf spark.sql.legacy.allowUntypedScalaUDF=true  --conf spark.sql.streaming.checkpointLocation="file:////home/vzverev/"  
  2.  
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.sql.SparkSession
  5. import org.apache.spark.sql.functions._
  6. import org.apache.spark.sql.streaming.Trigger
  7. import org.apache.spark.sql.types.StringType
  8. import org.apache.spark.sql.streaming.StreamingQuery
  9.  
  10. object SparkSQLKafkaValidation {
  11.  
  12.   def main(args: Array[String]): Unit = {
  13.     val sparkConf = new SparkConf().setAppName("SparkSQLKafkaValidation").setMaster("local[2]")
  14.     val spark = SparkSession.builder.config(sparkConf).getOrCreate()
  15.  
  16.     import spark.implicits._
  17.  
  18.     // Настройки для Kafka
  19.     val kafkaParams = Map[String, String](
  20.       "bootstrap.servers" -> "localhost:9092",
  21.       "subscribe" -> "input-topic",
  22.       "group.id" -> "spark-sql-consumer-group",
  23.       "auto.offset.reset" -> "earliest",
  24.       "kafka.bootstrap.servers"-> "localhost:9092"
  25.     )
  26.  
  27.     // Читаем данные из Kafka в виде DataFrame
  28.     val kafkaStreamDF = spark.readStream
  29.       .format("kafka")
  30.       .options(kafkaParams)
  31.       .load()
  32.  
  33.     // Преобразуем значения из бинарного формата в строку
  34.     val messagesDF = kafkaStreamDF.selectExpr("CAST(value AS STRING)").as[String]
  35.  
  36.     // Определяем пользовательскую функцию для валидации сообщения
  37.     val validateMessageUDF = udf((message: String) => message.length > 5)
  38.  
  39.     // Добавляем колонку с результатами валидации
  40.     val validatedDF = messagesDF.withColumn("isValid", validateMessageUDF(col("value")))
  41.  
  42.     // Делим DataFrame на два: валидные и невалидные сообщения
  43.     val validMessagesDF = validatedDF.filter("isValid = true").select("value")
  44.     val invalidMessagesDF = validatedDF.filter("isValid = false").select("value")
  45.  
  46.     // Настраиваем настройки для записи в Kafka
  47.     val kafkaOutputParams = Map[String, String](
  48.       "kafka.bootstrap.servers" -> "localhost:9092",
  49.       "topic" -> "valid-topic"
  50.     )
  51.  
  52.     // Записываем валидные сообщения в топик Kafka
  53.     val validMessagesQuery: StreamingQuery = validMessagesDF.writeStream
  54.       .outputMode("append")
  55.       .format("kafka")
  56.       .options(kafkaOutputParams)
  57.       .trigger(Trigger.ProcessingTime("5 seconds"))
  58.       .start()
  59.  
  60.     // Записываем невалидные сообщения в другой топик Kafka
  61.     val kafkaErrorOutputParams = Map[String, String](
  62.       "kafka.bootstrap.servers" -> "localhost:9092",
  63.       "topic" -> "error-topic"
  64.     )
  65.  
  66.     val invalidMessagesQuery: StreamingQuery = invalidMessagesDF.writeStream
  67.       .outputMode("append")
  68.       .format("kafka")
  69.       .options(kafkaErrorOutputParams)
  70.       .trigger(Trigger.ProcessingTime("5 seconds"))
  71.       .start()
  72.  
  73.     // Запускаем стриминг
  74.     validMessagesQuery.awaitTermination()
  75.     invalidMessagesQuery.awaitTermination()
  76.   }
  77. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement