Advertisement
den4ik2003

Untitled

Dec 6th, 2024
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 13.23 KB | None | 0 0
  1. import os
  2. import signal
  3. import psycopg2
  4. import random
  5. from datetime import datetime
  6. import time
  7. from pathlib import Path
  8. import subprocess
  9. from multiprocessing import Process, Queue
  10.  
  11. class Killer:
  12.   procs = []
  13.  
  14.   def __init__(self):
  15.     signal.signal(signal.SIGINT, self.exit)
  16.     signal.signal(signal.SIGTERM, self.exit)
  17.  
  18.   def exit(self, signum, frame):
  19.     for p in self.procs:
  20.         os.killpg(os.getpgid(p.pid), signum)
  21.  
  22. class DbWriterClass:
  23.     def __init__(self):
  24.         self.server = None
  25.         self.connection = None
  26.         self.symbol_id = {}
  27.  
  28.     def start_connection(self):
  29.         while 1:
  30.             try:
  31.                 self.connection = psycopg2.connect("dbname='template1' user='postgres' host='18.178.242.45' password='hamdAp-xazvyp-1bysvi'")
  32.                 break
  33.             except Exception as e:
  34.                 delay = random.randint(1, 10) / 5
  35.                 time.sleep(delay)
  36.  
  37.         print("Database connected")
  38.  
  39.         self.query_string = {'orders': {}, 'book': {}, 'public_trades': {}, 'private_trades': {}, 'balance': {}, 'status': {}}
  40.         self.s_hash = {}
  41.  
  42.     def init_symbol(self, symbol):
  43.         sym = symbol.split('_')
  44.         market = '_'.join(sym[2:]).lower()
  45.         sym = '_'.join(sym[:2])
  46.         with self.connection.cursor() as cursor:
  47.             print(sym)
  48.             print(market)
  49.             cursor.execute(f"SELECT s.* FROM symbols s JOIN exchanges e ON s.exchange_id = e.id WHERE s.symbol = '{sym}' AND e.name = '{market}'")
  50.             try:
  51.                 self.symbol_id[symbol] = cursor.fetchall()[0][0]
  52.             except Exception as e:
  53.                 print(e)
  54.                 return False
  55.  
  56.         # # i - just serial identifier
  57.         # # t - time when added to the database
  58.         # # id - exchange order_id, casted to int
  59.         # # u - update time, milliseconds from epoch
  60.         # # ot - original order type as in excel table ('l' - limit, 'm' - market, 't' - take profit, 's' - stop loss)
  61.         # # ct - current order type
  62.         # # s - order side ('b' - buy, 's' - sell)
  63.         # # st - order status as in excel table ('n' - new, 'p' - partially filled, 't' - triggered, 'c' - canceled, 'f' - filled, 'r' - rejected, 'e' - expired)
  64.         # # p - order price
  65.         # # q - order quantity
  66.         # # f - total filled quantity
  67.         # # fq - last fill quantity
  68.         # # fp - last fill price
  69.         # # historical_orders_table = f"""
  70.         # #     CREATE TABLE IF NOT EXISTS HISTORICAL_ORDERS_{symbol} (
  71.         # #         i SERIAL PRIMARY KEY,
  72.         # #         t TIMESTAMP,
  73.         # #         id BIGINT,
  74.         # #         u BIGINT,
  75.         # #         ot CHAR,
  76.         # #         ct CHAR,
  77.         # #         s CHAR,
  78.         # #         st CHAR,
  79.         # #         p DOUBLE PRECISION,
  80.         # #         q DOUBLE PRECISION,
  81.         # #         f DOUBLE PRECISION,
  82.         # #         fq DOUBLE PRECISION,
  83.         # #         fp DOUBLE PRECISION
  84.         # #     )
  85.         # # """
  86.         # # with self.connection.cursor() as cursor:
  87.         # #    cursor.execute(historical_orders_table)
  88.         # # self.connection.commit()
  89.  
  90.         # # i - plain serial identifier
  91.         # # t - timestamp when added to the table
  92.         # # a - best ask price
  93.         # # b - best bid price
  94.         book_table = f"""
  95.            CREATE TABLE IF NOT EXISTS book_{self.symbol_id[symbol]} (
  96.                id SERIAL PRIMARY KEY,
  97.                time TIMESTAMP,
  98.                ask_price DOUBLE PRECISION,
  99.                bid_price DOUBLE PRECISION
  100.            )
  101.        """
  102.         with self.connection.cursor() as cursor:
  103.             cursor.execute(book_table)
  104.         self.connection.commit()
  105.  
  106.         # # i - plain serial identifier
  107.         # # t - timestamp when added to the table
  108.         # # p - trade price
  109.         # # q - trade quantity
  110.         # # b - buying trade identifier (true - buy, false - sell)
  111.         pub_trade_table = f"""
  112.            CREATE TABLE IF NOT EXISTS public_trade_{self.symbol_id[symbol]} (
  113.                id SERIAL PRIMARY KEY,
  114.                time TIMESTAMP,
  115.                price DOUBLE PRECISION,
  116.                quantity DOUBLE PRECISION,
  117.                is_buy BOOLEAN
  118.            )
  119.        """
  120.         with self.connection.cursor() as cursor:
  121.             cursor.execute(pub_trade_table)
  122.         self.connection.commit()
  123.  
  124.         # # i - plain serial identifier
  125.         # # t - timestamp when added to the table
  126.         # # p - trade price
  127.         # # q - trade quantity
  128.         # # b - buying trade identifier (true - buy, false - sell)
  129.         usr_trade_table = f"""
  130.            CREATE TABLE IF NOT EXISTS user_trade_{self.symbol_id[symbol]} (
  131.                id SERIAL PRIMARY KEY,
  132.                time TIMESTAMP,
  133.                price DOUBLE PRECISION,
  134.                quantity DOUBLE PRECISION,
  135.                is_buy BOOLEAN,
  136.                strategy CHAR
  137.            )
  138.        """
  139.         with self.connection.cursor() as cursor:
  140.             cursor.execute(usr_trade_table)
  141.         self.connection.commit()
  142.    
  143.         # # i - plain serial identifier
  144.         # # t - timestamp when added to the table
  145.         # # bf - base asset, free
  146.         # # bl - base asset, locked
  147.         # # qf - quote asset, free
  148.         # # ql - quote asset, locked
  149.         # # p - current price
  150.         balance_table = f"""
  151.            CREATE TABLE IF NOT EXISTS balance_{self.symbol_id[symbol]} (
  152.                id SERIAL PRIMARY KEY,
  153.                time TIMESTAMP,
  154.                base DOUBLE PRECISION,
  155.                quote DOUBLE PRECISION,
  156.                price DOUBLE PRECISION,
  157.                strategy CHAR
  158.            )
  159.        """
  160.         with self.connection.cursor() as cursor:
  161.             cursor.execute(balance_table)
  162.         self.connection.commit()
  163.         # i - exchange order_id, casted to int
  164.         # t - update time
  165.         # p - order price
  166.         # q - order quantity
  167.         # b - buy order indicator, boolean flag (buy if true, sell if false)
  168.         active_orders_table = f"""
  169.            CREATE TABLE IF NOT EXISTS active_orders_{self.symbol_id[symbol]} (
  170.                id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
  171.                time TIMESTAMP,
  172.                price DOUBLE PRECISION,
  173.                quantity DOUBLE PRECISION,
  174.                is_bid BOOLEAN,
  175.                strategy CHAR
  176.            )
  177.        """
  178.         with self.connection.cursor() as cursor:
  179.             cursor.execute(active_orders_table)
  180.         self.connection.commit()
  181.         print(symbol)
  182.         self.query_string['orders'][symbol] = f'INSERT INTO active_orders_new_{self.symbol_id[symbol]} (time, price, quantity, is_bid, strategy)\n' + "VALUES (%s, %s, %s, %s, %s)"
  183.         # self.query_string['historical_orders'][symbol] = f'INSERT INTO HISTORICAL_ORDERS_{symbol} (i, t, id, u, ot, ct, s, st, p, q, f, fq, fp)' + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
  184.         self.query_string['book'][symbol] = f'INSERT INTO book_{self.symbol_id[symbol]} (time, ask_price, bid_price)\n' + "VALUES (%s, %s, %s)"
  185.         self.query_string['public_trades'][symbol] = f'INSERT INTO public_trade_{self.symbol_id[symbol]} (time, price, quantity, is_buy)\n' + "VALUES (%s, %s, %s, %s)"
  186.         self.query_string['private_trades'][symbol] = f'INSERT INTO user_trade_{self.symbol_id[symbol]} (time, price, quantity, is_buy, strategy)\n' + "VALUES (%s, %s, %s, %s, %s)"
  187.         self.query_string['balance'][symbol] =  f'INSERT INTO balance_{self.symbol_id[symbol]} (time, base, quote, strategy)\n' + "VALUES (%s, %s, %s, %s)"
  188.         self.query_string['status'][symbol] = f'INSERT INTO status (time, symbol_id, status, strategy)\n' + "VALUES (%s, %s, %s, %s)"
  189.  
  190.         return True
  191.  
  192.         # mod = 10 ** 9 + 7
  193.         # self.s_hash[symbol] = 0
  194.         # for i in range(len(symbol)):
  195.         #     self.s_hash[symbol] = (self.s_hash[symbol] * 257 + ord(symbol[i])) % mod
  196.         # self.s_hash[symbol] *= 3 * (10 ** 9)
  197.  
  198.     def process(self, symbol, line, strat):
  199.         try:
  200.             line = line.split()
  201.             ts = datetime.fromtimestamp(int(line[0]) / 1000.0).strftime('%Y-%m-%d %H:%M:%S')
  202.             mtype = line[1]
  203.             obj = eval(' '.join(line[2:]))
  204.             args = []
  205.             if mtype == 'orders':
  206.                 if strat == 'V':
  207.                     return
  208.                 with self.connection.cursor() as cursor:
  209.                     cursor.execute(f"""
  210.                        DROP TABLE IF EXISTS active_orders_new_{self.symbol_id[symbol]};
  211.                        CREATE TABLE active_orders_new_{self.symbol_id[symbol]} (
  212.                            id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
  213.                            time TIMESTAMP,
  214.                            price DOUBLE PRECISION,
  215.                            quantity DOUBLE PRECISION,
  216.                            is_bid BOOLEAN,
  217.                            strategy CHAR
  218.                        );
  219.                    """)
  220.                 for price in obj['ask'].keys():
  221.                     args.append((ts, price, obj['ask'][price]['quantity'], False, strat))
  222.                 for price in obj['bid'].keys():
  223.                     args.append((ts, price, obj['bid'][price]['quantity'], True, strat))
  224.             elif mtype == 'book':
  225.                 if strat == 'V':
  226.                     return
  227.                 args.append((ts, obj['ask_price'], obj['bid_price']))
  228.             elif mtype == 'public_trades':
  229.                 if strat == 'V':
  230.                     return
  231.                 for trade in obj:
  232.                     args.append((ts, trade['price'], trade['quantity'], trade['side'] == 'buy'))
  233.             elif mtype == 'private_trades':
  234.                 for trade in obj:
  235.                     args.append((ts, trade['price'], trade['quantity'], trade['side'] == 'buy', strat))
  236.             elif mtype == 'balance':
  237.                 args.append((ts, obj['base'], obj['quote'], strat))
  238.             elif mtype == 'status':
  239.                 args.append((ts, self.symbol_id[symbol], obj['status'], strat))
  240.             for arg in args:
  241.                 with self.connection.cursor() as cursor:
  242.                     cursor.execute(self.query_string[mtype][symbol], arg)
  243.             if mtype == 'orders':
  244.                 with self.connection.cursor() as cursor:
  245.                     cursor.execute(f"""
  246.                        ALTER TABLE "active_orders_{self.symbol_id[symbol]}" RENAME TO "active_orders_old_{self.symbol_id[symbol]}";
  247.                        ALTER TABLE "active_orders_new_{self.symbol_id[symbol]}" RENAME TO "active_orders_{self.symbol_id[symbol]}";
  248.                        DROP TABLE "active_orders_old_{self.symbol_id[symbol]}";
  249.                    """)
  250.                        
  251.             self.connection.commit()
  252.         except Exception as e:
  253.             print(e)
  254.  
  255. def pipe(q, filename, symbol, strat):
  256.     p = subprocess.Popen(['tail', '-F', filename], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  257.     for line in iter(p.stdout.readline, b''):
  258.         q.put((line.decode('utf-8'), symbol, strat))
  259.  
  260. if __name__ == '__main__':
  261.     old_files = set()
  262.     q = Queue()
  263.     killer = Killer()
  264.     writer = DbWriterClass()
  265.     writer.start_connection()
  266.     prev_time = 0
  267.     working = {}
  268.     while True:
  269.         cur_time = int(time.time())
  270.         if cur_time - prev_time > 60:
  271.             files_path = [(file, '/mnt/mm_telemetry/' + file.name) for file in Path('/mnt/mm_telemetry').iterdir() if file.is_file()]
  272.             files_path += [(file, '/mnt/volume_telemetry/' + file.name) for file in Path('/mnt/volume_telemetry').iterdir() if file.is_file()]
  273.             for raw_file, file in files_path:
  274.                 strat = file[5].upper()
  275.                 mtime = int(raw_file.stat().st_mtime * 1000)
  276.                 ctime = int(time.time() * 1000)
  277.                 if not file in old_files:
  278.                     name = file.split('/')[-1]
  279.                     symbol = name
  280.                     if not writer.init_symbol(symbol):
  281.                         continue
  282.                     working[file] = 0
  283.                     old_files.add(file)
  284.                     # symbol = (name.split('usdt')[0].replace('_', '').replace('-', '') + '_usdt').upper()
  285.                     # symbol += name.split('usdt')[1]
  286.                     killer.procs.append(Process(target=pipe, args=(q, file, symbol, strat)))
  287.                     killer.procs[-1].start()
  288.                 if strat == 'V':
  289.                     cd = 180
  290.                 else:
  291.                     cd = 60
  292.                 print(f'{file}: {mtime} {ctime}')
  293.                 if mtime < ctime - cd * 1000 and working[file] != -1:
  294.                     name = file.split('/')[-1]
  295.                     symbol = name
  296.                     # symbol = (name.split('usdt')[0].replace('_', '').replace('-', '') + '_usdt').upper()
  297.                     # symbol += name.split('usdt')[1]
  298.                     working[file] = -1
  299.                     writer.process(symbol, f"{ctime} status {{'status': False}}", strat)
  300.                 elif working[file] != 1:
  301.                     working[file] = 1
  302.                     writer.process(symbol, f"{ctime + 1000} status {{'status': True}}", strat)
  303.             prev_time = cur_time
  304.         z = q.get()
  305.         writer.process(z[1], z[0], z[2])
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement