Advertisement
Mad_Axell

mqtt_to_influxdb2

May 1st, 2025
125
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 10.04 KB | None | 0 0
  1. #!/usr/bin/env python3
  2. import argparse
  3. import math
  4.  
  5. import paho.mqtt.client
  6.  
  7. import numbers
  8. import time
  9. import random
  10. import sys
  11.  
  12. from influxdb_client import InfluxDBClient
  13. from influxdb_client.client.write_api import SYNCHRONOUS
  14. from threading import Thread
  15. from collections import deque
  16.  
  17. import logging
  18. logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.WARNING)
  19.  
  20. class DBWriterThread(Thread):
  21.     def __init__(self, influx_client, *args, **kwargs):
  22.         self.influx_client = influx_client
  23.         self.data_queue = deque()
  24.  
  25.         super(DBWriterThread, self).__init__(*args, **kwargs)
  26.  
  27.     def schedule_item(self, device):
  28.         item = (device.device_id, device.device_name,device.control_id, device.value)
  29.         logging.debug(f"schedule_item: {item}")
  30.         self.data_queue.append(device)
  31.  
  32.     def get_items(self, mininterval, maxitems):
  33.         """ This will collect items from queue until either 'mininterval'
  34.        is over or 'maxitems' items are collected """
  35.         started = time.time()
  36.         items = []
  37.  
  38.         while (time.time() - started < mininterval) and (len(items) < maxitems):
  39.             try:
  40.                 item = self.data_queue.popleft()
  41.             except IndexError:
  42.                 time.sleep(mininterval * 0.1)
  43.             else:
  44.                 items.append(item)
  45.            
  46.         logging.debug(f"get_items: {len(items)}")
  47.         return items
  48.  
  49.     def run(self):
  50.         while True:
  51.             items = self.get_items(mininterval=0.05, maxitems=50)
  52.             db_req_body = []
  53.             for device in items:
  54.                 ser_item = self.serialize_data_item(device)
  55.                 logging.debug(f"device_id = {device.device_id}, device_name = {device.device_name}, control_id = {device.control_id}, value = {device.value}")
  56.                 if ser_item:
  57.                     db_req_body.append(ser_item)
  58.                     #stat_clients.add(client)
  59.                    
  60.             logging.debug(f"\n\n db_req_body = {db_req_body} \n\n")
  61.             if db_req_body:
  62.                 logging.info(f"Write {len(items)} items")
  63.                 write_client = self.influx_client.write_api(write_options=SYNCHRONOUS).write(bucket=args.idb_database, record=db_req_body)
  64.  
  65.         time.sleep(0.01)
  66.  
  67.  
  68.  
  69.     def serialize_data_item(self, device):
  70.  
  71.         if not device.value:
  72.             return
  73.  
  74.         fields = {}
  75.         try:
  76.             value_f = float(device.value)
  77.             if not math.isnan(value_f):
  78.                 fields["value_f"] = value_f
  79.         except ValueError:
  80.             pass
  81.         if "value_f" not in fields:
  82.             fields["value_s"] = device.value
  83.  
  84.         item = {
  85.             'measurement': 'Serebrica',
  86.             'tags' : {
  87.                 "device" : f"{device.device_id}",
  88.                 "device_name" : f"{device.device_name}",
  89.                 "control" : f"{device.control_id}"
  90.             },
  91.             "fields" : fields
  92.         }
  93.  
  94.         return item
  95.  
  96. class Device:
  97.     def __init__(self, _device_id=None, _device_name="", _control_id=None, _value=-999999):
  98.         self.device_id = _device_id
  99.         self.device_name = _device_name
  100.         self.control_id = _control_id
  101.         self.value = _value
  102.  
  103. def add_or_update_device(devices_dict, new_device):
  104.     if new_device.device_id in devices_dict:
  105.         # Обновляем существующий экземпляр
  106.         existing_device = devices_dict[new_device.device_id]
  107.         if existing_device.device_name != None:
  108.             new_device.device_name = existing_device.device_name
  109.         if new_device.device_name != None and existing_device.device_name == None:
  110.             existing_device.device_name = new_device.device_name
  111.         if new_device.control_id != None:
  112.             if new_device.control_id == existing_device.control_id:
  113.                 existing_device.value = new_device.value
  114.             else:
  115.                 devices_dict[new_device.device_id] = new_device
  116.         #logging.info(f"U/{existing_device.device_id}/{existing_device.device_name}/{existing_device.control_id}/{existing_device.value}")
  117.         #with open(filename, mode='a', encoding='utf-8') as file:
  118.         #    file.write(f"U/{existing_device.device_id}/{existing_device.device_name}/{existing_device.control_id}/{existing_device.value}\n")
  119.     else:
  120.         # Добавляем новый экземпляр
  121.         devices_dict[new_device.device_id] = new_device
  122.         #logging.info(f"A/{new_device.device_id}/{new_device.device_name}/{new_device.control_id}/{new_device.value}")
  123.         #with open(filename, mode='a', encoding='utf-8') as file:
  124.         #    file.write(f"U/{new_device.device_id}/{new_device.device_name}/{new_device.control_id}/{new_device.value}\n")
  125.  
  126. def convert_to_number(s):
  127.     try:
  128.         return int(s)
  129.     except ValueError:
  130.         try:
  131.             return float(s)
  132.         except ValueError:
  133.             return -999999
  134.  
  135. db_writer = None
  136. devices = {}
  137.  
  138. def on_mqtt_message(arg0, arg1, arg2=None):
  139.     if arg2 is None:
  140.         msg = arg1
  141.     else:
  142.         msg = arg2
  143.  
  144.     # Эта штука не даёт получить мету из топиков. мета даёт имя устройства
  145.     #if msg.retain:
  146.     #    return
  147.  
  148.     parts = msg.topic.split('/')[1:]
  149.  
  150.     #logging.warning(f"_____________________topic path = {parts}")
  151.  
  152.     #if len(parts) < 4:
  153.     #    return
  154.  
  155.     #if (parts[0] == 'client'):
  156.     #    client = parts[1]
  157.     #    parts = parts[2:]
  158.  
  159.     if len(parts) != 4:
  160.         return
  161.  
  162.     ## Если подключать в шину не только wirenboard - стоит переделать на классический вариант
  163.     if parts[0] != 'devices':
  164.         return
  165.    
  166.     # Отсеивает ненужные топики
  167.     if parts[1][:6] == "system_"[:6] or parts[1][:6] == "telegram2wb"[:6]:
  168.         return
  169.    
  170.     device_id = parts[1]
  171.     device = Device(device_id, None, None, -999999)
  172.  
  173.     if parts[2] == 'meta' and parts[3] == 'name':
  174.         device.device_name = msg.payload.decode('utf8')
  175.         add_or_update_device(devices, device)
  176.    
  177.     if parts[2] == 'controls':
  178.         device.control_id = parts[3]
  179.         device.value = convert_to_number(msg.payload.decode('utf8'))
  180.     else:
  181.         return
  182.        
  183.     if device.value != -999999:
  184.         if isinstance(device.value, numbers.Number):
  185.                 add_or_update_device(devices, device)
  186.     else:
  187.         return
  188.  
  189.     updated_device = devices[device.device_id]
  190.     if updated_device.device_name != None:
  191.         if updated_device.value != -999999:
  192.             db_writer.schedule_item(updated_device)
  193.  
  194.  
  195. if __name__ == '__main__':
  196.     parser = argparse.ArgumentParser(description='MQTT retained message deleter', add_help=False)
  197.     ###---Common---###
  198.    
  199.     ###---MQTT---###
  200.     parser.add_argument('-hmq', '--host-mq', dest='mq_host', type=str,
  201.                         help='MQTT host', default='localhost')
  202.  
  203.     parser.add_argument('-umq', '--username-mq', dest='mq_username', type=str,
  204.                         help='MQTT username', default='')
  205.  
  206.     parser.add_argument('-Pmq', '--password-mq', dest='mq_password', type=str,
  207.                         help='MQTT password', default='')
  208.  
  209.     parser.add_argument('-pmq', '--port-mq', dest='mq_port', type=int,
  210.                         help='MQTT port', default='1883')
  211.    
  212.     parser.add_argument('-tmq', '--topic-mq', dest='mq_topic',  type=str,
  213.                         help='Topic mask to unpublish retained messages from. For example: "/devices/my-device/#"', default='/devices/#')
  214.  
  215.     ###---InfluxDB---###
  216.     parser.add_argument('-hdb', '--host-idb', dest='idb_host', type=str,
  217.                         help='InfluxDB host', default='localhost')
  218.  
  219.     parser.add_argument('-pdb', '--port-idb', dest='idb_port', type=int,
  220.                         help='InfluxDB port', default='8086')
  221.  
  222.     parser.add_argument('-udb', '--username-idb', dest='idb_username', type=str,
  223.                         help='InfluxDB username', default='')
  224.  
  225.     parser.add_argument('-Pdb', '--password-idb', dest='idb_password', type=str,
  226.                         help='InfluxDB password', default='')
  227.  
  228.     parser.add_argument('-tdb', '--token-idb', dest='idb_token', type=str,
  229.                         help='InfluxDB access token', default='')
  230.  
  231.     parser.add_argument('-ddb', '--database-idb', dest='idb_database', type=str,
  232.                         help='InfluxDB database', default='mqtt_data')
  233.    
  234.     parser.add_argument('-odb', '--organization-idb', dest='idb_org', type=str,
  235.                         help='InfluxDB organization', default='main-org')
  236.  
  237.    
  238.     mqtt_device_id = str(time.time()) + str(random.randint(0, 100000))
  239.  
  240.     args = parser.parse_args()
  241.  
  242.    
  243.     ## Переназначение аргументов для тестирования
  244.     #args.mq_host = "***.***.***.***"
  245.     #args.mq_port = 1883
  246.     #args.mq_username = "***"
  247.     #args.mq_password = "***"
  248.     #args.mq_topic = "/devices/#"
  249.  
  250.     #args.idb_host = "***.***.***.***"
  251.     #args.idb_port = 8086
  252.     #args.idb_username = "***"
  253.     #args.idb_password = "***"
  254.     #args.idb_token = "***" ##write access token
  255.     #args.idb_database = "wirenboard"
  256.     #args.idb_org = "main-org"
  257.  
  258.  
  259.     client = paho.mqtt.client.Client(client_id=None, clean_session=True, protocol=paho.mqtt.client.MQTTv31)
  260.  
  261.     if args.mq_username:
  262.         client.username_pw_set(args.mq_username, args.mq_password)
  263.  
  264.     client.connect(args.mq_host, args.mq_port)
  265.     client.on_message = on_mqtt_message
  266.  
  267.     client.subscribe(args.mq_topic)
  268.  
  269.  
  270.     #influx_client = InfluxDBClient(args.idb_host, args.idb_port, args.idb_database)
  271.     influx_client = InfluxDBClient(
  272.         url=f"http://{args.idb_host}:{args.idb_port}",
  273.         token=f"{args.idb_token}",
  274.         verify_ssl=False,
  275.         org=args.idb_org
  276.         )
  277.  
  278.     #args.idb_database
  279.     db_writer =  DBWriterThread(influx_client, daemon=True)
  280.     db_writer.start()
  281.  
  282.  
  283.     while 1:
  284.         rc = client.loop()
  285.         if rc != 0:
  286.             break
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement