Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import sys
- from datetime import datetime, timedelta
- from pyspark import SparkContext, SparkConf
- from pyspark.sql import SQLContext
- import pyspark.sql.functions as F
- def input_paths(date, depth):
- base_path = '/user/kvonleonid/data/events/date='
- event_type = '/event_type=message'
- date_list = [datetime.strptime(date, '%Y-%m-%d') - timedelta(days=x) for x in range(depth)]
- date_str = [x.strftime('%Y-%m-%d') for x in date_list]
- dataframe_paths = [base_path + x + event_type for x in date_str]
- return dataframe_paths
- def main():
- date = sys.argv[1]
- depth = sys.argv[2]
- threshold = sys.argv[3]
- verified_tags_path = sys.argv[4]
- base_output_path = sys.argv[5]
- conf = SparkConf().setAppName(f"VerifiedTagsCandidatesJob-{date}-d{depth}-cut{threshold}")
- sc = SparkContext(conf=conf)
- sql = SQLContext(sc)
- tags_verified = sql.read.parquet(verified_tags_path)
- path_list = input_paths(date, depth)
- events = sql.read.parquet(*path_list)
- tags = events.where("event.message_channel_to is not null").select(F.col('event.message_from').alias("message_from"), F.explode('event.tags').alias('tag'))
- tags_count = tags.groupBy(F.col('tag')).agg(F.countDistinct('message_from').alias('suggested_count'))
- df = tags_count.join(tags_verified, ['tag'], 'left_anti').filter(F.col('suggested_count') >= threshold)
- df.write.mode('overwrite').format('parquet').save(f'{base_output_path}/date={date}')
- if __name__ == "__main__":
- main()
- #/usr/lib/spark/bin/spark-submit --master yarn --deploy-mode cluster /lessons/verified_tags_candidates.py 2022-05-31 5 300 /user/master/data/snapshots/tags_verified/actual /user/kvonleonid/5.2.4/analytics/verified_tags_candidates_d5
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement