Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // events.json contains one json per line:
- //{"utcdate": "2014-11-10 22:23:23", "deviceid": "33028801", "lat": 40.055023, "lon": -74.996469}
- //{"utcdate": "2014-11-10 22:23:23", "deviceid": "38241762", "lat": 40.055012, "lon": -74.996447}
- //{"utcdate": "2014-11-10 22:23:23", "deviceid": "32740723", "lat": 40.177671, "lon": -74.911994}
- val devpos1 = sqlContext.read.json("file:///Users/talfranji/Dropbox/InterBit/Hadoop/ATT2014/events.json")
- devpos1.registerTempTable("devpos1")
- val devpos = sqlContext.sql("select deviceid, lat, lon, from_utc_timestamp(utcdate, 'UTC') as t from devpos1")
- devpos.registerTempTable("devpos")
- // How many devices in table
- val s = "select count(distinct deviceid) from devpos"
- sqlContext.sql(s).collect
- //Plans conatins each device - to which plan. it is tab separated
- //33028801 business1
- //38241762 kid
- //32740723 business1
- // Create an RDD of Plan objects and register it as a table.
- case class Plan(deviceid: String, plan: String)
- val plans = sc.textFile("file:///Users/talfranji/Dropbox/InterBit/Hadoop/ATT2014/plans.txt"
- ).map(_.split("\t")).map(p => Plan(p(0), p(1))).toDF()
- plans.registerTempTable("plans")
- // active devices by plan
- val s = """select
- count(distinct devpos.deviceid), plans.plan
- from plans
- join devpos on (devpos.deviceid = plans.deviceid)
- group by plans.plan"""
- sqlContext.sql(s).collect.foreach(println)
- // time limit -
- val s = "select count(distinct deviceid) from devpos where t >= '2014-11-10' and t <'2014-11-11'"
- sqlContext.sql(s).collect.foreach(println)
- // qutization
- // 1 lat =~ 110Km lat*110 * 10 - you get resolution of 100m
- // 1 lon =~ 100Km if we are in lat==32 so lon *100 *10 get you to resolution of 100m
- val s = """select
- max(deviceid), avg(lat), avg(lon) from devpos
- where (cast(deviceid as int) % 10) = 3 group by
- round(lat *110 * 10), round(lon*100 *10)"""
- // try *10000 - 10m
- sqlContext.sql(s).take(10).foreach(println)
- //##### Areas/Places where a user spent more than an hour.
- //# NOT good:
- //select min(t), max(t), max(deviceid), avg(lat), avg(lon)
- // from devpos
- // where (deviceid % 10) = 3
- // group by round(lat *1000), round(lon*1000);
- //# This can be done with Oracle analytical functions. Very powerfull tool!
- //#inner query - just quantify
- val s1 = "select t, deviceid, lat, lon, round(lat *110 * 100) as qlat, round(lon*100 * 10) as qlon from devpos"
- //##inner 2 - add the prev location to each record
- val s2 = """select t, deviceid, qlat, qlon,
- lag(qlat,1,0) over (partition by deviceid order by t) as prev_qlat,
- lag(qlon,1,0) over (partition by deviceid order by t) as prev_qlon
- from
- (select t, deviceid, lat, lon, round(lat *110 *10) as qlat, round(lon*100* 100) as qlon from devpos) qdevpos
- order by deviceid,t"""
- //###inner 3 - find only "new" locations
- val s3 = """select
- t, deviceid, qlat,qlon, prev_qlat, prev_qlon
- from (
- select t, deviceid, qlat, qlon,
- lag(qlat,1,0) over (partition by deviceid order by t) as prev_qlat,
- lag(qlon,1,0) over (partition by deviceid order by t) as prev_qlon
- from
- (select t, deviceid, lat, lon, round(lat *110 *10) as qlat, round(lon*100 *10) as qlon from devpos) qdevpos
- ) poschg
- where qlat <> prev_qlat and qlon<> prev_qlon
- order by deviceid, t"""
- sqlContext.sql(s3).take(10).foreach(println)
- // User defined functions
- // Given a latitude - what is the distance between two longtitude units, in Km
- val LonDistAtLat = (lat : Double) => {
- math.cos(math.abs(lat) / 90.0) * 111.321;
- }
- sqlContext.udf.register("LonDistAtLat", LonDistAtLat)
- val s4 = """select
- t, deviceid, qlat,qlon, prev_qlat, prev_qlon
- from (
- select t, deviceid, qlat, qlon,
- lag(qlat,1,0) over (partition by deviceid order by t) as prev_qlat,
- lag(qlon,1,0) over (partition by deviceid order by t) as prev_qlon
- from
- (select t, deviceid, lat, lon, round(lat *110 *10) as qlat, round(lon*LonDistAtLat(lat) *10) as qlon from devpos) qdevpos
- ) poschg
- where qlat <> prev_qlat and qlon<> prev_qlon
- order by deviceid, t"""
- sqlContext.sql(s4).take(10).foreach(println)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement