Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import logging
- import os
- import json
- import subprocess
- import signal
- import psutil
- from datetime import datetime
- from pathlib import Path
- from aiogram.types import Message
- from aiogram.enums import ParseMode
- from aiogram import Bot, Dispatcher, types
- from aiogram.filters.command import Command
- from aiogram import F
- from aiogram.exceptions import TelegramBadRequest
- from apscheduler.schedulers.asyncio import AsyncIOScheduler
- import numpy as np
- logging.basicConfig(level=logging.INFO)
- scheduler = AsyncIOScheduler()
- bot = Bot(token="7848354217:AAFqy-o0vsCFVUwvOZCFeEwN4BQEGcAvfME")
- dp = Dispatcher()
- available_uids = {
- "7905047482": "Mansur",
- "6049031402":"Murat",
- "377206035":"Denis",
- "5885283978":"Ibragim",
- "7604822901":"Eric",
- "377264131": "Murat"
- }
- running_pids = {
- 'marketmaking': {
- "lott_usdt": -1
- },
- "volume": {
- "lott_usdt": -1
- }
- }
- token_exchanges = {
- 'lott': [
- 'mexc_spot'
- ]
- }
- path_to_mm_balances_dir = '/mnt/mm_telemetry'
- balance_state = {}
- prev_wide_spread = {}
- stopped = []
- async def verification(uid):
- if f"{uid}" in available_uids:
- return True
- else:
- return False
- @dp.message(Command("start"))
- async def start(message: types.Message):
- is_verified = await verification(message.from_user.id)
- if not is_verified:
- await message.answer("You are not verified so i don't know who are you\nPlease contact @saushkin_den if you think this is a mistake")
- return
- await message.answer(f"Welcome to EchoBot controller, {available_uids[str(message.from_user.id)]}!\n") # Call \"/help\" too see the available commands")
- @dp.message(Command("clear"))
- async def cmd_clear(message: Message, bot: Bot) -> None:
- is_verified = await verification(message.from_user.id)
- if not is_verified:
- await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
- return
- try:
- for i in range(message.message_id, 0, -1):
- await bot.delete_message(message.from_user.id, i)
- except TelegramBadRequest as ex:
- if ex.message == "Bad Request: message to delete not found":
- print("All messages was deleted")
- #########################################################################################
- ################################## Market Making ########################################
- #########################################################################################
- @dp.message(Command("mm_aliveness"))
- async def mm_check_alivness(message: types.Message):
- is_verified = await verification(message.from_user.id)
- if not is_verified:
- await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
- return
- aliveness = {}
- for token, pid in running_pids['marketmaking'].items():
- if pid == -1:
- aliveness[token] = "false"
- continue
- try:
- process = psutil.Process(pid)
- if str(process.status()) == 'zombie': # why psutil.ZombieProcess is not working???
- aliveness[token] = "false"
- else:
- aliveness[token] = "true"
- except psutil.NoSuchProcess:
- aliveness[token] = "false"
- await message.answer(json.dumps(aliveness))
- @dp.message(Command("mm_balances"))
- async def mm_balance(message: types.Message):
- files_path = [path_to_mm_balances_dir + '/' + file.name for file in Path(path_to_mm_balances_dir).iterdir() if file.is_file()]
- balances = {}
- prices = {}
- for balance_file in files_path:
- p = subprocess.Popen(['tail', '-20', balance_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
- res, err = p.communicate()
- if not err:
- name = balance_file.split('/')[-1]
- decoded = res.decode().split('\n')[::-1]
- for line in decoded:
- splitted = line.split(' ')
- if len(splitted) < 3:
- continue
- if splitted[1] == 'balance':
- time = datetime.fromtimestamp(int(int(splitted[0]) / 1000))
- info_dict = json.loads(''.join(splitted[2:]).replace("'", '"'))
- base, quote = float(info_dict['base']), float(info_dict['quote'])
- balances[name] = f"\ntime: {str(time)}\nbase: {base}\nquote: {quote}\n"
- elif splitted[1] == 'book':
- data = eval(''.join(splitted[2:]))
- ask = data['ask_price']
- bid = data['bid_price']
- spread = f'%.2f' % ((float(ask) / float(bid) - 1) * 100) if bid > 0 else '0'
- prices[name] = f"ask_price={ask}\nbid_price={bid}\nspread={spread}%\n"
- else:
- print(err.decode())
- answers = []
- balances_answer = ''
- for name in sorted(balances.keys(), key=lambda s: s.lower()):
- balances_answer += f'{name}:{balances[name]}{prices[name]}\n'
- # if len(balances_answer) > 1000:
- # answers.append(balances_answer)
- # balances_answer = ''
- # for answer in answers:
- await message.answer(balances_answer)
- @dp.message(Command("vol_aliveness"))
- async def vol_check_alivness(message: types.Message):
- is_verified = await verification(message.from_user.id)
- if not is_verified:
- await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
- return
- aliveness = {}
- for token, pid in running_pids['volume'].items():
- if pid == -1:
- aliveness[token] = "false"
- continue
- try:
- process = psutil.Process(pid)
- if str(process.status()) == 'zombie': # why psutil.ZombieProcess is not working???
- aliveness[token] = "false"
- else:
- aliveness[token] = "true"
- except psutil.NoSuchProcess:
- aliveness[token] = "false"
- await message.answer(json.dumps(aliveness))
- @dp.message(F.text.lower().contains('tag:')) # tag:start:marketmaking:DAG
- async def handler(message: Message):
- is_verified = await verification(message.from_user.id)
- if not is_verified:
- await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
- return
- operation = message.text.split(':')[1] # start OR stop
- bot_type = message.text.split(':')[2] # marketmaking OR volume
- token = message.text.split(':')[3]
- if operation == 'start':
- await start_bot(message, bot_type, token)
- elif operation == 'stop':
- await stop_bot(message, bot_type, token)
- else:
- await message.answer("unknown operation")
- async def start_bot(message: Message, type, token: str):
- if running_pids[type][token] != -1:
- await message.answer(f'Firstly stop already launched bot')
- return
- path_to_launch_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../')
- # if type == 'marketmaking':
- # path_to_exec = os.path.join(path_to_launch_dir, 'market_making.py')
- # path_to_config_dir = os.path.join(path_to_launch_dir, 'configs/marketmaking/', token)
- # elif type == 'volume':
- # path_to_exec = os.path.join(path_to_launch_dir, 'volume.py')
- # path_to_config_dir = os.path.join(path_to_launch_dir, 'configs/volume/', token)
- # else:
- # message.answer("unknown bot type")
- # config_files = [file.name for file in Path(path_to_config_dir).iterdir() if file.is_file()]
- if type == 'marketmaking':
- exec_name = token.lower() + '_mm.py'
- config_name = token.lower() + '_mm.key'
- elif type == 'volume':
- exec_name = token.lower() + '_volume.py'
- config_name = token.lower() + '_volume.key'
- else:
- await message.answer("unknown bot type")
- return
- path_to_exec = os.path.join(path_to_launch_dir, exec_name)
- configs = [os.path.join(path_to_launch_dir, exchange, config_name) for exchange in token_exchanges[token]]
- p = subprocess.Popen(["python3", path_to_exec] + configs, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
- running_pids[type][token] = p.pid
- print(p.pid)
- await message.answer(f'Process with id = {p.pid} was started')
- async def stop_bot(message: Message, type, token):
- if running_pids[type][token] == -1:
- await message.answer("No process is running")
- else:
- # check whether stopped
- try:
- process = psutil.Process(running_pids[type][token])
- if str(process.status()) == 'zombie': # it consumes no resources and is dead, but it still exists
- running_pids[type][token] = -1
- await message.answer(f"The process with PID {running_pids[type][token]} is not running")
- return
- except psutil.NoSuchProcess:
- running_pids[type][token] = -1
- await message.answer(f"No process found with PID {running_pids[type][token]}")
- return
- pid = running_pids[type][token]
- running_pids[type][token] = -1
- os.kill(pid, signal.SIGTERM)
- await message.answer(f'Process with id = {pid} was stopped')
- @dp.message(F.text) # КОСТЫЛЬ!! нужны фильтры, сейчас НЕЛЬЗЯ эту функцию поднимать выше
- async def bad_message(message: Message):
- is_verified = await verification(message.from_user.id)
- if not is_verified:
- await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
- return
- await message.answer("I don't understand you")
- async def send_message_to_users(msg):
- for user_id in available_uids:
- await bot.send_message(text=msg, chat_id=user_id)
- async def healthcheck_alarm(bot: Bot):
- print('healthcheck')
- for token, pid in running_pids['volume'].items():
- if pid != -1:
- try:
- process = psutil.Process(pid)
- if str(process.status()) == 'zombie': # why psutil.ZombieProcess is not working???
- await send_message_to_users(f'ALARM: {token} volume bot is dead')
- running_pids['volume'][token] = -1
- except psutil.NoSuchProcess:
- await send_message_to_users(f'ALARM: {token} volume bot is dead')
- running_pids['volume'][token] = -1
- for token, pid in running_pids['marketmaking'].items():
- if pid != -1:
- try:
- process = psutil.Process(pid)
- if str(process.status()) == 'zombie': # why psutil.ZombieProcess is not working???
- await send_message_to_users(f'ALARM: {token} marketmaking bot is dead')
- running_pids['marketmaking'][token] = -1
- except psutil.NoSuchProcess:
- await send_message_to_users(f'ALARM: {token} marketmaking bot is dead')
- running_pids['marketmaking'][token] = -1
- async def balance_checker(bot: Bot):
- mm_files_path = [path_to_mm_balances_dir + '/' + file.name for file in Path(path_to_mm_balances_dir).iterdir() if file.is_file()]
- volume_files_path = [path_to_mm_balances_dir + '/' + file.name for file in Path(path_to_mm_balances_dir).iterdir() if file.is_file()]
- for balance_file in mm_files_path:
- p = subprocess.Popen(['tail', '-20', balance_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
- res, err = p.communicate()
- if not err:
- name = balance_file.split('/')[-1]
- decoded = res.decode().split('\n')
- used = set()
- for line in decoded[::-1]:
- try:
- time = str(datetime.fromtimestamp(int(int(res.decode().split(' ')[0]) / 1000)))
- except:
- print(f'error parse {name} ')
- continue
- splitted = line.split(' ')
- if len(splitted) < 3:
- continue
- if 'stopped' in line:
- if name in stopped:
- continue
- stopped.append(name)
- await send_message_to_users(f'ALARM: {name} bot was stopped at {time}')
- continue
- if name in stopped:
- stopped.remove(name)
- await send_message_to_users(f'ALARM: {name} bot was restarted')
- continue
- msg_type = splitted[1]
- if msg_type in used:
- continue
- used.add(msg_type)
- print(''.join(splitted[2:]).replace("'", '"'))
- info_dict = eval(''.join(splitted[2:]).replace("'", '"'))
- if msg_type == 'book':
- if float(info_dict['bid_price']) == 0:
- pass
- # await send_message_to_users(f'ALARM: {name} bid price = 0')
- else:
- spread_size = (float(info_dict['ask_price']) - float(info_dict['bid_price'])) / float(info_dict['bid_price'])
- if spread_size > 0.05:
- ask = float(info_dict['ask_price'])
- bid = float(info_dict['bid_price'])
- if name in prev_wide_spread: # т.е уже кидали аларм
- if spread_size - prev_wide_spread[name] >= 0.01:
- prev_wide_spread[name] = spread_size
- # await send_message_to_users(f'ALARM: {name} spread = {round(spread_size * 100, 2)}%; ask = {ask} bid = {bid}')
- else:
- prev_wide_spread[name] = spread_size
- # await send_message_to_users(f'ALARM: {name} spread = {round(spread_size * 100, 2)}%; ask = {ask} bid = {bid}')
- else:
- ask = float(info_dict['ask_price'])
- bid = float(info_dict['bid_price'])
- if name in prev_wide_spread:
- # await send_message_to_users(f'ALARM: {name} REGAIN spread = {round(spread_size * 100, 2)}%; ask = {ask} bid = {bid} -> OK')
- prev_wide_spread.pop(name)
- elif msg_type == 'balance':
- base, quote = float(info_dict['base']), float(info_dict['quote'])
- new_balances = {"time": time, "base": base, 'quote': quote}
- if base == 0 or quote == 0: # в теории норм обработать надо
- # await send_message_to_users(f'ALARM: {name} quote or base balance = 0')
- continue
- if name not in balance_state:
- balance_state[name] = new_balances
- continue
- free_delta = abs(new_balances['base'] - balance_state[name]['base']) / balance_state[name]['base']
- locked_delta = abs(new_balances['quote'] - balance_state[name]['quote']) / balance_state[name]['quote']
- if free_delta > 0.1 or locked_delta > 0.1:
- prev_bal = json.dumps(balance_state[name])
- new_bal = json.dumps(new_balances)
- balance_state[name] = new_balances # update
- # await send_message_to_users(f'ALARM: {name} balance > 10% change:\n new: {new_bal}\n previous: {prev_bal}')
- else:
- print(err.decode())
- for balance_file in volume_files_path:
- p = subprocess.Popen(['tail', '-1', balance_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
- res, err = p.communicate()
- if not err:
- name = balance_file.split('/')[-1]
- try:
- time = str(datetime.fromtimestamp(int(int(res.decode().split(' ')[0]) / 1000)))
- except:
- print(f'error parse {name} ')
- continue
- if 'stopped' in res.decode():
- if name in stopped:
- continue
- stopped.append(name)
- await send_message_to_users(f'VOLUME ALARM: {name} bot was stopped at {time}')
- continue
- if name in stopped:
- stopped.remove(name)
- await send_message_to_users(f'VOLUME ALARM: {name} bot was restarted')
- msg_type = res.decode().split(' ')[1]
- info_dict = eval(''.join(res.decode().split(' ')[2:]).replace("'", '"'))
- if msg_type == 'balance':
- base, quote = float(info_dict['base']), float(info_dict['quote'])
- new_balances = {"time": time, "base": base, 'quote': quote}
- if base < 10 and quote < 10: # в теории норм обработать надо
- await send_message_to_users(f'VOLUME ALARM: {name} quote or base balance = 0')
- continue
- if name not in balance_state:
- balance_state[name] = new_balances
- continue
- free_delta = abs(new_balances['base'] - balance_state[name]['base']) / max(30, balance_state[name]['base'])
- locked_delta = abs(new_balances['quote'] - balance_state[name]['quote']) / max(30, balance_state[name]['quote'])
- if free_delta > 0.1 or locked_delta > 0.1:
- prev_bal = json.dumps(balance_state[name])
- new_bal = json.dumps(new_balances)
- balance_state[name] = new_balances # update
- # await send_message_to_users(f'VOLUME ALARM: {name} balance > 10% change:\n previous: {prev_bal}\n new: {new_bal}')
- else:
- print(err.decode())
- print(balance_state)
- async def main():
- scheduler = AsyncIOScheduler()
- # scheduler.add_job(healthcheck_alarm, "interval", seconds=30, args=[bot])
- scheduler.add_job(balance_checker, "interval", seconds=45, args=[bot])
- scheduler.start()
- await dp.start_polling(bot)
- if __name__ == "__main__":
- asyncio.run(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement