Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import logging
- import os
- import json
- import subprocess
- import signal
- import psutil
- from datetime import datetime
- from pathlib import Path
- from aiogram.types import Message
- from aiogram.enums import ParseMode
- from aiogram import Bot, Dispatcher, types
- from aiogram.filters.command import Command
- from aiogram import F
- from aiogram.exceptions import TelegramBadRequest
- from apscheduler.schedulers.asyncio import AsyncIOScheduler
- import numpy as np
- logging.basicConfig(level=logging.INFO)
- scheduler = AsyncIOScheduler()
- bot = Bot(token="7848354217:AAFqy-o0vsCFVUwvOZCFeEwN4BQEGcAvfME")
- dp = Dispatcher()
- def parse_admins_json():
- with open('admins.json', 'r') as admins_f:
- uids = json.loads(''.join(admins_f.readlines()).replace(' ', '').replace('\n', ''))
- return uids
- available_uids = parse_admins_json()
- super_admins = {
- "7905047482": "Mansur",
- "6049031402":"Murat",
- "377206035":"Denis",
- "5885283978":"Ibragim",
- "7604822901":"Eric",
- "377264131": "Murat"
- }
- price_checker_upd_p = (0, 0)
- path_to_mm_balances_dir = '/mnt/mm_telemetry'
- path_to_vol_balances_dir = '/mnt/volume_telemetry'
- balance_state = {}
- prev_wide_spread = {}
- stopped = ([], [])
- alarm_last_time = {}
- async def verification(uid):
- if f"{uid}" in available_uids:
- return True
- else:
- return False
- @dp.message(Command("start"))
- async def start(message: types.Message):
- is_verified = await verification(message.from_user.id)
- if not is_verified:
- await message.answer("You are not verified so i don't know who are you\nPlease contact @saushkin_den if you think this is a mistake")
- return
- await message.answer(f"Welcome to EchoBot controller, {available_uids[str(message.from_user.id)]}!\n") # Call \"/help\" too see the available commands")
- @dp.message(F.text.lower().contains('/add_admin_'))
- async def add_admin(message: Message):
- add_id = message.text.split('_')[2].upper()
- name = message.text.split('_')[3].upper()
- if message.from_user.id in super_admins:
- available_uids[add_id] = name[0].upper() + name[1:].lower()
- with open('admins.json', 'w') as admins_f:
- admins_f.write(json.dumps(available_uids))
- await message.reply(f"{available_uids[add_id]} with {add_id} uid was added as admin")
- else:
- await message.reply("You haven't super admin rights to do it")
- @dp.message(F.text) # КОСТЫЛЬ!! нужны фильтры, сейчас НЕЛЬЗЯ эту функцию поднимать выше
- async def bad_message(message: Message):
- is_verified = await verification(message.from_user.id)
- if not is_verified:
- await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
- return
- await message.answer("I don't understand you")
- async def send_message_to_users(msg):
- for user_id in available_uids:
- await bot.send_message(text=msg, chat_id=user_id)
- async def broadcast_alarm_message_to_users(msg, pair, alarm_type):
- if pair in alarm_last_time and alarm_type in alarm_last_time[pair]:
- if int(time.time() * 1000) - alarm_last_time[pair][alarm_type] >= 600000:
- for user_id in available_uids:
- await bot.send_message(text=msg, chat_id=user_id)
- else:
- for user_id in available_uids:
- await bot.send_message(text=msg, chat_id=user_id)
- alarm_last_time[pair][alarm_type] = int(time.time() * 1000)
- async def mm_calculate_lob_occupancy_rate(orders_log, decoded):
- orders = json.loads(orders_log.split(' ')[2:].replace("'", '"'))
- asks_sum = [float(order['quantity']) for order in orders['ask']]
- bids_sum = [float(order['quantity']) * float(order['price']) for order in orders['bid']]
- for line in decoded:
- if 'position' in line:
- position = json.loads(line.split(' ')[2:].replace("'", '"'))
- if 'total_buy_quote' not in position or 'total_sell_base' not in position: # пока нет этого поля, чтобы исключения не летели постоянно
- break
- if 0.1 * float(position['total_buy_quote']) > bids_sum:
- await broadcast_alarm_message_to_users(
- f'MARKET MAKING ALARM: {name} buy order book occupancy rate < 10%\nCurrent bids amount: {bids_sum}\nTotal buy quote parameter: {float(position['total_buy_quote'])}',
- name, 'mm_buy_occupancy_rate'
- )
- if 0.1 * float(position['total_sell_base']) > asks_sum:
- await broadcast_alarm_message_to_users(
- f"MARKET MAKING ALARM: {name} sell order book occupancy rate < 10%\nCurrent asks quantity: {asks_sum}\nTotal sell base parameter: {float(position['total_sell_base'])}",
- name, 'mm_sell_occupancy_rate'
- )
- break
- async def balance_checker(bot: Bot):
- mm_files_path = [path_to_mm_balances_dir + '/' + file.name for file in Path(path_to_mm_balances_dir).iterdir() if file.is_file()]
- volume_files_path = [path_to_vol_balances_dir + '/' + file.name for file in Path(path_to_vol_balances_dir).iterdir() if file.is_file()]
- for balance_file in mm_files_path:
- p = subprocess.Popen(['tail', '-6', balance_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
- res, err = p.communicate()
- if not err:
- name = balance_file.split('/')[-1]
- decoded = res.decode().split('\n')
- used = set()
- for line in decoded[::-1]:
- try:
- time = str(datetime.fromtimestamp(int(int(res.decode().split(' ')[0]) / 1000)))
- except:
- print(f'error parse {name} ')
- continue
- splitted = line.split(' ')
- if len(splitted) < 3:
- continue
- if 'stop' in line:
- if name in stopped[0]:
- continue
- stopped[0].append(name)
- await send_message_to_users(f'ALARM: {name} bot was stopped at {time}')
- continue
- if name in stopped[0]:
- stopped[0].remove(name)
- await send_message_to_users(f'ALARM: {name} bot was restarted')
- continue
- msg_type = splitted[1]
- if msg_type in used:
- continue
- used.add(msg_type)
- print(''.join(splitted[2:]).replace("'", '"'))
- info_dict = eval(''.join(splitted[2:]).replace("'", '"'))
- if msg_type == 'book':
- if float(info_dict['bid_price']) == 0:
- pass
- # await send_message_to_users(f'ALARM: {name} bid price = 0')
- else:
- spread_size = (float(info_dict['ask_price']) - float(info_dict['bid_price'])) / float(info_dict['bid_price'])
- ask = float(info_dict['ask_price'])
- bid = float(info_dict['bid_price'])
- if spread_size > 0.02:
- await broadcast_alarm_message_to_users(
- f'MARKET MAKING ALARM: {name} spread > 2%\nspread: {round(spread_size * 100, 2)}%\nask: {ask}\nbid: {bid}',
- name, 'mm_wide_spread'
- )
- elif spread_size < 0.0005:
- await broadcast_alarm_message_to_users(
- f'MARKET MAKING ALARM: {name} spread < 0.05%\nspread: {round(spread_size * 100, 2)}%\nask: {ask}\nbid: {bid}',
- name, 'mm_narrow_spread'
- )
- elif msg_type == 'balance':
- base, quote = float(info_dict['base']), float(info_dict['quote'])
- new_balances = {"time": time, "base": base, 'quote': quote}
- zero_base, zero_quote = await extract_zero_position(decoded[::-1]) # TODO: обработать только что созданные файлы
- if base < 0.1 * zero_base or quote < 0.1 * zero_quote:
- await broadcast_alarm_message_to_users(f'MARKET MAKING ALARM: {name} balance lower than critical', name, 'mm_low_balance')
- continue
- if name not in balance_state:
- balance_state[name] = new_balances
- continue
- free_delta = abs(new_balances['base'] - balance_state[name]['base']) / balance_state[name]['base']
- locked_delta = abs(new_balances['quote'] - balance_state[name]['quote']) / balance_state[name]['quote']
- if free_delta > 0.1 or locked_delta > 0.1:
- prev_bal = json.dumps(balance_state[name])
- new_bal = json.dumps(new_balances)
- balance_state[name] = new_balances # update
- # await send_message_to_users(f'ALARM: {name} balance > 10% change:\n new: {new_bal}\n previous: {prev_bal}')
- elif msg_type == 'orders':
- await mm_calculate_lob_occupancy_rate(line, decoded[::-1])
- else:
- print(err.decode())
- # for balance_file in volume_files_path:
- # p = subprocess.Popen(['tail', '-1', balance_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
- # res, err = p.communicate()
- # if not err:
- # name = balance_file.split('/')[-1]
- # try:
- # time = str(datetime.fromtimestamp(int(int(res.decode().split(' ')[0]) / 1000)))
- # except:
- # print(f'error parse {name} ')
- # continue
- # if 'stop' in res.decode():
- # if name in stopped[1]:
- # continue
- # stopped[1].append(name)
- # await send_message_to_users(f'VOLUME ALARM: {name} bot was stopped at {time}')
- # continue
- # if name in stopped[1]:
- # stopped[1].remove(name)
- # await send_message_to_users(f'VOLUME ALARM: {name} bot was restarted')
- # msg_type = res.decode().split(' ')[1]
- # info_dict = eval(''.join(res.decode().split(' ')[2:]).replace("'", '"'))
- # if msg_type == 'balance':
- # base, quote = float(info_dict['base']), float(info_dict['quote'])
- # new_balances = {"time": time, "base": base, 'quote': quote}
- # if base < 10 and quote < 10: # в теории норм обработать надо
- # await send_message_to_users(f'VOLUME ALARM: {name} quote or base balance = 0')
- # continue
- # if name not in balance_state:
- # balance_state[name] = new_balances
- # continue
- # free_delta = abs(new_balances['base'] - balance_state[name]['base']) / max(30, balance_state[name]['base'])
- # locked_delta = abs(new_balances['quote'] - balance_state[name]['quote']) / max(30, balance_state[name]['quote'])
- # if free_delta > 0.1 or locked_delta > 0.1:
- # prev_bal = json.dumps(balance_state[name])
- # new_bal = json.dumps(new_balances)
- # balance_state[name] = new_balances # update
- # # await send_message_to_users(f'VOLUME ALARM: {name} balance > 10% change:\n previous: {prev_bal}\n new: {new_bal}')
- # else:
- # print(err.decode())
- print(balance_state)
- async def volume_health_trades_checker(bot: Bot):
- volume_files_path = [path_to_vol_balances_dir + '/' + file.name for file in Path(path_to_vol_balances_dir).iterdir() if file.is_file()]
- bound_time = int(time.time()) - 900000 # -15 minutes
- for balance_file in volume_files_path:
- p = subprocess.Popen(['tail', '-5000', balance_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
- res, err = p.communicate()
- if not err:
- name = balance_file.split('/')[-1]
- decoded = res.decode().split('\n')
- if 'stop' in decoded[-1]:
- if name not in stopped[1]:
- stopped[1].append(name)
- await send_message_to_users(f'VOLUME ALARM: {name} bot was stopped at {time}')
- continue
- for line in decoded[::-1]:
- if name in stopped[1]:
- stopped[1].remove(name)
- await send_message_to_users(f'VOLUME ALARM: {name} bot was restarted')
- continue
- if int(line.split(' ')[0]) < bound_time:
- await broadcast_alarm_message_to_users(f'VOLUME ALARM: {name} no private trades within 15 minutes', name, 'vol_no_trades')
- break
- if 'private_trades' in line:
- if line.split(' ')[2:] != '\[\]':
- break
- async def extract_zero_position(decoded):
- for line in decoded:
- if 'position' in line:
- position = json.loads(line.split(' ')[2:].replace("'", '"'))
- return (float(position['zero_base']), float(position['zero_quote']))
- async def price_checker(bot: Bot):
- mm_files_path = [path_to_mm_balances_dir + '/' + file.name for file in Path(path_to_mm_balances_dir).iterdir() if file.is_file()]
- for balance_file in mm_files_path:
- p = subprocess.Popen(['tail', '-100', path_to_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
- res, err = p.communicate()
- if not err:
- name = balance_file.split('/')[-1]
- decoded = res.decode().split('\n')
- for line in decoded[::-1]:
- if 'stop' in line:
- break
- if 'book' in line:
- book = json.loads(line.split(' ')[2:].replace("'", '"'))
- mid_price = (float(book['ask_price']) + float(book['bid_price'])) / 2
- if price_checker_upd_p[1] != 0:
- if mid_price > 1.05 * price_checker_upd_p[1] or mid_price < 0.95 * price_checker_upd_p[1]:
- prev_time = str(datetime.fromtimestamp(price_checker_upd_p[0]))
- cur_time = str(datetime.fromtimestamp(int(time.time())))
- await send_message_to_users(
- f'MARKET MAKING ALARM: {name} 15 minutes mid price delta > 5%\n{prev_time}\nmid price: {price_checker_upd_p[1]}\n{cur_time}\nmid price: {mid_price}'
- )
- price_checker_upd_p[0] = int(time.time())
- price_checker_upd_p[1] = mid_price
- if price_checker_upd_p[0] == 0 or int(time.time()) - price_checker_upd_p[0] >= 900: # 15 min
- price_checker_upd_p[0] = int(time.time())
- price_checker_upd_p[1] = mid_price
- break
- async def main():
- scheduler = AsyncIOScheduler()
- scheduler.add_job(balance_checker, "interval", seconds=45, args=[bot])
- scheduler.add_job(price_checker, "interval", seconds=45, args=[bot])
- scheduler.start()
- await dp.start_polling(bot)
- if __name__ == "__main__":
- asyncio.run(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement