Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.net.InetAddress
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
- import org.elasticsearch.action.bulk.{BulkRequestBuilder, BulkResponse}
- import org.elasticsearch.client.transport.TransportClient
- import org.elasticsearch.common.settings.Settings
- import org.elasticsearch.common.transport.InetSocketTransportAddress
- import org.elasticsearch.transport.client.PreBuiltTransportClient
- /**
- * Author: wangxiaogang
- * Date: 2017/7/11
- * Email: Adamyuanyuan@gmail.com
- * hdfs 中的数据根据格式写到ES中
- */
- object HdfsToEs {
- def main(args: Array[String]) {
- if (args.length < 5) {
- System.err.println("Usage: HdfsToEs <file> <esIndex> <esType> <partition>")
- System.exit(1)
- }
- val hdfsInputPath: String = args(0)
- println("hdfsInputPath: " + hdfsInputPath)
- val conf = new SparkConf().setAppName("HdfsToEs")
- val sc = new SparkContext(conf)
- //插入相关,索引 类型 id相关 以args方式提供接口。
- val esIndex: String = args(1)
- val esType: String = args(2)
- val partition: Int = args(3).toInt
- val bulkNum: Int = args(4).toInt
- val hdfsRdd: RDD[String] = sc.textFile(hdfsInputPath, partition)
- val startTime: Long = System.currentTimeMillis
- println("hdfsRDD partition: " + hdfsRdd.getNumPartitions + " setted partition: " + partition)
- hdfsRdd.foreachPartition {
- eachPa => {
- // 生产环境
- val settings: Settings = Settings.builder.put("cluster.name", "production-es").put("client.transport.sniff", true)
- .put("transport.type", "netty3").put("http.type", "netty3").build
- val client: TransportClient = new PreBuiltTransportClient(settings)
- .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
- .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
- .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
- .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
- .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
- var bulkRequest: BulkRequestBuilder = null
- var flag = true
- var lineNum = 0
- for (eachLine <- eachPa) {
- // 每个bulk是10-15M为宜,数据封装为bulk后会较原来的数据略有增大,如果每行数据约为 1.5KB,则每 10000 行为一个bulk
- if (flag) {
- bulkRequest = client.prepareBulk
- flag = false
- }
- val strArray: Array[String] = eachLine.split("###")
- if (strArray.length != 25) {
- // 表示这行数据又问题,为了不影响整体,则跳过
- println("ERROR: strArray.length != 25: " + strArray.length + " lineNum: " + lineNum + " strArray(0): " + strArray(0))
- } else {
- // LinkedHashMap让ES中的数据变得有序
- val esDataMap: java.util.Map[String, String] = new java.util.LinkedHashMap[String, String]
- val id: String = strArray(0)
- esDataMap.put("msisdn", id)
- // 数据合并后的格式为: msisdn###w0的前三###w1的前三###如果为空的话就是null...###w23的前三,共25列
- for (i <- 1 to 24) {
- val locTimesListStr = strArray(i)
- val esDataKey = "w" + (i - 1)
- if (locTimesListStr == null || locTimesListStr.isEmpty || locTimesListStr.equals("null")) {
- esDataMap.put(esDataKey, "")
- } else {
- esDataMap.put(esDataKey, locTimesListStr)
- }
- }
- bulkRequest.add(client.prepareIndex(esIndex, esType, id).setSource(esDataMap))
- lineNum += 1
- if (lineNum % bulkNum == 0) {
- val endTime: Long = System.currentTimeMillis
- println("bulk push, current lineNum: " + lineNum + ", currentTime s: " + ((endTime - startTime) / 1000))
- val bbq: BulkResponse = bulkRequest.execute.actionGet()
- flag = true
- if (bbq.hasFailures) {
- println("bbq.hasFailures: " + bbq.toString)
- bulkRequest.execute.actionGet
- }
- }
- }
- }
- if (bulkRequest != null) {
- bulkRequest.execute().actionGet()
- }
- client.close()
- val endTime: Long = System.currentTimeMillis
- println("ths time is: " + (endTime - startTime) / 1000 + "s ")
- }
- }
- sc.stop()
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement