Advertisement
YaBoiSwayZ

Advanced Data Reconciliation Tool (PySpark)

Jun 2nd, 2024
92
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.43 KB | Source Code | 0 0
  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.functions import col, broadcast
  3. from pyspark.storagelevel import StorageLevel
  4. from pyspark.sql.utils import AnalysisException, Py4JJavaError
  5. import logging
  6.  
  7. spark = SparkSession.builder \
  8.     .appName("EnhancedDataReconciliation") \
  9.     .config("spark.sql.shuffle.partitions", str(spark.sparkContext.defaultParallelism * 3)) \
  10.     .config("spark.dynamicAllocation.enabled", "true") \
  11.     .getOrCreate()
  12.  
  13. dfA = spark.read.parquet("/path/to/tableA")
  14. dfB = spark.read.parquet("/path/to/tableB")
  15.  
  16. num_partitions = spark.sparkContext.defaultParallelism * 3
  17.  
  18. dfA = dfA.repartition(num_partitions, "policy_name").persist(StorageLevel.MEMORY_AND_DISK)
  19. dfB = dfB.repartition(num_partitions, "policy_name").persist(StorageLevel.MEMORY_AND_DISK)
  20.  
  21. def safe_broadcast(df, max_size=10*1024*1024):
  22.     try:
  23.         if df.count() * len(df.columns) * 8 < max_size:
  24.             return broadcast(df)
  25.         else:
  26.             return df
  27.     except AnalysisException:
  28.         return df
  29.  
  30. dfA_broadcasted = safe_broadcast(dfA)
  31. dfB_broadcasted = safe_broadcast(dfB)
  32.  
  33. exact_matches = dfA_broadcasted.join(
  34.     dfB_broadcasted,
  35.     on=["policy_name", "source_ip", "destination_ip", "port", "protocol"],
  36.     how="inner"
  37. )
  38.  
  39. filtered_dfA = dfA.filter(col("policy_name").isNotNull())
  40. filtered_dfB = dfB.filter(col("policy_name").isNotNull())
  41. partial_matches = filtered_dfA.crossJoin(filtered_dfB).filter(
  42.     (col("filtered_dfA.policy_name") != col("filtered_dfB.policy_name")) &
  43.     (
  44.         (col("filtered_dfA.source_ip") == col("filtered_dfB.source_ip")) |
  45.         (col("filtered_dfA.destination_ip") == col("filtered_dfB.destination_ip")) |
  46.         (col("filtered_dfA.port") == col("filtered_dfB.port")) |
  47.         (col("filtered_dfA.protocol") == col("filtered_dfB.protocol"))
  48.     )
  49. )
  50.  
  51. non_matches_A = dfA.join(broadcast(dfB), on=["policy_name", "source_ip", "destination_ip", "port", "protocol"], how="left_anti")
  52. non_matches_B = dfB.join(broadcast(dfA), on=["policy_name", "source_ip", "destination_ip", "port", "protocol"], how="left_anti")
  53.  
  54. exact_matches_result = exact_matches.collect()
  55. partial_matches_result = partial_matches.collect()
  56. non_matches_A_result = non_matches_A.collect()
  57. non_matches_B_result = non_matches_B.collect()
  58.  
  59. exact_matches.write.parquet("/path/to/exact_matches")
  60. partial_matches.write.parquet("/path/to/partial_matches")
  61. non_matches_A.write.parquet("/path/to/non_matches_A")
  62. non_matches_B.write.parquet("/path/to/non_matches_B")
  63.  
  64. dfA.unpersist()
  65. dfB.unpersist()
  66.  
  67. logging.basicConfig(level=logging.INFO)
  68. logger = logging.getLogger(__name__)
  69. logger.info("Exact matches count: %d", len(exact_matches_result))
  70. logger.info("Partial matches count: %d", len(partial_matches_result))
  71. logger.info("Non-matches A count: %d", len(non_matches_A_result))
  72. logger.info("Non-matches B count: %d", len(non_matches_B_result))
  73.  
  74. try:
  75.     exact_matches.write.parquet("/path/to/exact_matches")
  76. except (AnalysisException, Py4JJavaError) as e:
  77.     logger.error("Error saving exact matches: %s", str(e))
  78.  
  79. def monitor_cache_hit_ratio(spark_session):
  80.     storage_status = spark_session.sparkContext._jsc.getPersistentRDDs()
  81.     for (rdd_id, rdd_info) in storage_status.items():
  82.         if rdd_info.numCachedPartitions() > 0:
  83.             logger.info(f"RDD ID {rdd_id}: {rdd_info.numCachedPartitions()} partitions cached out of {rdd_info.numPartitions()} total")
  84.  
  85. monitor_cache_hit_ratio(spark)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement