Advertisement
hivefans

DirectKafkaDefaultExample.scala

Mar 30th, 2017
500
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.13 KB | None | 0 0
  1. package main.scala
  2.  
  3. import kafka.serializer.StringDecoder
  4. import org.apache.spark.streaming.kafka.KafkaUtils
  5.  
  6.  
  7. object DirectKafkaDefaultExample {
  8.    private val conf = ConfigFactory.load()
  9.   private val sparkStreamingConf = conf.getStringList("DirectKafkaDefaultExample-List").asScala
  10.   val logger = Logger.getLogger(DirectKafkaDefaultExample.getClass)
  11.   def main(args: Array[String]) {
  12.     if (args.length < 2) {
  13.       System.exit(1)
  14.     }
  15.     val Array(brokers, topics) = args
  16.     val checkpointDir = "/tmp/checkpointLogs"
  17.     val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
  18.     // Extract : Create direct kafka stream with brokers and topics
  19.     val topicsSet = topics.split(",").toSet
  20.     val ssc = StreamingContext.getOrCreate(checkpointDir, setupSsc(topicsSet, kafkaParams, checkpointDir) _)
  21.     ssc.start()// Start the spark streaming
  22.     ssc.awaitTermination();
  23.   }
  24.   def setupSsc(topicsSet:Set[String],kafkaParams:Map[String,String],checkpointDir:String)():StreamingContext=
  25.   { //setting sparkConf with configurations
  26.     val sparkConf = new SparkConf()
  27.     sparkConf.setAppName(conf.getString("DirectKafkaDefaultExample"))
  28.     sparkStreamingConf.foreach { x => val split = x.split("="); sparkConf.set(split(0), split(1));}
  29.     val ssc = new StreamingContext(sc, Seconds(conf.getInt("application.sparkbatchinterval")))
  30.     val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  31.       ssc, kafkaParams, topicsSet)
  32.     val line = messages.map(_._2)
  33.     val lines = line.flatMap(line => line.split("\n"))
  34.     val filteredLines = lines.filter { x => LogFilter.filter(x, "1") }
  35.     filteredLines.foreachRDD((rdd: RDD[String], time: Time) => {
  36.       rdd.foreachPartition { partitionOfRecords => {
  37.         if (partitionOfRecords.isEmpty) {
  38.           logger.info("partitionOfRecords FOUND EMPTY ,IGNORING THIS PARTITION")
  39.         } else {
  40.           /* write computation logic here  */
  41.         }
  42.       } //partition ends
  43.       }//foreachRDD ends
  44.     })
  45.     ssc.checkpoint(checkpointDir) // the offset ranges for the stream will be stored in the checkpoint
  46.     ssc }
  47.  
  48. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement