Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import sys
- import signal
- import psycopg2
- import random
- import json
- from datetime import datetime
- import time
- from pathlib import Path
- import subprocess
- from multiprocessing import Process, Queue
- from configparser import ConfigParser
- import numpy as np
- import pika
- last_orders_mtype_time = {}
- class Killer:
- procs = []
- def __init__(self):
- signal.signal(signal.SIGINT, self.exit)
- signal.signal(signal.SIGTERM, self.exit)
- def exit(self, signum, frame):
- for p in self.procs:
- os.killpg(os.getpgid(p.pid), signum)
- class DbWriterClass:
- def __init__(self):
- self.server = None
- self.connection = None
- self.symbol_id = {}
- def load_config(self, filename, section):
- parser = ConfigParser()
- parser.read(filename)
- database = {}
- if parser.has_section(section):
- params = parser.items(section)
- for param in params:
- database[param[0]] = param[1]
- else:
- raise Exception(
- "Section {0} not found in the {1} file".format(section, filename)
- )
- return database
- def start_connection(self, filename = '/home/ubuntu/database.ini', section = 'postgresql'):
- while 1:
- try:
- self.connection = psycopg2.connect(**self.load_config(filename, section))
- break
- except Exception as e:
- print(e)
- delay = random.randint(1, 10) / 5
- time.sleep(delay)
- print("Database connected")
- self.query_string = {'orders': {}, 'book': {}, 'public_trades': {}, 'private_trades': {}, 'balance': {}, 'position': {}, 'status': {}}
- self.s_hash = {}
- def execute_select(self, query):
- with self.connection.cursor() as cursor:
- cursor.execute(query)
- return cursor.fetchall()
- def init_symbol(self, symbol):
- sym = symbol.split('_')
- market = '_'.join(sym[2:]).lower()
- sym = '_'.join(sym[:2])
- with self.connection.cursor() as cursor:
- print(sym)
- print(market)
- cursor.execute(f"SELECT s.* FROM symbols s JOIN exchanges e USING (exchange_id) WHERE symbol = '{sym}' AND exchange_name = '{market}'")
- try:
- self.symbol_id[symbol] = cursor.fetchall()[0][0]
- except Exception as e:
- print(e)
- return False
- book_table = f"""
- CREATE TABLE IF NOT EXISTS book_{self.symbol_id[symbol]} (
- id SERIAL PRIMARY KEY,
- time TIMESTAMP,
- ask_price DOUBLE PRECISION,
- bid_price DOUBLE PRECISION
- )
- """
- with self.connection.cursor() as cursor:
- cursor.execute(book_table)
- self.connection.commit()
- pub_trade_table = f"""
- CREATE TABLE IF NOT EXISTS public_trade_{self.symbol_id[symbol]} (
- id SERIAL PRIMARY KEY,
- time TIMESTAMP,
- price DOUBLE PRECISION,
- quantity DOUBLE PRECISION,
- is_buy BOOLEAN
- )
- """
- with self.connection.cursor() as cursor:
- cursor.execute(pub_trade_table)
- self.connection.commit()
- usr_trade_table = f"""
- CREATE TABLE IF NOT EXISTS user_trade_{self.symbol_id[symbol]} (
- id SERIAL PRIMARY KEY,
- time TIMESTAMP,
- price DOUBLE PRECISION,
- quantity DOUBLE PRECISION,
- is_buy BOOLEAN,
- strategy CHAR
- )
- """
- with self.connection.cursor() as cursor:
- cursor.execute(usr_trade_table)
- self.connection.commit()
- balance_table = f"""
- CREATE TABLE IF NOT EXISTS balance_{self.symbol_id[symbol]} (
- id SERIAL PRIMARY KEY,
- time TIMESTAMP,
- base DOUBLE PRECISION,
- quote DOUBLE PRECISION,
- price DOUBLE PRECISION,
- strategy CHAR
- )
- """
- with self.connection.cursor() as cursor:
- cursor.execute(balance_table)
- self.connection.commit()
- position_table = f"""
- CREATE TABLE IF NOT EXISTS position_{self.symbol_id[symbol]} (
- id SERIAL PRIMARY KEY,
- time TIMESTAMP,
- ask_bound DOUBLE PRECISION,
- bid_bound DOUBLE PRECISION
- )
- """
- with self.connection.cursor() as cursor:
- cursor.execute(position_table)
- self.connection.commit()
- active_orders_table = f"""
- CREATE TABLE IF NOT EXISTS active_orders_{self.symbol_id[symbol]} (
- id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
- time TIMESTAMP,
- price DOUBLE PRECISION,
- quantity DOUBLE PRECISION,
- is_bid BOOLEAN,
- strategy CHAR
- )
- """
- with self.connection.cursor() as cursor:
- cursor.execute(active_orders_table)
- self.connection.commit()
- print(symbol)
- self.query_string['orders'][symbol] = f'INSERT INTO active_orders_new_{self.symbol_id[symbol]} (time, price, quantity, is_bid, strategy)\n' + "VALUES (%s, %s, %s, %s, %s)"
- self.query_string['book'][symbol] = f'INSERT INTO book_{self.symbol_id[symbol]} (time, ask_price, bid_price)\n' + "VALUES (%s, %s, %s)"
- self.query_string['public_trades'][symbol] = f'INSERT INTO public_trade_{self.symbol_id[symbol]} (time, price, quantity, is_buy)\n' + "VALUES (%s, %s, %s, %s)"
- self.query_string['private_trades'][symbol] = f'INSERT INTO user_trade_{self.symbol_id[symbol]} (time, price, quantity, is_buy, strategy)\n' + "VALUES (%s, %s, %s, %s, %s)"
- self.query_string['balance'][symbol] = f'INSERT INTO balance_{self.symbol_id[symbol]} (time, base, quote, strategy)\n' + "VALUES (%s, %s, %s, %s)"
- self.query_string['position'][symbol] = f'INSERT INTO position_{self.symbol_id[symbol]} (time, bound, base, quote)\n' + "VALUES (%s, %s, %s, %s)"
- self.query_string['status'][symbol] = f'INSERT INTO status (time, symbol_id, status, strategy)\n' + "VALUES (%s, %s, %s, %s)"
- return True
- def pipe(q, filename, symbol, strat):
- p = subprocess.Popen(['tail', '-F', filename], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- for line in iter(p.stdout.readline, b''):
- q.put((line.decode('utf-8'), symbol, strat))
- def send(channels, symbol, strat, line):
- if line == 'init':
- message = {
- 'name': f"{'_'.join(symbol.split('_')[:2])}.{'_'.join(symbol.split('_')[2:])}.{strat}",
- 'ts': int(time.time()),
- 'mtype': 'init',
- 'msg': ''
- }
- else:
- if len(line.split(' ')) < 3:
- print(f'can not parse line: {line}')
- return
- global last_orders_mtype_time
- if line.split(' ')[1] == 'orders':
- if symbol not in last_orders_mtype_time:
- last_orders_mtype_time[symbol] = 0
- if time.time() - last_orders_mtype_time[symbol] < 10:
- return
- last_orders_mtype_time[symbol] = time.time()
- message = {
- 'name': f"{'_'.join(symbol.split('_')[:2])}.{'_'.join(symbol.split('_')[2:])}.{strat}",
- 'ts': int(line.split(' ')[0]),
- 'mtype': line.split(' ')[1],
- 'msg': eval(' '.join(line.split(' ')[2:]))
- }
- if 'status' in line:
- print(line)
- for channel in channels:
- channel.basic_publish(
- exchange="",
- routing_key="log_queue.old_prod",
- body=json.dumps(message),
- properties=pika.BasicProperties(delivery_mode=2) # Make message persistent
- )
- if __name__ == '__main__':
- # Rabbit init
- RABBITMQ_HOST = "18.178.242.45"
- RABBITMQ_TERMINAL_HOST = "80.93.176.68"
- connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST, port=9669))
- connection_terminal = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_TERMINAL_HOST, port=5672))
- channel = connection.channel()
- channel_terminal = connection_terminal.channel()
- # Declare a queue
- channel.queue_declare(queue="log_queue.old_prod", durable=True)
- channel_terminal.queue_declare(queue="log_queue.old_prod", durable=True)
- channels = [channel, channel_terminal]
- old_files = set()
- q = Queue()
- killer = Killer()
- writer = DbWriterClass()
- args = {}
- if len(sys.argv) > 1:
- args['filename'] = sys.argv[1]
- writer.start_connection(**args)
- prev_time = 0
- working = {}
- print(writer.connection.closed)
- while True:
- cur_time = int(time.time())
- if cur_time - prev_time > 60:
- files_path = [(file, '/mnt/mm_telemetry/' + file.name) for file in Path('/mnt/mm_telemetry').iterdir() if file.is_file()]
- files_path += [(file, '/mnt/volume_telemetry/' + file.name) for file in Path('/mnt/volume_telemetry').iterdir() if file.is_file()]
- files_path += [(file, '/mnt/wolume_telemetry/' + file.name) for file in Path('/mnt/wolume_telemetry').iterdir() if file.is_file()]
- files_path += [(file, '/mnt/tb_telemetry/' + file.name) for file in Path('/mnt/tb_telemetry').iterdir() if file.is_file()]
- for raw_file, file in files_path:
- strat = file[5].upper()
- mtime = int(raw_file.stat().st_mtime * 1000)
- ctime = int(time.time() * 1000)
- if not file in old_files:
- name = file.split('/')[-1]
- symbol = name
- if not writer.init_symbol(symbol):
- continue
- send(channels, symbol, strat, 'init')
- working[file] = 0
- old_files.add(file)
- # symbol = (name.split('usdt')[0].replace('_', '').replace('-', '') + '_usdt').upper()
- # symbol += name.split('usdt')[1]
- killer.procs.append(Process(target=pipe, args=(q, file, symbol, strat)))
- killer.procs[-1].start()
- if strat == 'V':
- cd = 180
- else:
- cd = 60
- print(f'{file}: {mtime} {ctime}; {working[file]}')
- if mtime < ctime - cd * 1000:
- if working[file] != -1:
- name = file.split('/')[-1]
- symbol = name
- # symbol = (name.split('usdt')[0].replace('_', '').replace('-', '') + '_usdt').upper()
- # symbol += name.split('usdt')[1]
- working[file] = -1
- send(channels, symbol, strat, f"{ctime} status {{'status': False}}")
- elif working[file] != 1:
- name = file.split('/')[-1]
- symbol = name
- working[file] = 1
- send(channels, symbol, strat, f"{ctime + 1000} status {{'status': True}}")
- prev_time = cur_time
- z = q.get()
- send(channels, z[1], z[2], z[0])
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement