Advertisement
hivefans

sparktoes.scala

Dec 21st, 2019
939
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 4.59 KB | None | 0 0
  1. import java.net.InetAddress
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. import org.elasticsearch.action.bulk.{BulkRequestBuilder, BulkResponse}
  5. import org.elasticsearch.client.transport.TransportClient
  6. import org.elasticsearch.common.settings.Settings
  7. import org.elasticsearch.common.transport.InetSocketTransportAddress
  8. import org.elasticsearch.transport.client.PreBuiltTransportClient
  9. /**
  10.   * Author: wangxiaogang
  11.   * Date: 2017/7/11
  12.   * Email: Adamyuanyuan@gmail.com
  13.   * hdfs 中的数据根据格式写到ES中
  14.   */
  15. object HdfsToEs {
  16.   def main(args: Array[String]) {
  17.     if (args.length < 5) {
  18.       System.err.println("Usage: HdfsToEs <file> <esIndex> <esType> <partition>")
  19.       System.exit(1)
  20.     }
  21.     val hdfsInputPath: String = args(0)
  22.     println("hdfsInputPath: " + hdfsInputPath)
  23.     val conf = new SparkConf().setAppName("HdfsToEs")
  24.     val sc = new SparkContext(conf)
  25.     //插入相关,索引 类型 id相关  以args方式提供接口。
  26.     val esIndex: String = args(1)
  27.     val esType: String = args(2)
  28.     val partition: Int = args(3).toInt
  29.     val bulkNum: Int = args(4).toInt
  30.     val hdfsRdd: RDD[String] = sc.textFile(hdfsInputPath, partition)
  31.     val startTime: Long = System.currentTimeMillis
  32.     println("hdfsRDD partition: " + hdfsRdd.getNumPartitions + " setted partition: " + partition)
  33.     hdfsRdd.foreachPartition {
  34.       eachPa => {
  35.         //        生产环境
  36.         val settings: Settings = Settings.builder.put("cluster.name", "production-es").put("client.transport.sniff", true)
  37.           .put("transport.type", "netty3").put("http.type", "netty3").build
  38.         val client: TransportClient = new PreBuiltTransportClient(settings)
  39.           .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
  40.           .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
  41.           .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
  42.           .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
  43.           .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
  44.         var bulkRequest: BulkRequestBuilder = null
  45.         var flag = true
  46.         var lineNum = 0
  47.         for (eachLine <- eachPa) {
  48.           // 每个bulk是10-15M为宜,数据封装为bulk后会较原来的数据略有增大,如果每行数据约为 1.5KB,则每 10000 行为一个bulk
  49.           if (flag) {
  50.             bulkRequest = client.prepareBulk
  51.             flag = false
  52.           }
  53.           val strArray: Array[String] = eachLine.split("###")
  54.           if (strArray.length != 25) {
  55.             // 表示这行数据又问题,为了不影响整体,则跳过
  56.             println("ERROR: strArray.length != 25: " + strArray.length + " lineNum: " + lineNum + " strArray(0): " + strArray(0))
  57.           } else {
  58.             // LinkedHashMap让ES中的数据变得有序
  59.             val esDataMap: java.util.Map[String, String] = new java.util.LinkedHashMap[String, String]
  60.             val id: String = strArray(0)
  61.             esDataMap.put("msisdn", id)
  62.             // 数据合并后的格式为: msisdn###w0的前三###w1的前三###如果为空的话就是null...###w23的前三,共25列
  63.             for (i <- 1 to 24) {
  64.               val locTimesListStr = strArray(i)
  65.               val esDataKey = "w" + (i - 1)
  66.               if (locTimesListStr == null || locTimesListStr.isEmpty || locTimesListStr.equals("null")) {
  67.                 esDataMap.put(esDataKey, "")
  68.               } else {
  69.                 esDataMap.put(esDataKey, locTimesListStr)
  70.               }
  71.             }
  72.             bulkRequest.add(client.prepareIndex(esIndex, esType, id).setSource(esDataMap))
  73.             lineNum += 1
  74.             if (lineNum % bulkNum == 0) {
  75.               val endTime: Long = System.currentTimeMillis
  76.               println("bulk push, current lineNum: " + lineNum + ", currentTime s: " + ((endTime - startTime) / 1000))
  77.               val bbq: BulkResponse = bulkRequest.execute.actionGet()
  78.               flag = true
  79.               if (bbq.hasFailures) {
  80.                 println("bbq.hasFailures: " + bbq.toString)
  81.                 bulkRequest.execute.actionGet
  82.               }
  83.             }
  84.           }
  85.         }
  86.         if (bulkRequest != null) {
  87.           bulkRequest.execute().actionGet()
  88.         }
  89.         client.close()
  90.         val endTime: Long = System.currentTimeMillis
  91.         println("ths time is: " + (endTime - startTime) / 1000 + "s ")
  92.       }
  93.     }
  94.     sc.stop()
  95.   }
  96. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement