Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import serial, time
- message_start = "/?%s!\r\n"
- message_login = "\x01P1\x02(00000000)\x03"
- message_read = "\x01R1\x02%s()\x03"
- message_exit = "\x01B0\x03"
- message_mode = "\x060:1\r\n"
- meter_list = ['001511420141','001511420142','001511420143']
- address_list = ['00000000', '00000003','00000004','00000010','00000036']
- error_list = [0,0,0]
- state_disconnected=10
- state_wait_identification=11
- state_wait_password_prompt=12
- state_wait_password_verification=13
- state_wait_data_read=14
- state_data_available=15
- state = state_disconnected
- wait_response = 1
- current_index = 0
- current_meter = 0
- #ser = serial.Serial('/dev/pts/0', 115200, timeout=3)
- ser = serial.Serial('/dev/ttyUSB1', 9600, timeout=0.1, bytesize=serial.SEVENBITS, parity=serial.PARITY_EVEN)
- def addBCC(data):
- b = 0
- for ch in data[1:]:
- b ^= ord(ch)
- data = data + chr(b)
- return data
- def sendMessage(message, param=None, bcc=0):
- if param:
- message = message % param
- if bcc:
- message = addBCC(message)
- ser.write(message)
- else:
- message = message.replace("%s","")
- if bcc:
- message = addBCC(message)
- ser.write(message)
- def sendExit():
- global state, meter_index, error_list
- sendMessage(message_exit, bcc=1)
- state = state_disconnected
- error_list[meter_index] += 1
- def disconnect():
- global state, meter_index, error_list
- sendMessage(message_exit, bcc=1)
- state = state_disconnected
- print "logout"
- def handle_wait_identification(data):
- global state, wait_response
- if data.startswith('/YTL'):
- print "ok. sending mode ..."
- sendMessage(message_mode)
- wait_response = 1
- state = state_wait_password_prompt
- else:
- sendExit()
- state = state_disconnected
- print "error"
- def handle_wait_password_prompt(data):
- global state, wait_response
- print data
- print ord(data[0])
- print ord(data[1])
- print ord(data[2])
- print ord(data[3])
- if ord(data[0])==1 and ord(data[1])==80 and ord(data[2])==48:
- #if data.startswith('p0'):
- print "ok. sending password ..."
- sendMessage(message_login, None, 1)
- state = state_wait_password_verification
- wait_response = 1
- else:
- sendExit()
- state = state_disconnected
- print "error handle_wait_password"
- def send_data_request():
- global state, wait_response, current_index
- addr = address_list[current_index]
- print "sending data request ... %s" % (addr)
- sendMessage(message_read, addr, 1)
- state = state_wait_data_read
- wait_response = 1
- def handle_wait_password_verification(data):
- global state, wait_response
- if data.startswith('\x06'):
- #if data.startswith('a'):
- print "password ok"
- send_data_request()
- else:
- sendExit()
- state = state_disconnected
- print "error"
- def handle_wait_data_read(data):
- global state, wait_response, current_index
- print data
- current_index += 1
- if current_index < len(address_list):
- send_data_request()
- else:
- disconnect()
- state = state_disconnected
- print "done"
- def read_meter_data(index):
- global state, wait_response, current_index, meter_index
- meter_index = index
- time.sleep(0.1)
- current_index = 0
- meter_id = meter_list[meter_index]
- print "start reading meter ... %s" % meter_id
- sendMessage(message_start, meter_id)
- state = state_wait_identification
- wait_response = 1
- while wait_response:
- wait_response = 0
- ser.flushInput()
- time.sleep(0.2)
- data = ser.read(64)
- if data and len(data) > 0:
- print "received bytes: %d" % len(data)
- if state == state_wait_identification:
- handle_wait_identification(data)
- continue
- if state == state_wait_password_prompt:
- handle_wait_password_prompt(data)
- continue
- if state == state_wait_password_verification:
- handle_wait_password_verification(data)
- continue
- if state == state_wait_data_read:
- handle_wait_data_read(data)
- continue
- else:
- print "fail"
- sendExit()
- state = state_disconnected
- max_steps = 1000
- for i in range(max_steps):
- print "loop %d of %d ..." % (i, max_steps)
- for meter in range(len(meter_list)):
- read_meter_data(meter)
- for meter in range(len(meter_list)):
- print "Errors for meter[%s]: %d" % (meter_list[meter], error_list[meter])
- ser.close()
Add Comment
Please, Sign In to add comment