Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import socketserver
- from collections import namedtuple
- from fl_networking_tools import get_binary, send_binary
- from threading import Event
- '''
- Commands:
- COMMANDS
- QUES - question command
- SANS - answer to question
- JOIN - request to join
- '''
- class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
- pass
- NUMBER_OF_PLAYERS = 2
- players = []
- num_answers=0
- ready_to_start = Event()
- wait_for_answers = Event()
- # Named tuples are extensions of the tuple structure, with contents you can refer to by name. In this case, the question will be held in a variable named q and the answer in answer.
- # This is just the set up of the question - it will be sent to the client using the send_binary function when a request is made.
- Question = namedtuple('Question', ['q', 'answer'])
- q1 = Question("Expand the acronym ALU", "Arithmetic Logic Unit")
- # The socketserver module uses 'Handlers' to interact with connections. When a client connects a version of this class is made to handle it.
- class QuizGame(socketserver.BaseRequestHandler):
- # The handle method is what actually handles the connection
- def handle(self):
- #Retrieve Command
- for request in get_binary(self.request):
- # Add this to the top of handle()
- global players # Make sure this is global
- global num_answers
- num_answer=0
- if request[0] == "JOIN":
- # Code to handle join
- team_name = request[1]
- players.append(team_name)
- if len(players) == NUMBER_OF_PLAYERS:
- # If correct number of players
- ready_to_start.set()
- # Trigger the event
- # Send the confirmation response
- send_binary(self.request, [2, "Game on; Good Luck"])
- # Wait for the ready to start event
- ready_to_start.wait()
- elif request[0] == "QUES":
- #Send question
- send_binary(self.request, (1, q1.q))
- #Your server code goes here
- elif request[0] == 'SANS':
- num_answers+=1
- print(q1.answer,request[1])
- if num_answers != len(players):
- # Trigger the event
- wait_for_answers.wait()
- wait_for_answers.set()
- if request[1].replace(' ','').replace('\t','').lower() == q1.answer.replace(' ','').replace('\t','').lower():
- send_binary(self.request, (2,'Correct, Answer is: '+ q1.answer))
- else:
- send_binary(self.request, (3,'Incorrect. Answer is: '+ q1.answer))
- num_answers=0
- # Open the quiz server and bind it to a port - creating a socket
- # This works similarly to the sockets you used before, but you have to give it both an address pair (IP and port) and a handler for the server.
- quiz_server = ThreadedTCPServer(('127.0.0.1', 2065), QuizGame)
- quiz_server.serve_forever()
Add Comment
Please, Sign In to add comment