Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import csv
- import sys
- from util import Node, StackFrontier, QueueFrontier
- # Maps names to a set of corresponding person_ids
- names = {}
- # Maps person_ids to a dictionary of: name, birth, movies (a set of movie_ids)
- people = {}
- # Maps movie_ids to a dictionary of: title, year, stars (a set of person_ids)
- movies = {}
- explored = set() # the explored set of movies for a search run
- frontier = QueueFrontier() # a queue of nodes used to discover path in a Breath First Search way
- def load_data(directory):
- """
- Load data from CSV files into memory.
- """
- # Load people
- with open(f"{directory}/people.csv", encoding="utf-8") as f:
- reader = csv.DictReader(f)
- for row in reader:
- people[row["id"]] = {
- "name": row["name"],
- "birth": row["birth"],
- "movies": set()
- }
- if row["name"].lower() not in names:
- names[row["name"].lower()] = {row["id"]}
- else:
- names[row["name"].lower()].add(row["id"])
- # Load movies
- with open(f"{directory}/movies.csv", encoding="utf-8") as f:
- reader = csv.DictReader(f)
- for row in reader:
- movies[row["id"]] = {
- "title": row["title"],
- "year": row["year"],
- "stars": set()
- }
- # Load stars
- with open(f"{directory}/stars.csv", encoding="utf-8") as f:
- reader = csv.DictReader(f)
- for row in reader:
- try:
- people[row["person_id"]]["movies"].add(row["movie_id"])
- movies[row["movie_id"]]["stars"].add(row["person_id"])
- except KeyError:
- pass
- def main():
- if len(sys.argv) > 2:
- sys.exit("Usage: python degrees.py [directory]")
- directory = sys.argv[1] if len(sys.argv) == 2 else "large"
- # Load data from files into memory
- print("Loading data...")
- load_data(directory)
- print("Data loaded.")
- source = person_id_for_name(input("Name: "))
- if source is None:
- sys.exit("Person not found.")
- target = person_id_for_name(input("Name: "))
- if target is None:
- sys.exit("Person not found.")
- path = shortest_path(source, target)
- if path is None:
- print("Not connected.")
- else:
- degrees = len(path)
- print(f"{degrees} degrees of separation.")
- path = [(None, source)] + path
- for i in range(degrees):
- person1 = people[path[i][1]]["name"]
- person2 = people[path[i + 1][1]]["name"]
- movie = movies[path[i + 1][0]]["title"]
- print(f"{i + 1}: {person1} and {person2} starred in {movie}")
- def shortest_path(source, target):
- """
- Returns the shortest list of (movie_id, person_id) pairs
- that connect the source to the target.
- If no possible path, returns None.
- """
- start = Node(state=source, parent=None, action=None)
- frontier.add(start) # we add the starting node to the queue frontier
- while True:
- if frontier.empty():
- raise Exception("no solution")
- node = frontier.remove() # pop node from queue
- if node.state == target: # found it!
- return built_path(node)
- neighbors = [] # a container of all the neighbors of that node
- neighbors = neighbors_for_person(node.state)
- for neighbor in neighbors:
- if neighbor[0] not in explored: # this movie path is unexplored
- if neighbor[1] == node.state: continue # do not backtrack, check next neighbor
- if neighbor[1] == target: # found it!
- child = Node(state=neighbor[1], parent=node, action=neighbor[0])
- return built_path(child)
- if not frontier.contains_state(neighbor[1]): # keep searching
- child = Node(state=neighbor[1], parent=node, action=neighbor[0])
- frontier.add(child)
- explored.add(neighbor[0])
- # we back track to follow the nodes that lead to the target and reverse
- def built_path(nd):
- path = []
- while nd.parent is not None:
- path.append((nd.action,nd.state))
- nd = nd.parent
- path.reverse()
- return path
- def person_id_for_name(name):
- """
- Returns the IMDB id for a person's name,
- resolving ambiguities as needed.
- """
- person_ids = list(names.get(name.lower(), set()))
- if len(person_ids) == 0:
- return None
- elif len(person_ids) > 1:
- print(f"Which '{name}'?")
- for person_id in person_ids:
- person = people[person_id]
- name = person["name"]
- birth = person["birth"]
- print(f"ID: {person_id}, Name: {name}, Birth: {birth}")
- try:
- person_id = input("Intended Person ID: ")
- if person_id in person_ids:
- return person_id
- except ValueError:
- pass
- return None
- else:
- return person_ids[0]
- def neighbors_for_person(person_id):
- """
- Returns (movie_id, person_id) pairs for people
- who starred with a given person.
- """
- source_actor = person_id
- movie_ids = people[person_id]["movies"]
- neighbors = set()
- for movie_id in movie_ids:
- if movie_id not in explored:
- for actor in movies[movie_id]["stars"]: # do not add neighbor if already in the frontier because it will take longer +1 extra movie
- if actor != source_actor and not frontier.contains_state(actor): # do not add source to the neighbors' list extra cost
- neighbors.add((movie_id, actor))
- return neighbors
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement