Advertisement
hyor1

Untitled

Aug 16th, 2023 (edited)
21
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.81 KB | None | 0 0
  1. import sys
  2. from datetime import datetime, timedelta
  3.  
  4. from pyspark import SparkContext, SparkConf
  5. from pyspark.sql import SQLContext
  6. import pyspark.sql.functions as F
  7.  
  8.  
  9. def input_paths(date, depth):
  10. base_path = '/user/kvonleonid/data/events/date='
  11. event_type = '/event_type=message'
  12. date_list = [datetime.strptime(date, '%Y-%m-%d') - timedelta(days=x) for x in range(depth)]
  13. date_str = [x.strftime('%Y-%m-%d') for x in date_list]
  14. dataframe_paths = [base_path + x + event_type for x in date_str]
  15. return dataframe_paths
  16.  
  17. def main():
  18. date = sys.argv[1]
  19. depth = sys.argv[2]
  20. threshold = sys.argv[3]
  21. verified_tags_path = sys.argv[4]
  22. base_output_path = sys.argv[5]
  23.  
  24. conf = SparkConf().setAppName(f"VerifiedTagsCandidatesJob-{date}-d{depth}-cut{threshold}")
  25. sc = SparkContext(conf=conf)
  26. sql = SQLContext(sc)
  27.  
  28. tags_verified = sql.read.parquet(verified_tags_path)
  29. path_list = input_paths(date, depth)
  30. events = sql.read.parquet(*path_list)
  31. tags = events.where("event.message_channel_to is not null").select(F.col('event.message_from').alias("message_from"), F.explode('event.tags').alias('tag'))
  32. tags_count = tags.groupBy(F.col('tag')).agg(F.countDistinct('message_from').alias('suggested_count'))
  33. df = tags_count.join(tags_verified, ['tag'], 'left_anti').filter(F.col('suggested_count') >= threshold)
  34. df.write.mode('overwrite').format('parquet').save(f'{base_output_path}/date={date}')
  35.  
  36. if __name__ == "__main__":
  37. main()
  38.  
  39. #/usr/lib/spark/bin/spark-submit --master yarn --deploy-mode cluster /lessons/verified_tags_candidates.py 2022-05-31 5 300 /user/master/data/snapshots/tags_verified/actual /user/kvonleonid/5.2.4/analytics/verified_tags_candidates_d5
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement