Advertisement
den4ik2003

Untitled

Mar 21st, 2025
277
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 11.33 KB | None | 0 0
  1. import os
  2. import sys
  3. import signal
  4. import psycopg2
  5. import random
  6. import json
  7. from datetime import datetime
  8. import time
  9. from pathlib import Path
  10. import subprocess
  11. from multiprocessing import Process, Queue
  12. from configparser import ConfigParser
  13. import numpy as np
  14. import pika
  15.  
  16. last_orders_mtype_time = {}
  17.  
  18. class Killer:
  19.   procs = []
  20.  
  21.   def __init__(self):
  22.     signal.signal(signal.SIGINT, self.exit)
  23.     signal.signal(signal.SIGTERM, self.exit)
  24.  
  25.   def exit(self, signum, frame):
  26.     for p in self.procs:
  27.         os.killpg(os.getpgid(p.pid), signum)
  28.  
  29. class DbWriterClass:
  30.     def __init__(self):
  31.         self.server = None
  32.         self.connection = None
  33.         self.symbol_id = {}
  34.  
  35.     def load_config(self, filename, section):
  36.         parser = ConfigParser()
  37.         parser.read(filename)
  38.  
  39.         database = {}
  40.         if parser.has_section(section):
  41.             params = parser.items(section)
  42.             for param in params:
  43.                 database[param[0]] = param[1]
  44.  
  45.         else:
  46.             raise Exception(
  47.                 "Section {0} not found in the {1} file".format(section, filename)
  48.             )
  49.  
  50.         return database
  51.  
  52.     def start_connection(self, filename = '/home/ubuntu/database.ini', section = 'postgresql'):
  53.         while 1:
  54.             try:
  55.                 self.connection = psycopg2.connect(**self.load_config(filename, section))
  56.                 break
  57.             except Exception as e:
  58.                 print(e)
  59.                 delay = random.randint(1, 10) / 5
  60.                 time.sleep(delay)
  61.  
  62.         print("Database connected")
  63.  
  64.         self.query_string = {'orders': {}, 'book': {}, 'public_trades': {}, 'private_trades': {}, 'balance': {}, 'position': {}, 'status': {}}
  65.         self.s_hash = {}
  66.  
  67.     def execute_select(self, query):
  68.         with self.connection.cursor() as cursor:
  69.             cursor.execute(query)
  70.             return cursor.fetchall()
  71.  
  72.     def init_symbol(self, symbol):
  73.         sym = symbol.split('_')
  74.         market = '_'.join(sym[2:]).lower()
  75.         sym = '_'.join(sym[:2])
  76.         with self.connection.cursor() as cursor:
  77.             print(sym)
  78.             print(market)
  79.             cursor.execute(f"SELECT s.* FROM symbols s JOIN exchanges e USING (exchange_id) WHERE symbol = '{sym}' AND exchange_name = '{market}'")
  80.             try:
  81.                 self.symbol_id[symbol] = cursor.fetchall()[0][0]
  82.             except Exception as e:
  83.                 print(e)
  84.                 return False
  85.  
  86.         book_table = f"""
  87.            CREATE TABLE IF NOT EXISTS book_{self.symbol_id[symbol]} (
  88.                id SERIAL PRIMARY KEY,
  89.                time TIMESTAMP,
  90.                ask_price DOUBLE PRECISION,
  91.                bid_price DOUBLE PRECISION
  92.            )
  93.        """
  94.         with self.connection.cursor() as cursor:
  95.             cursor.execute(book_table)
  96.         self.connection.commit()
  97.  
  98.         pub_trade_table = f"""
  99.            CREATE TABLE IF NOT EXISTS public_trade_{self.symbol_id[symbol]} (
  100.                id SERIAL PRIMARY KEY,
  101.                time TIMESTAMP,
  102.                price DOUBLE PRECISION,
  103.                quantity DOUBLE PRECISION,
  104.                is_buy BOOLEAN
  105.            )
  106.        """
  107.         with self.connection.cursor() as cursor:
  108.             cursor.execute(pub_trade_table)
  109.         self.connection.commit()
  110.  
  111.         usr_trade_table = f"""
  112.            CREATE TABLE IF NOT EXISTS user_trade_{self.symbol_id[symbol]} (
  113.                id SERIAL PRIMARY KEY,
  114.                time TIMESTAMP,
  115.                price DOUBLE PRECISION,
  116.                quantity DOUBLE PRECISION,
  117.                is_buy BOOLEAN,
  118.                strategy CHAR
  119.            )
  120.        """
  121.         with self.connection.cursor() as cursor:
  122.             cursor.execute(usr_trade_table)
  123.         self.connection.commit()
  124.    
  125.         balance_table = f"""
  126.            CREATE TABLE IF NOT EXISTS balance_{self.symbol_id[symbol]} (
  127.                id SERIAL PRIMARY KEY,
  128.                time TIMESTAMP,
  129.                base DOUBLE PRECISION,
  130.                quote DOUBLE PRECISION,
  131.                price DOUBLE PRECISION,
  132.                strategy CHAR
  133.            )
  134.        """
  135.         with self.connection.cursor() as cursor:
  136.             cursor.execute(balance_table)
  137.         self.connection.commit()
  138.    
  139.         position_table = f"""
  140.            CREATE TABLE IF NOT EXISTS position_{self.symbol_id[symbol]} (
  141.                id SERIAL PRIMARY KEY,
  142.                time TIMESTAMP,
  143.                ask_bound DOUBLE PRECISION,
  144.                bid_bound DOUBLE PRECISION
  145.            )
  146.        """
  147.         with self.connection.cursor() as cursor:
  148.             cursor.execute(position_table)
  149.         self.connection.commit()
  150.  
  151.         active_orders_table = f"""
  152.            CREATE TABLE IF NOT EXISTS active_orders_{self.symbol_id[symbol]} (
  153.                id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
  154.                time TIMESTAMP,
  155.                price DOUBLE PRECISION,
  156.                quantity DOUBLE PRECISION,
  157.                is_bid BOOLEAN,
  158.                strategy CHAR
  159.            )
  160.        """
  161.         with self.connection.cursor() as cursor:
  162.             cursor.execute(active_orders_table)
  163.         self.connection.commit()
  164.         print(symbol)
  165.         self.query_string['orders'][symbol] = f'INSERT INTO active_orders_new_{self.symbol_id[symbol]} (time, price, quantity, is_bid, strategy)\n' + "VALUES (%s, %s, %s, %s, %s)"
  166.         self.query_string['book'][symbol] = f'INSERT INTO book_{self.symbol_id[symbol]} (time, ask_price, bid_price)\n' + "VALUES (%s, %s, %s)"
  167.         self.query_string['public_trades'][symbol] = f'INSERT INTO public_trade_{self.symbol_id[symbol]} (time, price, quantity, is_buy)\n' + "VALUES (%s, %s, %s, %s)"
  168.         self.query_string['private_trades'][symbol] = f'INSERT INTO user_trade_{self.symbol_id[symbol]} (time, price, quantity, is_buy, strategy)\n' + "VALUES (%s, %s, %s, %s, %s)"
  169.         self.query_string['balance'][symbol] =  f'INSERT INTO balance_{self.symbol_id[symbol]} (time, base, quote, strategy)\n' + "VALUES (%s, %s, %s, %s)"
  170.         self.query_string['position'][symbol] =  f'INSERT INTO position_{self.symbol_id[symbol]} (time, bound, base, quote)\n' + "VALUES (%s, %s, %s, %s)"
  171.         self.query_string['status'][symbol] = f'INSERT INTO status (time, symbol_id, status, strategy)\n' + "VALUES (%s, %s, %s, %s)"
  172.  
  173.         return True
  174.  
  175. def pipe(q, filename, symbol, strat):
  176.     p = subprocess.Popen(['tail', '-F', filename], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  177.     for line in iter(p.stdout.readline, b''):
  178.         q.put((line.decode('utf-8'), symbol, strat))
  179.  
  180. def send(channels, symbol, strat, line):
  181.     if line == 'init':
  182.         message = {
  183.             'name': f"{'_'.join(symbol.split('_')[:2])}.{'_'.join(symbol.split('_')[2:])}.{strat}",
  184.             'ts': int(time.time()),
  185.             'mtype': 'init',
  186.             'msg': ''
  187.         }
  188.     else:
  189.         if len(line.split(' ')) < 3:
  190.             print(f'can not parse line: {line}')
  191.             return
  192.        
  193.         global last_orders_mtype_time
  194.  
  195.         if line.split(' ')[1] == 'orders':
  196.             if symbol not in last_orders_mtype_time:
  197.                 last_orders_mtype_time[symbol] = 0
  198.             if time.time() - last_orders_mtype_time[symbol] < 10:
  199.                 return
  200.             last_orders_mtype_time[symbol] = time.time()
  201.  
  202.         message = {
  203.             'name': f"{'_'.join(symbol.split('_')[:2])}.{'_'.join(symbol.split('_')[2:])}.{strat}",
  204.             'ts': int(line.split(' ')[0]),
  205.             'mtype': line.split(' ')[1],
  206.             'msg': eval(' '.join(line.split(' ')[2:]))
  207.         }
  208.     if 'status' in line:
  209.         print(line)
  210.     for channel in channels:
  211.         channel.basic_publish(
  212.             exchange="",
  213.             routing_key="log_queue.old_prod",
  214.             body=json.dumps(message),
  215.             properties=pika.BasicProperties(delivery_mode=2)  # Make message persistent
  216.         )
  217.  
  218. if __name__ == '__main__':
  219.     # Rabbit init
  220.     RABBITMQ_HOST = "18.178.242.45"
  221.     RABBITMQ_TERMINAL_HOST = "80.93.176.68"
  222.     connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST, port=9669))
  223.     connection_terminal = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_TERMINAL_HOST, port=5672))
  224.     channel = connection.channel()
  225.     channel_terminal = connection_terminal.channel()
  226.     # Declare a queue
  227.     channel.queue_declare(queue="log_queue.old_prod", durable=True)
  228.     channel_terminal.queue_declare(queue="log_queue.old_prod", durable=True)
  229.     channels = [channel, channel_terminal]
  230.  
  231.     old_files = set()
  232.     q = Queue()
  233.     killer = Killer()
  234.     writer = DbWriterClass()
  235.     args = {}
  236.     if len(sys.argv) > 1:
  237.         args['filename'] = sys.argv[1]
  238.     writer.start_connection(**args)
  239.     prev_time = 0
  240.     working = {}
  241.     print(writer.connection.closed)
  242.     while True:
  243.         cur_time = int(time.time())
  244.         if cur_time - prev_time > 60:
  245.             files_path = [(file, '/mnt/mm_telemetry/' + file.name) for file in Path('/mnt/mm_telemetry').iterdir() if file.is_file()]
  246.             files_path += [(file, '/mnt/volume_telemetry/' + file.name) for file in Path('/mnt/volume_telemetry').iterdir() if file.is_file()]
  247.             files_path += [(file, '/mnt/wolume_telemetry/' + file.name) for file in Path('/mnt/wolume_telemetry').iterdir() if file.is_file()]
  248.             files_path += [(file, '/mnt/tb_telemetry/' + file.name) for file in Path('/mnt/tb_telemetry').iterdir() if file.is_file()]
  249.             for raw_file, file in files_path:
  250.                 strat = file[5].upper()
  251.                 mtime = int(raw_file.stat().st_mtime * 1000)
  252.                 ctime = int(time.time() * 1000)
  253.                 if not file in old_files:
  254.                     name = file.split('/')[-1]
  255.                     symbol = name
  256.                     if not writer.init_symbol(symbol):
  257.                         continue
  258.                     send(channels, symbol, strat, 'init')
  259.                     working[file] = 0
  260.                     old_files.add(file)
  261.                     # symbol = (name.split('usdt')[0].replace('_', '').replace('-', '') + '_usdt').upper()
  262.                     # symbol += name.split('usdt')[1]
  263.                     killer.procs.append(Process(target=pipe, args=(q, file, symbol, strat)))
  264.                     killer.procs[-1].start()
  265.                 if strat == 'V':
  266.                     cd = 180
  267.                 else:
  268.                     cd = 60
  269.                 print(f'{file}: {mtime} {ctime}; {working[file]}')
  270.                 if mtime < ctime - cd * 1000:
  271.                     if working[file] != -1:
  272.                         name = file.split('/')[-1]
  273.                         symbol = name
  274.                         # symbol = (name.split('usdt')[0].replace('_', '').replace('-', '') + '_usdt').upper()
  275.                         # symbol += name.split('usdt')[1]
  276.                         working[file] = -1
  277.                         send(channels, symbol, strat, f"{ctime} status {{'status': False}}")
  278.                 elif working[file] != 1:
  279.                     name = file.split('/')[-1]
  280.                     symbol = name
  281.                     working[file] = 1
  282.                     send(channels, symbol, strat, f"{ctime + 1000} status {{'status': True}}")
  283.             prev_time = cur_time
  284.         z = q.get()
  285.         send(channels, z[1], z[2], z[0])
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement