Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.text.SimpleDateFormat
- import org.apache.spark.{SparkConf, SparkContext}
- import org.apache.spark.SparkContext._
- import org.apache.spark.sql.types.{DataTypes, TimestampType}
- import scala.util.{Failure, Success, Try}
- import org.apache.spark.sql.hive.HiveContext
- import java.sql.Timestamp
- case class LogLine(ip :String, ts : Timestamp, userId : Int, path : String)
- object StartWithRDDs {
- def getTimestamp(s: String) : Option[Timestamp] = s match {
- case "" => None
- case _ => {
- // Parsing timestamp as it appears in log.
- // Note that becasue splitting on spaces we get '['
- // at the begining of the field
- val format = new SimpleDateFormat("[dd/MMM/yyyy:HH:mm:ss")
- Try(new Timestamp(format.parse(s).getTime)) match {
- case Success(t) => Some(t)
- case Failure(_) => None
- }
- }
- }
- def parseLogLine(line: String) : LogLine = {
- val a = line.split("\\s+")
- val ts :Timestamp = getTimestamp(a(3)).getOrElse(new Timestamp(0L))
- LogLine(a(0), ts, a(2).toInt, a(6))
- }
- def ipClass(ip: String) = {
- val a = ip.split("\\.")
- s"${a(0)}.${a(1)}.${a(2)}"
- }
- def createSparkContext() = {
- // add .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") below.
- new SparkContext(new SparkConf().setMaster("local[*]").setAppName("PlayDataFrames"))
- }
- def main(args: Array[String]) {
- val sc = createSparkContext()
- val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
- import sqlContext.implicits._
- val logfiles = sc.textFile("file:/Users/talfranji/course/sparkdev/data/weblogs/2014-03-15.log")
- val logs = logfiles.map(parseLogLine).toDF()
- // registerTempTable puts a DF info the metadata store so
- // SQL Select statment can recognize it.
- logs.registerTempTable("logs")
- // This query is the same as the CountJPGs exercise
- val sql1 =
- """select userId, ts, unix_timestamp(ts) - unix_timestamp(LAG(ts,1, ts) over (partition by userId order by ts)) as dt
- |from logs""".stripMargin
- val sql2 =
- """select userId, ts, case when
- |unix_timestamp(ts) - unix_timestamp(LAG(ts,1, ts) over (partition by userId order by ts)) > 60*20
- |then 1
- |else 0 end
- |as session_end
- |from logs""".stripMargin
- val sql3 =
- """select userId, ts, sum(session_end) over (partition by userId order by ts) as sessId
- |from (select userId, ts, case when
- |unix_timestamp(ts) - unix_timestamp(LAG(ts,1, ts) over (partition by userId order by ts)) > 60*20
- |then 1
- |else 0 end
- |as session_end
- |from logs) sboundary""".stripMargin
- val sql4 =
- """select userId, count(*)
- |from (select userId, ts, sum(session_end) over (partition by userId order by ts) as sessId
- |from (select userId, ts, case when
- |unix_timestamp(ts) - unix_timestamp(LAG(ts,1, ts) over (partition by userId order by ts)) > 60*20
- |then 1
- |else 0 end
- |as session_end
- |from logs) sboundary) sid
- |group by userId,sessId""".stripMargin
- val out = sqlContext.sql(sql2)
- // TODO : write out put to:
- // 1. json
- //out.write.format("json").save("file:/Users/talfranji/course/jpglogs_json")
- // 2. CSV
- out.write.format("com.databricks.spark.csv").option("header", "true").save("file:/Users/talfranji/course/jpglogs")
- // 3. parquet
- //out.write.format("parquet").save("file:/Users/talfranji/course/jpglogs_p")
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement