Advertisement
microrobotics

Huabang DDS228 Modbus Parser

Apr 12th, 2024 (edited)
970
0
Never
1
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.99 KB | None | 0 0
  1. import serial
  2. import struct
  3. import time
  4.  
  5. # CRC-16 calculation function (Modbus flavor)
  6. def crc16(data):
  7.     crc = 0xFFFF
  8.     for byte in data:
  9.         crc ^= byte
  10.         for _ in range(8):
  11.             if crc & 0x0001:
  12.                 crc = (crc >> 1) ^ 0xA001
  13.             else:
  14.                 crc >>= 1
  15.     return crc
  16.  
  17. # Function to convert IEEE 754 floating point bytes to a float value
  18. def ieee754_to_float(bytes):
  19.     return struct.unpack('>f', struct.pack('>L', bytes))[0]
  20.  
  21. # Function to parse a Modbus response
  22. def parse_modbus_response(response_bytes):
  23.     # Slave ID
  24.     slave_id = response_bytes[0]
  25.    
  26.     # Function code
  27.     function_code = response_bytes[1]
  28.    
  29.     # Byte count
  30.     byte_count = response_bytes[2]
  31.    
  32.     # Data bytes
  33.     data_bytes = response_bytes[3:-2]
  34.    
  35.     # CRC bytes
  36.     crc_bytes = response_bytes[-2:]
  37.    
  38.     # Calculate CRC and check
  39.     calculated_crc = crc16(response_bytes[:-2])
  40.     received_crc = int.from_bytes(crc_bytes, byteorder='little')
  41.     if calculated_crc != received_crc:
  42.         print("CRC error in the response")
  43.         return
  44.    
  45.     # Check for error response
  46.     if function_code == 0x83:
  47.         exception_code = data_bytes[0]
  48.         print(f"Error response: Exception code {exception_code}")
  49.         return
  50.    
  51.     # Parse data based on expected data type
  52.     if function_code == 0x03:  # Read holding registers
  53.         if byte_count == 4:  # 32-bit float
  54.             value = ieee754_to_float(int.from_bytes(data_bytes, byteorder='big'))
  55.             return value
  56.         elif byte_count == 2:  # 16-bit unsigned int
  57.             value = int.from_bytes(data_bytes, byteorder='big')
  58.             return value
  59.         else:
  60.             print("Unexpected byte count for read holding registers response")
  61.             return None
  62.     else:
  63.         print(f"Unhandled function code: {function_code}")
  64.         return None
  65.  
  66. # Function to construct a Modbus request
  67. def construct_modbus_request(register_choice, slave_id=0x01):
  68.     requests = {
  69.         1: bytes.fromhex(f'01 03 00 00 00 02'),  # Total active energy (kWh)
  70.         7: bytes.fromhex(f'01 03 00 90 00 02'),  # Grid frequency (Hz)
  71.     }
  72.    
  73.     if register_choice in requests:
  74.         request_bytes = requests[register_choice]
  75.         crc = crc16(request_bytes)
  76.         crc_bytes = crc.to_bytes(2, byteorder='little')
  77.         return request_bytes + crc_bytes
  78.     else:
  79.         print("Invalid choice, please try again.")
  80.         return None
  81.  
  82. # Main program
  83. # Get the COM port from the user
  84. com_port = input("Enter the COM port (e.g., COM1, /dev/ttyUSB0): ")
  85.  
  86. # Device ID is fixed to 26
  87. device_id = 26
  88.  
  89. # Open the serial port
  90. try:
  91.     ser = serial.Serial(com_port, baudrate=9600, bytesize=8, parity=serial.PARITY_EVEN, stopbits=1)
  92. except serial.SerialException as e:
  93.     print(f"Error opening serial port: {e}")
  94.     exit(1)
  95.  
  96. while True:
  97.     print("\nSelect the register you want to request:")
  98.     print("1. Total active energy (kWh)")
  99.     print("7. Grid frequency (Hz)")
  100.     print("0. Exit")
  101.    
  102.     choice = int(input("Enter your choice (0, 1, or 7): "))
  103.    
  104.     if choice == 0:
  105.         break
  106.    
  107.     request_bytes = construct_modbus_request(choice, device_id)
  108.    
  109.     if request_bytes:
  110.         print(f"Modbus request: {request_bytes.hex()}")
  111.        
  112.         # Send the request over the serial port
  113.         ser.write(request_bytes)
  114.        
  115.         # Wait for the response
  116.         response_bytes = ser.read(size=9)
  117.        
  118.         if len(response_bytes) == 9:
  119.             value = parse_modbus_response(response_bytes)
  120.             if value is not None:
  121.                 if choice == 1:
  122.                     print(f"Total active energy (kWh): {value}")
  123.                 elif choice == 7:
  124.                     print(f"Grid frequency (Hz): {value}")
  125.         else:
  126.             print("Invalid response length")
  127.            
  128.         # Add a delay to avoid flooding the meter with requests
  129.         time.sleep(0.1)
  130.  
  131. print("Exiting program...")
  132. ser.close()
  133.  
Advertisement
Comments
  • microrobotics
    243 days
    # text 2.59 KB | 0 0
    1. Install Dependencies:
    2. Ensure that you have the pyserial library installed. You can install it using pip:
    3.  
    4. pip install pyserial
    5.  
    6. Connect the Energy Meter:
    7. Connect the energy meter to the device (e.g., a computer or a Raspberry Pi) running the Python code using a serial cable (e.g., RS-485 or RS-232).
    8. Make sure the serial connection settings (baud rate, data bits, parity, and stop bits) on both the energy meter and the device match. According to the documentation, the settings should be:
    9. Baud rate: 9600
    10. Data bits: 8
    11. Parity: Even
    12. Stop bits: 1
    13.  
    14. Configure the Code:
    15. Open the Python script in a text editor or an IDE.
    16. Locate the construct_modbus_request function and ensure that the slave ID (device address) is set correctly. In the provided code, the slave ID is set to 0x01 (decimal 1). If your energy meter has a different slave ID, update the line:
    17.  
    18. def construct_modbus_request(register_choice, slave_id=0x01):
    19.  
    20. Replace 0x01 with the hexadecimal representation of your energy meter's slave ID.
    21.  
    22. Run the Code:
    23. Save the Python script.
    24. Open a terminal or command prompt and navigate to the directory containing the Python script.
    25. Run the script by executing the following command:
    26.  
    27. python script_name.py
    28.  
    29. Replace script_name.py with the actual name of your Python script file.
    30.  
    31. Interact with the Script:
    32. The script will prompt you to enter the COM port (e.g., COM1, /dev/ttyUSB0) to which the energy meter is connected. Enter the appropriate COM port and press Enter.
    33. The script will then display a menu with options to request different registers from the energy meter:
    34.  
    35. Select the register you want to request:
    36. 1. Total active energy (kWh)
    37. 2. Reverse active energy (kWh)
    38. 3. Voltage (V)
    39. 4. Current (A)
    40. 5. Active power (kW)
    41. 6. Power factor
    42. 7. Grid frequency (Hz)
    43. 0. Exit
    44.  
    45. Enter the corresponding number (1-7) for the register you want to read, and press Enter.
    46.  
    47. The script will send the Modbus request to the energy meter and display the received response value.
    48.  
    49. After displaying the response, the script will wait for a short period (0.1 seconds) before prompting you for the next register choice. This delay is to avoid flooding the energy meter with requests.
    50. To exit the script, enter 0 and press Enter.
    51.  
    52. Interpreting the Output:
    53. The script will print the received value from the energy meter for the requested register.
    54. For 32-bit float values (e.g., voltage, current, active power), the script will display the value with the appropriate number of decimal places based on the expected accuracy.
    55. For 16-bit unsigned integer values (if any), the script will display the integer value directly.
Add Comment
Please, Sign In to add comment
Advertisement