Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import signal
- import psycopg2
- import random
- from datetime import datetime
- import time
- from pathlib import Path
- import subprocess
- from multiprocessing import Process, Queue
- class Killer:
- procs = []
- def __init__(self):
- signal.signal(signal.SIGINT, self.exit)
- signal.signal(signal.SIGTERM, self.exit)
- def exit(self, signum, frame):
- for p in self.procs:
- os.killpg(os.getpgid(p.pid), signum)
- class DbWriterClass:
- def __init__(self):
- self.server = None
- self.connection = None
- self.symbol_id = {}
- def start_connection(self):
- while 1:
- try:
- self.connection = psycopg2.connect("dbname='template1' user='postgres' host='18.178.242.45' password='hamdAp-xazvyp-1bysvi'")
- break
- except Exception as e:
- delay = random.randint(1, 10) / 5
- time.sleep(delay)
- print("Database connected")
- self.query_string = {'orders': {}, 'book': {}, 'public_trades': {}, 'private_trades': {}, 'balance': {}, 'status': {}}
- self.s_hash = {}
- def init_symbol(self, symbol):
- sym = symbol.split('_')
- market = '_'.join(sym[2:]).lower()
- sym = '_'.join(sym[:2])
- with self.connection.cursor() as cursor:
- print(sym)
- print(market)
- cursor.execute(f"SELECT s.* FROM symbols s JOIN exchanges e ON s.exchange_id = e.id WHERE s.symbol = '{sym}' AND e.name = '{market}'")
- try:
- self.symbol_id[symbol] = cursor.fetchall()[0][0]
- except Exception as e:
- print(e)
- return False
- # # i - just serial identifier
- # # t - time when added to the database
- # # id - exchange order_id, casted to int
- # # u - update time, milliseconds from epoch
- # # ot - original order type as in excel table ('l' - limit, 'm' - market, 't' - take profit, 's' - stop loss)
- # # ct - current order type
- # # s - order side ('b' - buy, 's' - sell)
- # # st - order status as in excel table ('n' - new, 'p' - partially filled, 't' - triggered, 'c' - canceled, 'f' - filled, 'r' - rejected, 'e' - expired)
- # # p - order price
- # # q - order quantity
- # # f - total filled quantity
- # # fq - last fill quantity
- # # fp - last fill price
- # # historical_orders_table = f"""
- # # CREATE TABLE IF NOT EXISTS HISTORICAL_ORDERS_{symbol} (
- # # i SERIAL PRIMARY KEY,
- # # t TIMESTAMP,
- # # id BIGINT,
- # # u BIGINT,
- # # ot CHAR,
- # # ct CHAR,
- # # s CHAR,
- # # st CHAR,
- # # p DOUBLE PRECISION,
- # # q DOUBLE PRECISION,
- # # f DOUBLE PRECISION,
- # # fq DOUBLE PRECISION,
- # # fp DOUBLE PRECISION
- # # )
- # # """
- # # with self.connection.cursor() as cursor:
- # # cursor.execute(historical_orders_table)
- # # self.connection.commit()
- # # i - plain serial identifier
- # # t - timestamp when added to the table
- # # a - best ask price
- # # b - best bid price
- book_table = f"""
- CREATE TABLE IF NOT EXISTS book_{self.symbol_id[symbol]} (
- id SERIAL PRIMARY KEY,
- time TIMESTAMP,
- ask_price DOUBLE PRECISION,
- bid_price DOUBLE PRECISION
- )
- """
- with self.connection.cursor() as cursor:
- cursor.execute(book_table)
- self.connection.commit()
- # # i - plain serial identifier
- # # t - timestamp when added to the table
- # # p - trade price
- # # q - trade quantity
- # # b - buying trade identifier (true - buy, false - sell)
- pub_trade_table = f"""
- CREATE TABLE IF NOT EXISTS public_trade_{self.symbol_id[symbol]} (
- id SERIAL PRIMARY KEY,
- time TIMESTAMP,
- price DOUBLE PRECISION,
- quantity DOUBLE PRECISION,
- is_buy BOOLEAN
- )
- """
- with self.connection.cursor() as cursor:
- cursor.execute(pub_trade_table)
- self.connection.commit()
- # # i - plain serial identifier
- # # t - timestamp when added to the table
- # # p - trade price
- # # q - trade quantity
- # # b - buying trade identifier (true - buy, false - sell)
- usr_trade_table = f"""
- CREATE TABLE IF NOT EXISTS user_trade_{self.symbol_id[symbol]} (
- id SERIAL PRIMARY KEY,
- time TIMESTAMP,
- price DOUBLE PRECISION,
- quantity DOUBLE PRECISION,
- is_buy BOOLEAN,
- strategy CHAR
- )
- """
- with self.connection.cursor() as cursor:
- cursor.execute(usr_trade_table)
- self.connection.commit()
- # # i - plain serial identifier
- # # t - timestamp when added to the table
- # # bf - base asset, free
- # # bl - base asset, locked
- # # qf - quote asset, free
- # # ql - quote asset, locked
- # # p - current price
- balance_table = f"""
- CREATE TABLE IF NOT EXISTS balance_{self.symbol_id[symbol]} (
- id SERIAL PRIMARY KEY,
- time TIMESTAMP,
- base DOUBLE PRECISION,
- quote DOUBLE PRECISION,
- price DOUBLE PRECISION,
- strategy CHAR
- )
- """
- with self.connection.cursor() as cursor:
- cursor.execute(balance_table)
- self.connection.commit()
- # i - exchange order_id, casted to int
- # t - update time
- # p - order price
- # q - order quantity
- # b - buy order indicator, boolean flag (buy if true, sell if false)
- active_orders_table = f"""
- CREATE TABLE IF NOT EXISTS active_orders_{self.symbol_id[symbol]} (
- id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
- time TIMESTAMP,
- price DOUBLE PRECISION,
- quantity DOUBLE PRECISION,
- is_bid BOOLEAN,
- strategy CHAR
- )
- """
- with self.connection.cursor() as cursor:
- cursor.execute(active_orders_table)
- self.connection.commit()
- print(symbol)
- self.query_string['orders'][symbol] = f'INSERT INTO active_orders_new_{self.symbol_id[symbol]} (time, price, quantity, is_bid, strategy)\n' + "VALUES (%s, %s, %s, %s, %s)"
- # self.query_string['historical_orders'][symbol] = f'INSERT INTO HISTORICAL_ORDERS_{symbol} (i, t, id, u, ot, ct, s, st, p, q, f, fq, fp)' + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
- self.query_string['book'][symbol] = f'INSERT INTO book_{self.symbol_id[symbol]} (time, ask_price, bid_price)\n' + "VALUES (%s, %s, %s)"
- self.query_string['public_trades'][symbol] = f'INSERT INTO public_trade_{self.symbol_id[symbol]} (time, price, quantity, is_buy)\n' + "VALUES (%s, %s, %s, %s)"
- self.query_string['private_trades'][symbol] = f'INSERT INTO user_trade_{self.symbol_id[symbol]} (time, price, quantity, is_buy, strategy)\n' + "VALUES (%s, %s, %s, %s, %s)"
- self.query_string['balance'][symbol] = f'INSERT INTO balance_{self.symbol_id[symbol]} (time, base, quote, strategy)\n' + "VALUES (%s, %s, %s, %s)"
- self.query_string['status'][symbol] = f'INSERT INTO status (time, symbol_id, status, strategy)\n' + "VALUES (%s, %s, %s, %s)"
- return True
- # mod = 10 ** 9 + 7
- # self.s_hash[symbol] = 0
- # for i in range(len(symbol)):
- # self.s_hash[symbol] = (self.s_hash[symbol] * 257 + ord(symbol[i])) % mod
- # self.s_hash[symbol] *= 3 * (10 ** 9)
- def process(self, symbol, line, strat):
- try:
- line = line.split()
- ts = datetime.fromtimestamp(int(line[0]) / 1000.0).strftime('%Y-%m-%d %H:%M:%S')
- mtype = line[1]
- obj = eval(' '.join(line[2:]))
- args = []
- if mtype == 'orders':
- if strat == 'V':
- return
- with self.connection.cursor() as cursor:
- cursor.execute(f"""
- DROP TABLE IF EXISTS active_orders_new_{self.symbol_id[symbol]};
- CREATE TABLE active_orders_new_{self.symbol_id[symbol]} (
- id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
- time TIMESTAMP,
- price DOUBLE PRECISION,
- quantity DOUBLE PRECISION,
- is_bid BOOLEAN,
- strategy CHAR
- );
- """)
- for price in obj['ask'].keys():
- args.append((ts, price, obj['ask'][price]['quantity'], False, strat))
- for price in obj['bid'].keys():
- args.append((ts, price, obj['bid'][price]['quantity'], True, strat))
- elif mtype == 'book':
- if strat == 'V':
- return
- args.append((ts, obj['ask_price'], obj['bid_price']))
- elif mtype == 'public_trades':
- if strat == 'V':
- return
- for trade in obj:
- args.append((ts, trade['price'], trade['quantity'], trade['side'] == 'buy'))
- elif mtype == 'private_trades':
- for trade in obj:
- args.append((ts, trade['price'], trade['quantity'], trade['side'] == 'buy', strat))
- elif mtype == 'balance':
- args.append((ts, obj['base'], obj['quote'], strat))
- elif mtype == 'status':
- args.append((ts, self.symbol_id[symbol], obj['status'], strat))
- for arg in args:
- with self.connection.cursor() as cursor:
- cursor.execute(self.query_string[mtype][symbol], arg)
- if mtype == 'orders':
- with self.connection.cursor() as cursor:
- cursor.execute(f"""
- ALTER TABLE "active_orders_{self.symbol_id[symbol]}" RENAME TO "active_orders_old_{self.symbol_id[symbol]}";
- ALTER TABLE "active_orders_new_{self.symbol_id[symbol]}" RENAME TO "active_orders_{self.symbol_id[symbol]}";
- DROP TABLE "active_orders_old_{self.symbol_id[symbol]}";
- """)
- self.connection.commit()
- except Exception as e:
- print(e)
- def pipe(q, filename, symbol, strat):
- p = subprocess.Popen(['tail', '-F', filename], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- for line in iter(p.stdout.readline, b''):
- q.put((line.decode('utf-8'), symbol, strat))
- if __name__ == '__main__':
- old_files = set()
- q = Queue()
- killer = Killer()
- writer = DbWriterClass()
- writer.start_connection()
- prev_time = 0
- working = {}
- while True:
- cur_time = int(time.time())
- if cur_time - prev_time > 60:
- files_path = [(file, '/mnt/mm_telemetry/' + file.name) for file in Path('/mnt/mm_telemetry').iterdir() if file.is_file()]
- files_path += [(file, '/mnt/volume_telemetry/' + file.name) for file in Path('/mnt/volume_telemetry').iterdir() if file.is_file()]
- for raw_file, file in files_path:
- strat = file[5].upper()
- mtime = int(raw_file.stat().st_mtime * 1000)
- ctime = int(time.time() * 1000)
- if not file in old_files:
- name = file.split('/')[-1]
- symbol = name
- if not writer.init_symbol(symbol):
- continue
- working[file] = 0
- old_files.add(file)
- # symbol = (name.split('usdt')[0].replace('_', '').replace('-', '') + '_usdt').upper()
- # symbol += name.split('usdt')[1]
- killer.procs.append(Process(target=pipe, args=(q, file, symbol, strat)))
- killer.procs[-1].start()
- if strat == 'V':
- cd = 180
- else:
- cd = 60
- print(f'{file}: {mtime} {ctime}')
- if mtime < ctime - cd * 1000 and working[file] != -1:
- name = file.split('/')[-1]
- symbol = name
- # symbol = (name.split('usdt')[0].replace('_', '').replace('-', '') + '_usdt').upper()
- # symbol += name.split('usdt')[1]
- working[file] = -1
- writer.process(symbol, f"{ctime} status {{'status': False}}", strat)
- elif working[file] != 1:
- working[file] = 1
- writer.process(symbol, f"{ctime + 1000} status {{'status': True}}", strat)
- prev_time = cur_time
- z = q.get()
- writer.process(z[1], z[0], z[2])
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement