Advertisement
franji

Spark DF - July 2016

Aug 2nd, 2016
374
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.52 KB | None | 0 0
  1. import java.text.SimpleDateFormat
  2.  
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. import org.apache.spark.SparkContext._
  5. import org.apache.spark.sql.types.{DataTypes, TimestampType}
  6.  
  7. import scala.util.{Failure, Success, Try}
  8. import org.apache.spark.sql.hive.HiveContext
  9. import java.sql.Timestamp
  10.  
  11. case class LogLine(ip :String, ts : Timestamp, userId : Int, path : String)
  12. object StartWithRDDs {
  13.   def getTimestamp(s: String) : Option[Timestamp] = s match {
  14.     case "" => None
  15.     case _ => {
  16.       // Parsing timestamp as it appears in log.
  17.       // Note that becasue splitting on spaces we get '['
  18.       // at the begining of the field
  19.       val format = new SimpleDateFormat("[dd/MMM/yyyy:HH:mm:ss")
  20.       Try(new Timestamp(format.parse(s).getTime)) match {
  21.         case Success(t) => Some(t)
  22.         case Failure(_) => None
  23.       }
  24.     }
  25.   }
  26.  
  27.   def parseLogLine(line: String) : LogLine = {
  28.     val a = line.split("\\s+")
  29.     val ts :Timestamp = getTimestamp(a(3)).getOrElse(new Timestamp(0L))
  30.     LogLine(a(0), ts, a(2).toInt, a(6))
  31.   }
  32.  
  33.   def ipClass(ip: String) = {
  34.     val a = ip.split("\\.")
  35.     s"${a(0)}.${a(1)}.${a(2)}"
  36.   }
  37.  
  38.   def createSparkContext() = {
  39.     // add .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") below.
  40.     new SparkContext(new SparkConf().setMaster("local[*]").setAppName("PlayDataFrames"))
  41.   }
  42.  
  43.   def main(args: Array[String]) {
  44.     val sc = createSparkContext()
  45.     val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
  46.     import sqlContext.implicits._
  47.     val logfiles = sc.textFile("file:/Users/talfranji/course/sparkdev/data/weblogs/2014-03-15.log")
  48.     val logs = logfiles.map(parseLogLine).toDF()
  49.     // registerTempTable puts a DF info the metadata store so
  50.     // SQL Select statment can recognize it.
  51.     logs.registerTempTable("logs")
  52.     // This query is the same as the CountJPGs exercise
  53.     val sql1 =
  54.       """select userId, ts, unix_timestamp(ts) - unix_timestamp(LAG(ts,1, ts) over (partition by userId order by ts)) as dt
  55.        |from logs""".stripMargin
  56.     val sql2 =
  57.       """select userId, ts, case when
  58.        |unix_timestamp(ts) - unix_timestamp(LAG(ts,1, ts) over (partition by userId order by ts)) > 60*20
  59.        |then 1
  60.        |else 0 end
  61.        |as session_end
  62.        |from logs""".stripMargin
  63.  
  64.     val sql3 =
  65.       """select userId, ts, sum(session_end) over (partition by userId order by ts) as sessId
  66.        |from (select userId, ts, case when
  67.        |unix_timestamp(ts) - unix_timestamp(LAG(ts,1, ts) over (partition by userId order by ts)) > 60*20
  68.        |then 1
  69.        |else 0 end
  70.        |as session_end
  71.        |from logs) sboundary""".stripMargin
  72.  
  73.     val sql4 =
  74.       """select userId, count(*)
  75.        |from (select userId, ts, sum(session_end) over (partition by userId order by ts) as sessId
  76.        |from (select userId, ts, case when
  77.        |unix_timestamp(ts) - unix_timestamp(LAG(ts,1, ts) over (partition by userId order by ts)) > 60*20
  78.        |then 1
  79.        |else 0 end
  80.        |as session_end
  81.        |from logs) sboundary) sid
  82.        |group by userId,sessId""".stripMargin
  83.     val out = sqlContext.sql(sql2)
  84.      // TODO : write out put to:
  85.     //  1. json
  86.     //out.write.format("json").save("file:/Users/talfranji/course/jpglogs_json")
  87.  
  88.     //  2. CSV
  89.     out.write.format("com.databricks.spark.csv").option("header", "true").save("file:/Users/talfranji/course/jpglogs")
  90.     //  3. parquet
  91.     //out.write.format("parquet").save("file:/Users/talfranji/course/jpglogs_p")
  92.  
  93.   }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement