Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def reaction_tag_tops(date, depth, spark):
- message_paths = input_event_paths(date, depth)
- df_reactions = spark.read \
- .option("basePath", "/user/enfinity/data/events") \
- .parquet(*message_paths) \
- .where((F.col("event_type") == 'reaction') & (F.col("event.message_channel_to").isNotNull())) \
- .select(
- F.col("event.message_id").alias("message_id"),
- F.col("event.message_from").alias("user_id"),
- F.explode(F.col("event.tags")).alias("tag"),
- F.col("event.reaction_type").alias("reaction_type")
- )
- # Split into likes and dislikes dataframes
- likes_df = df_reactions.where(F.col("reaction_type") == 'like')
- dislikes_df = df_reactions.where(F.col("reaction_type") == 'dislike')
- # Define common logic for tops
- def top_tags(df, col_prefix):
- return df.groupBy("user_id", "tag") \
- .agg(F.count("*").alias("tag_count")) \
- .withColumn("rank", F.row_number().over(Window.partitionBy("user_id") \
- .orderBy(F.desc("tag_count"), F.desc("tag")))) \
- .where("rank <= 3") \
- .groupBy("user_id") \
- .pivot("rank", [1, 2, 3]) \
- .agg(F.first("tag")) \
- .withColumnRenamed("1", col_prefix + "_tag_top_1") \
- .withColumnRenamed("2", col_prefix + "_tag_top_2") \
- .withColumnRenamed("3", col_prefix + "_tag_top_3")
- likes_top = top_tags(likes_df, "like")
- dislikes_top = top_tags(dislikes_df, "dislike")
- # Join likes and dislikes tops on user_id
- result = likes_top.join(dislikes_top, on='user_id', how='outer')
- return result
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement