Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import sys
- import os
- from datetime import datetime as dt
- import datetime
- import findspark
- findspark.init()
- findspark.find()
- from pyspark import SparkContext, SparkConf
- from pyspark.sql import SQLContext
- import pyspark.sql.functions as F
- from pyspark.sql.window import Window
- os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
- os.environ['YARN_CONF_DIR'] = '/etc/hadoop/conf'
- os.environ['JAVA_HOME']='/usr'
- os.environ['SPARK_HOME'] ='/usr/lib/spark'
- os.environ['PYTHONPATH'] ='/usr/local/lib/python3.8'
- def input_paths(date, depth, base_input_path, postfix):
- inputMaxDate = dt.strptime(date, "%Y-%m-%d")
- DateList = []
- for i in range(int(depth)):
- DateList.append((inputMaxDate + datetime.timedelta(days=-i)).strftime(base_input_path+"/date="+"%Y-%m-%d"+postfix))
- return DateList
- def tag_tops(date, depth, spark_sesion, events_base_path):
- postfix = "/event_type=message"
- PathList = input_paths(date, depth, events_base_path, postfix)
- result = spark_sesion.read.parquet(*PathList)\
- .where("event.message_channel_to is not null")\
- .select(F.col("event.message_id").alias("message_id"),
- F.col("event.message_from").alias("user_id"),
- F.explode(F.col("event.tags")).alias("tag"))\
- .groupBy("user_id", "tag")\
- .agg(F.count("*").alias("tag_count"))\
- .withColumn("rank", F.row_number().over(Window.partitionBy("user_id")\
- .orderBy(F.desc("tag_count"), F.desc("tag"))))\
- .where("rank <= 3")\
- .groupBy("user_id")\
- .pivot("rank", [1, 2, 3])\
- .agg(F.first("tag"))\
- .withColumnRenamed("1", "tag_top_1")\
- .withColumnRenamed("2", "tag_top_2")\
- .withColumnRenamed("3", "tag_top_3")
- return result
- def input_event_paths(date, depth):
- dt = datetime.datetime.strptime(date, '%Y-%m-%d')
- return [f"/user/madaxell/data/events/date={(dt-datetime.timedelta(days=x)).strftime('%Y-%m-%d')}" for x in range(depth)]
- def reaction_tag_tops(date, depth, spark, events_base_path):
- reaction_paths = input_event_paths(date, depth)
- reactions = spark.read\
- .option("basePath", events_base_path)\
- .parquet(*reaction_paths)\
- .where("event_type='reaction'")
- all_message_tags = spark.read.parquet("/user/madaxell/data/events/events.parquet")\
- .where("event_type='message' and event.message_channel_to is not null")\
- .select(F.col("event.message_id").alias("message_id"),
- F.col("event.message_from").alias("user_id"),
- F.explode(F.col("event.tags")).alias("tag")
- )
- reaction_tags = reactions\
- .select(F.col("event.reaction_from").alias("user_id"),
- F.col("event.message_id").alias("message_id"),
- F.col("event.reaction_type").alias("reaction_type")
- ).join(all_message_tags.select("message_id", "tag"), "message_id")
- reaction_tops = reaction_tags\
- .groupBy("user_id", "tag", "reaction_type")\
- .agg(F.count("*").alias("tag_count"))\
- .withColumn("rank", F.row_number().over(Window.partitionBy("user_id", "reaction_type")\
- .orderBy(F.desc("tag_count"), F.desc("tag"))))\
- .where("rank <= 3")\
- .groupBy("user_id", "reaction_type")\
- .pivot("rank", [1, 2, 3])\
- .agg(F.first("tag"))\
- .cache()
- like_tops = reaction_tops\
- .where("reaction_type = 'like'")\
- .drop("reaction_type")\
- .withColumnRenamed("1", "like_tag_top_1")\
- .withColumnRenamed("2", "like_tag_top_2")\
- .withColumnRenamed("3", "like_tag_top_3")
- dislike_tops = reaction_tops\
- .where("reaction_type = 'dislike'")\
- .drop("reaction_type")\
- .withColumnRenamed("1", "dislike_tag_top_1")\
- .withColumnRenamed("2", "dislike_tag_top_2")\
- .withColumnRenamed("3", "dislike_tag_top_3")
- result = like_tops\
- .join(dislike_tops, "user_id", "full_outer")
- return result
- def calculate_user_interests(date, days_count, spark, events_base_path):
- Tags = tag_tops(date, days_count, spark, events_base_path)
- Reactions = reaction_tag_tops(date, days_count, spark, events_base_path)
- result = Tags.join(Reactions, "user_id", "full_outer")
- return result
- def main():
- date = sys.argv[1]
- days_count = int(sys.argv[2])
- events_base_path = sys.argv[3]
- output_base_path = sys.argv[4]
- conf = SparkConf().setAppName(f"UserInterestsJob-{date}-d{days_count}")
- sc = SparkContext(conf=conf)
- sql = SQLContext(sc)
- calculate_user_interests(date, days_count, sql, events_base_path).write.mode("overwrite").parquet(f'{output_base_path}/user_interests_d{days_count}/date={date}')
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement