Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import sys
- import serial
- from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QComboBox, QLabel, QLineEdit, QTextEdit, QMessageBox, \
- QPushButton
- from PyQt5.QtCore import QThread, pyqtSignal, Qt, QTimer
- from PyQt5.QtGui import QFont, QTextCursor, QTextCharFormat
- from PyQt5 import QtCore
- class SerialDataReader(QThread):
- data_received = pyqtSignal(bytes)
- def __init__(self, serial_port):
- super().__init__()
- self.serial_port = serial_port
- self.running = True
- def stop(self):
- self.running = False
- def run(self):
- while self.running:
- if self.should_emit_data():
- data = self.read_data()
- self.data_received.emit(data)
- def should_emit_data(self):
- return self.serial_port and self.serial_port.is_open
- def read_data(self):
- return self.serial_port.read(self.serial_port.in_waiting)
- class TextEditCustom(QTextEdit):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.cursor_position = QTextCursor.End
- def keyPressEvent(self, event):
- if event.key() == Qt.Key_Backspace:
- event.ignore()
- else:
- super().keyPressEvent(event)
- self.move_cursor_to_end()
- def move_cursor_to_end(self):
- cursor = self.textCursor()
- cursor.movePosition(QTextCursor.End)
- self.setTextCursor(cursor)
- def serial_ports():
- ports = ['COM%s' % (i + 1) for i in range(9)]
- result = []
- for port in ports:
- try:
- s = serial.Serial(port, 9600)
- s.close()
- result.append(port[3::])
- except (OSError, serial.SerialException):
- pass
- return result
- class SerialCommunicationWindow(QMainWindow):
- error_occurred = pyqtSignal()
- def __init__(self):
- super().__init__()
- self.serial_port = None
- self.serial_baudrate = 9600
- self.serial_stopbits = serial.STOPBITS_ONE
- self.serial_data_bits = serial.EIGHTBITS
- self.serial_parity = serial.PARITY_NONE
- self.serial_flow_control = False
- self.serial_read_timeout = None
- self.serial_write_timeout = None
- font = QFont("Times New Roman", 12)
- app = QApplication.instance()
- if not app:
- app = QApplication([])
- app.setFont(font)
- self.resize(609, 530)
- self.setMinimumSize(1000, 530)
- self.setMaximumSize(1001, 530)
- # self.setFixedSize(QSize(609, 530))
- # self.setFixedWidth(609)
- # self.setFixedHeight(530)
- self.central_widget = QWidget(self)
- self.setCentralWidget(self.central_widget)
- self.central_widget.setStyleSheet("background-color: #73B1F4;")
- self.input_label = QLabel('Input:', self.central_widget)
- self.input_label.setGeometry(QtCore.QRect(30, 10, 121, 21))
- self.input_label.setFont(font)
- self.input_label.setAlignment(Qt.AlignCenter) # выравнивание по центру
- self.input_label.setGeometry(QtCore.QRect(30, 10, 351, 21))
- self.input_message = TextEditCustom(self.central_widget)
- self.input_message.setGeometry(QtCore.QRect(30, 50, 351, 161))
- self.input_message.setFont(font)
- self.input_message.setStyleSheet("background-color: lightBlue;")
- self.output_label = QLabel('Output:', self.central_widget)
- self.output_label.setGeometry(QtCore.QRect(30, 300, 121, 21))
- self.output_label.setFont(font)
- self.output_label.setAlignment(Qt.AlignCenter)
- self.output_label.setGeometry(QtCore.QRect(30, 300, 351, 21))
- self.output_message = QTextEdit(self.central_widget)
- self.output_message.setReadOnly(True)
- self.output_message.setGeometry(QtCore.QRect(30, 340, 351, 161))
- self.output_message.setFont(font)
- self.output_message.setStyleSheet("background-color: lightBlue;")
- self.com_port_label = QLabel('COM port:', self.central_widget)
- self.com_port_label.setGeometry(QtCore.QRect(410, 50, 110, 21))
- self.com_port_label.setFont(font)
- self.com_port_input = QLineEdit(self.central_widget)
- self.com_port_input.setGeometry(QtCore.QRect(520, 50, 50, 22))
- self.com_port_input.setStyleSheet("background-color: lightBlue;")
- self.stopbits_label = QLabel('Stop bits:', self.central_widget)
- self.stopbits_label.setGeometry(QtCore.QRect(410, 90, 110, 21))
- self.stopbits_label.setFont(font)
- self.stopbits_input = QComboBox(self.central_widget)
- self.stopbits_input.setGeometry(QtCore.QRect(520, 90, 60, 22))
- self.stopbits_input.addItems(['1', '1.5', '2'])
- self.stopbits_input.setStyleSheet("background-color: lightBlue;")
- self.bytes_received_label = QLabel('Bytes received:', self.central_widget)
- self.bytes_received_label.setGeometry(QtCore.QRect(410, 370, 220, 21))
- self.bytes_received_label.setFont(font)
- self.bytes_received_value = QLabel('0', self.central_widget)
- self.bytes_received_value.setGeometry(QtCore.QRect(537, 370, 230, 21))
- self.baudrate_label = QLabel('Baudrate:', self.central_widget)
- self.baudrate_label.setGeometry(QtCore.QRect(410, 340, 120, 21))
- self.baudrate_label.setFont(font)
- self.baudrate_value = QLabel(str(self.serial_baudrate), self.central_widget)
- self.baudrate_value.setGeometry(QtCore.QRect(500, 340, 130, 21))
- self.kadr_label = QLabel('Packet structure:', self.central_widget)
- self.kadr_label.setGeometry(QtCore.QRect(410, 400, 140, 21))
- self.kadr_window = QTextEdit(self.central_widget)
- self.kadr_window.setGeometry(QtCore.QRect(410, 430, 560, 70))
- # self.output_message.setGeometry(QtCore.QRect(30, 340, 351, 161))
- self.kadr_window.setReadOnly(True)
- self.kadr_window.setVerticalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
- self.kadr_window.setStyleSheet("background-color: lightBlue;")
- clear_button = QPushButton('Clear', self.central_widget)
- clear_button.setGeometry(150, 240, 110, 30)
- clear_button.setFont(font)
- clear_button.clicked.connect(self.clear_text_areas)
- clear_button.setStyleSheet("background-color: lightBlue;")
- self.keyword_combinations = ["kw2", "kw1"]
- self.available_com_ports = serial_ports()
- self.last_used_port_index = 0
- self.previous_port = None
- self.user_ports = []
- if self.available_com_ports:
- self.set_available_port()
- self.text_to_send = ""
- self.error_occurred.connect(self.handle_error)
- self.stopbits_input.currentIndexChanged.connect(
- lambda index: setattr(self, 'serial_stopbits', float(self.stopbits_input.itemText(index))))
- self.com_port_input.textChanged.connect(self.connection)
- self.input_message.textChanged.connect(self.send_data)
- def clear_text_areas(self):
- self.input_message.clear()
- self.output_message.clear()
- def set_available_port(self):
- for i in range(self.last_used_port_index, len(self.available_com_ports)):
- try:
- s = serial.Serial('COM' + self.available_com_ports[i], 9600)
- s.close()
- self.com_port_input.setText(self.available_com_ports[i])
- self.last_used_port_index = i + 1
- self.connection()
- break
- except serial.SerialException:
- continue
- def connection(self):
- port_number_str = self.com_port_input.text()
- if not port_number_str:
- QMessageBox.critical(self, 'Error', "Please enter a valid COM port number.")
- return
- try:
- port_number = int(port_number_str)
- if not (1 <= port_number <= 9):
- QMessageBox.critical(self, 'Error', "COM port number should be between 1 and 9.")
- return
- port_name = f'COM{port_number}'
- except ValueError:
- QMessageBox.critical(self, 'Error', "Please enter a valid numeric COM port number.")
- return
- self.previous_port = port_number_str
- if self.previous_port not in self.user_ports:
- self.user_ports.append(self.previous_port)
- try:
- self.disconnection()
- self.setup_serial_port(port_name)
- self.start_read()
- except serial.SerialException as e:
- error_message = str(e)
- if "FileNotFoundError" in error_message:
- error_message = "COM port not found!"
- QMessageBox.critical(self, 'Error', error_message)
- QTimer.singleShot(1000, self.error_occurred.emit)
- def handle_error(self):
- if self.user_ports:
- for port in self.user_ports:
- if port in self.available_com_ports:
- self.com_port_input.setText(port)
- QTimer.singleShot(1000, lambda: self.connection()) # Add a delay before reconnecting
- return
- QTimer.singleShot(1000, lambda: self.set_available_port()) # Add a delay before setting an available port
- def setup_serial_port(self, port_name):
- self.serial_port = serial.Serial(
- port_name,
- self.serial_baudrate,
- stopbits=self.serial_stopbits,
- bytesize=self.serial_data_bits,
- parity=self.serial_parity,
- xonxoff=self.serial_flow_control,
- timeout=self.serial_read_timeout,
- write_timeout=self.serial_write_timeout
- )
- def start_read(self):
- self.read_thread = SerialDataReader(self.serial_port)
- self.read_thread.data_received.connect(self.data_received)
- self.read_thread.start()
- def disconnection(self):
- if self.serial_port:
- self.read_thread.stop()
- self.serial_port.close()
- self.serial_port = None
- ###############################################################################
- def send_data(self):
- packet_length = 31
- if self.serial_port:
- text = self.input_message.toPlainText()
- if text:
- self.text_to_send += text[-1]
- while len(self.text_to_send) >= packet_length:
- data_to_send = self.create_packet(self.text_to_send[:packet_length])
- self.text_to_send = self.text_to_send[packet_length:]
- self.show_packet(data_to_send)
- self.serial_port.write(data_to_send.encode())
- ################################################################################
- def data_received(self, data):
- if self.serial_port:
- self.bytes_received_value.setText(
- str(int(self.bytes_received_value.text()) + len(data)))
- data = self.byte_destuffing(data[1::].decode())
- self.output_message.moveCursor(QTextCursor.End)
- self.output_message.insertPlainText(data[3:-1])
- ################################################################################
- def show_packet(self, data):
- data = data.replace('\n', '\\n')
- self.kadr_window.clear()
- cursor = self.kadr_window.textCursor()
- cursor.beginEditBlock()
- cursor.movePosition(QTextCursor.End)
- format = QTextCharFormat()
- format.setFontWeight(QFont.Bold)
- format.setForeground(Qt.blue)
- normal_format = QTextCharFormat()
- normal_format.setFontWeight(QFont.Normal)
- normal_format.setForeground(Qt.black)
- i = 0
- while i < len(data):
- if any(data[i:i + len(keyword)] == keyword for keyword in self.keyword_combinations):
- for keyword in self.keyword_combinations:
- if data[i:i + len(keyword)] == keyword:
- cursor.setCharFormat(format)
- cursor.insertText(data[i:i + len(keyword)])
- i += len(keyword)
- break
- else:
- cursor.setCharFormat(normal_format)
- cursor.insertText(data[i])
- i += 1
- cursor.endEditBlock()
- def byte_stuffing(self, data):
- data = data.replace('$', 'kw2')
- data = data.replace('#x', 'kw1')
- return data
- def byte_destuffing(self, data):
- data = data.replace('kw1', '#x')
- data = data.replace('kw2', '$')
- return data
- def create_packet(self, data):
- n = 24
- flag = '#' + chr(ord('a') + n - 1)
- source_address = self.com_port_input.text()
- destination_address = '0'
- fcs = '0'
- packet = destination_address + source_address + data + fcs
- print(packet)
- packet = self.byte_stuffing(packet)
- print(packet)
- packet = flag + packet
- print(packet)
- return packet
- def closeEvent(self, event):
- if self.serial_port:
- self.read_thread.stop()
- self.serial_port.close()
- self.serial_port = None
- event.accept()
- if __name__ == '__main__':
- app = QApplication(sys.argv)
- window = SerialCommunicationWindow()
- window.show()
- sys.exit(app.exec_())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement