Advertisement
Vitaly_Zverev

Untitled

Jan 9th, 2024
454
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.94 KB | None | 0 0
  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.sql.SparkSession
  3. import org.apache.spark.sql.functions._
  4. import org.apache.spark.sql.streaming.Trigger
  5. import org.apache.spark.sql.types.StringType
  6. import org.apache.spark.sql.streaming.StreamingQuery
  7.  
  8. object SparkSQLKafkaValidation {
  9.  
  10.   def main(args: Array[String]): Unit = {
  11.     val sparkConf = new SparkConf().setAppName("SparkSQLKafkaValidation").setMaster("local[2]")
  12.     val spark = SparkSession.builder.config(sparkConf).getOrCreate()
  13.  
  14.     import spark.implicits._
  15.  
  16.     // Настройки для Kafka
  17.     val kafkaParams = Map[String, String](
  18.       "bootstrap.servers" -> "localhost:9092",
  19.       "subscribe" -> "input-topic",
  20.       "group.id" -> "spark-sql-consumer-group",
  21.       "auto.offset.reset" -> "earliest"
  22.     )
  23.  
  24.     // Читаем данные из Kafka в виде DataFrame
  25.     val kafkaStreamDF = spark.readStream
  26.       .format("kafka")
  27.       .options(kafkaParams)
  28.       .load()
  29.  
  30.     // Преобразуем значения из бинарного формата в строку
  31.     val messagesDF = kafkaStreamDF.selectExpr("CAST(value AS STRING)").as[String]
  32.  
  33.     // Определяем пользовательскую функцию для валидации сообщения
  34.     val validateMessageUDF = udf((message: String) => message.length > 5, StringType)
  35.  
  36.     // Добавляем колонку с результатами валидации
  37.     val validatedDF = messagesDF.withColumn("isValid", validateMessageUDF(col("value")))
  38.  
  39.     // Делим DataFrame на два: валидные и невалидные сообщения
  40.     val validMessagesDF = validatedDF.filter("isValid = true").select("value")
  41.     val invalidMessagesDF = validatedDF.filter("isValid = false").select("value")
  42.  
  43.     // Настраиваем настройки для записи в Kafka
  44.     val kafkaOutputParams = Map[String, String](
  45.       "kafka.bootstrap.servers" -> "localhost:9092",
  46.       "topic" -> "valid-topic"
  47.     )
  48.  
  49.     // Записываем валидные сообщения в топик Kafka
  50.     val validMessagesQuery: StreamingQuery = validMessagesDF.writeStream
  51.       .outputMode("append")
  52.       .format("kafka")
  53.       .options(kafkaOutputParams)
  54.       .trigger(Trigger.ProcessingTime("5 seconds"))
  55.       .start()
  56.  
  57.     // Записываем невалидные сообщения в другой топик Kafka
  58.     val kafkaErrorOutputParams = Map[String, String](
  59.       "kafka.bootstrap.servers" -> "localhost:9092",
  60.       "topic" -> "error-topic"
  61.     )
  62.  
  63.     val invalidMessagesQuery: StreamingQuery = invalidMessagesDF.writeStream
  64.       .outputMode("append")
  65.       .format("kafka")
  66.       .options(kafkaErrorOutputParams)
  67.       .trigger(Trigger.ProcessingTime("5 seconds"))
  68.       .start()
  69.  
  70.     // Запускаем стриминг
  71.     validMessagesQuery.awaitTermination()
  72.     invalidMessagesQuery.awaitTermination()
  73.   }
  74. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement