Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1 --conf spark.sql.legacy.allowUntypedScalaUDF=true --conf spark.sql.streaming.checkpointLocation="file:////home/vzverev/"
- import org.apache.spark.SparkConf
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.sql.functions._
- import org.apache.spark.sql.streaming.Trigger
- import org.apache.spark.sql.types.StringType
- import org.apache.spark.sql.streaming.StreamingQuery
- object SparkSQLKafkaValidation {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("SparkSQLKafkaValidation").setMaster("local[2]")
- val spark = SparkSession.builder.config(sparkConf).getOrCreate()
- import spark.implicits._
- // Настройки для Kafka
- val kafkaParams = Map[String, String](
- "bootstrap.servers" -> "localhost:9092",
- "subscribe" -> "input-topic",
- "group.id" -> "spark-sql-consumer-group",
- "auto.offset.reset" -> "earliest",
- "kafka.bootstrap.servers"-> "localhost:9092"
- )
- // Читаем данные из Kafka в виде DataFrame
- val kafkaStreamDF = spark.readStream
- .format("kafka")
- .options(kafkaParams)
- .load()
- // Преобразуем значения из бинарного формата в строку
- val messagesDF = kafkaStreamDF.selectExpr("CAST(value AS STRING)").as[String]
- // Определяем пользовательскую функцию для валидации сообщения
- val validateMessageUDF = udf((message: String) => message.length > 5)
- // Добавляем колонку с результатами валидации
- val validatedDF = messagesDF.withColumn("isValid", validateMessageUDF(col("value")))
- // Делим DataFrame на два: валидные и невалидные сообщения
- val validMessagesDF = validatedDF.filter("isValid = true").select("value")
- val invalidMessagesDF = validatedDF.filter("isValid = false").select("value")
- // Настраиваем настройки для записи в Kafka
- val kafkaOutputParams = Map[String, String](
- "kafka.bootstrap.servers" -> "localhost:9092",
- "topic" -> "valid-topic"
- )
- // Записываем валидные сообщения в топик Kafka
- val validMessagesQuery: StreamingQuery = validMessagesDF.writeStream
- .outputMode("append")
- .format("kafka")
- .options(kafkaOutputParams)
- .trigger(Trigger.ProcessingTime("5 seconds"))
- .start()
- // Записываем невалидные сообщения в другой топик Kafka
- val kafkaErrorOutputParams = Map[String, String](
- "kafka.bootstrap.servers" -> "localhost:9092",
- "topic" -> "error-topic"
- )
- val invalidMessagesQuery: StreamingQuery = invalidMessagesDF.writeStream
- .outputMode("append")
- .format("kafka")
- .options(kafkaErrorOutputParams)
- .trigger(Trigger.ProcessingTime("5 seconds"))
- .start()
- // Запускаем стриминг
- validMessagesQuery.awaitTermination()
- invalidMessagesQuery.awaitTermination()
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement