Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- import argparse
- import math
- import paho.mqtt.client
- import numbers
- import time
- import random
- import sys
- from influxdb_client import InfluxDBClient
- from influxdb_client.client.write_api import SYNCHRONOUS
- from threading import Thread
- from collections import deque
- import logging
- logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.WARNING)
- class DBWriterThread(Thread):
- def __init__(self, influx_client, *args, **kwargs):
- self.influx_client = influx_client
- self.data_queue = deque()
- super(DBWriterThread, self).__init__(*args, **kwargs)
- def schedule_item(self, device):
- item = (device.device_id, device.device_name,device.control_id, device.value)
- logging.debug(f"schedule_item: {item}")
- self.data_queue.append(device)
- def get_items(self, mininterval, maxitems):
- """ This will collect items from queue until either 'mininterval'
- is over or 'maxitems' items are collected """
- started = time.time()
- items = []
- while (time.time() - started < mininterval) and (len(items) < maxitems):
- try:
- item = self.data_queue.popleft()
- except IndexError:
- time.sleep(mininterval * 0.1)
- else:
- items.append(item)
- logging.debug(f"get_items: {len(items)}")
- return items
- def run(self):
- while True:
- items = self.get_items(mininterval=0.05, maxitems=50)
- db_req_body = []
- for device in items:
- ser_item = self.serialize_data_item(device)
- logging.debug(f"device_id = {device.device_id}, device_name = {device.device_name}, control_id = {device.control_id}, value = {device.value}")
- if ser_item:
- db_req_body.append(ser_item)
- #stat_clients.add(client)
- logging.debug(f"\n\n db_req_body = {db_req_body} \n\n")
- if db_req_body:
- logging.info(f"Write {len(items)} items")
- write_client = self.influx_client.write_api(write_options=SYNCHRONOUS).write(bucket=args.idb_database, record=db_req_body)
- time.sleep(0.01)
- def serialize_data_item(self, device):
- if not device.value:
- return
- fields = {}
- try:
- value_f = float(device.value)
- if not math.isnan(value_f):
- fields["value_f"] = value_f
- except ValueError:
- pass
- if "value_f" not in fields:
- fields["value_s"] = device.value
- item = {
- 'measurement': 'Serebrica',
- 'tags' : {
- "device" : f"{device.device_id}",
- "device_name" : f"{device.device_name}",
- "control" : f"{device.control_id}"
- },
- "fields" : fields
- }
- return item
- class Device:
- def __init__(self, _device_id=None, _device_name="", _control_id=None, _value=-999999):
- self.device_id = _device_id
- self.device_name = _device_name
- self.control_id = _control_id
- self.value = _value
- def add_or_update_device(devices_dict, new_device):
- if new_device.device_id in devices_dict:
- # Обновляем существующий экземпляр
- existing_device = devices_dict[new_device.device_id]
- if existing_device.device_name != None:
- new_device.device_name = existing_device.device_name
- if new_device.device_name != None and existing_device.device_name == None:
- existing_device.device_name = new_device.device_name
- if new_device.control_id != None:
- if new_device.control_id == existing_device.control_id:
- existing_device.value = new_device.value
- else:
- devices_dict[new_device.device_id] = new_device
- #logging.info(f"U/{existing_device.device_id}/{existing_device.device_name}/{existing_device.control_id}/{existing_device.value}")
- #with open(filename, mode='a', encoding='utf-8') as file:
- # file.write(f"U/{existing_device.device_id}/{existing_device.device_name}/{existing_device.control_id}/{existing_device.value}\n")
- else:
- # Добавляем новый экземпляр
- devices_dict[new_device.device_id] = new_device
- #logging.info(f"A/{new_device.device_id}/{new_device.device_name}/{new_device.control_id}/{new_device.value}")
- #with open(filename, mode='a', encoding='utf-8') as file:
- # file.write(f"U/{new_device.device_id}/{new_device.device_name}/{new_device.control_id}/{new_device.value}\n")
- def convert_to_number(s):
- try:
- return int(s)
- except ValueError:
- try:
- return float(s)
- except ValueError:
- return -999999
- db_writer = None
- devices = {}
- def on_mqtt_message(arg0, arg1, arg2=None):
- if arg2 is None:
- msg = arg1
- else:
- msg = arg2
- # Эта штука не даёт получить мету из топиков. мета даёт имя устройства
- #if msg.retain:
- # return
- parts = msg.topic.split('/')[1:]
- #logging.warning(f"_____________________topic path = {parts}")
- #if len(parts) < 4:
- # return
- #if (parts[0] == 'client'):
- # client = parts[1]
- # parts = parts[2:]
- if len(parts) != 4:
- return
- ## Если подключать в шину не только wirenboard - стоит переделать на классический вариант
- if parts[0] != 'devices':
- return
- # Отсеивает ненужные топики
- if parts[1][:6] == "system_"[:6] or parts[1][:6] == "telegram2wb"[:6]:
- return
- device_id = parts[1]
- device = Device(device_id, None, None, -999999)
- if parts[2] == 'meta' and parts[3] == 'name':
- device.device_name = msg.payload.decode('utf8')
- add_or_update_device(devices, device)
- if parts[2] == 'controls':
- device.control_id = parts[3]
- device.value = convert_to_number(msg.payload.decode('utf8'))
- else:
- return
- if device.value != -999999:
- if isinstance(device.value, numbers.Number):
- add_or_update_device(devices, device)
- else:
- return
- updated_device = devices[device.device_id]
- if updated_device.device_name != None:
- if updated_device.value != -999999:
- db_writer.schedule_item(updated_device)
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(description='MQTT retained message deleter', add_help=False)
- ###---Common---###
- ###---MQTT---###
- parser.add_argument('-hmq', '--host-mq', dest='mq_host', type=str,
- help='MQTT host', default='localhost')
- parser.add_argument('-umq', '--username-mq', dest='mq_username', type=str,
- help='MQTT username', default='')
- parser.add_argument('-Pmq', '--password-mq', dest='mq_password', type=str,
- help='MQTT password', default='')
- parser.add_argument('-pmq', '--port-mq', dest='mq_port', type=int,
- help='MQTT port', default='1883')
- parser.add_argument('-tmq', '--topic-mq', dest='mq_topic', type=str,
- help='Topic mask to unpublish retained messages from. For example: "/devices/my-device/#"', default='/devices/#')
- ###---InfluxDB---###
- parser.add_argument('-hdb', '--host-idb', dest='idb_host', type=str,
- help='InfluxDB host', default='localhost')
- parser.add_argument('-pdb', '--port-idb', dest='idb_port', type=int,
- help='InfluxDB port', default='8086')
- parser.add_argument('-udb', '--username-idb', dest='idb_username', type=str,
- help='InfluxDB username', default='')
- parser.add_argument('-Pdb', '--password-idb', dest='idb_password', type=str,
- help='InfluxDB password', default='')
- parser.add_argument('-tdb', '--token-idb', dest='idb_token', type=str,
- help='InfluxDB access token', default='')
- parser.add_argument('-ddb', '--database-idb', dest='idb_database', type=str,
- help='InfluxDB database', default='mqtt_data')
- parser.add_argument('-odb', '--organization-idb', dest='idb_org', type=str,
- help='InfluxDB organization', default='main-org')
- mqtt_device_id = str(time.time()) + str(random.randint(0, 100000))
- args = parser.parse_args()
- ## Переназначение аргументов для тестирования
- #args.mq_host = "***.***.***.***"
- #args.mq_port = 1883
- #args.mq_username = "***"
- #args.mq_password = "***"
- #args.mq_topic = "/devices/#"
- #args.idb_host = "***.***.***.***"
- #args.idb_port = 8086
- #args.idb_username = "***"
- #args.idb_password = "***"
- #args.idb_token = "***" ##write access token
- #args.idb_database = "wirenboard"
- #args.idb_org = "main-org"
- client = paho.mqtt.client.Client(client_id=None, clean_session=True, protocol=paho.mqtt.client.MQTTv31)
- if args.mq_username:
- client.username_pw_set(args.mq_username, args.mq_password)
- client.connect(args.mq_host, args.mq_port)
- client.on_message = on_mqtt_message
- client.subscribe(args.mq_topic)
- #influx_client = InfluxDBClient(args.idb_host, args.idb_port, args.idb_database)
- influx_client = InfluxDBClient(
- url=f"http://{args.idb_host}:{args.idb_port}",
- token=f"{args.idb_token}",
- verify_ssl=False,
- org=args.idb_org
- )
- #args.idb_database
- db_writer = DBWriterThread(influx_client, daemon=True)
- db_writer.start()
- while 1:
- rc = client.loop()
- if rc != 0:
- break
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement