Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from telethon import TelegramClient
- from telethon.tl import types
- from telethon.tl.functions.updates import GetDifferenceRequest
- import argparse
- def dict_from_state(state):
- return {'pts': state.pts, 'qts': state.qts, 'date': state.date}
- async def search_pts(state, found_pts):
- while found_pts['bottom'] <= found_pts['top']:
- state['pts'] = (found_pts['bottom'] + found_pts['top']) // 2
- try:
- response = await client(GetDifferenceRequest(**state))
- except: response = None
- if not response or isinstance(response, types.updates.DifferenceTooLong):
- found_pts['bottom'] = state['pts'] + 1
- else:
- found_pts['top'] = state['pts'] - 1
- state['pts'] = found_pts['bottom']
- return state, found_pts
- async def cache_all_bot_chats(state, total_pts):
- found_pts = {'bottom': 0, 'top': 0}
- state, found_pts = await search_pts(state, found_pts)
- while True:
- try:
- response = await client(GetDifferenceRequest(**state))
- if isinstance(response, types.updates.DifferenceEmpty):
- break
- elif isinstance(response, types.updates.Difference):
- state = dict_from_state(response.state)
- elif isinstance(response, types.updates.DifferenceSlice):
- state = dict_from_state(response.intermediate_state)
- elif isinstance(response, types.updates.DifferenceTooLong):
- bottom = state['pts']
- top = response.pts
- state, found_pts = await search_pts(bottom, found_pts)
- except Exception as e:
- return print(f'Error getting difference: {type(e)}: {e}')
- print(f'Fetching peers {(state["pts"] / total_pts) * 100:0.2f}%', flush=True, end='\r')
- print('\nFinished')
- return True
- async def get_bot_dialog_ids() -> list[int]:
- success = await cache_all_bot_chats(
- {'qts': 1, 'pts': 1, 'date': 1, 'pts_total_limit': 2**31 - 1}
- , client._message_box.session_state()[0]['pts'])
- if success:
- client.session.save()
- return [
- en[0]
- for en in client.session._cursor()
- .execute('SELECT id FROM entities')
- .fetchall()
- ]
- async def main(client):
- old_count = client.session._cursor().execute('SELECT COUNT(*) FROM entities').fetchone()[0]
- print('current peer count in session:', old_count)
- chats = await get_bot_dialog_ids()
- if chats:
- print('Total new cached chats:', len(chats) - old_count)
- print('Users:', len([u for u in chats if u > 0]))
- print('Chats:', len([c for c in chats if c < 0]))
- else:
- print('Failed')
- parser = argparse.ArgumentParser()
- parser.add_argument("session", help="path to .session file of bot")
- parser.add_argument("api_id", type=int)
- parser.add_argument("api_hash")
- args = parser.parse_args()
- client = TelegramClient(args.session, args.api_id, args.api_hash).start()
- client.loop.run_until_complete(main(client))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement