Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import sys
- import can_emulator as ce
- # Init adapter CANalyst-II
- factory = ce.FactoryCanAnalyst2()
- # Назначение данных параметров описано в документации CANalyst-II
- ca2_vic = {
- 'AccCode': 0x4fa00000,
- 'AccMask': 0x00000000,
- 'Filter': 1,
- 'Timing0': 0x00,
- 'Timing1': 0x1C,
- 'Mode': 0
- }
- ca2_kwargs = {'DevType': 4, 'DevIndex': 0, 'CANIndex': 0, 'vic': ca2_vic}
- ca2 = factory.factory_method(0, 'CANAnalyst-II', **ca2_kwargs)
- # Establish connection with adapter. Returns 0 on successful connection
- ret = ca2.open_connection()
- if ret != 0:
- sys.exit('Could not establish connection to CAN-adapter CANalyst-II')
- # Импорт всех сообщений из dbc файла.
- # dbc_messages - словарь <ID, сообщение>
- dbc_messages = ce.init_messages(os.path.abspath('general.dbc'))
- # Добавление потока циклической отправки сообщения с ID 0x27d
- ca2.add_send_thread(dbc_messages[0x27d], t_int=0.01)
- # Старт потока циклического чтения всех сообщений, получаемых адаптером
- ca2.start_recv_thread()
- # Для чтения значения сигнала сообшения
- val = dbc_messages[0x27d].read_signal('ABS_VehicleSpeed')
- print('ABS_VehicleSpeed value = ', val)
- # Считываемые сообщения помещаются в очередь из которой можно считать сообщения
- while True:
- if not ca2.recv_queue.empty():
- serialized_msg = ca2.recv_queue.get()
- frame_id = serialized_msg.ID
- data = serialized_msg.Data
- dbc_messages[frame_id].decode_message(data, scaling=True)
- if frame_id == 0x27d:
- print(dbc_messages[0x27d].read_signal('ABS_VehicleSpeed'))
- break
- # Записи значения сигнала
- dbc_messages[0x27d].write_signal_value('ABS_VehicleSpeed', 10)
- # Методы для приостановки и продолжения потока отправки сообщения
- # dbc_messages[0x27d].send_thread_pause()
- # dbc_messages[0x27d].send_thread_pause()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement