Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import logging
- import os
- import json
- import subprocess
- import signal
- import psutil
- from datetime import datetime
- from pathlib import Path
- from aiogram.types import Message
- from aiogram.enums import ParseMode
- from aiogram import Bot, Dispatcher, types
- from aiogram.filters.command import Command
- from aiogram import F
- from aiogram.exceptions import TelegramBadRequest
- from apscheduler.schedulers.asyncio import AsyncIOScheduler
- logging.basicConfig(level=logging.INFO)
- scheduler = AsyncIOScheduler()
- bot = Bot(token="7848354217:AAFqy-o0vsCFVUwvOZCFeEwN4BQEGcAvfME")
- dp = Dispatcher()
- available_uids = {
- "7905047482": "Mansur",
- "6049031402":"Murat",
- "377206035":"Denis",
- "5885283978":"Ibragim",
- "7604822901":"Eric",
- "377264131": "Murat"
- }
- running_pids = {
- 'marketmaking': {
- "lott_usdt": -1
- },
- "volume": {
- "lott_usdt": -1
- }
- }
- token_exchanges = {
- 'lott': [
- 'mexc_spot'
- ]
- }
- path_to_mm_balances_dir = '/mnt/mm_telemetry'
- balance_state = {}
- async def verification(uid):
- if f"{uid}" in available_uids:
- return True
- else:
- return False
- @dp.message(Command("start"))
- async def start(message: types.Message):
- is_verified = await verification(message.from_user.id)
- if not is_verified:
- await message.answer("You are not verified so i don't know who are you\nPlease contact @saushkin_den if you think this is a mistake")
- return
- await message.answer(f"Welcome to EchoBot controller, {available_uids[str(message.from_user.id)]}!\n") # Call \"/help\" too see the available commands")
- @dp.message(Command("clear"))
- async def cmd_clear(message: Message, bot: Bot) -> None:
- is_verified = await verification(message.from_user.id)
- if not is_verified:
- await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
- return
- try:
- for i in range(message.message_id, 0, -1):
- await bot.delete_message(message.from_user.id, i)
- except TelegramBadRequest as ex:
- if ex.message == "Bad Request: message to delete not found":
- print("All messages was deleted")
- #########################################################################################
- ################################## Market Making ########################################
- #########################################################################################
- @dp.message(Command("mm_aliveness"))
- async def mm_check_alivness(message: types.Message):
- is_verified = await verification(message.from_user.id)
- if not is_verified:
- await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
- return
- aliveness = {}
- for token, pid in running_pids['marketmaking'].items():
- if pid == -1:
- aliveness[token] = "false"
- continue
- try:
- process = psutil.Process(pid)
- if str(process.status()) == 'zombie': # why psutil.ZombieProcess is not working???
- aliveness[token] = "false"
- else:
- aliveness[token] = "true"
- except psutil.NoSuchProcess:
- aliveness[token] = "false"
- await message.answer(json.dumps(aliveness))
- @dp.message(Command("mm_balances"))
- async def mm_balance(message: types.Message):
- files_path = [path_to_mm_balances_dir + '/' + file.name for file in Path(path_to_mm_balances_dir).iterdir() if file.is_file()]
- balances = {}
- for balance_file in files_path:
- p = subprocess.Popen(['tail', '-1', balance_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
- res, err = p.communicate()
- if not err:
- print(res.decode())
- time = datetime.fromtimestamp(int(int(res.decode().split(' ')[0]) / 1000))
- base, quote = float(res.decode().split(' ')[1].strip()[1:-1]), float(res.decode().split(' ')[2].strip()[:-1])
- balances[balance_file.split('/')[-1]] = f"time: {str(time)} base: {base} quote: {quote}"
- else:
- print(err.decode())
- balances_answer = ''.join("{}\n".format(field) for field in json.dumps(balances).split(','))
- await message.answer(balances_answer)
- @dp.message(Command("vol_aliveness"))
- async def vol_check_alivness(message: types.Message):
- is_verified = await verification(message.from_user.id)
- if not is_verified:
- await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
- return
- aliveness = {}
- for token, pid in running_pids['volume'].items():
- if pid == -1:
- aliveness[token] = "false"
- continue
- try:
- process = psutil.Process(pid)
- if str(process.status()) == 'zombie': # why psutil.ZombieProcess is not working???
- aliveness[token] = "false"
- else:
- aliveness[token] = "true"
- except psutil.NoSuchProcess:
- aliveness[token] = "false"
- await message.answer(json.dumps(aliveness))
- @dp.message(F.text.lower().contains('tag:')) # tag:start:marketmaking:DAG
- async def handler(message: Message):
- is_verified = await verification(message.from_user.id)
- if not is_verified:
- await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
- return
- operation = message.text.split(':')[1] # start OR stop
- bot_type = message.text.split(':')[2] # marketmaking OR volume
- token = message.text.split(':')[3]
- if operation == 'start':
- await start_bot(message, bot_type, token)
- elif operation == 'stop':
- await stop_bot(message, bot_type, token)
- else:
- await message.answer("unknown operation")
- async def start_bot(message: Message, type, token: str):
- if running_pids[type][token] != -1:
- await message.answer(f'Firstly stop already launched bot')
- return
- path_to_launch_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../')
- # if type == 'marketmaking':
- # path_to_exec = os.path.join(path_to_launch_dir, 'market_making.py')
- # path_to_config_dir = os.path.join(path_to_launch_dir, 'configs/marketmaking/', token)
- # elif type == 'volume':
- # path_to_exec = os.path.join(path_to_launch_dir, 'volume.py')
- # path_to_config_dir = os.path.join(path_to_launch_dir, 'configs/volume/', token)
- # else:
- # message.answer("unknown bot type")
- # config_files = [file.name for file in Path(path_to_config_dir).iterdir() if file.is_file()]
- if type == 'marketmaking':
- exec_name = token.lower() + '_mm.py'
- config_name = token.lower() + '_mm.key'
- elif type == 'volume':
- exec_name = token.lower() + '_volume.py'
- config_name = token.lower() + '_volume.key'
- else:
- await message.answer("unknown bot type")
- return
- path_to_exec = os.path.join(path_to_launch_dir, exec_name)
- configs = [os.path.join(path_to_launch_dir, exchange, config_name) for exchange in token_exchanges[token]]
- p = subprocess.Popen(["python3", path_to_exec] + configs, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
- running_pids[type][token] = p.pid
- print(p.pid)
- await message.answer(f'Process with id = {p.pid} was started')
- async def stop_bot(message: Message, type, token):
- if running_pids[type][token] == -1:
- await message.answer("No process is running")
- else:
- # check whether stopped
- try:
- process = psutil.Process(running_pids[type][token])
- if str(process.status()) == 'zombie': # it consumes no resources and is dead, but it still exists
- running_pids[type][token] = -1
- await message.answer(f"The process with PID {running_pids[type][token]} is not running")
- return
- except psutil.NoSuchProcess:
- running_pids[type][token] = -1
- await message.answer(f"No process found with PID {running_pids[type][token]}")
- return
- pid = running_pids[type][token]
- running_pids[type][token] = -1
- os.kill(pid, signal.SIGTERM)
- await message.answer(f'Process with id = {pid} was stopped')
- @dp.message(F.text) # КОСТЫЛЬ!! нужны фильтры, сейчас НЕЛЬЗЯ эту функцию поднимать выше
- async def bad_message(message: Message):
- is_verified = await verification(message.from_user.id)
- if not is_verified:
- await message.answer("You are not verified so i don't know who are you\nplease contact @saushkin_den if you think this is a mistake")
- return
- await message.answer("I don't understand you")
- async def send_message_to_users(msg):
- for user_id in available_uids:
- await bot.send_message(text=msg, chat_id=user_id)
- async def healthcheck_alarm(bot: Bot):
- print('healthcheck')
- for token, pid in running_pids['volume'].items():
- if pid != -1:
- try:
- process = psutil.Process(pid)
- if str(process.status()) == 'zombie': # why psutil.ZombieProcess is not working???
- await send_message_to_users(f'ALARM: {token} volume bot is dead')
- running_pids['volume'][token] = -1
- except psutil.NoSuchProcess:
- await send_message_to_users(f'ALARM: {token} volume bot is dead')
- running_pids['volume'][token] = -1
- for token, pid in running_pids['marketmaking'].items():
- if pid != -1:
- try:
- process = psutil.Process(pid)
- if str(process.status()) == 'zombie': # why psutil.ZombieProcess is not working???
- await send_message_to_users(f'ALARM: {token} marketmaking bot is dead')
- running_pids['marketmaking'][token] = -1
- except psutil.NoSuchProcess:
- await send_message_to_users(f'ALARM: {token} marketmaking bot is dead')
- running_pids['marketmaking'][token] = -1
- async def balance_checker(bot: Bot):
- files_path = [path_to_mm_balances_dir + '/' + file.name for file in Path(path_to_mm_balances_dir).iterdir() if file.is_file()]
- for balance_file in files_path:
- p = subprocess.Popen(['tail', '-1', balance_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
- res, err = p.communicate()
- if not err:
- name = balance_file.split('/')[-1]
- time = str(datetime.fromtimestamp(int(int(res.decode().split(' ')[0]) / 1000)))
- base, quote = float(res.decode().split(' ')[1].strip()[1:-1]), float(res.decode().split(' ')[2].strip()[:-1])
- new_balances = {"time": time, "base": base, 'quote': quote}
- if base == 0 or quote == 0: # в теории норм обработать надо
- await send_message_to_users(f'ALARM: {name} quote or base balance = 0')
- continue
- if name not in balance_state:
- balance_state[name] = new_balances
- continue
- free_delta = abs(new_balances['base'] - balance_state[name]['base']) / balance_state[name]['base']
- locked_delta = abs(new_balances['quote'] - balance_state[name]['quote']) / balance_state[name]['quote']
- if free_delta > 0.1 or locked_delta > 0.1:
- prev_bal = json.dumps(balance_state[name])
- new_bal = json.dumps(new_balances)
- balance_state[name] = new_balances # update
- await send_message_to_users(f'ALARM: {name} balance > 10% change:\n previous: {prev_bal}\n new: {new_bal}')
- else:
- print(err.decode())
- print(balance_state)
- async def main():
- scheduler = AsyncIOScheduler()
- scheduler.add_job(healthcheck_alarm, "interval", seconds=30, args=[bot])
- scheduler.add_job(balance_checker, "interval", seconds=60, args=[bot])
- scheduler.start()
- await dp.start_polling(bot)
- if __name__ == "__main__":
- asyncio.run(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement