Advertisement
den4ik2003

Untitled

Feb 12th, 2025
123
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 15.68 KB | None | 0 0
  1. import asyncio
  2. import logging
  3. import os
  4. import json
  5. import subprocess
  6. import signal
  7. import psutil
  8. from datetime import datetime
  9. from pathlib import Path
  10. from aiogram.types import Message
  11. from aiogram.enums import ParseMode
  12. from aiogram import Bot, Dispatcher, types
  13. from aiogram.filters.command import Command
  14. from aiogram import F
  15. from aiogram.exceptions import TelegramBadRequest
  16. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  17. import numpy as np
  18.  
  19. logging.basicConfig(level=logging.INFO)
  20.  
  21. scheduler = AsyncIOScheduler()
  22. bot = Bot(token="7848354217:AAFqy-o0vsCFVUwvOZCFeEwN4BQEGcAvfME")
  23. dp = Dispatcher()
  24.  
  25. def parse_admins_json():
  26.     with open('admins.json', 'r') as admins_f:
  27.         uids = json.loads(''.join(admins_f.readlines()).replace(' ', '').replace('\n', ''))
  28.         return uids
  29.  
  30. available_uids = parse_admins_json()
  31.  
  32. super_admins = {
  33.     "7905047482": "Mansur",
  34.     "6049031402":"Murat",
  35.     "377206035":"Denis",
  36.     "5885283978":"Ibragim",
  37.     "7604822901":"Eric",
  38.     "377264131": "Murat"
  39. }
  40.  
  41. price_checker_upd_p = (0, 0)
  42.  
  43. path_to_mm_balances_dir = '/mnt/mm_telemetry'
  44. path_to_vol_balances_dir = '/mnt/volume_telemetry'
  45. balance_state = {}
  46. prev_wide_spread = {}
  47. stopped = ([], [])
  48.  
  49. alarm_last_time = {}
  50.  
  51. async def verification(uid):
  52.     if f"{uid}" in available_uids:
  53.         return True
  54.     else:
  55.         return False
  56.  
  57. @dp.message(Command("start"))
  58. async def start(message: types.Message):
  59.     is_verified = await verification(message.from_user.id)
  60.     if not is_verified:
  61.         await message.answer("You are not verified so i don't know who are you\nPlease contact @saushkin_den if you think this is a mistake")
  62.         return
  63.  
  64.     await message.answer(f"Welcome to EchoBot controller, {available_uids[str(message.from_user.id)]}!\n") # Call \"/help\" too see the available commands")
  65.  
  66.  
  67. @dp.message(F.text.lower().contains('/add_admin_'))
  68. async def add_admin(message: Message):
  69.     add_id = message.text.split('_')[2].upper()
  70.     name = message.text.split('_')[3].upper()
  71.  
  72.     if message.from_user.id in super_admins:
  73.         available_uids[add_id] = name[0].upper() + name[1:].lower()
  74.        
  75.         with open('admins.json', 'w') as admins_f:
  76.             admins_f.write(json.dumps(available_uids))
  77.  
  78.         await message.reply(f"{available_uids[add_id]} with {add_id} uid was added as admin")
  79.  
  80.     else:
  81.         await message.reply("You haven't super admin rights to do it")
  82.  
  83.  
  84. @dp.message(F.text) # КОСТЫЛЬ!! нужны фильтры, сейчас НЕЛЬЗЯ эту функцию поднимать выше
  85. async def bad_message(message: Message):
  86.     is_verified = await verification(message.from_user.id)
  87.     if not is_verified:
  88.         await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
  89.         return
  90.  
  91.     await message.answer("I don't understand you")
  92.  
  93.  
  94. async def send_message_to_users(msg):
  95.     for user_id in available_uids:
  96.         await bot.send_message(text=msg, chat_id=user_id)
  97.  
  98.  
  99. async def broadcast_alarm_message_to_users(msg, pair, alarm_type):
  100.     if pair in alarm_last_time and alarm_type in alarm_last_time[pair]:
  101.         if int(time.time() * 1000) - alarm_last_time[pair][alarm_type] >= 600000:
  102.             for user_id in available_uids:
  103.                 await bot.send_message(text=msg, chat_id=user_id)
  104.     else:
  105.         for user_id in available_uids:
  106.             await bot.send_message(text=msg, chat_id=user_id)
  107.  
  108.     alarm_last_time[pair][alarm_type] = int(time.time() * 1000)
  109.  
  110.  
  111. async def mm_calculate_lob_occupancy_rate(orders_log, decoded):
  112.     orders = json.loads(orders_log.split(' ')[2:].replace("'", '"'))
  113.     asks_sum = [float(order['quantity']) for order in orders['ask']]
  114.     bids_sum = [float(order['quantity']) * float(order['price']) for order in orders['bid']]
  115.     for line in decoded:
  116.         if 'position' in line:
  117.             position = json.loads(line.split(' ')[2:].replace("'", '"'))
  118.            
  119.             if 'total_buy_quote' not in position or 'total_sell_base' not in position: # пока нет этого поля, чтобы исключения не летели постоянно
  120.                 break
  121.            
  122.             if 0.1 * float(position['total_buy_quote']) > bids_sum:
  123.                 await broadcast_alarm_message_to_users(
  124.                     f'MARKET MAKING ALARM: {name} buy order book occupancy rate < 10%\nCurrent bids amount: {bids_sum}\nTotal buy quote parameter: {float(position['total_buy_quote'])}',
  125.                     name, 'mm_buy_occupancy_rate'
  126.                 )
  127.  
  128.             if 0.1 * float(position['total_sell_base']) > asks_sum:
  129.                 await broadcast_alarm_message_to_users(
  130.                     f"MARKET MAKING ALARM: {name} sell order book occupancy rate < 10%\nCurrent asks quantity: {asks_sum}\nTotal sell base parameter: {float(position['total_sell_base'])}",
  131.                     name, 'mm_sell_occupancy_rate'
  132.                 )
  133.  
  134.             break
  135.  
  136.  
  137. async def balance_checker(bot: Bot):
  138.     mm_files_path = [path_to_mm_balances_dir + '/' + file.name for file in Path(path_to_mm_balances_dir).iterdir() if file.is_file()]
  139.     volume_files_path = [path_to_vol_balances_dir + '/' + file.name for file in Path(path_to_vol_balances_dir).iterdir() if file.is_file()]
  140.  
  141.     for balance_file in mm_files_path:
  142.         p = subprocess.Popen(['tail', '-6', balance_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
  143.         res, err = p.communicate()
  144.         if not err:
  145.             name = balance_file.split('/')[-1]
  146.             decoded = res.decode().split('\n')
  147.             used = set()
  148.             for line in decoded[::-1]:
  149.                 try:
  150.                     time = str(datetime.fromtimestamp(int(int(res.decode().split(' ')[0]) / 1000)))
  151.                 except:
  152.                     print(f'error parse {name} ')
  153.                     continue
  154.                 splitted = line.split(' ')
  155.  
  156.                 if len(splitted) < 3:
  157.                     continue
  158.  
  159.                 if 'stop' in line:
  160.                     if name in stopped[0]:
  161.                         continue
  162.                     stopped[0].append(name)
  163.                     await send_message_to_users(f'ALARM: {name} bot was stopped at {time}')
  164.                     continue
  165.  
  166.                 if name in stopped[0]:
  167.                     stopped[0].remove(name)
  168.                     await send_message_to_users(f'ALARM: {name} bot was restarted')
  169.                 continue
  170.                 msg_type = splitted[1]
  171.                 if msg_type in used:
  172.                     continue
  173.                 used.add(msg_type)
  174.                 print(''.join(splitted[2:]).replace("'", '"'))
  175.                 info_dict = eval(''.join(splitted[2:]).replace("'", '"'))
  176.  
  177.                 if msg_type == 'book':
  178.                     if float(info_dict['bid_price']) == 0:
  179.                         pass
  180.                         # await send_message_to_users(f'ALARM: {name} bid price = 0')
  181.                     else:
  182.                         spread_size = (float(info_dict['ask_price']) - float(info_dict['bid_price'])) / float(info_dict['bid_price'])
  183.                         ask = float(info_dict['ask_price'])
  184.                         bid = float(info_dict['bid_price'])
  185.                         if spread_size > 0.02:
  186.                             await broadcast_alarm_message_to_users(
  187.                                 f'MARKET MAKING ALARM: {name} spread > 2%\nspread: {round(spread_size * 100, 2)}%\nask: {ask}\nbid: {bid}',
  188.                                 name, 'mm_wide_spread'
  189.                             )
  190.                         elif spread_size < 0.0005:
  191.                             await broadcast_alarm_message_to_users(
  192.                                 f'MARKET MAKING ALARM: {name} spread < 0.05%\nspread: {round(spread_size * 100, 2)}%\nask: {ask}\nbid: {bid}',
  193.                                 name, 'mm_narrow_spread'
  194.                             )
  195.  
  196.                 elif msg_type == 'balance':
  197.                     base, quote = float(info_dict['base']), float(info_dict['quote'])
  198.                     new_balances = {"time": time, "base": base, 'quote': quote}
  199.  
  200.                     zero_base, zero_quote = await extract_zero_position(decoded[::-1]) # TODO: обработать только что созданные файлы
  201.  
  202.                     if base < 0.1 * zero_base or quote < 0.1 * zero_quote:
  203.                         await broadcast_alarm_message_to_users(f'MARKET MAKING ALARM: {name} balance lower than critical', name, 'mm_low_balance')
  204.                         continue
  205.  
  206.                     if name not in balance_state:
  207.                         balance_state[name] = new_balances
  208.                         continue
  209.  
  210.                     free_delta = abs(new_balances['base'] - balance_state[name]['base']) / balance_state[name]['base']
  211.                     locked_delta = abs(new_balances['quote'] - balance_state[name]['quote']) / balance_state[name]['quote']
  212.  
  213.                     if free_delta > 0.1 or locked_delta > 0.1:
  214.                         prev_bal = json.dumps(balance_state[name])
  215.                         new_bal = json.dumps(new_balances)
  216.                         balance_state[name] = new_balances # update
  217.                         # await send_message_to_users(f'ALARM: {name} balance > 10% change:\n  new: {new_bal}\n  previous: {prev_bal}')
  218.  
  219.                 elif msg_type == 'orders':
  220.                     await mm_calculate_lob_occupancy_rate(line, decoded[::-1])
  221.         else:
  222.             print(err.decode())
  223.  
  224.     # for balance_file in volume_files_path:
  225.     #     p = subprocess.Popen(['tail', '-1', balance_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
  226.     #     res, err = p.communicate()
  227.     #     if not err:
  228.     #         name = balance_file.split('/')[-1]
  229.     #         try:
  230.     #             time = str(datetime.fromtimestamp(int(int(res.decode().split(' ')[0]) / 1000)))
  231.     #         except:
  232.     #             print(f'error parse {name} ')
  233.     #             continue
  234.  
  235.     #         if 'stop' in res.decode():
  236.     #             if name in stopped[1]:
  237.     #                 continue
  238.     #             stopped[1].append(name)
  239.     #             await send_message_to_users(f'VOLUME ALARM: {name} bot was stopped at {time}')
  240.     #             continue
  241.  
  242.     #         if name in stopped[1]:
  243.     #             stopped[1].remove(name)
  244.     #             await send_message_to_users(f'VOLUME ALARM: {name} bot was restarted')
  245.  
  246.     #         msg_type = res.decode().split(' ')[1]
  247.     #         info_dict = eval(''.join(res.decode().split(' ')[2:]).replace("'", '"'))
  248.  
  249.     #         if msg_type == 'balance':
  250.     #             base, quote = float(info_dict['base']), float(info_dict['quote'])
  251.     #             new_balances = {"time": time, "base": base, 'quote': quote}
  252.            
  253.     #             if base < 10 and quote < 10: # в теории норм обработать надо
  254.     #                 await send_message_to_users(f'VOLUME ALARM: {name} quote or base balance = 0')
  255.     #                 continue
  256.  
  257.     #             if name not in balance_state:
  258.     #                 balance_state[name] = new_balances
  259.     #                 continue
  260.  
  261.     #             free_delta = abs(new_balances['base'] - balance_state[name]['base']) / max(30, balance_state[name]['base'])
  262.     #             locked_delta = abs(new_balances['quote'] - balance_state[name]['quote']) / max(30, balance_state[name]['quote'])
  263.  
  264.     #             if free_delta > 0.1 or locked_delta > 0.1:
  265.     #                 prev_bal = json.dumps(balance_state[name])
  266.     #                 new_bal = json.dumps(new_balances)
  267.     #                 balance_state[name] = new_balances # update
  268.     #                 # await send_message_to_users(f'VOLUME ALARM: {name} balance > 10% change:\n  previous: {prev_bal}\n  new: {new_bal}')
  269.     #     else:
  270.     #         print(err.decode())
  271.  
  272.     print(balance_state)
  273.  
  274.  
  275. async def volume_health_trades_checker(bot: Bot):
  276.     volume_files_path = [path_to_vol_balances_dir + '/' + file.name for file in Path(path_to_vol_balances_dir).iterdir() if file.is_file()]
  277.  
  278.     bound_time = int(time.time()) - 900000 # -15 minutes
  279.     for balance_file in volume_files_path:
  280.         p = subprocess.Popen(['tail', '-5000', balance_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
  281.         res, err = p.communicate()
  282.         if not err:
  283.             name = balance_file.split('/')[-1]
  284.             decoded = res.decode().split('\n')
  285.             if 'stop' in decoded[-1]:
  286.                 if name not in stopped[1]:
  287.                     stopped[1].append(name)
  288.                     await send_message_to_users(f'VOLUME ALARM: {name} bot was stopped at {time}')
  289.  
  290.                 continue
  291.  
  292.             for line in decoded[::-1]:
  293.                 if name in stopped[1]:
  294.                     stopped[1].remove(name)
  295.                     await send_message_to_users(f'VOLUME ALARM: {name} bot was restarted')
  296.                     continue
  297.  
  298.                 if int(line.split(' ')[0]) < bound_time:
  299.                     await broadcast_alarm_message_to_users(f'VOLUME ALARM: {name} no private trades within 15 minutes', name, 'vol_no_trades')
  300.                     break
  301.  
  302.                 if 'private_trades' in line:
  303.                     if line.split(' ')[2:] != '\[\]':
  304.                         break
  305.  
  306.  
  307. async def extract_zero_position(decoded):
  308.     for line in decoded:
  309.         if 'position' in line:
  310.             position = json.loads(line.split(' ')[2:].replace("'", '"'))
  311.             return (float(position['zero_base']), float(position['zero_quote']))
  312.            
  313.  
  314. async def price_checker(bot: Bot):
  315.     mm_files_path = [path_to_mm_balances_dir + '/' + file.name for file in Path(path_to_mm_balances_dir).iterdir() if file.is_file()]
  316.  
  317.     for balance_file in mm_files_path:
  318.         p = subprocess.Popen(['tail', '-100', path_to_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
  319.         res, err = p.communicate()
  320.  
  321.         if not err:
  322.             name = balance_file.split('/')[-1]
  323.             decoded = res.decode().split('\n')
  324.             for line in decoded[::-1]:
  325.                 if 'stop' in line:
  326.                     break
  327.                 if 'book' in line:
  328.                     book = json.loads(line.split(' ')[2:].replace("'", '"'))
  329.                     mid_price = (float(book['ask_price']) + float(book['bid_price'])) / 2
  330.  
  331.                     if price_checker_upd_p[1] != 0:
  332.                         if mid_price > 1.05 * price_checker_upd_p[1] or mid_price < 0.95 * price_checker_upd_p[1]:
  333.                             prev_time = str(datetime.fromtimestamp(price_checker_upd_p[0]))
  334.                             cur_time = str(datetime.fromtimestamp(int(time.time())))
  335.                             await send_message_to_users(
  336.                                 f'MARKET MAKING ALARM: {name} 15 minutes mid price delta > 5%\n{prev_time}\nmid price: {price_checker_upd_p[1]}\n{cur_time}\nmid price: {mid_price}'
  337.                             )
  338.                             price_checker_upd_p[0] = int(time.time())
  339.                             price_checker_upd_p[1] = mid_price
  340.  
  341.                     if price_checker_upd_p[0] == 0 or int(time.time()) - price_checker_upd_p[0] >= 900: # 15 min
  342.                         price_checker_upd_p[0] = int(time.time())
  343.                         price_checker_upd_p[1] = mid_price
  344.  
  345.                     break
  346.                
  347.  
  348. async def main():
  349.     scheduler = AsyncIOScheduler()
  350.     scheduler.add_job(balance_checker, "interval", seconds=45, args=[bot])
  351.     scheduler.add_job(price_checker, "interval", seconds=45, args=[bot])
  352.     scheduler.start()
  353.  
  354.     await dp.start_polling(bot)
  355.  
  356.  
  357. if __name__ == "__main__":
  358.     asyncio.run(main())
  359.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement