Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import random
- from enum import Enum
- class Options(Enum):
- CREATE = 1
- SELECT = 2
- clients = {} # {'id': {'name': "bob", 'writer': <writer>, 'chatroom_id': "1"}}
- chatrooms = {
- 'default_chatroom_id': "test"
- } # {'chatroom_id': 'chatroom_name'}
- header = 1024
- ip, port = 'localhost', 8888
- # Define a simple XOR-based encryption and decryption function
- def simple_xor_encrypt(message, key):
- encrypted_message = bytearray()
- for char in message.encode():
- encrypted_char = char ^ key
- encrypted_message.append(encrypted_char)
- return encrypted_message.decode()
- def simple_xor_decrypt(encrypted_message, key):
- decrypted_message = bytearray()
- for char in encrypted_message.encode():
- decrypted_char = char ^ key
- decrypted_message.append(decrypted_char)
- return decrypted_message.decode()
- async def send_message(writer, message, key):
- encrypted_message = simple_xor_encrypt(message, key)
- writer.write(encrypted_message.encode())
- await writer.drain()
- class Client:
- def __init__(self, writer="", reader=""):
- self.id = "usr" + str(random.random())
- self.writer = writer
- self.reader = reader
- self.name = ""
- self.chatroom_id = ""
- self.client_address = writer.get_extra_info('peername')
- print(f"New connection from {self.client_address}")
- def get_user_profile(self):
- return {'name': self.name, 'chatroom_id': self.chatroom_id, 'writer': self.writer}
- async def broadcast_to_all(self, message, key):
- for user_id, values in clients.items():
- if user_id != self.id:
- await send_message(values['writer'], f"{message}", key)
- async def multicast_to_chat(self, message, key):
- for user_id, values in clients.items():
- if user_id != self.id and values['chatroom_id'] == self.chatroom_id:
- await send_message(values['writer'], f"{message}", key)
- async def send_message(self, message, key):
- encrypted_message = simple_xor_encrypt(message, key)
- self.writer.write(encrypted_message.encode())
- await self.writer.drain()
- async def receive_message(self, key):
- data = await self.reader.read(header)
- decrypted_message = simple_xor_decrypt(data.decode(), key)
- return decrypted_message.strip()
- async def client_req_and_res(self, message, key):
- await self.send_message(message, key)
- return await self.receive_message(key)
- async def remove_client(self, key):
- del clients[self.id]
- await self.multicast_to_chat(f"{self.name} has left the chat!\n", key)
- self.writer.close()
- await self.writer.wait_closed()
- print(f"Connection closed for {self.client_address}")
- async def choose_name(self, key):
- self.name = await self.client_req_and_res("Welcome to the chatroom! Please enter your name: \n", key)
- print(f"{self.name} has joined the server")
- async def choose_chat(self, key):
- option = ""
- while option not in [Options.CREATE, Options.SELECT]:
- chatroom_names = "\n".join(chatrooms.values())
- option = await self.client_req_and_res(f"\n1. Create a Chatroom\n2. Select a Chatroom:\n{chatroom_names}\n", key)
- try:
- option = Options(int(option)) # Convert user input to enum
- except ValueError:
- option = None
- print(f"{option} is chosen")
- while True:
- if option == Options.CREATE:
- chatroom_name = await self.client_req_and_res("Enter the name of a new chatroom: ", key)
- print(f"{chatroom_name} chatroom is created")
- self.chatroom_id = "cht" + str(random.random())
- chatrooms[self.chatroom_id] = chatroom_name
- break
- elif option == Options.SELECT:
- chatroom_name = await self.client_req_and_res("Enter the name of a chatroom: ", key)
- self.chatroom_id = find_id_by_name(chatroom_name, chatrooms)
- if not self.chatroom_id:
- continue
- break
- await self.multicast_to_chat(f"\n{self.name} has joined the chat!\n", key)
- print(f"{self.chatroom_id} chatroom is accessed on the server")
- async def chat_with_others_in_room(self, key):
- while True:
- data = await self.reader.read(header)
- if data == b'\r\n':
- continue # Skip empty messages
- decrypted_message = simple_xor_decrypt(data.decode(), key)
- message = f"{self.name}| {decrypted_message}"
- await self.multicast_to_chat(message, key)
- async def handle_client(reader, writer):
- client = Client(writer, reader)
- await client.choose_name(encryption_key)
- await client.choose_chat(encryption_key)
- clients[client.id] = client.get_user_profile()
- try:
- await client.chat_with_others_in_room(encryption_key)
- except asyncio.CancelledError:
- pass
- except Exception as e:
- print(f"Error handling client {client.name}: {e}")
- finally:
- await client.remove_client(encryption_key)
- del client
- async def main():
- server = await asyncio.start_server(handle_client, ip, port)
- async with server:
- await server.serve_forever()
- if __name__ == "__main__":
- encryption_key = 42 # Replace with your own encryption key
- asyncio.run(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement