Advertisement
HawkeyeHS

flight

Apr 2nd, 2025
402
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.75 KB | None | 0 0
  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.functions import col, asc, desc
  3. from graphframes import GraphFrame
  4.  
  5. # Initialize Spark session
  6. spark = SparkSession.builder \
  7.     .appName("Flight Data Analysis") \
  8.     .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.0-s_2.12") \
  9.     .getOrCreate()
  10.  
  11. # Load flight data
  12. df = spark.read.csv("airline_2m.csv", header=True, inferSchema=True)
  13.  
  14. # Remove null values from required columns
  15. df = df.dropna(subset=["OriginAirportID", "DestAirportID", "Distance", "CarrierDelay"])
  16.  
  17. # Select airport details for joining
  18. airport_info = df.select(
  19.     col("OriginAirportID").alias("AirportID"),
  20.     col("Origin").alias("AirportCode"),
  21.     col("OriginCityName").alias("CityName")
  22. ).dropDuplicates()
  23.  
  24. # Create vertices (airports) with names
  25. vertices = airport_info.select(
  26.     col("AirportID").alias("id"),
  27.     col("AirportCode"),
  28.     col("CityName")
  29. ).distinct()
  30.  
  31. # Create edges (flight routes) while removing duplicates
  32. edges = df.select(
  33.     col("OriginAirportID").alias("src"),
  34.     col("DestAirportID").alias("dst"),
  35.     col("Distance").alias("distance"),
  36.     col("CarrierDelay").alias("cost")
  37. ).dropDuplicates()
  38.  
  39. # Create a GraphFrame
  40. g = GraphFrame(vertices, edges)
  41.  
  42. # 1. Compute the total number of flight routes
  43. total_routes = g.edges.count()
  44. print(f"Total number of flight routes: {total_routes}")
  45.  
  46. # 2. Compute and sort the longest flight routes (with airport names)
  47. longest_routes = g.edges.dropDuplicates(["src", "dst", "distance"]).orderBy(desc("distance")).limit(10)
  48. longest_routes_named = longest_routes \
  49.     .join(vertices.alias("v1"), longest_routes.src == col("v1.id"), "left") \
  50.     .join(vertices.alias("v2"), longest_routes.dst == col("v2.id"), "left") \
  51.     .select(
  52.         col("v1.AirportCode").alias("Source_Airport"),
  53.         col("v1.CityName").alias("Source_City"),
  54.         col("v2.AirportCode").alias("Dest_Airport"),
  55.         col("v2.CityName").alias("Dest_City"),
  56.         "distance"
  57.     ).dropDuplicates()
  58.  
  59. print("Top 10 Longest Flight Routes:")
  60. longest_routes_named.show()
  61.  
  62. # 3. Display the airport with the highest degree vertex (most connections)
  63. degree_df = g.degrees.orderBy(desc("degree")).limit(1)
  64. degree_df_named = degree_df \
  65.     .join(vertices.alias("v"), degree_df.id == col("v.id"), "left") \
  66.     .select(col("v.AirportCode").alias("Airport"), col("v.CityName").alias("City"), "degree")
  67.  
  68. print("Airport with Highest Degree Vertex:")
  69. degree_df_named.show()
  70.  
  71. # 4. List the most important airports according to PageRank
  72. pagerank = g.pageRank(resetProbability=0.15, maxIter=10)
  73.  
  74. important_airports_sorted = pagerank.vertices.orderBy(desc("pagerank"))
  75.  
  76. important_airports_named = important_airports_sorted.alias("p") \
  77.     .join(vertices.alias("v"), col("p.id") == col("v.id"), "left") \
  78.     .select(
  79.         col("v.AirportCode").alias("Airport"),
  80.         col("v.CityName").alias("City"),
  81.         col("p.pagerank")  # Explicitly using `p.pagerank` to avoid ambiguity
  82.     ).limit(10)
  83.  
  84. print("Top 10 Most Important Airports (PageRank):")
  85. important_airports_named.show()
  86.  
  87. # 5. List the routes with the lowest flight costs
  88. cheapest_routes = g.edges.orderBy(asc("cost")).limit(10)
  89. cheapest_routes_named = cheapest_routes \
  90.     .join(vertices.alias("v1"), cheapest_routes.src == col("v1.id"), "left") \
  91.     .join(vertices.alias("v2"), cheapest_routes.dst == col("v2.id"), "left") \
  92.     .select(
  93.         col("v1.AirportCode").alias("Source_Airport"),
  94.         col("v1.CityName").alias("Source_City"),
  95.         col("v2.AirportCode").alias("Dest_Airport"),
  96.         col("v2.CityName").alias("Dest_City"),
  97.         "distance", "cost"
  98.     ).dropDuplicates()
  99.  
  100. print("Top 10 Cheapest Routes:")
  101. cheapest_routes_named.show()
  102.  
  103. # Stop Spark session
  104. spark.stop()
  105.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement