Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # Filename: server_byte_stuffing_demo.py
- # Version: 1.0.0
- # Author: Jeoi Reqi
- """
- Description:
- - This script demonstrates a byte-stuffing algorithm and a TCP server function to process byte-stuffed data over TCP.
- - Byte stuffing is a technique used in data transmission to avoid ambiguity caused by control characters.
- - The script showcases how byte stuffing can be applied to data and how a server can handle byte-stuffed data over TCP.
- Requirements:
- - Python 3.x
- Functions:
- - byte_stuffing(data):
- Performs byte stuffing on the input data.
- - process_data(data):
- Processes the byte-stuffed data received from the client.
- - start_server(host, port, duration, received_data_list):
- Starts a TCP server that listens for incoming connections from clients and processes byte-stuffed data for a specified duration.
- - send_data(host, port, data):
- Sends byte-stuffed data to the server.
- - save_output(output, received_data_list):
- Saves the terminal output and received data to a file.
- - main():
- Main function to automate the process of starting the server, sending data, and handling user interaction for saving output.
- Expected Example Output:
- Byte-stuffed data: 017d5e027d5d03
- Server listening on 127.0.0.1:1337...
- Data sent to the server: b'\x01}^\x02}]\x03'
- Connection from ('127.0.0.1', 52261)
- Processed data: b'\x01~\x02}\x03'
- Received data from the server: b'\x01~\x02}\x03'
- Output saved successfully!
- Received: 017e027d03
- Sent: 017d5e027d5d03
- Received: 017e027d03
- Additional Notes:
- - This script is for demonstration purposes only and does not represent a full-fledged server implementation.
- """
- import socket
- import threading
- import time
- import logging
- from typing import List, Tuple
- def byte_stuffing(data: bytes) -> bytes:
- """
- Performs byte stuffing on the input data.
- Args:
- data (bytes): The input data to be byte-stuffed.
- Returns:
- bytes: The byte-stuffed data.
- """
- stuffed_data = []
- for byte in data:
- if byte in (0x7E, 0x7D):
- stuffed_data.append(
- 0x7D
- ) # Byte stuffing: Replace 0x7E and 0x7D with 0x7D, 0x5E and 0x5D respectively
- stuffed_data.append(byte ^ 0x20) # XOR with 0x20 to toggle the fifth bit
- else:
- stuffed_data.append(byte)
- return bytes(stuffed_data)
- def process_data(data: bytes) -> bytes:
- """
- Processes the byte-stuffed data received from the client.
- Args:
- data (bytes): The byte-stuffed data received from the client.
- Returns:
- bytes: The processed data.
- """
- processed_data = []
- escape_next = False
- for byte in data:
- if escape_next:
- processed_data.append(
- byte ^ 0x20
- ) # XOR with 0x20 to remove the escape character
- escape_next = False
- elif byte == 0x7D:
- escape_next = True # Set escape_next flag if 0x7D is encountered
- else:
- processed_data.append(byte)
- return bytes(processed_data)
- def start_server(
- host: str, port: int, duration: int, received_data_list: List[Tuple[bytes, bytes]]
- ):
- """
- Starts a TCP server that listens for incoming connections from clients and processes byte-stuffed data for a specified duration.
- Args:
- host (str): The IP address or hostname of the server.
- port (int): The port number on which the server listens.
- duration (int): The duration (in seconds) for which the server should run.
- received_data_list (List[Tuple[bytes, bytes]]): A list to store the received data and processed data.
- """
- start_time = time.time() # Record the start time
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
- server_socket.bind((host, port)) # Bind the socket to the host and port
- server_socket.listen(
- 5
- ) # Start listening for incoming connections, with a backlog of 5
- logging.info(
- f"\nServer listening on {host}:{port}..."
- ) # Log the server listening status
- # Loop until the specified duration is reached
- while (time.time() - start_time) < duration:
- # Set the timeout for the server socket
- server_socket.settimeout(duration - (time.time() - start_time))
- try:
- # Accept incoming connection and get the client socket and address
- client_socket, client_address = server_socket.accept()
- logging.info(
- f"\nConnection from {client_address}"
- ) # Log the connection
- with client_socket:
- # Loop until the client closes the connection
- while True:
- data = client_socket.recv(1024) # Receive data from the client
- if not data:
- break # Break the loop if no data is received
- if data.startswith(b"GET"):
- logging.info(
- f"\nReceived HTTP request: {data!r}"
- ) # Log HTTP request
- continue # Skip processing if it's an HTTP request
- processed_data = process_data(data) # Process received data
- logging.info(
- f"\nProcessed data: {processed_data!r}"
- ) # Log processed data
- received_data_list.append(
- (data, processed_data)
- ) # Append to received data list
- client_socket.sendall(
- processed_data
- ) # Send processed data back to the client
- except socket.timeout:
- break # Break the loop if timeout occurs
- except ConnectionResetError:
- logging.warning(
- "\nConnection was reset by the client!"
- ) # Log connection reset by client
- continue # Continue to the next iteration of the loop
- def send_data(host: str, port: int, data: bytes) -> bytes:
- """
- Sends byte-stuffed data to the server.
- Args:
- host (str): The IP address or hostname of the server.
- port (int): The port number on which the server is listening.
- data (bytes): The byte-stuffed data to be sent to the server.
- Returns:
- bytes: The received data from the server.
- """
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
- try:
- client_socket.connect((host, port)) # Connect to the server
- client_socket.sendall(data) # Send the byte-stuffed data
- logging.info(f"\nData sent to the server: {data!r}") # Log the sent data
- received_data = client_socket.recv(1024) # Receive data from the server
- logging.info(
- f"\nReceived data from the server: {received_data!r}"
- ) # Log the received data
- return received_data # Return the received data
- except ConnectionResetError:
- logging.warning(
- "\nConnection was reset by the server!"
- ) # Log connection reset by server
- raise # Raise an exception instead of returning None
- def save_output(output: str, received_data_list: List[Tuple[bytes, bytes]]):
- """
- Save the terminal output and received data to a file, excluding specific lines.
- Args:
- output (str): The terminal output to be saved.
- received_data_list (List[Tuple[bytes, bytes]]): A list containing tuples of sent and received byte-stuffed data.
- Notes:
- Lines containing the specified exclusion patterns are not written to the output file.
- """
- lines_to_exclude = [
- "027d5d03",
- "Received: 017e027d03",
- ] # Lines to exclude from output
- output_lines = output.split("\n") # Split the output into lines
- with open("output.txt", "a", encoding="utf-8") as file:
- for line in output_lines:
- if not any(exclude_line in line for exclude_line in lines_to_exclude):
- file.write(line + "\n") # Write the line to the output file
- # Write received data
- for sent, received in received_data_list:
- if received is not None:
- file.write(f"Sent: {sent.hex()}\n") # Write sent data to the file
- file.write(
- f"Received: {received.hex()}\n\n"
- ) # Write received data to the file
- def main():
- """
- Main function to automate the process of starting the server, sending data, and handling user interaction for saving output.
- """
- # Define server parameters
- HOST = "127.0.0.1"
- PORT = 1337
- DURATION = 1
- # Prepare test data for byte-stuffing
- test_data = bytes([0x01, 0x7E, 0x02, 0x7D, 0x03])
- # Perform byte stuffing on the test data
- stuffed_data = byte_stuffing(test_data)
- logging.info("Byte-stuffed data: %s", stuffed_data.hex())
- # List to store received data tuples
- received_data_list: List[Tuple[bytes, bytes]] = []
- # Start the server in a separate thread
- server_thread = threading.Thread(
- target=start_server, args=(HOST, PORT, DURATION, received_data_list)
- )
- server_thread.start()
- # Send stuffed data to the server
- received_data = send_data(HOST, PORT, stuffed_data)
- if received_data is not None:
- received_data_list.append(
- (stuffed_data, received_data)
- ) # Append received data only
- # Wait for the server thread to complete
- server_thread.join()
- # Prompt user to save the output
- save = input("\nDo you want to save the output? (y/n): ")
- if save.lower() == "y":
- # Format example data for saving
- example_data = f"\nSent: {stuffed_data.hex()}\nReceived: {received_data.hex() if received_data else 'None'}\n"
- # Save the output to a file
- save_output(example_data, received_data_list)
- logging.info("\nOutput saved successfully!\n")
- if __name__ == "__main__":
- # Configure logging settings
- logging.basicConfig(
- level=logging.INFO,
- format="%(message)s",
- handlers=[logging.FileHandler("output.txt", mode="w"), logging.StreamHandler()],
- )
- # Execute the main function
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement