Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import argparse
- import datetime
- from pyspark.sql import SparkSession
- from pyspark.sql import functions as F
- from pyspark.sql.utils import AnalysisException
- os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
- os.environ['YARN_CONF_DIR'] = '/etc/hadoop/conf'
- os.environ['JAVA_HOME']='/usr'
- os.environ['SPARK_HOME'] ='/usr/lib/spark'
- os.environ['PYTHONPATH'] ='/usr/local/lib/python3.8'
- def get_input_paths(date, depth, input_path):
- dt = datetime.datetime.strptime(date, '%Y-%m-%d')
- return [
- f"{input_path}/date={(dt - datetime.timedelta(days=x)).strftime('%Y-%m-%d')}/event_type=message"
- for x in range(depth)
- ]
- def delete_and_create_hdfs_dir(path, spark):
- """Deletes an HDFS directory if it exists, then creates it."""
- fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
- hadoop_path = spark._jvm.org.apache.hadoop.fs.Path(path)
- try:
- if fs.exists(hadoop_path):
- fs.delete(hadoop_path, True)
- fs.mkdirs(hadoop_path)
- except AnalysisException:
- print(f"Warning: Path {path} might exist as a file.")
- def get_and_save_data(date, depth, suggested_count, event_input_path, snapshot_input_path, base_output_path):
- spark = SparkSession.builder \
- .appName(f'VerifiedTagsCandidatesJob-{date}-d{depth}-cut{suggested_count}') \
- .getOrCreate()
- delete_and_create_hdfs_dir(base_output_path, spark)
- messages = spark.read.parquet(*get_input_paths(date, depth, event_input_path))
- all_tags = messages.where("event.message_channel_to is not null")\
- .selectExpr(["event.message_from as user", "explode(event.tags) as tag"])\
- .groupBy("tag").agg(F.expr("count(distinct user) as suggested_count"))\
- .where(f"suggested_count >= {suggested_count}")
- verified_tags = spark.read.parquet(snapshot_input_path)
- candidates = all_tags.join(verified_tags, "tag", "left_anti")
- candidates.write.parquet(base_output_path + '/date=' + date)
- def main():
- parser = argparse.ArgumentParser(description="Verified Tags Candidates Job")
- parser.add_argument("date", help="Date in YYYY-MM-DD format")
- parser.add_argument("depth", type=int, help="Depth of event history")
- parser.add_argument("suggested_count", type=int, help="Minimum suggested count for tags")
- parser.add_argument("event_input_path", help="Input path for event data")
- parser.add_argument("snapshot_input_path", help="Input path for snapshot data")
- parser.add_argument("base_output_path", help="Base output path")
- args = parser.parse_args()
- get_and_save_data(
- args.date,
- args.depth,
- args.suggested_count,
- args.event_input_path,
- args.snapshot_input_path,
- args.base_output_path
- )
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement