Advertisement
PeachLemonade

Untitled

Mar 17th, 2024
81
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.88 KB | None | 0 0
  1. import os
  2. import argparse
  3. import datetime
  4. from pyspark.sql import SparkSession
  5. from pyspark.sql import functions as F
  6. from pyspark.sql.utils import AnalysisException
  7.  
  8. os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
  9. os.environ['YARN_CONF_DIR'] = '/etc/hadoop/conf'
  10. os.environ['JAVA_HOME']='/usr'
  11. os.environ['SPARK_HOME'] ='/usr/lib/spark'
  12. os.environ['PYTHONPATH'] ='/usr/local/lib/python3.8'
  13.  
  14. def get_input_paths(date, depth, input_path):
  15.     dt = datetime.datetime.strptime(date, '%Y-%m-%d')
  16.     return [
  17.         f"{input_path}/date={(dt - datetime.timedelta(days=x)).strftime('%Y-%m-%d')}/event_type=message"
  18.         for x in range(depth)
  19.     ]
  20.  
  21. def delete_and_create_hdfs_dir(path, spark):
  22.     """Deletes an HDFS directory if it exists, then creates it."""
  23.     fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
  24.     hadoop_path = spark._jvm.org.apache.hadoop.fs.Path(path)
  25.     try:
  26.         if fs.exists(hadoop_path):
  27.             fs.delete(hadoop_path, True)
  28.         fs.mkdirs(hadoop_path)
  29.     except AnalysisException:
  30.         print(f"Warning: Path {path} might exist as a file.")
  31.  
  32. def get_and_save_data(date, depth, suggested_count, event_input_path, snapshot_input_path, base_output_path):
  33.    
  34.     spark = SparkSession.builder \
  35.     .appName(f'VerifiedTagsCandidatesJob-{date}-d{depth}-cut{suggested_count}') \
  36.     .getOrCreate()
  37.  
  38.     delete_and_create_hdfs_dir(base_output_path, spark)
  39.    
  40.     messages = spark.read.parquet(*get_input_paths(date, depth, event_input_path))
  41.  
  42.     all_tags = messages.where("event.message_channel_to is not null")\
  43.                        .selectExpr(["event.message_from as user", "explode(event.tags) as tag"])\
  44.                        .groupBy("tag").agg(F.expr("count(distinct user) as suggested_count"))\
  45.                        .where(f"suggested_count >= {suggested_count}")
  46.  
  47.     verified_tags = spark.read.parquet(snapshot_input_path)
  48.  
  49.     candidates = all_tags.join(verified_tags, "tag", "left_anti")
  50.     candidates.write.parquet(base_output_path + '/date=' + date)
  51.  
  52. def main():
  53.  
  54.     parser = argparse.ArgumentParser(description="Verified Tags Candidates Job")
  55.     parser.add_argument("date", help="Date in YYYY-MM-DD format")
  56.     parser.add_argument("depth", type=int, help="Depth of event history")
  57.     parser.add_argument("suggested_count", type=int, help="Minimum suggested count for tags")
  58.     parser.add_argument("event_input_path", help="Input path for event data")
  59.     parser.add_argument("snapshot_input_path", help="Input path for snapshot data")
  60.     parser.add_argument("base_output_path", help="Base output path")
  61.  
  62.     args = parser.parse_args()
  63.  
  64.     get_and_save_data(
  65.         args.date,
  66.         args.depth,
  67.         args.suggested_count,
  68.         args.event_input_path,
  69.         args.snapshot_input_path,
  70.         args.base_output_path
  71.     )
  72.  
  73. if __name__ == "__main__":
  74.     main()
  75.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement