Advertisement
Mad_Axell

Untitled

Feb 28th, 2023
128
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.64 KB | None | 0 0
  1. import sys
  2. import os
  3.  
  4. from datetime import datetime as dt
  5. import datetime
  6.  
  7. import findspark
  8. findspark.init()
  9. findspark.find()
  10.  
  11. from pyspark import SparkContext, SparkConf
  12. from pyspark.sql import SQLContext
  13. import pyspark.sql.functions as F
  14. from pyspark.sql.window import Window
  15.  
  16. os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
  17. os.environ['YARN_CONF_DIR'] = '/etc/hadoop/conf'
  18. os.environ['JAVA_HOME']='/usr'
  19. os.environ['SPARK_HOME'] ='/usr/lib/spark'
  20. os.environ['PYTHONPATH'] ='/usr/local/lib/python3.8'
  21.  
  22. def input_paths(date, depth, base_input_path, postfix):
  23.  
  24. inputMaxDate = dt.strptime(date, "%Y-%m-%d")
  25.  
  26. DateList = []
  27. for i in range(int(depth)):
  28. DateList.append((inputMaxDate + datetime.timedelta(days=-i)).strftime(base_input_path+"/date="+"%Y-%m-%d"+postfix))
  29. return DateList
  30.  
  31.  
  32. def tag_tops(date, depth, spark_sesion, events_base_path):
  33.  
  34. postfix = "/event_type=message"
  35. PathList = input_paths(date, depth, events_base_path, postfix)
  36. result = spark_sesion.read.parquet(*PathList)\
  37. .where("event.message_channel_to is not null")\
  38. .select(F.col("event.message_id").alias("message_id"),
  39. F.col("event.message_from").alias("user_id"),
  40. F.explode(F.col("event.tags")).alias("tag"))\
  41. .groupBy("user_id", "tag")\
  42. .agg(F.count("*").alias("tag_count"))\
  43. .withColumn("rank", F.row_number().over(Window.partitionBy("user_id")\
  44. .orderBy(F.desc("tag_count"), F.desc("tag"))))\
  45. .where("rank <= 3")\
  46. .groupBy("user_id")\
  47. .pivot("rank", [1, 2, 3])\
  48. .agg(F.first("tag"))\
  49. .withColumnRenamed("1", "tag_top_1")\
  50. .withColumnRenamed("2", "tag_top_2")\
  51. .withColumnRenamed("3", "tag_top_3")
  52.  
  53. return result
  54.  
  55. def input_event_paths(date, depth):
  56. dt = datetime.datetime.strptime(date, '%Y-%m-%d')
  57.  
  58. return [f"/user/madaxell/data/events/date={(dt-datetime.timedelta(days=x)).strftime('%Y-%m-%d')}" for x in range(depth)]
  59.  
  60.  
  61. def reaction_tag_tops(date, depth, spark, events_base_path):
  62.  
  63. reaction_paths = input_event_paths(date, depth)
  64.  
  65. reactions = spark.read\
  66. .option("basePath", events_base_path)\
  67. .parquet(*reaction_paths)\
  68. .where("event_type='reaction'")
  69.  
  70. all_message_tags = spark.read.parquet("/user/madaxell/data/events/events.parquet")\
  71. .where("event_type='message' and event.message_channel_to is not null")\
  72. .select(F.col("event.message_id").alias("message_id"),
  73. F.col("event.message_from").alias("user_id"),
  74. F.explode(F.col("event.tags")).alias("tag")
  75. )
  76.  
  77. reaction_tags = reactions\
  78. .select(F.col("event.reaction_from").alias("user_id"),
  79. F.col("event.message_id").alias("message_id"),
  80. F.col("event.reaction_type").alias("reaction_type")
  81. ).join(all_message_tags.select("message_id", "tag"), "message_id")
  82.  
  83. reaction_tops = reaction_tags\
  84. .groupBy("user_id", "tag", "reaction_type")\
  85. .agg(F.count("*").alias("tag_count"))\
  86. .withColumn("rank", F.row_number().over(Window.partitionBy("user_id", "reaction_type")\
  87. .orderBy(F.desc("tag_count"), F.desc("tag"))))\
  88. .where("rank <= 3")\
  89. .groupBy("user_id", "reaction_type")\
  90. .pivot("rank", [1, 2, 3])\
  91. .agg(F.first("tag"))\
  92. .cache()
  93.  
  94. like_tops = reaction_tops\
  95. .where("reaction_type = 'like'")\
  96. .drop("reaction_type")\
  97. .withColumnRenamed("1", "like_tag_top_1")\
  98. .withColumnRenamed("2", "like_tag_top_2")\
  99. .withColumnRenamed("3", "like_tag_top_3")
  100.  
  101. dislike_tops = reaction_tops\
  102. .where("reaction_type = 'dislike'")\
  103. .drop("reaction_type")\
  104. .withColumnRenamed("1", "dislike_tag_top_1")\
  105. .withColumnRenamed("2", "dislike_tag_top_2")\
  106. .withColumnRenamed("3", "dislike_tag_top_3")
  107.  
  108. result = like_tops\
  109. .join(dislike_tops, "user_id", "full_outer")
  110.  
  111. return result
  112.  
  113.  
  114. def calculate_user_interests(date, days_count, spark, events_base_path):
  115. Tags = tag_tops(date, days_count, spark, events_base_path)
  116. Reactions = reaction_tag_tops(date, days_count, spark, events_base_path)
  117. result = Tags.join(Reactions, "user_id", "full_outer")
  118.  
  119. return result
  120.  
  121.  
  122. def main():
  123. date = sys.argv[1]
  124. days_count = int(sys.argv[2])
  125. events_base_path = sys.argv[3]
  126. output_base_path = sys.argv[4]
  127.  
  128. conf = SparkConf().setAppName(f"UserInterestsJob-{date}-d{days_count}")
  129. sc = SparkContext(conf=conf)
  130. sql = SQLContext(sc)
  131.  
  132. calculate_user_interests(date, days_count, sql, events_base_path).write.mode("overwrite").parquet(f'{output_base_path}/user_interests_d{days_count}/date={date}')
  133.  
  134. if __name__ == "__main__":
  135. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement