Advertisement
den4ik2003

Untitled

Nov 4th, 2024
92
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 12.32 KB | None | 0 0
  1. import asyncio
  2. import logging
  3. import os
  4. import json
  5. import subprocess
  6. import signal
  7. import psutil
  8. from datetime import datetime
  9. from pathlib import Path
  10. from aiogram.types import Message
  11. from aiogram.enums import ParseMode
  12. from aiogram import Bot, Dispatcher, types
  13. from aiogram.filters.command import Command
  14. from aiogram import F
  15. from aiogram.exceptions import TelegramBadRequest
  16. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  17.  
  18. logging.basicConfig(level=logging.INFO)
  19.  
  20. scheduler = AsyncIOScheduler()
  21. bot = Bot(token="7848354217:AAFqy-o0vsCFVUwvOZCFeEwN4BQEGcAvfME")
  22. dp = Dispatcher()
  23.  
  24. available_uids = {
  25.     "7905047482": "Mansur",
  26.     "6049031402":"Murat",
  27.     "377206035":"Denis",
  28.     "5885283978":"Ibragim",
  29.     "7604822901":"Eric",
  30.     "377264131": "Murat"
  31. }
  32.  
  33. running_pids = {
  34.     'marketmaking': {
  35.         "lott_usdt": -1
  36.     },
  37.     "volume": {
  38.         "lott_usdt": -1  
  39.     }
  40. }
  41. token_exchanges = {
  42.     'lott': [
  43.         'mexc_spot'
  44.     ]
  45. }
  46. path_to_mm_balances_dir = '/mnt/mm_telemetry'
  47. balance_state = {}
  48.  
  49. async def verification(uid):
  50.     if f"{uid}" in available_uids:
  51.         return True
  52.     else:
  53.         return False
  54.  
  55.  
  56. @dp.message(Command("start"))
  57. async def start(message: types.Message):
  58.     is_verified = await verification(message.from_user.id)
  59.     if not is_verified:
  60.         await message.answer("You are not verified so i don't know who are you\nPlease contact @saushkin_den if you think this is a mistake")
  61.         return
  62.  
  63.     await message.answer(f"Welcome to EchoBot controller, {available_uids[str(message.from_user.id)]}!\n") # Call \"/help\" too see the available commands")
  64.  
  65.  
  66. @dp.message(Command("clear"))
  67. async def cmd_clear(message: Message, bot: Bot) -> None:
  68.     is_verified = await verification(message.from_user.id)
  69.     if not is_verified:
  70.         await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
  71.         return
  72.  
  73.     try:
  74.         for i in range(message.message_id, 0, -1):
  75.             await bot.delete_message(message.from_user.id, i)
  76.     except TelegramBadRequest as ex:
  77.         if ex.message == "Bad Request: message to delete not found":
  78.             print("All messages was deleted")
  79.  
  80.  
  81. #########################################################################################
  82. ################################## Market Making ########################################
  83. #########################################################################################
  84.  
  85.  
  86. @dp.message(Command("mm_aliveness"))
  87. async def mm_check_alivness(message: types.Message):
  88.     is_verified = await verification(message.from_user.id)
  89.     if not is_verified:
  90.         await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
  91.         return
  92.  
  93.     aliveness = {}
  94.  
  95.     for token, pid in running_pids['marketmaking'].items():
  96.         if pid == -1:
  97.             aliveness[token] = "false"
  98.             continue
  99.         try:
  100.             process = psutil.Process(pid)
  101.             if str(process.status()) == 'zombie':  # why psutil.ZombieProcess is not working???
  102.                 aliveness[token] = "false"
  103.             else:
  104.                 aliveness[token] = "true"
  105.         except psutil.NoSuchProcess:
  106.            aliveness[token] = "false"
  107.  
  108.     await message.answer(json.dumps(aliveness))
  109.  
  110.  
  111. @dp.message(Command("mm_balances"))
  112. async def mm_balance(message: types.Message):
  113.     files_path = [path_to_mm_balances_dir + '/' + file.name for file in Path(path_to_mm_balances_dir).iterdir() if file.is_file()]
  114.     balances = {}
  115.  
  116.     for balance_file in files_path:
  117.         p = subprocess.Popen(['tail', '-1', balance_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
  118.         res, err = p.communicate()
  119.         if not err:
  120.             print(res.decode())
  121.             time = datetime.fromtimestamp(int(int(res.decode().split(' ')[0]) / 1000))
  122.             base, quote = float(res.decode().split(' ')[1].strip()[1:-1]), float(res.decode().split(' ')[2].strip()[:-1])
  123.             balances[balance_file.split('/')[-1]] = f"time: {str(time)} base: {base} quote: {quote}"
  124.         else:
  125.             print(err.decode())
  126.  
  127.     balances_answer = ''.join("{}\n".format(field) for field in json.dumps(balances).split(','))
  128.     await message.answer(balances_answer)
  129.  
  130.  
  131. @dp.message(Command("vol_aliveness"))
  132. async def vol_check_alivness(message: types.Message):
  133.     is_verified = await verification(message.from_user.id)
  134.     if not is_verified:
  135.         await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
  136.         return
  137.  
  138.     aliveness = {}
  139.  
  140.     for token, pid in running_pids['volume'].items():
  141.         if pid == -1:
  142.             aliveness[token] = "false"
  143.             continue
  144.         try:
  145.             process = psutil.Process(pid)
  146.             if str(process.status()) == 'zombie':  # why psutil.ZombieProcess is not working???
  147.                 aliveness[token] = "false"
  148.             else:
  149.                 aliveness[token] = "true"
  150.         except psutil.NoSuchProcess:
  151.            aliveness[token] = "false"
  152.  
  153.     await message.answer(json.dumps(aliveness))
  154.  
  155.  
  156. @dp.message(F.text.lower().contains('tag:')) # tag:start:marketmaking:DAG
  157. async def handler(message: Message):
  158.     is_verified = await verification(message.from_user.id)
  159.     if not is_verified:
  160.         await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
  161.         return
  162.  
  163.     operation = message.text.split(':')[1] # start OR stop
  164.     bot_type = message.text.split(':')[2] # marketmaking OR volume
  165.     token = message.text.split(':')[3]
  166.    
  167.     if operation == 'start':
  168.         await start_bot(message, bot_type, token)
  169.     elif operation == 'stop':
  170.         await stop_bot(message, bot_type, token)
  171.     else:
  172.         await message.answer("unknown operation")
  173.  
  174.  
  175. async def start_bot(message: Message, type, token: str):
  176.     if running_pids[type][token] != -1:
  177.         await message.answer(f'Firstly stop already launched bot')
  178.         return
  179.  
  180.     path_to_launch_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../')
  181.  
  182.     # if type == 'marketmaking':
  183.     #     path_to_exec = os.path.join(path_to_launch_dir, 'market_making.py')
  184.     #     path_to_config_dir = os.path.join(path_to_launch_dir, 'configs/marketmaking/', token)
  185.     # elif type == 'volume':
  186.     #     path_to_exec = os.path.join(path_to_launch_dir, 'volume.py')
  187.     #     path_to_config_dir = os.path.join(path_to_launch_dir, 'configs/volume/', token)
  188.     # else:
  189.     #     message.answer("unknown bot type")
  190.  
  191.     # config_files = [file.name for file in Path(path_to_config_dir).iterdir() if file.is_file()]
  192.  
  193.     if type == 'marketmaking':
  194.         exec_name = token.lower() + '_mm.py'
  195.         config_name = token.lower() + '_mm.key'
  196.     elif type == 'volume':
  197.         exec_name = token.lower() + '_volume.py'
  198.         config_name = token.lower() + '_volume.key'
  199.     else:
  200.         await message.answer("unknown bot type")
  201.         return
  202.  
  203.     path_to_exec = os.path.join(path_to_launch_dir, exec_name)
  204.    
  205.     configs = [os.path.join(path_to_launch_dir, exchange, config_name) for exchange in token_exchanges[token]]
  206.  
  207.     p = subprocess.Popen(["python3", path_to_exec] + configs, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
  208.     running_pids[type][token] = p.pid
  209.     print(p.pid)
  210.  
  211.     await message.answer(f'Process with id = {p.pid} was started')
  212.  
  213.  
  214. async def stop_bot(message: Message, type, token):
  215.     if running_pids[type][token] == -1:
  216.         await message.answer("No process is running")
  217.     else:
  218.         # check whether stopped
  219.         try:
  220.             process = psutil.Process(running_pids[type][token])
  221.             if str(process.status()) == 'zombie':  # it consumes no resources and is dead, but it still exists
  222.                 running_pids[type][token] = -1
  223.                 await message.answer(f"The process with PID {running_pids[type][token]} is not running")
  224.                 return
  225.         except psutil.NoSuchProcess:
  226.             running_pids[type][token] = -1
  227.             await message.answer(f"No process found with PID {running_pids[type][token]}")
  228.             return
  229.  
  230.         pid = running_pids[type][token]
  231.         running_pids[type][token] = -1
  232.         os.kill(pid, signal.SIGTERM)
  233.  
  234.         await message.answer(f'Process with id = {pid} was stopped')
  235.  
  236.  
  237. @dp.message(F.text) # КОСТЫЛЬ!! нужны фильтры, сейчас НЕЛЬЗЯ эту функцию поднимать выше
  238. async def bad_message(message: Message):
  239.     is_verified = await verification(message.from_user.id)
  240.     if not is_verified:
  241.         await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
  242.         return
  243.  
  244.     await message.answer("I don't understand you")
  245.  
  246.  
  247. async def send_message_to_users(msg):
  248.     for user_id in available_uids:
  249.         await bot.send_message(text=msg, chat_id=user_id)
  250.  
  251.  
  252. async def healthcheck_alarm(bot: Bot):
  253.     print('healthcheck')
  254.     for token, pid in running_pids['volume'].items():
  255.         if pid != -1:
  256.             try:
  257.                 process = psutil.Process(pid)
  258.                 if str(process.status()) == 'zombie':  # why psutil.ZombieProcess is not working???
  259.                     await send_message_to_users(f'ALARM: {token} volume bot is dead')
  260.                     running_pids['volume'][token] = -1
  261.             except psutil.NoSuchProcess:
  262.                 await send_message_to_users(f'ALARM: {token} volume bot is dead')
  263.                 running_pids['volume'][token] = -1
  264.        
  265.     for token, pid in running_pids['marketmaking'].items():
  266.         if pid != -1:
  267.             try:
  268.                 process = psutil.Process(pid)
  269.                 if str(process.status()) == 'zombie':  # why psutil.ZombieProcess is not working???
  270.                     await send_message_to_users(f'ALARM: {token} marketmaking bot is dead')
  271.                     running_pids['marketmaking'][token] = -1
  272.             except psutil.NoSuchProcess:
  273.                 await send_message_to_users(f'ALARM: {token} marketmaking bot is dead')
  274.                 running_pids['marketmaking'][token] = -1
  275.  
  276.  
  277. async def balance_checker(bot: Bot):
  278.     files_path = [path_to_mm_balances_dir + '/' + file.name for file in Path(path_to_mm_balances_dir).iterdir() if file.is_file()]
  279.  
  280.     for balance_file in files_path:
  281.         p = subprocess.Popen(['tail', '-1', balance_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
  282.         res, err = p.communicate()
  283.         if not err:
  284.             name = balance_file.split('/')[-1]
  285.             time = str(datetime.fromtimestamp(int(int(res.decode().split(' ')[0]) / 1000)))
  286.             base, quote = float(res.decode().split(' ')[1].strip()[1:-1]), float(res.decode().split(' ')[2].strip()[:-1])
  287.             new_balances = {"time": time, "base": base, 'quote': quote}
  288.            
  289.             if base == 0 or quote == 0: # в теории норм обработать надо
  290.                 await send_message_to_users(f'ALARM: {name} quote or base balance = 0')
  291.                 continue
  292.  
  293.             if name not in balance_state:
  294.                 balance_state[name] = new_balances
  295.                 continue
  296.  
  297.             free_delta = abs(new_balances['base'] - balance_state[name]['base']) / balance_state[name]['base']
  298.             locked_delta = abs(new_balances['quote'] - balance_state[name]['quote']) / balance_state[name]['quote']
  299.  
  300.             if free_delta > 0.1 or locked_delta > 0.1:
  301.                 prev_bal = json.dumps(balance_state[name])
  302.                 new_bal = json.dumps(new_balances)
  303.                 balance_state[name] = new_balances # update
  304.                 await send_message_to_users(f'ALARM: {name} balance > 10% change:\n  previous: {prev_bal}\n  new: {new_bal}')
  305.         else:
  306.             print(err.decode())
  307.  
  308.     print(balance_state)
  309.  
  310.  
  311. async def main():
  312.     scheduler = AsyncIOScheduler()
  313.     scheduler.add_job(healthcheck_alarm, "interval", seconds=30, args=[bot])
  314.     scheduler.add_job(balance_checker, "interval", seconds=60, args=[bot])
  315.     scheduler.start()
  316.  
  317.     await dp.start_polling(bot)
  318.  
  319. if __name__ == "__main__":
  320.     asyncio.run(main())
  321.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement