Advertisement
den4ik2003

Untitled

Nov 11th, 2024
62
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.16 KB | None | 0 0
  1. import socket
  2. import threading
  3. import collections
  4.  
  5. class UDPBasedProtocol:
  6.     def __init__(self, *, local_addr, remote_addr):
  7.         self.udp_socket = socket.socket(
  8.             family=socket.AF_INET, type=socket.SOCK_DGRAM)
  9.         self.remote_addr = remote_addr
  10.         self.udp_socket.bind(local_addr)
  11.  
  12.     def sendto(self, data):
  13.         return self.udp_socket.sendto(data, self.remote_addr)
  14.  
  15.     def recvfrom(self, n):
  16.         msg, addr = self.udp_socket.recvfrom(n)
  17.         return msg
  18.  
  19.     def close(self):
  20.         self.udp_socket.close()
  21.  
  22.  
  23. class MyTCPProtocol(UDPBasedProtocol):
  24.     def __init__(self, *args, **kwargs):
  25.         super().__init__(*args, **kwargs)
  26.         self.udp_socket.settimeout(0.0005)
  27.         self.seq = 0
  28.         self.ack = 0
  29.         self.ack_data = b''
  30.         self.received_data = b''
  31.         self.saved_data_lock = threading.Lock()
  32.         self.buffer_size = 12000
  33.         self.status = True # активно ли соединение
  34.         self.send_lock = threading.Condition()
  35.         self.recv_lock = threading.Condition()
  36.         self.recv_result_lock = threading.Condition()
  37.         self.ack_check_lock = threading.Lock()
  38.         self.data_to_send = collections.deque()
  39.         self.num_to_receive = 0
  40.         self.recv_data = b''
  41.         self.send_thread = threading.Thread(target=self.send_call)
  42.         self.recv_thread = threading.Thread(target=self.recv_call)
  43.         self.send_thread.daemon = True
  44.         self.recv_thread.daemon = True
  45.         self.send_thread.start()
  46.         self.recv_thread.start()
  47.    
  48.     @staticmethod
  49.     def parse_ack_message(data):
  50.         return int.from_bytes(data[0: 1], 'big'), int.from_bytes(data[1: 5], 'big')
  51.  
  52.     @staticmethod
  53.     def parse_message_with_data(data):
  54.         return int.from_bytes(data[0: 1], 'big'), int.from_bytes(data[1: 5], 'big'), int.from_bytes(data[5: 9], 'big'), data[9: len(data)]
  55.  
  56.     @staticmethod
  57.     def create_ack_message(ack):
  58.         return int.to_bytes(0, 1, 'big') + int.to_bytes(ack, 4, 'big')
  59.  
  60.     @staticmethod
  61.     def create_package(num_of_bytes, seq, data):
  62.         return int.to_bytes(1, 1, 'big') + int.to_bytes(num_of_bytes, 4, 'big') + int.to_bytes(seq, 4, 'big') + data
  63.  
  64.     def send_call(self):
  65.         current_length_of_data = 0
  66.         while self.status:
  67.             data = b''
  68.             with self.send_lock:
  69.                 while self.status and len(self.data_to_send) == 0:
  70.                     self.send_lock.wait()
  71.  
  72.                 if not self.status:
  73.                     break
  74.  
  75.                 if current_length_of_data == 0:
  76.                     current_length_of_data = len(self.data_to_send[0])
  77.  
  78.                 length = min(self.buffer_size, current_length_of_data)
  79.                 data += self.data_to_send[0][0:length]
  80.                 self.data_to_send[0] = self.data_to_send[0][length:]
  81.  
  82.                 package = MyTCPProtocol.create_package(
  83.                     current_length_of_data, self.seq, data)
  84.  
  85.                 if len(self.data_to_send[0]) == 0:
  86.                     self.data_to_send.popleft()
  87.                     current_length_of_data = 0
  88.  
  89.             while True:
  90.                 final_flag = False
  91.                 while True:
  92.                     if not self.status:
  93.                         return
  94.                     self.sendto(package)
  95.                     while True:
  96.                         received = b''
  97.                         try:
  98.                             received = self.recvfrom(self.buffer_size + 10)
  99.                         except Exception as e:
  100.                             with self.ack_check_lock:
  101.                                 if len(self.ack_data) != 0:
  102.                                     received = self.ack_data
  103.                                     self.ack_data = b''
  104.                                 else:
  105.                                     break
  106.                         flag, ack = MyTCPProtocol.parse_ack_message(received)
  107.                         if flag != 0:
  108.                             with self.saved_data_lock:
  109.                                 self.received_data = received
  110.                             continue
  111.  
  112.                         if ack > self.seq:
  113.                             self.seq = ack
  114.                             final_flag = True
  115.                             break
  116.  
  117.                         break
  118.                     if final_flag:
  119.                         break
  120.                 if final_flag:
  121.                     break
  122.  
  123.     def send(self, data: bytes):
  124.         with self.send_lock:
  125.             self.data_to_send.append(data)
  126.             self.send_lock.notify_all()
  127.         return len(data)
  128.  
  129.     def recv_call(self):
  130.         while self.status:
  131.             result = b''
  132.             with self.recv_lock:
  133.                 while self.status and self.num_to_receive == 0:
  134.                     self.recv_lock.wait()
  135.                 if not self.status:
  136.                     break
  137.             prev_ack = self.ack
  138.             while True:
  139.                 try:
  140.                     received = self.recvfrom(self.buffer_size + 10)
  141.                 except Exception as e:
  142.                     with self.saved_data_lock:
  143.                         if len(self.received_data) != 0:
  144.                             received = self.received_data
  145.                             self.received_data = b''
  146.                         else:
  147.                             continue
  148.                 flag, num_of_bytes, seq, data = MyTCPProtocol.parse_message_with_data(received)
  149.                 if flag != 1:
  150.                     with self.ack_check_lock:
  151.                         self.ack_data = received
  152.                     continue
  153.  
  154.                 if seq != self.ack or (len(data) != self.buffer_size and len(data) != num_of_bytes - (self.ack - prev_ack)):
  155.                     message = MyTCPProtocol.create_ack_message(self.ack)
  156.                     self.sendto(message)
  157.                 else:
  158.                     self.ack += len(data)
  159.                     result += data
  160.                     message = MyTCPProtocol.create_ack_message(
  161.                         self.ack)
  162.                     self.sendto(message)
  163.                 if self.ack - prev_ack == num_of_bytes:
  164.                     for _ in range(5):
  165.                         self.sendto(message)
  166.                     break
  167.  
  168.             self.recv_data = result
  169.             self.num_to_receive = 0
  170.             with self.recv_result_lock:
  171.                 self.recv_result_lock.notify_all()
  172.         return result
  173.  
  174.     def recv(self, n: int):
  175.         result = bytes()
  176.         with self.recv_lock:
  177.             self.num_to_receive = n
  178.             self.recv_lock.notify_all()
  179.         with self.recv_result_lock:
  180.             if len(self.recv_data) == 0:
  181.                 self.recv_result_lock.wait()
  182.                 result += self.recv_data
  183.                 self.recv_data = b''
  184.             else:
  185.                 self.recv_data = b''
  186.             return result
  187.  
  188.     def close(self):
  189.         self.status = False
  190.         with self.send_lock:
  191.             self.send_lock.notify_all()
  192.         with self.recv_lock:
  193.             self.recv_lock.notify_all()
  194.         self.send_thread.join()
  195.         self.recv_thread.join()
  196.         super().close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement