Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from telethon import events, types,functions
- from telethon.sync import TelegramClient
- from telethon.tl.types import InputMediaPoll
- import random, asyncio
- from telethon.tl.functions.channels import GetFullChannelRequest
- # Your API credentials
- api_id = ''
- api_hash = ''
- session_name = ''
- # Create a new TelegramClient instance
- client = TelegramClient(session_name, api_id, api_hash)
- #Join request to a channel which the bot is admin of will trigger sending a poll to the user trying to join
- @client.on(events.Raw(types.UpdateBotChatInviteRequester))
- async def join_requested(event):
- try:
- print("\nNEW JOIN REQUEST\n\n" + event.stringify() +"\n")
- entity = await client.get_entity(event.peer)
- invite = (await client(GetFullChannelRequest(entity))).full_chat.exported_invite.link
- channel_name = entity.title
- channel_id = event.peer.channel_id
- user_id = event.user_id
- print("User ID:", user_id)
- print("Channel Name:", channel_name)
- print("Channel ID:", channel_id)
- print("Channel Link:", invite)
- newsub = f'\nNew sub!\n\nUser ID: {user_id}\nChannel ID: {channel_id}\nChannel Name: {channel_name}\n{invite}\n'
- except Exception as e:
- print("Error fetching channel entity:", e)
- # Define the poll question and answers
- question = 'How much is 3+1?'
- answers = [types.PollAnswer('1', b'1'), types.PollAnswer('2', b'2'),types.PollAnswer('3', b'3'), types.PollAnswer('4', b'4'), ]
- random.shuffle(answers)
- # Create the poll object
- poll = types.Poll(
- id=653342256788,
- question=question,
- answers=answers,
- public_voters=False,
- quiz=True,
- multiple_choice=False,
- )
- # Create the InputMediaPoll object
- media = types.InputMediaPoll(
- poll=poll,
- correct_answers=[b'4'] # Correct answer(s)
- )
- # Send the poll
- await client.send_message(user_id, f'To join **{channel_name}**, successfully answer the following question:')
- my_poll = await client.send_file(event.user_id, media)
- poll_id = my_poll.poll.poll.id
- wait_vote = asyncio.Event()
- @client.on(events.Raw(types.UpdateMessagePoll, func=lambda e: print('check', user_id, e.poll_id == poll_id) or e.poll_id == poll_id))
- async def check_answer(event):
- wait_vote.set()
- print("\nPOLL RESULT\n\n" + event.stringify() + "\n")
- print(f"Poll ID: {event.poll_id}")
- options_and_voters = {answer.text: voters.voters for answer, voters in zip(event.poll.answers, event.results.results)}
- print(options_and_voters)
- if options_and_voters['4'] == 1:
- # Approve join request
- try:
- await client.send_message(user_id, "You seem like a clever person, I'm going to let you in 😏")
- print('Success')
- print(newsub)
- await client(functions.messages.HideChatJoinRequestRequest(channel_id, user_id, approved=True))
- except Exception as e:
- print(e)
- else:
- try:
- await client.send_message(user_id, "Womp womp woooooomp. Brush yourself off and try again.")
- print('Fail')
- await client(functions.messages.HideChatJoinRequestRequest(channel_id, user_id, approved=False))
- except Exception as e:
- print(e)
- try:
- await asyncio.wait_for(wait_vote.wait(), timeout=10 * 60)
- except TimeoutError:
- await client.send_message(user_id, "Too slow, try again.")
- finally:
- print('vote reached, clean handler')
- client.remove_event_handler(check_answer)
- async def main():
- await client.start()
- print('\nLogged in')
- await client.run_until_disconnected()
- if __name__ == '__main__':
- asyncio.run(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement