Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package sparkml
- import org.apache.spark.sql.SparkSession
- import java.util.Properties
- import org.apache.spark.mllib.linalg.Vectors
- import org.apache.spark.rdd.RDD
- import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
- import org.apache.spark.sql.{DataFrame, Row}
- object Demo {
- def main(args: Array[String]): Unit = {
- val spark = SparkSession.builder().appName("Hiveml").enableHiveSupport().getOrCreate()
- //.config("spark.sql.inMemoryColumnarStorage.batchSize", 10)
- val mltableName = "ml_kmeans_table"
- //执行sql
- val df= spark.sql("select name,num from test")
- val numClusters = 3
- val numIterations = 20
- var parsedData = res_data.select("num").rdd.map{case Row(s:Double)=> Vectors.dense(Array(s))}
- val clusters = KMeans.train(p, numClusters, numIterations)
- var tt_data = clusters.predict(parsedData)
- tt_data.collect().toList
- tt_data.collect().foreach {println}
- var index_data = tt_data.toDF("julei").withColumn("tindex", monotonically_increasing_id).withColumn("index", row_number().over(Window.orderBy("tindex"))).drop("tindex")
- var org_data = df.withColumn("tindex", monotonically_increasing_id).withColumn("index", row_number().over(Window.orderBy("tindex"))).drop("tindex")
- var res_data = org_data.join(index_data, Seq("index"), "left").drop("index").withColumn("julei", col("julei").cast("double"))
- sql(s"DROP TABLE IF EXISTS ${mltableName}")
- sql(s"CREATE TABLE $mltableName (name STRING, num DOUBLE, julei DOUBLE)")
- //res_data.printSchema()
- res_data.write.insertInto(mltableName)
- //res_data.write.mode("overwrite").insertInto(mltableName)
- //停止Spark
- spark.stop()
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement