Advertisement
hivefans

sparksqlkmeans.scala

Aug 5th, 2021
2,511
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 1.70 KB | None | 0 0
  1. package sparkml
  2.  
  3. import org.apache.spark.sql.SparkSession
  4. import java.util.Properties
  5. import org.apache.spark.mllib.linalg.Vectors
  6. import org.apache.spark.rdd.RDD
  7. import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
  8. import org.apache.spark.sql.{DataFrame, Row}
  9.  
  10. object Demo {
  11.    
  12.   def main(args: Array[String]): Unit = {
  13.    
  14.     val spark = SparkSession.builder().appName("Hiveml").enableHiveSupport().getOrCreate()
  15.    //.config("spark.sql.inMemoryColumnarStorage.batchSize", 10)
  16.     val mltableName = "ml_kmeans_table"
  17.  
  18.     //执行sql
  19.     val df= spark.sql("select name,num from test")  
  20.     val numClusters = 3
  21.     val numIterations = 20
  22.     var parsedData = res_data.select("num").rdd.map{case Row(s:Double)=> Vectors.dense(Array(s))}
  23.     val clusters = KMeans.train(p, numClusters, numIterations)
  24.     var tt_data = clusters.predict(parsedData)
  25.     tt_data.collect().toList
  26.  
  27.     tt_data.collect().foreach {println}
  28.      
  29.     var index_data = tt_data.toDF("julei").withColumn("tindex", monotonically_increasing_id).withColumn("index", row_number().over(Window.orderBy("tindex"))).drop("tindex")
  30.  
  31.     var org_data = df.withColumn("tindex", monotonically_increasing_id).withColumn("index", row_number().over(Window.orderBy("tindex"))).drop("tindex")
  32.  
  33.     var res_data = org_data.join(index_data, Seq("index"), "left").drop("index").withColumn("julei", col("julei").cast("double"))
  34.  
  35.     sql(s"DROP TABLE IF EXISTS ${mltableName}")
  36.     sql(s"CREATE TABLE $mltableName (name STRING, num DOUBLE, julei DOUBLE)")
  37.     //res_data.printSchema()
  38.     res_data.write.insertInto(mltableName)
  39.     //res_data.write.mode("overwrite").insertInto(mltableName)
  40.     //停止Spark
  41.     spark.stop()
  42.  
  43.   }
  44. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement