Advertisement
Vitaly_Zverev

Untitled

Jan 9th, 2024
457
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.97 KB | None | 0 0
  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}
  3. import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
  4. import org.apache.kafka.clients.consumer.ConsumerRecord
  5. import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
  6. import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
  7.  
  8. object SparkKafkaValidation {
  9.  
  10.   def main(args: Array[String]): Unit = {
  11.     // Создаем конфигурацию Spark
  12.     val sparkConf = new SparkConf().setAppName("SparkKafkaValidation").setMaster("local[2]")
  13.     val ssc = new StreamingContext(sparkConf, Seconds(5))
  14.  
  15.     // Настройки для Kafka
  16.     val kafkaParams = Map[String, Object](
  17.       "bootstrap.servers" -> "localhost:9092",
  18.       "key.deserializer" -> classOf[StringDeserializer],
  19.       "value.deserializer" -> classOf[StringDeserializer],
  20.       "group.id" -> "spark-consumer-group",
  21.       "auto.offset.reset" -> "earliest",
  22.       "enable.auto.commit" -> (false: java.lang.Boolean)
  23.     )
  24.  
  25.     // Топики Kafka
  26.     val topics = Array("input-topic")
  27.  
  28.     // Создаем DStream, принимающий сообщения из Kafka
  29.     val stream = KafkaUtils.createDirectStream[String, String](
  30.       ssc,
  31.       LocationStrategies.PreferConsistent,
  32.       ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
  33.     )
  34.  
  35.     // Функция для валидации сообщения
  36.     def validateMessage(message: String): Boolean = {
  37.       message.length > 5
  38.     }
  39.  
  40.     // Отправка сообщения в топик Kafka
  41.     def sendToKafka(topic: String, message: String): Unit = {
  42.       val producerProperties = new java.util.Properties()
  43.       producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  44.       producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
  45.       producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
  46.  
  47.       val producer = new KafkaProducer[String, String](producerProperties)
  48.       producer.send(new ProducerRecord[String, String](topic, message))
  49.       producer.close()
  50.     }
  51.  
  52.     // Обрабатываем каждое сообщение из Kafka
  53.     stream.foreachRDD { rdd =>
  54.       rdd.foreach { record =>
  55.         val message = record.value()
  56.         if (validateMessage(message)) {
  57.           // Отправляем валидное сообщение в топик Kafka
  58.           sendToKafka("valid-topic", message)
  59.           println(s"Valid message: $message")
  60.         } else {
  61.           // Отправляем ошибку в топик Kafka
  62.           sendToKafka("error-topic", s"Invalid message: $message")
  63.           println(s"Invalid message: $message")
  64.         }
  65.       }
  66.     }
  67.  
  68.     // Запускаем Spark Streaming
  69.     ssc.start()
  70.     ssc.awaitTermination()
  71.   }
  72. }
  73.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement