Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import logging
- import os
- import json
- import subprocess
- import signal
- import psutil
- import time
- from datetime import datetime, timezone, timedelta
- from pathlib import Path
- from aiogram.types import Message
- from aiogram.enums import ParseMode
- from aiogram import Bot, Dispatcher, types
- from aiogram.filters.command import Command
- from aiogram import F
- from aiogram.exceptions import TelegramBadRequest
- from apscheduler.schedulers.asyncio import AsyncIOScheduler
- logging.basicConfig(level=logging.INFO)
- scheduler = AsyncIOScheduler()
- bot = Bot(token="7030520224:AAEnyKFkIsgS6QzVnexn3EiWM9Dlgb3WBPY")
- dp = Dispatcher()
- # chat_ids = {
- # -4567096432: ["DOR", "DAG", "KRU", "LTX", "ABDS", "GHUB", "IMARO", "SENDOR", "AEROBUD", "OBORTECH", "VGX", "CHAMP", "CHEX", "RBX", "MEI", "DOGA", "SOLAR", "$SX"], # managing group
- # -1002150192437: ["DOR", "DAG", "KRU", "LTX", "ABDS", "GHUB", "IMARO", "SENDOR", "AEROBUD", "OBORTECH", "VGX", "CHAMP", "CHEX", "RBX", "MEI", "DOGA", "SOLAR", "$SX"], # managing group
- # -4147360362: ["DAG","IMARO", "$SX"], # test group
- # # client groups
- # -4531845094: ["DAG"],
- # -4573489347: ["GHUB"],
- # -1002385516473: ["ABDS"],
- # -1002404653479: ["KRU"],
- # -4537296744: ["DOR"],
- # -4541252605: ["LTX"],
- # -4742454014: ["AEROBUD"],
- # -4560694856: ["IMARO"],
- # -4743728721: ["OBORTECH"],
- # -1002264729612: ["VGX"],
- # -4679815932: ["CHAMP"],
- # -4770925676: ["CHEX"],
- # -4626936453: ["RBX"],
- # -4788418340: ["MEI"],
- # -4663628175: ["DOGA"],
- # -4747537835: ["SOLAR"],
- # -4770221167: ["$SX"]
- # }
- chat_ids = {}
- def parse_clients_json():
- with open('clients.json', 'r') as clients_f:
- chat_ids = json.loads(''.join(clients_f.readlines()).replace(' ', '').replace('\n', ''))
- timezones = {
- -4788418340: timedelta(hours=9),
- # -1002150192437: timedelta(hours=3),
- }
- path_to_mm_balances_dir = '/mnt/mm_telemetry'
- path_to_volume_balances_dir = '/mnt/volume_telemetry'
- balance_state = {}
- prev_wide_spread = {}
- stopped = []
- @dp.message(Command("add_token"))
- async def add_token(message: types.Message):
- current_chat_id = str(message.chat.id)
- token_name = ''.upper()
- if current_chat_id in chat_ids:
- chat_ids[current_chat_id].append(token_name)
- else:
- chat_ids[current_chat_id] = [token_name]
- with open('clients.json', 'w') as clients_f:
- clients_f.write(json.dumps(chat_ids))
- await message.answer(f"{token_name} was successfully added")
- @dp.message(Command("liquidity_volume"))
- async def volume_balances(message: types.Message):
- files_path = [path_to_volume_balances_dir + '/' + file.name for file in Path(path_to_volume_balances_dir).iterdir() if file.is_file()]
- trades_size = {}
- for balance_file in files_path:
- name = balance_file.split('/')[-1]
- if str(message.chat.id) not in chat_ids:
- await message.answer("Your chat is not in whitelist")
- return
- is_clients_coin = False
- for coin in chat_ids[str(message.chat.id)]:
- if coin.lower() == name.split('_')[0].lower():
- is_clients_coin = True
- break
- if not is_clients_coin:
- continue
- total_trades, last_time = await calculate_private_trades(balance_file)
- if total_trades == -1 and last_time == -1: # decode error
- continue
- if last_time != 0:
- last_time = str(datetime.fromtimestamp(int(last_time / 1000)))
- trades_size[name] = f'\nliquidity_volume (24h): {round(total_trades, 2)}\nearliest 24h trade time: {last_time}\n'
- trades_answer = ''
- for name in sorted(trades_size.keys(), key=lambda s: s.lower()):
- print_name = ''
- parse_name = name.replace('-', ' ').replace('_', ' ').split(' ')
- if len(parse_name) == 2:
- print_name += parse_name[0].upper() + " " + parse_name[1]
- elif len(parse_name) == 3:
- print_name += (parse_name[0] + parse_name[1]).upper() + " " + parse_name[2]
- elif len(parse_name) == 4:
- print_name += (parse_name[0] + parse_name[1]).upper() + " " + parse_name[2] + parse_name[3]
- trades_answer += f'{print_name}:{trades_size[name]}\n'
- await message.answer(trades_answer)
- @dp.message(Command("volume_balances"))
- async def volume_balances(message: types.Message):
- files_path = [path_to_volume_balances_dir + '/' + file.name for file in Path(path_to_volume_balances_dir).iterdir() if file.is_file()]
- balances = {}
- trades_size = {}
- for balance_file in files_path:
- p = subprocess.Popen(['tail', '-20', balance_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
- res, err = p.communicate()
- if not err:
- name = balance_file.split('/')[-1]
- if str(message.chat.id) not in chat_ids:
- await message.answer("Your chat is not in whitelist")
- return
- is_clients_coin = False
- symbol_name = '_'.join(name.split('_')[:2])
- base_asset = symbol_name.split('_')[0]
- quote_asset = symbol_name.split('_')[1]
- for coin in chat_ids[str(message.chat.id)]:
- if coin.upper() == base_asset:
- is_clients_coin = True
- break
- if not is_clients_coin:
- continue
- decoded = res.decode().split('\n')[::-1]
- for line in decoded:
- splitted = line.split(' ')
- if len(splitted) < 3:
- continue
- if splitted[1] == 'balance':
- tz = timezone(timezones.get(message.chat.id, timedelta(hours=0)))
- time = datetime.fromtimestamp(int(int(splitted[0]) / 1000), tz)
- info_dict = json.loads(''.join(splitted[2:]).replace("'", '"'))
- base, quote = float(info_dict['base']), float(info_dict['quote'])
- balances[name] = f"\ntime: {str(time)}\n{base_asset}: {base}\n{quote_asset}: {quote}\n"
- else:
- print(err.decode())
- answers = []
- balances_answer = ''
- for name in sorted(balances.keys(), key=lambda s: s.lower()):
- z = name.split('_')
- parse_name = '/'.join(z[:2]) + ' ' + '_'.join(z[2:])
- balances_answer += f'{parse_name}:{balances[name]}\n'
- await message.answer(balances_answer)
- async def calculate_private_trades(balance_file):
- grep_command = ['grep', 'private_trades', balance_file]
- tail_command = ['tail', '-n', '100000']
- with subprocess.Popen(grep_command, stdout=subprocess.PIPE) as grep_process:
- with subprocess.Popen(tail_command, stdin=grep_process.stdout, stdout=subprocess.PIPE) as tail_process:
- res, err = tail_process.communicate()
- if not err:
- decoded = res.decode().split('\n')[::-1]
- else:
- print(err.decode())
- return -1, -1
- total = 0
- critical_time = int(time.time() * 1000) - 86400000
- last_time = 0
- for line in decoded:
- splitted = line.split(' ')
- if len(splitted) < 3:
- continue
- if (int(splitted[0]) < critical_time):
- break
- if splitted[1] == 'private_trades': # на всякий случай проверка
- trades = eval(''.join(splitted[2:]).replace("'", '"'))
- if len(trades) == 0:
- continue
- last_time = int(splitted[0])
- total += float(trades[0]['price']) * float(trades[0]['quantity'])
- return total, last_time
- @dp.message(Command("mm_balances"))
- async def mm_balance(message: types.Message):
- files_path = [path_to_mm_balances_dir + '/' + file.name for file in Path(path_to_mm_balances_dir).iterdir() if file.is_file()]
- balances = {}
- for balance_file in files_path:
- p = subprocess.Popen(['tail', '-20', balance_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
- res, err = p.communicate()
- if not err:
- name = balance_file.split('/')[-1]
- if str(message.chat.id) not in chat_ids:
- await message.answer("Your chat is not in whitelist")
- return
- is_clients_coin = False
- symbol_name = '_'.join(name.split('_')[:2])
- base_asset = symbol_name.split('_')[0]
- quote_asset = symbol_name.split('_')[1]
- for coin in chat_ids[str(message.chat.id)]:
- if coin == base_asset:
- is_clients_coin = True
- break
- if not is_clients_coin:
- continue
- decoded = res.decode().split('\n')[::-1]
- for line in decoded:
- splitted = line.split(' ')
- if len(splitted) < 3:
- continue
- if splitted[1] == 'balance':
- tz = timezone(timezones.get(message.chat.id, timedelta(hours=0)))
- time = datetime.fromtimestamp(int(int(splitted[0]) / 1000), tz)
- info_dict = json.loads(''.join(splitted[2:]).replace("'", '"'))
- base, quote = float(info_dict['base']), float(info_dict['quote'])
- balances[name] = f"\ntime: {str(time)}\n{base_asset}: {base}\n{quote_asset}: {quote}\n"
- else:
- print(err.decode())
- answers = []
- balances_answer = ''
- for name in sorted(balances.keys(), key=lambda s: s.lower()):
- # print_name = ''
- # parse_name = name.replace('-', ' ').replace('_', ' ').split(' ')
- # if len(parse_name) == 2:
- # print_name += parse_name[0].upper() + " " + parse_name[1]
- # elif len(parse_name) == 3:
- # print_name += (parse_name[0] + parse_name[1]).upper() + " " + parse_name[2]
- z = name.split('_')
- parse_name = '/'.join(z[:2]) + ' ' + '_'.join(z[2:])
- balances_answer += f'{parse_name}:{balances[name]}\n'
- await message.answer(balances_answer)
- @dp.message(Command("cashout_balances"))
- async def cashout_balances(message: types.Message):
- files_path = ['/mnt/tb_telemetry/' + file.name for file in Path('/mnt/tb_telemetry').iterdir() if file.is_file()]
- balances = {}
- trades_size = {}
- for balance_file in files_path:
- p = subprocess.Popen(['tail', '-20', balance_file],shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
- res, err = p.communicate()
- if not err:
- name = balance_file.split('/')[-1]
- if 'FUCK' in name.upper():
- continue
- if str(message.chat.id) not in chat_ids:
- await message.answer("Your chat is not in whitelist")
- return
- is_clients_coin = False
- symbol_name = '_'.join(name.split('_')[:2])
- base_asset = symbol_name.split('_')[0]
- quote_asset = symbol_name.split('_')[1]
- for coin in chat_ids[str(message.chat.id)]:
- if coin.upper() == base_asset:
- is_clients_coin = True
- break
- if not is_clients_coin:
- continue
- decoded = res.decode().split('\n')[::-1]
- for line in decoded:
- splitted = line.split(' ')
- if len(splitted) < 3:
- continue
- if splitted[1] == 'balance':
- tz = timezone(timezones.get(message.chat.id, timedelta(hours=0)))
- time = datetime.fromtimestamp(int(int(splitted[0]) / 1000), tz)
- info_dict = json.loads(''.join(splitted[2:]).replace("'", '"'))
- base, quote = float(info_dict['base']), float(info_dict['quote'])
- balances[name] = f"\ntime: {str(time)}\n{base_asset}: {base}\n{quote_asset}: {quote}\n"
- else:
- print(err.decode())
- answers = []
- balances_answer = ''
- for name in sorted(balances.keys(), key=lambda s: s.lower()):
- z = name.split('_')
- parse_name = '/'.join(z[:2]) + ' ' + '_'.join(z[2:])
- balances_answer += f'{parse_name}:{balances[name]}\n'
- await message.answer(balances_answer)
- @dp.message(F.text)
- async def bad_message(message: Message):
- print(message.chat.id)
- await message.answer("I don't understand you")
- async def main():
- await dp.start_polling(bot)
- if __name__ == "__main__":
- parse_clients_json()
- asyncio.run(main())
Add Comment
Please, Sign In to add comment