Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import findspark
- findspark.init()
- findspark.find()
- import os
- os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
- os.environ['YARN_CONF_DIR'] = '/etc/hadoop/conf'
- import pyspark
- from pyspark.sql import SparkSession
- from pyspark.context import SparkContext
- from datetime import datetime, timedelta
- # импортируем оконную функцию и модуль Spark Functions
- import pyspark.sql.functions as F
- spark = SparkSession \
- .builder \
- .master("yarn") \
- .config("spark.driver.cores", "4") \
- .config("spark.driver.memory", "4g") \
- .appName("Task1UpdateTag") \
- .getOrCreate()
- def input_paths(date, depth):
- dt = datetime.strptime(date, '%Y-%m-%d')
- return [f"\"hdfs://rc1a-dataproc-m-dg5lgqqm7jju58f9.mdb.yandexcloud.net/user/ahuretskyi/data/events/date={(dt-timedelta(days=x)).strftime('%Y-%m-%d')}/event_type=message\"" for x in range(depth)]
- paths = input_paths('2022-05-31', 7)
- for pth in paths:
- messages = spark.read.parquet(pth)
- # результирующий датафрейм должен иметь две колонки: tag — строка с тегом и suggested_count — количество уникальных пользователей.
- all_tags = messages.where("event.message_channel_to is not null") \
- .selectExpr(["event.message_from as user", "explode(event.tags) as tag"]) \
- .groupBy("tag").agg(F.expr("count(distinct user) as suggested_count")).where("suggested_count >= 100")
- verified_tags = spark.read.json("hdfs://rc1a-dataproc-m-dg5lgqqm7jju58f9.mdb.yandexcloud.net/user/master/data/snapshots/tags_verified/actual")
- # не входят в общедоступные (датасет tags_verified)
- candidates = all_tags.join(verified_tags, "tag", "left_anti")
- # записываем результат в формате Parquet по пути
- candidates.write.parquet('hdfs://rc1a-dataproc-m-dg5lgqqm7jju58f9.mdb.yandexcloud.net/user/ahuretskyi/data/analytics/candidates_d7_pyspark')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement