Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // spark读取kafka json嵌套数组数据
- // json数据格式
- //{"terminalId":4501109,"gps":[{"move":2,"distance":23.3,"gpsId":0,"direct":237.22,"lon":112.581512,"terminalId":4501109,"speed":83.12,"acceleration":0.0,"satelliteNum":23,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854135000,"lat":23.031654,"height":11.52},{"move":2,"distance":22.65,"gpsId":0,"direct":236.65,"lon":112.581139,"terminalId":4501109,"speed":82.0,"acceleration":0.0,"satelliteNum":23,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854137000,"lat":23.031427,"height":12.99},{"move":2,"distance":22.77,"gpsId":0,"direct":236.06,"lon":112.580765,"terminalId":4501109,"speed":82.98,"acceleration":0.0,"satelliteNum":23,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854139000,"lat":23.031197,"height":12.47},{"move":2,"distance":21.41,"gpsId":0,"direct":236.95,"lon":112.580406,"terminalId":4501109,"speed":79.32,"acceleration":0.0,"satelliteNum":24,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854141000,"lat":23.030968,"height":12.03}]}
- package com.empgo
- import org.apache.hadoop.conf.Configuration
- import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
- import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
- import org.apache.hadoop.hbase.util.Bytes
- import org.json4s._
- import org.json4s.jackson.JsonMethods._
- import org.apache.kafka.common.serialization.StringDeserializer
- import org.apache.spark.{SparkConf, SparkContext}
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- import org.apache.spark.streaming.kafka010._
- import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
- import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
- object Demo {
- var zookeeperservers = "emg102:2181,emg103:2181,emg104:2181"
- case class gpslog(distance:Double,gpsId:Int,direct:Double,lon:Double,terminalId:Long,
- speed:Double,acceleration:Double,satelliteNum:Int,
- sysTime:Long,time:Long,lat:Double,height:Double)
- case class log(terminalId:Long, gps: List[gpslog])
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[2]").setAppName("ecargps")
- val ssc = new StreamingContext(conf, Seconds(5))
- val kafkaParams = Map[String, Object](
- "bootstrap.servers" -> "emg104:9092,emg105:9092,emg106:9092",
- "key.deserializer" -> classOf[StringDeserializer],
- "value.deserializer" -> classOf[StringDeserializer],
- "group.id" -> "for_gps_stream",
- "auto.offset.reset" -> "latest",
- "enable.auto.commit" -> (false: java.lang.Boolean)
- )
- val topics = Array("ecar-photo-gps")
- val stream = KafkaUtils.createDirectStream[String, String](
- ssc,
- PreferConsistent,
- Subscribe[String, String](topics, kafkaParams)
- )
- stream.map(record => record.value)
- .map(value => {
- // 隐式转换,使用json4s的默认转化器
- implicit val formats: DefaultFormats.type = DefaultFormats
- val json = parse(value)
- // 样式类从JSON对象中提取值
- json.extract[log]
- }).window( Seconds(5), Seconds(5)) // 设置窗口时间,这个为每分钟分析一次一小时内的内容
- .foreachRDD( // 这里请去了解RDD的概念
- rdd => {
- rdd.foreachPartition(partitionOfRecords => { // 循环分区
- // 获取Hbase连接,分区创建一个连接,分区不跨节点,不需要序列化
- partitionOfRecords.foreach(logData => {
- logData.gps.foreach(
- gpslog => {
- println(gpslog.terminalId + "===" + gpslog.height + "===" + gpslog.sysTime)
- }
- )
- })
- })
- }
- )
- ssc.start()
- ssc.awaitTermination()
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement