Advertisement
AliaksandrLet

reaction_tag_tops

Sep 10th, 2023 (edited)
264
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.66 KB | None | 0 0
  1. def reaction_tag_tops(date, depth, spark):
  2.     message_paths = input_event_paths(date, depth)
  3.  
  4.     df_reactions = spark.read \
  5.         .option("basePath", "/user/enfinity/data/events") \
  6.         .parquet(*message_paths) \
  7.         .where((F.col("event_type") == 'reaction') & (F.col("event.message_channel_to").isNotNull())) \
  8.         .select(
  9.             F.col("event.message_id").alias("message_id"),
  10.             F.col("event.message_from").alias("user_id"),
  11.             F.explode(F.col("event.tags")).alias("tag"),
  12.             F.col("event.reaction_type").alias("reaction_type")
  13.         )
  14.  
  15.     # Split into likes and dislikes dataframes
  16.     likes_df = df_reactions.where(F.col("reaction_type") == 'like')
  17.     dislikes_df = df_reactions.where(F.col("reaction_type") == 'dislike')
  18.  
  19.     # Define common logic for tops
  20.     def top_tags(df, col_prefix):
  21.         return df.groupBy("user_id", "tag") \
  22.             .agg(F.count("*").alias("tag_count")) \
  23.             .withColumn("rank", F.row_number().over(Window.partitionBy("user_id") \
  24.             .orderBy(F.desc("tag_count"), F.desc("tag")))) \
  25.             .where("rank <= 3") \
  26.             .groupBy("user_id") \
  27.             .pivot("rank", [1, 2, 3]) \
  28.             .agg(F.first("tag")) \
  29.             .withColumnRenamed("1", col_prefix + "_tag_top_1") \
  30.             .withColumnRenamed("2", col_prefix + "_tag_top_2") \
  31.             .withColumnRenamed("3", col_prefix + "_tag_top_3")
  32.  
  33.     likes_top = top_tags(likes_df, "like")
  34.     dislikes_top = top_tags(dislikes_df, "dislike")
  35.  
  36.     # Join likes and dislikes tops on user_id
  37.     result = likes_top.join(dislikes_top, on='user_id', how='outer')
  38.  
  39.     return result
Tags: Spark
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement