Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pyspark.sql import SparkSession
- from pyspark.sql.functions import col, asc, desc
- from graphframes import GraphFrame
- # Initialize Spark session
- spark = SparkSession.builder \
- .appName("Flight Data Analysis") \
- .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.0-s_2.12") \
- .getOrCreate()
- # Load flight data
- df = spark.read.csv("airline_2m.csv", header=True, inferSchema=True)
- # Remove null values from required columns
- df = df.dropna(subset=["OriginAirportID", "DestAirportID", "Distance", "CarrierDelay"])
- # Select airport details for joining
- airport_info = df.select(
- col("OriginAirportID").alias("AirportID"),
- col("Origin").alias("AirportCode"),
- col("OriginCityName").alias("CityName")
- ).dropDuplicates()
- # Create vertices (airports) with names
- vertices = airport_info.select(
- col("AirportID").alias("id"),
- col("AirportCode"),
- col("CityName")
- ).distinct()
- # Create edges (flight routes) while removing duplicates
- edges = df.select(
- col("OriginAirportID").alias("src"),
- col("DestAirportID").alias("dst"),
- col("Distance").alias("distance"),
- col("CarrierDelay").alias("cost")
- ).dropDuplicates()
- # Create a GraphFrame
- g = GraphFrame(vertices, edges)
- # 1. Compute the total number of flight routes
- total_routes = g.edges.count()
- print(f"Total number of flight routes: {total_routes}")
- # 2. Compute and sort the longest flight routes (with airport names)
- longest_routes = g.edges.dropDuplicates(["src", "dst", "distance"]).orderBy(desc("distance")).limit(10)
- longest_routes_named = longest_routes \
- .join(vertices.alias("v1"), longest_routes.src == col("v1.id"), "left") \
- .join(vertices.alias("v2"), longest_routes.dst == col("v2.id"), "left") \
- .select(
- col("v1.AirportCode").alias("Source_Airport"),
- col("v1.CityName").alias("Source_City"),
- col("v2.AirportCode").alias("Dest_Airport"),
- col("v2.CityName").alias("Dest_City"),
- "distance"
- ).dropDuplicates()
- print("Top 10 Longest Flight Routes:")
- longest_routes_named.show()
- # 3. Display the airport with the highest degree vertex (most connections)
- degree_df = g.degrees.orderBy(desc("degree")).limit(1)
- degree_df_named = degree_df \
- .join(vertices.alias("v"), degree_df.id == col("v.id"), "left") \
- .select(col("v.AirportCode").alias("Airport"), col("v.CityName").alias("City"), "degree")
- print("Airport with Highest Degree Vertex:")
- degree_df_named.show()
- # 4. List the most important airports according to PageRank
- pagerank = g.pageRank(resetProbability=0.15, maxIter=10)
- important_airports_sorted = pagerank.vertices.orderBy(desc("pagerank"))
- important_airports_named = important_airports_sorted.alias("p") \
- .join(vertices.alias("v"), col("p.id") == col("v.id"), "left") \
- .select(
- col("v.AirportCode").alias("Airport"),
- col("v.CityName").alias("City"),
- col("p.pagerank") # Explicitly using `p.pagerank` to avoid ambiguity
- ).limit(10)
- print("Top 10 Most Important Airports (PageRank):")
- important_airports_named.show()
- # 5. List the routes with the lowest flight costs
- cheapest_routes = g.edges.orderBy(asc("cost")).limit(10)
- cheapest_routes_named = cheapest_routes \
- .join(vertices.alias("v1"), cheapest_routes.src == col("v1.id"), "left") \
- .join(vertices.alias("v2"), cheapest_routes.dst == col("v2.id"), "left") \
- .select(
- col("v1.AirportCode").alias("Source_Airport"),
- col("v1.CityName").alias("Source_City"),
- col("v2.AirportCode").alias("Dest_Airport"),
- col("v2.CityName").alias("Dest_City"),
- "distance", "cost"
- ).dropDuplicates()
- print("Top 10 Cheapest Routes:")
- cheapest_routes_named.show()
- # Stop Spark session
- spark.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement