Advertisement
den4ik2003

Untitled

Feb 12th, 2025
16
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 18.57 KB | None | 0 0
  1. import asyncio
  2. import logging
  3. import os
  4. import json
  5. import subprocess
  6. import signal
  7. import psutil
  8. from datetime import datetime
  9. from pathlib import Path
  10. from aiogram.types import Message
  11. from aiogram.enums import ParseMode
  12. from aiogram import Bot, Dispatcher, types
  13. from aiogram.filters.command import Command
  14. from aiogram import F
  15. from aiogram.exceptions import TelegramBadRequest
  16. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  17. import numpy as np
  18.  
  19. logging.basicConfig(level=logging.INFO)
  20.  
  21. scheduler = AsyncIOScheduler()
  22. bot = Bot(token="7848354217:AAFqy-o0vsCFVUwvOZCFeEwN4BQEGcAvfME")
  23. dp = Dispatcher()
  24.  
  25. available_uids = {
  26. "7905047482": "Mansur",
  27. "6049031402":"Murat",
  28. "377206035":"Denis",
  29. "5885283978":"Ibragim",
  30. "7604822901":"Eric",
  31. "377264131": "Murat"
  32. }
  33.  
  34. running_pids = {
  35. 'marketmaking': {
  36. "lott_usdt": -1
  37. },
  38. "volume": {
  39. "lott_usdt": -1
  40. }
  41. }
  42. token_exchanges = {
  43. 'lott': [
  44. 'mexc_spot'
  45. ]
  46. }
  47. path_to_mm_balances_dir = '/mnt/mm_telemetry'
  48. balance_state = {}
  49. prev_wide_spread = {}
  50. stopped = []
  51.  
  52. async def verification(uid):
  53. if f"{uid}" in available_uids:
  54. return True
  55. else:
  56. return False
  57.  
  58. @dp.message(Command("start"))
  59. async def start(message: types.Message):
  60. is_verified = await verification(message.from_user.id)
  61. if not is_verified:
  62. await message.answer("You are not verified so i don't know who are you\nPlease contact @saushkin_den if you think this is a mistake")
  63. return
  64.  
  65. await message.answer(f"Welcome to EchoBot controller, {available_uids[str(message.from_user.id)]}!\n") # Call \"/help\" too see the available commands")
  66.  
  67.  
  68. @dp.message(Command("clear"))
  69. async def cmd_clear(message: Message, bot: Bot) -> None:
  70. is_verified = await verification(message.from_user.id)
  71. if not is_verified:
  72. await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
  73. return
  74.  
  75. try:
  76. for i in range(message.message_id, 0, -1):
  77. await bot.delete_message(message.from_user.id, i)
  78. except TelegramBadRequest as ex:
  79. if ex.message == "Bad Request: message to delete not found":
  80. print("All messages was deleted")
  81.  
  82.  
  83. #########################################################################################
  84. ################################## Market Making ########################################
  85. #########################################################################################
  86.  
  87.  
  88. @dp.message(Command("mm_aliveness"))
  89. async def mm_check_alivness(message: types.Message):
  90. is_verified = await verification(message.from_user.id)
  91. if not is_verified:
  92. await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
  93. return
  94.  
  95. aliveness = {}
  96.  
  97. for token, pid in running_pids['marketmaking'].items():
  98. if pid == -1:
  99. aliveness[token] = "false"
  100. continue
  101. try:
  102. process = psutil.Process(pid)
  103. if str(process.status()) == 'zombie': # why psutil.ZombieProcess is not working???
  104. aliveness[token] = "false"
  105. else:
  106. aliveness[token] = "true"
  107. except psutil.NoSuchProcess:
  108. aliveness[token] = "false"
  109.  
  110. await message.answer(json.dumps(aliveness))
  111.  
  112.  
  113. @dp.message(Command("mm_balances"))
  114. async def mm_balance(message: types.Message):
  115. files_path = [path_to_mm_balances_dir + '/' + file.name for file in Path(path_to_mm_balances_dir).iterdir() if file.is_file()]
  116. balances = {}
  117. prices = {}
  118.  
  119. for balance_file in files_path:
  120. p = subprocess.Popen(['tail', '-20', balance_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
  121. res, err = p.communicate()
  122. if not err:
  123. name = balance_file.split('/')[-1]
  124. decoded = res.decode().split('\n')[::-1]
  125. for line in decoded:
  126. splitted = line.split(' ')
  127.  
  128. if len(splitted) < 3:
  129. continue
  130.  
  131. if splitted[1] == 'balance':
  132. time = datetime.fromtimestamp(int(int(splitted[0]) / 1000))
  133. info_dict = json.loads(''.join(splitted[2:]).replace("'", '"'))
  134. base, quote = float(info_dict['base']), float(info_dict['quote'])
  135. balances[name] = f"\ntime: {str(time)}\nbase: {base}\nquote: {quote}\n"
  136. elif splitted[1] == 'book':
  137. data = eval(''.join(splitted[2:]))
  138. ask = data['ask_price']
  139. bid = data['bid_price']
  140. spread = f'%.2f' % ((float(ask) / float(bid) - 1) * 100) if bid > 0 else '0'
  141. prices[name] = f"ask_price={ask}\nbid_price={bid}\nspread={spread}%\n"
  142.  
  143. else:
  144. print(err.decode())
  145.  
  146. answers = []
  147. balances_answer = ''
  148. for name in sorted(balances.keys(), key=lambda s: s.lower()):
  149. balances_answer += f'{name}:{balances[name]}{prices[name]}\n'
  150. # if len(balances_answer) > 1000:
  151. # answers.append(balances_answer)
  152. # balances_answer = ''
  153.  
  154. # for answer in answers:
  155. await message.answer(balances_answer)
  156.  
  157.  
  158. @dp.message(Command("vol_aliveness"))
  159. async def vol_check_alivness(message: types.Message):
  160. is_verified = await verification(message.from_user.id)
  161. if not is_verified:
  162. await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
  163. return
  164.  
  165. aliveness = {}
  166.  
  167. for token, pid in running_pids['volume'].items():
  168. if pid == -1:
  169. aliveness[token] = "false"
  170. continue
  171. try:
  172. process = psutil.Process(pid)
  173. if str(process.status()) == 'zombie': # why psutil.ZombieProcess is not working???
  174. aliveness[token] = "false"
  175. else:
  176. aliveness[token] = "true"
  177. except psutil.NoSuchProcess:
  178. aliveness[token] = "false"
  179.  
  180. await message.answer(json.dumps(aliveness))
  181.  
  182.  
  183. @dp.message(F.text.lower().contains('tag:')) # tag:start:marketmaking:DAG
  184. async def handler(message: Message):
  185. is_verified = await verification(message.from_user.id)
  186. if not is_verified:
  187. await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
  188. return
  189.  
  190. operation = message.text.split(':')[1] # start OR stop
  191. bot_type = message.text.split(':')[2] # marketmaking OR volume
  192. token = message.text.split(':')[3]
  193.  
  194. if operation == 'start':
  195. await start_bot(message, bot_type, token)
  196. elif operation == 'stop':
  197. await stop_bot(message, bot_type, token)
  198. else:
  199. await message.answer("unknown operation")
  200.  
  201.  
  202. async def start_bot(message: Message, type, token: str):
  203. if running_pids[type][token] != -1:
  204. await message.answer(f'Firstly stop already launched bot')
  205. return
  206.  
  207. path_to_launch_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../')
  208.  
  209. # if type == 'marketmaking':
  210. # path_to_exec = os.path.join(path_to_launch_dir, 'market_making.py')
  211. # path_to_config_dir = os.path.join(path_to_launch_dir, 'configs/marketmaking/', token)
  212. # elif type == 'volume':
  213. # path_to_exec = os.path.join(path_to_launch_dir, 'volume.py')
  214. # path_to_config_dir = os.path.join(path_to_launch_dir, 'configs/volume/', token)
  215. # else:
  216. # message.answer("unknown bot type")
  217.  
  218. # config_files = [file.name for file in Path(path_to_config_dir).iterdir() if file.is_file()]
  219.  
  220. if type == 'marketmaking':
  221. exec_name = token.lower() + '_mm.py'
  222. config_name = token.lower() + '_mm.key'
  223. elif type == 'volume':
  224. exec_name = token.lower() + '_volume.py'
  225. config_name = token.lower() + '_volume.key'
  226. else:
  227. await message.answer("unknown bot type")
  228. return
  229.  
  230. path_to_exec = os.path.join(path_to_launch_dir, exec_name)
  231.  
  232. configs = [os.path.join(path_to_launch_dir, exchange, config_name) for exchange in token_exchanges[token]]
  233.  
  234. p = subprocess.Popen(["python3", path_to_exec] + configs, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
  235. running_pids[type][token] = p.pid
  236. print(p.pid)
  237.  
  238. await message.answer(f'Process with id = {p.pid} was started')
  239.  
  240.  
  241. async def stop_bot(message: Message, type, token):
  242. if running_pids[type][token] == -1:
  243. await message.answer("No process is running")
  244. else:
  245. # check whether stopped
  246. try:
  247. process = psutil.Process(running_pids[type][token])
  248. if str(process.status()) == 'zombie': # it consumes no resources and is dead, but it still exists
  249. running_pids[type][token] = -1
  250. await message.answer(f"The process with PID {running_pids[type][token]} is not running")
  251. return
  252. except psutil.NoSuchProcess:
  253. running_pids[type][token] = -1
  254. await message.answer(f"No process found with PID {running_pids[type][token]}")
  255. return
  256.  
  257. pid = running_pids[type][token]
  258. running_pids[type][token] = -1
  259. os.kill(pid, signal.SIGTERM)
  260.  
  261. await message.answer(f'Process with id = {pid} was stopped')
  262.  
  263.  
  264. @dp.message(F.text) # КОСТЫЛЬ!! нужны фильтры, сейчас НЕЛЬЗЯ эту функцию поднимать выше
  265. async def bad_message(message: Message):
  266. is_verified = await verification(message.from_user.id)
  267. if not is_verified:
  268. await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
  269. return
  270.  
  271. await message.answer("I don't understand you")
  272.  
  273.  
  274. async def send_message_to_users(msg):
  275. for user_id in available_uids:
  276. await bot.send_message(text=msg, chat_id=user_id)
  277.  
  278.  
  279. async def healthcheck_alarm(bot: Bot):
  280. print('healthcheck')
  281. for token, pid in running_pids['volume'].items():
  282. if pid != -1:
  283. try:
  284. process = psutil.Process(pid)
  285. if str(process.status()) == 'zombie': # why psutil.ZombieProcess is not working???
  286. await send_message_to_users(f'ALARM: {token} volume bot is dead')
  287. running_pids['volume'][token] = -1
  288. except psutil.NoSuchProcess:
  289. await send_message_to_users(f'ALARM: {token} volume bot is dead')
  290. running_pids['volume'][token] = -1
  291.  
  292. for token, pid in running_pids['marketmaking'].items():
  293. if pid != -1:
  294. try:
  295. process = psutil.Process(pid)
  296. if str(process.status()) == 'zombie': # why psutil.ZombieProcess is not working???
  297. await send_message_to_users(f'ALARM: {token} marketmaking bot is dead')
  298. running_pids['marketmaking'][token] = -1
  299. except psutil.NoSuchProcess:
  300. await send_message_to_users(f'ALARM: {token} marketmaking bot is dead')
  301. running_pids['marketmaking'][token] = -1
  302.  
  303.  
  304. async def balance_checker(bot: Bot):
  305. mm_files_path = [path_to_mm_balances_dir + '/' + file.name for file in Path(path_to_mm_balances_dir).iterdir() if file.is_file()]
  306. volume_files_path = [path_to_mm_balances_dir + '/' + file.name for file in Path(path_to_mm_balances_dir).iterdir() if file.is_file()]
  307.  
  308. for balance_file in mm_files_path:
  309. p = subprocess.Popen(['tail', '-20', balance_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
  310. res, err = p.communicate()
  311. if not err:
  312. name = balance_file.split('/')[-1]
  313. decoded = res.decode().split('\n')
  314. used = set()
  315. for line in decoded[::-1]:
  316. try:
  317. time = str(datetime.fromtimestamp(int(int(res.decode().split(' ')[0]) / 1000)))
  318. except:
  319. print(f'error parse {name} ')
  320. continue
  321. splitted = line.split(' ')
  322.  
  323. if len(splitted) < 3:
  324. continue
  325.  
  326. if 'stopped' in line:
  327. if name in stopped:
  328. continue
  329. stopped.append(name)
  330. await send_message_to_users(f'ALARM: {name} bot was stopped at {time}')
  331. continue
  332.  
  333. if name in stopped:
  334. stopped.remove(name)
  335. await send_message_to_users(f'ALARM: {name} bot was restarted')
  336. continue
  337. msg_type = splitted[1]
  338. if msg_type in used:
  339. continue
  340. used.add(msg_type)
  341. print(''.join(splitted[2:]).replace("'", '"'))
  342. info_dict = eval(''.join(splitted[2:]).replace("'", '"'))
  343.  
  344. if msg_type == 'book':
  345. if float(info_dict['bid_price']) == 0:
  346. pass
  347. # await send_message_to_users(f'ALARM: {name} bid price = 0')
  348. else:
  349. spread_size = (float(info_dict['ask_price']) - float(info_dict['bid_price'])) / float(info_dict['bid_price'])
  350. if spread_size > 0.05:
  351. ask = float(info_dict['ask_price'])
  352. bid = float(info_dict['bid_price'])
  353. if name in prev_wide_spread: # т.е уже кидали аларм
  354. if spread_size - prev_wide_spread[name] >= 0.01:
  355. prev_wide_spread[name] = spread_size
  356. # await send_message_to_users(f'ALARM: {name} spread = {round(spread_size * 100, 2)}%; ask = {ask} bid = {bid}')
  357. else:
  358. prev_wide_spread[name] = spread_size
  359. # await send_message_to_users(f'ALARM: {name} spread = {round(spread_size * 100, 2)}%; ask = {ask} bid = {bid}')
  360. else:
  361. ask = float(info_dict['ask_price'])
  362. bid = float(info_dict['bid_price'])
  363. if name in prev_wide_spread:
  364. # await send_message_to_users(f'ALARM: {name} REGAIN spread = {round(spread_size * 100, 2)}%; ask = {ask} bid = {bid} -> OK')
  365. prev_wide_spread.pop(name)
  366. elif msg_type == 'balance':
  367. base, quote = float(info_dict['base']), float(info_dict['quote'])
  368. new_balances = {"time": time, "base": base, 'quote': quote}
  369.  
  370. if base == 0 or quote == 0: # в теории норм обработать надо
  371. # await send_message_to_users(f'ALARM: {name} quote or base balance = 0')
  372. continue
  373.  
  374. if name not in balance_state:
  375. balance_state[name] = new_balances
  376. continue
  377.  
  378. free_delta = abs(new_balances['base'] - balance_state[name]['base']) / balance_state[name]['base']
  379. locked_delta = abs(new_balances['quote'] - balance_state[name]['quote']) / balance_state[name]['quote']
  380.  
  381. if free_delta > 0.1 or locked_delta > 0.1:
  382. prev_bal = json.dumps(balance_state[name])
  383. new_bal = json.dumps(new_balances)
  384. balance_state[name] = new_balances # update
  385. # await send_message_to_users(f'ALARM: {name} balance > 10% change:\n new: {new_bal}\n previous: {prev_bal}')
  386. else:
  387. print(err.decode())
  388.  
  389. for balance_file in volume_files_path:
  390. p = subprocess.Popen(['tail', '-1', balance_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
  391. res, err = p.communicate()
  392. if not err:
  393. name = balance_file.split('/')[-1]
  394. try:
  395. time = str(datetime.fromtimestamp(int(int(res.decode().split(' ')[0]) / 1000)))
  396. except:
  397. print(f'error parse {name} ')
  398. continue
  399.  
  400. if 'stopped' in res.decode():
  401. if name in stopped:
  402. continue
  403. stopped.append(name)
  404. await send_message_to_users(f'VOLUME ALARM: {name} bot was stopped at {time}')
  405. continue
  406.  
  407. if name in stopped:
  408. stopped.remove(name)
  409. await send_message_to_users(f'VOLUME ALARM: {name} bot was restarted')
  410.  
  411. msg_type = res.decode().split(' ')[1]
  412. info_dict = eval(''.join(res.decode().split(' ')[2:]).replace("'", '"'))
  413.  
  414. if msg_type == 'balance':
  415. base, quote = float(info_dict['base']), float(info_dict['quote'])
  416. new_balances = {"time": time, "base": base, 'quote': quote}
  417.  
  418. if base < 10 and quote < 10: # в теории норм обработать надо
  419. await send_message_to_users(f'VOLUME ALARM: {name} quote or base balance = 0')
  420. continue
  421.  
  422. if name not in balance_state:
  423. balance_state[name] = new_balances
  424. continue
  425.  
  426. free_delta = abs(new_balances['base'] - balance_state[name]['base']) / max(30, balance_state[name]['base'])
  427. locked_delta = abs(new_balances['quote'] - balance_state[name]['quote']) / max(30, balance_state[name]['quote'])
  428.  
  429. if free_delta > 0.1 or locked_delta > 0.1:
  430. prev_bal = json.dumps(balance_state[name])
  431. new_bal = json.dumps(new_balances)
  432. balance_state[name] = new_balances # update
  433. # await send_message_to_users(f'VOLUME ALARM: {name} balance > 10% change:\n previous: {prev_bal}\n new: {new_bal}')
  434. else:
  435. print(err.decode())
  436.  
  437. print(balance_state)
  438.  
  439.  
  440. async def main():
  441. scheduler = AsyncIOScheduler()
  442. # scheduler.add_job(healthcheck_alarm, "interval", seconds=30, args=[bot])
  443. scheduler.add_job(balance_checker, "interval", seconds=45, args=[bot])
  444. scheduler.start()
  445.  
  446. await dp.start_polling(bot)
  447.  
  448. if __name__ == "__main__":
  449. asyncio.run(main())
  450.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement