Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import sys
- import time
- from threading import Thread
- import serial
- from pycad import Logger
- from pycad import Shared
- from pycad.Shared import TitanStatic
- from pycad.Shared import VMXStatic
- from pycad.Funcad import Funcad
- class TitanCOM:
- @classmethod
- def start_com(cls) -> None:
- th: Thread = Thread(target=cls.com_loop)
- th.daemon = True
- th.start()
- @classmethod
- def com_loop(cls) -> None:
- try:
- ser = serial.Serial(
- port='/dev/ttyACM0',
- baudrate=115200,
- parity=serial.PARITY_NONE,
- stopbits=serial.STOPBITS_ONE,
- bytesize=serial.EIGHTBITS
- )
- start_time: int = round(time.time() * 10000)
- send_count_time: float = time.time()
- comm_counter = 0
- while True:
- rx_data: bytearray = bytearray(ser.read(48))
- ser.reset_input_buffer() # reset buffer
- rx_time: int = round(time.time() * 10000)
- TitanCOM.set_up_rx_data(rx_data)
- Shared.info_holder.rx_com_time_dev = str(round(time.time() * 10000) - rx_time)
- tx_time: int = round(time.time() * 10000)
- tx_data = TitanCOM.set_up_tx_data()
- Shared.info_holder.tx_com_time_dev = str(round(time.time() * 10000) - tx_time)
- ser.reset_output_buffer() # reset buffer
- ser.write(tx_data)
- ser.flush()
- comm_counter += 1
- if time.time() - send_count_time > 1:
- send_count_time = time.time()
- Shared.info_holder.com_count_dev = str(comm_counter)
- comm_counter = 0
- time.sleep(0.002)
- Shared.info_holder.com_time_dev = str(round(time.time() * 10000) - start_time)
- start_time = round(time.time() * 10000)
- except (Exception, serial.SerialException) as e:
- exc_type, exc_obj, exc_tb = sys.exc_info()
- file_name = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
- Logger.write_com_log(" ".join(map(str, [exc_type, file_name, exc_tb.tb_lineno])))
- Logger.write_com_log(str(e))
- @staticmethod
- def set_up_rx_data(data: bytearray) -> None:
- if data[42] != 33:
- if data[0] == 1:
- if data[24] == 111:
- raw_enc_0: int = (data[2] & 0xff) << 8 | (data[1] & 0xff)
- raw_enc_1: int = (data[4] & 0xff) << 8 | (data[3] & 0xff)
- raw_enc_2: int = (data[6] & 0xff) << 8 | (data[5] & 0xff)
- raw_enc_3: int = (data[8] & 0xff) << 8 | (data[7] & 0xff)
- TitanCOM.set_up_encoders(raw_enc_0, raw_enc_1, raw_enc_2, raw_enc_3)
- # нужно поправить потом, красиво сделать
- TitanStatic.limit_1_m_2 = Funcad.access_bit(data[9], 5)
- TitanStatic.limit_l_m_3 = Funcad.access_bit(data[9], 4)
- TitanStatic.limit_h_m_3 = Funcad.access_bit(data[9], 6)
- TitanStatic.limit_1_m_0 = Funcad.access_bit(data[10], 1)
- TitanStatic.limit_2_m_0 = Funcad.access_bit(data[10], 2)
- TitanStatic.limit_1_m_1 = Funcad.access_bit(data[10], 3)
- TitanStatic.limit_2_m_1 = Funcad.access_bit(data[10], 4)
- TitanStatic.limit_2_m_2 = Funcad.access_bit(data[10], 5)
- # достигнута ли позиция лифта
- TitanStatic.lift_pos_reached = Funcad.access_bit(data[10], 6)
- sign_pid_0: int = 1 if Funcad.access_bit(data[19], 1) else -1
- sign_pid_1: int = 1 if Funcad.access_bit(data[19], 2) else -1
- sign_pid_2: int = 1 if Funcad.access_bit(data[19], 3) else -1
- sign_pid_3: int = 1 if Funcad.access_bit(data[19], 4) else -1
- sign_x: int = 1 if Funcad.access_bit(data[19], 5) else -1
- sign_y: int = 1 if Funcad.access_bit(data[19], 6) else -1
- TitanStatic.speed_motor_0_pid = \
- ((data[12] & 0xff) << 8 | (data[11] & 0xff)) / 65535 * 100 * sign_pid_0
- TitanStatic.speed_motor_1_pid = \
- ((data[14] & 0xff) << 8 | (data[13] & 0xff)) / 65535 * 100 * sign_pid_1
- TitanStatic.speed_motor_2_pid = \
- ((data[16] & 0xff) << 8 | (data[15] & 0xff)) / 65535 * 100 * sign_pid_2
- TitanStatic.speed_motor_3_pid = \
- ((data[18] & 0xff) << 8 | (data[17] & 0xff)) / 65535 * 100 * sign_pid_3
- TitanStatic.odo_x_pos = \
- ((data[21] & 0xff) << 8 | (data[20] & 0xff)) / 100 * sign_x
- TitanStatic.odo_y_pos = \
- ((data[23] & 0xff) << 8 | (data[22] & 0xff)) / 100 * sign_y
- else:
- Logger.write_com_log("received wrong data " + " ".join(map(str, data)))
- @staticmethod
- def set_up_encoders(enc0: int, enc1: int, enc2: int, enc3: int) -> None:
- TitanStatic.enc_motor_0 += TitanCOM.get_normal_diff(enc0, TitanStatic.raw_enc_motor_0)
- TitanStatic.enc_motor_1 += TitanCOM.get_normal_diff(enc1, TitanStatic.raw_enc_motor_1)
- TitanStatic.enc_motor_2 += TitanCOM.get_normal_diff(enc2, TitanStatic.raw_enc_motor_2)
- TitanStatic.enc_motor_3 += TitanCOM.get_normal_diff(enc3, TitanStatic.raw_enc_motor_3)
- TitanStatic.raw_enc_motor_0 = enc0
- TitanStatic.raw_enc_motor_1 = enc1
- TitanStatic.raw_enc_motor_2 = enc2
- TitanStatic.raw_enc_motor_3 = enc3
- @staticmethod
- def get_normal_diff(curr: int, last: int) -> int:
- diff: int = curr - last
- if diff > 30000:
- diff = -(last + (65535 - curr))
- elif diff < -30000:
- diff = curr + (65535 - last)
- return diff
- @staticmethod
- def set_up_tx_data() -> bytearray:
- tx_data: bytearray = bytearray([0] * 48)
- if TitanStatic.send_pid:
- tx_data[0] = 2
- tx_data[1] = int('0' + ("1" if TitanStatic.use_pid_motor_0 else "0") +
- ("1" if TitanStatic.use_pid_motor_1 else "0") +
- ("1" if TitanStatic.use_pid_motor_2 else "0") +
- ("1" if TitanStatic.use_pid_motor_3 else "0") + '000', 2)
- motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_0[0] * 1000000))
- tx_data[2] = motor_pids[1]
- tx_data[3] = motor_pids[2]
- tx_data[4] = motor_pids[3]
- motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_0[1] * 1000000))
- tx_data[5] = motor_pids[1]
- tx_data[6] = motor_pids[2]
- tx_data[7] = motor_pids[3]
- motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_0[2] * 1000000))
- tx_data[8] = motor_pids[1]
- tx_data[9] = motor_pids[2]
- tx_data[10] = motor_pids[3]
- motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_1[0] * 1000000))
- tx_data[11] = motor_pids[1]
- tx_data[12] = motor_pids[2]
- tx_data[13] = motor_pids[3]
- motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_1[1] * 1000000))
- tx_data[14] = motor_pids[1]
- tx_data[15] = motor_pids[2]
- tx_data[16] = motor_pids[3]
- motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_1[2] * 1000000))
- tx_data[17] = motor_pids[1]
- tx_data[18] = motor_pids[2]
- tx_data[19] = motor_pids[3]
- motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_2[0] * 1000000))
- tx_data[20] = motor_pids[1]
- tx_data[21] = motor_pids[2]
- tx_data[22] = motor_pids[3]
- motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_2[1] * 1000000))
- tx_data[23] = motor_pids[1]
- tx_data[24] = motor_pids[2]
- tx_data[25] = motor_pids[3]
- motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_2[2] * 1000000))
- tx_data[26] = motor_pids[1]
- tx_data[27] = motor_pids[2]
- tx_data[28] = motor_pids[3]
- motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_3[0] * 1000000))
- tx_data[29] = motor_pids[1]
- tx_data[30] = motor_pids[2]
- tx_data[31] = motor_pids[3]
- motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_3[1] * 1000000))
- tx_data[32] = motor_pids[1]
- tx_data[33] = motor_pids[2]
- tx_data[34] = motor_pids[3]
- motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_3[2] * 1000000))
- tx_data[35] = motor_pids[1]
- tx_data[36] = motor_pids[2]
- tx_data[37] = motor_pids[3]
- tx_data[38] = 222
- TitanStatic.send_pid = False
- else:
- tx_data[0] = 1
- tx_data[1] = int('1' + '0000001', 2)
- motor_speeds: bytearray = Funcad.int_to_byte_4(abs(int(TitanStatic.speed_motor_0 / 100 * 65535)))
- tx_data[2] = motor_speeds[2]
- tx_data[3] = motor_speeds[3]
- motor_speeds: bytearray = Funcad.int_to_byte_4(abs(int(TitanStatic.speed_motor_1 / 100 * 65535)))
- tx_data[4] = motor_speeds[2]
- tx_data[5] = motor_speeds[3]
- motor_speeds: bytearray = Funcad.int_to_byte_4(abs(int(TitanStatic.speed_motor_2 / 100 * 65535)))
- tx_data[6] = motor_speeds[2]
- tx_data[7] = motor_speeds[3]
- motor_speeds: bytearray = Funcad.int_to_byte_4(abs(int(TitanStatic.speed_motor_3 / 100 * 65535)))
- tx_data[8] = motor_speeds[2]
- tx_data[9] = motor_speeds[3]
- tx_data[10] = int('1' + ("1" if TitanStatic.speed_motor_0 >= 0 else "0") +
- ("1" if TitanStatic.speed_motor_1 >= 0 else "0") +
- ("1" if TitanStatic.speed_motor_2 >= 0 else "0") +
- ("1" if TitanStatic.speed_motor_3 >= 0 else "0") + '001', 2)
- lift_poss: bytearray = Funcad.int_to_byte_4(abs(int(TitanStatic.lift_pos / 10 * 65535)))
- tx_data[11] = lift_poss[2]
- tx_data[12] = lift_poss[3]
- tx_data[13] = int('1' + ("1" if TitanStatic.virtual_ems else "0") +
- ("1" if TitanStatic.lift_init else "0") +
- ("1" if TitanStatic.odo_reset_coords else "0") +
- ("0" if VMXStatic.yaw_unlim >= 0 else "1") +
- ("1" if TitanStatic.odo_reset_x >= 0 else "0") +
- ("1" if TitanStatic.odo_reset_y >= 0 else "0") + '1', 2)
- # на всякий случай
- TitanStatic.odo_reset_coords = False
- yaw_unsigned: bytearray = Funcad.int_to_byte_4(abs(int(-VMXStatic.yaw_unlim * 10)))
- tx_data[14] = yaw_unsigned[2]
- tx_data[15] = yaw_unsigned[3]
- reset_x_odo: bytearray = Funcad.int_to_byte_4(abs(int(TitanStatic.odo_reset_x * 10)))
- tx_data[16] = reset_x_odo[2]
- tx_data[17] = reset_x_odo[3]
- reset_y_odo: bytearray = Funcad.int_to_byte_4(abs(int(TitanStatic.odo_reset_y * 10)))
- tx_data[18] = reset_y_odo[2]
- tx_data[19] = reset_y_odo[3]
- tx_data[20] = 222
- # if TitanStatic.speed_motor_0 > 0:
- # Logger.write_com_log("received wrong data " + " ".join(map(str, tx_data)))
- return tx_data
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement