Advertisement
AntonHuretskyi

Task1UpdateTag_final

Jan 12th, 2023
81
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.09 KB | None | 0 0
  1. import findspark
  2. findspark.init()
  3. findspark.find()
  4. import os
  5. os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
  6. os.environ['YARN_CONF_DIR'] = '/etc/hadoop/conf'
  7.  
  8. import pyspark
  9. from pyspark.sql import SparkSession
  10. from pyspark.context import SparkContext
  11. from datetime import datetime, timedelta
  12.  
  13. # импортируем оконную функцию и модуль Spark Functions
  14. import pyspark.sql.functions as F
  15.  
  16. spark = SparkSession \
  17. .builder \
  18. .master("yarn") \
  19. .config("spark.driver.cores", "4") \
  20. .config("spark.driver.memory", "4g") \
  21. .appName("Task1UpdateTag") \
  22. .getOrCreate()
  23.  
  24. def input_paths(date, depth):
  25. dt = datetime.strptime(date, '%Y-%m-%d')
  26. return [f"hdfs://rc1a-dataproc-m-dg5lgqqm7jju58f9.mdb.yandexcloud.net/user/ahuretskyi/data/events/date={(dt-timedelta(days=x)).strftime('%Y-%m-%d')}/event_type=message" for x in range(depth)]
  27.  
  28. paths = list(input_paths('2022-05-31', 7))
  29.  
  30. for pth in paths:
  31. messages = spark.read.parquet(pth)
  32.  
  33. # результирующий датафрейм должен иметь две колонки: tag — строка с тегом и suggested_count — количество уникальных пользователей.
  34. all_tags = messages.where("event.message_channel_to is not null") \
  35. .selectExpr(["event.message_from as user", "explode(event.tags) as tag"]) \
  36. .groupBy("tag").agg(F.expr("count(distinct user) as suggested_count")).where("suggested_count >= 100")
  37.  
  38.  
  39. verified_tags = spark.read.parquet("hdfs://rc1a-dataproc-m-dg5lgqqm7jju58f9.mdb.yandexcloud.net/user/master/data/snapshots/tags_verified/actual")
  40.  
  41. # не входят в общедоступные (датасет tags_verified)
  42. candidates = all_tags.join(verified_tags, "tag", "left_anti")
  43.  
  44. # записываем результат в формате Parquet по пути
  45. candidates.write.mode("overwrite").parquet("hdfs://rc1a-dataproc-m-dg5lgqqm7jju58f9.mdb.yandexcloud.net/user/ahuretskyi/data/analytics/candidates_d7_pyspark")
Tags: Hadoop
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement