Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import telethon
- from telethon import TelegramClient, Button, events, types, functions, errors
- MEDIA_PATH = '...'
- BOT_TOKEN='...'
- bot = TelegramClient('bot', API_ID, API_HASH)
- import logging
- logging.basicConfig(
- format='[%(levelname) 5s/%(asctime)s] %(name)s: %(message)s', level=logging.INFO
- )
- from telethon.tl.functions import InvokeWithBusinessConnectionRequest
- from telethon.tl.types import UpdateBotNewBusinessMessage, UpdateBotEditBusinessMessage
- from telethon.tl.functions.messages import (
- EditMessageRequest,
- SendMessageRequest,
- SendMediaRequest,
- UploadMediaRequest,
- )
- from telethon.errors import UserMigrateError
- def parse_response(updates, peer):
- for e in updates.updates:
- if isinstance(e, (UpdateBotNewBusinessMessage, UpdateBotEditBusinessMessage)):
- e.message._finish_init(
- bot, {i.id: i for i in updates.chats + updates.users}, peer
- )
- return e.message
- async def invoke_business(conn_id, req, chat_id):
- await req.resolve(bot, telethon.utils)
- req = InvokeWithBusinessConnectionRequest(conn_id, req)
- result = await bot(req)
- return parse_response(result, await bot.get_input_entity(chat_id))
- async def edit_businsess_message(conn_id, chat_id, msg_id, new_message, **kwargs):
- message, entities = bot.parse_mode.parse(new_message)
- return await invoke_business(
- conn_id,
- EditMessageRequest(
- peer=chat_id, id=msg_id, message=message, entities=entities, **kwargs
- ),
- chat_id,
- )
- async def send_business_message(conn_id, chat_id, message):
- message, entities = bot.parse_mode.parse(message)
- return await invoke_business(
- conn_id,
- SendMessageRequest(
- peer=chat_id,
- message=message,
- entities=entities,
- ),
- chat_id,
- )
- async def send_business_media(conn_id, chat_id, file, caption=None):
- message, entities = bot.parse_mode.parse(caption) if caption else '', None
- media = await bot(
- UploadMediaRequest('me', (await bot._file_to_media(file))[1], conn_id)
- )
- return await invoke_business(
- conn_id,
- SendMediaRequest(
- peer=chat_id,
- media=media,
- message=message,
- entities=entities,
- ),
- chat_id,
- )
- @bot.on(events.Raw(UpdateBotNewBusinessMessage))
- async def new_biz_message(event):
- message = await send_business_message(
- event.connection_id, event.message.chat_id, 'Hi! how can i help you?'
- )
- message = await send_business_media(
- event.connection_id, event.message.chat_id, MEDIA_PATH
- )
- await edit_businsess_message(
- event.connection_id,
- event.message.chat_id,
- message.id,
- 'Hi! how can i help you?',
- )
- bot.start(bot_token=BOT_TOKEN)
- bot.run_until_disconnected()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement