Advertisement
icarussiano

botwolframnew

Apr 1st, 2024 (edited)
637
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.25 KB | None | 0 0
  1. import html
  2. import logging
  3. import os
  4. import urllib.parse
  5. from uuid import uuid4
  6.  
  7. import requests
  8. import vertexai
  9. import wolframalpha
  10. from dotenv import load_dotenv
  11. from more_itertools import peekable
  12. from telegram import InlineKeyboardButton, InlineKeyboardMarkup, InlineQueryResultArticle, InputTextMessageContent, \
  13.     Update, InlineQueryResultPhoto
  14. from telegram.constants import ParseMode
  15. from telegram.ext import ApplicationBuilder, CallbackContext, CallbackQueryHandler, CommandHandler, ContextTypes, \
  16.     InlineQueryHandler
  17. from telegram.helpers import mention_html
  18. from vertexai.generative_models import GenerativeModel
  19.  
  20. load_dotenv()
  21.  
  22. os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/davide/Downloads/provagemini-417819-7eb2582ee0d4.json'
  23. project_id = "provagemini-417819"
  24. location = "us-central1"
  25. vertexai.init(project=project_id, location=location)
  26. model = GenerativeModel("gemini-1.0-pro")
  27.  
  28. # setup logger
  29. logging.basicConfig(
  30.     filename='app.log',
  31.     filemode='a',
  32.     format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
  33.     level=logging.INFO,
  34. )
  35. # set higher logging level for httpx to avoid all GET and POST requests being logged
  36. logging.getLogger("httpx").setLevel(logging.WARNING)
  37. logger = logging.getLogger(__name__)
  38.  
  39. TOKEN = os.getenv('TOKEN')
  40. appid = os.getenv('APPID')
  41. wolfram_client = wolframalpha.Client(appid)
  42.  
  43.  
  44. async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
  45.     """Send a message when the command /start is issued."""
  46.     user_id = update.effective_user.id
  47.     name = update.effective_user.full_name
  48.     menzione = mention_html(user_id, name)
  49.     await update.message.reply_html(
  50.         f"Ciao {menzione}! Sono un bot che ti aiuta a rispondere alle domande usando WolframAlpha. Scrivi /help per "
  51.         f"sapere come usarmi.")
  52.  
  53.  
  54. async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
  55.     """Send a message when the command /help is issued."""
  56.     help_text = "Puoi usare i seguenti comandi:\n\n"
  57.     help_text += "/short <query>: Risponderò usando l'API short answers di WolframAlpha\n"
  58.     help_text += "/img <query>: Risponderò con l'immagine del risultato di WolframAlpha\n"
  59.     help_text += ("/query <query>: Risponderò in modo dettagliato, riportando tutte le informazioni testuali "
  60.                   "disponibili\n")
  61.     help_text += "\nPuoi anche usare il bot inline, scrivendo @simplewolframbot <query> e poi selezionando uno dei " \
  62.                  "bottoni oppure direttamente @simplewolframbot img <query> per inviare inline l'immagine "
  63.     await update.message.reply_text(help_text)
  64.  
  65.  
  66. async def simple_query(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
  67.     """Handle the /simple command. This is run when you type: /simple <query>"""
  68.     query = ' '.join(update.message.text.split()[1:])
  69.     query_url = urllib.parse.quote_plus(query)
  70.     short_url = f"http://api.wolframalpha.com/v1/result?appid={appid}&i={query_url}"
  71.     res = requests.get(short_url).text
  72.     await update.message.reply_text(res)
  73.  
  74.  
  75. async def img(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
  76.     """Handle the /img command. This is run when you type: /img <query>"""
  77.     query = ' '.join(update.message.text.split()[1:])
  78.     query_url = urllib.parse.quote_plus(query)
  79.     photo_url = f"http://api.wolframalpha.com/v1/simple?appid={appid}&i={query_url}"
  80.     res = requests.get(photo_url)
  81.     with open('output.png', 'wb') as f:
  82.         f.write(res.content)
  83.     await update.message.reply_photo(open('output.png', 'rb'))
  84.  
  85.  
  86. async def reply_query(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
  87.     """Handle the /query command. This is run when you type: /query <query>"""
  88.     query = ' '.join(update.message.text.split()[1:])
  89.     res = wolfram_client.query(query)
  90.     result_text = ""
  91.     if peekable(res.results):
  92.         for pod in res.results:
  93.             for subpod in pod.subpods:
  94.                 result_text += f"{subpod.plaintext}\n"
  95.     else:
  96.         for pod in res.pods:
  97.             result_text += f"\n{pod.title}\n"
  98.             for subpod in pod.subpods:
  99.                 result_text += f"{subpod.plaintext}\n"
  100.     if len(result_text) > 4096:
  101.         result_text = result_text[:4096]
  102.     await update.message.reply_text(result_text)
  103.  
  104.  
  105. async def inline_query(update: Update, context: CallbackContext) -> None:
  106.     """Handle the inline query. This is run when you type: @simplewolframbot <query>"""
  107.     query = update.inline_query.query
  108.     if query.startswith("img "):
  109.         query = query[4:]
  110.         query_url = urllib.parse.quote_plus(query)
  111.         photo_url = f"http://api.wolframalpha.com/v1/simple?appid={appid}&i={query_url}"
  112.         results = [
  113.             InlineQueryResultPhoto(
  114.                 id=str(uuid4()),
  115.                 photo_url=photo_url,
  116.                 thumbnail_url="https://www.wolframalpha.com/_next/static/images/share_3eSzXbxb.png",
  117.             )
  118.         ]
  119.         await update.inline_query.answer(results)
  120.     # create keyboard with three buttons named Risposta, Immagine , LaTeX(sperimentale)
  121.     keyboard = [
  122.         [
  123.             InlineKeyboardButton("Risposta", callback_data=f"1:{query}"),
  124.             InlineKeyboardButton("Risposta breve", callback_data=f"2:{query}"),
  125.         ],
  126.         [InlineKeyboardButton("LaTeX(Sperimentale)", callback_data=f"3:{query}"),
  127.          InlineKeyboardButton("Immagine", switch_inline_query_current_chat=f"img {query}"), ],
  128.         [
  129.             InlineKeyboardButton("Modifica query", switch_inline_query_current_chat=f"{query}"),
  130.         ]
  131.     ]
  132.     reply_markup = InlineKeyboardMarkup(keyboard)
  133.     results = [
  134.  
  135.         InlineQueryResultArticle(
  136.             id=str(uuid4()),
  137.             title="Risultati",
  138.             input_message_content=InputTextMessageContent(
  139.                 f"Seleziona l'opzione per cercare <code>{html.escape(query)}</code>", parse_mode=ParseMode.HTML),
  140.             reply_markup=reply_markup
  141.         ),
  142.     ]
  143.  
  144.     await context.bot.answer_inline_query(update.inline_query.id, results=results)
  145.  
  146.  
  147. async def button(update: Update, CallbackContext) -> None:
  148.     """Parses the CallbackQuery and updates the message text."""
  149.     query = update.callback_query
  150.     global chunks
  151.     keyboard = []
  152.     # CallbackQueries need to be answered, even if no notification to the user is needed
  153.     # Some clients may have trouble otherwise. See https://core.telegram.org/bots/api#callbackquery
  154.     await query.answer()
  155.     answer = "No answer"
  156.     if query.data.startswith("1:"):
  157.         search_query = query.data.split(":")[1]
  158.         res = wolfram_client.query(search_query)
  159.         result_text = ""
  160.         if peekable(res.results):
  161.             for pod in res.results:
  162.                 for subpod in pod.subpods:
  163.                     result_text += f"{subpod.plaintext}\n"
  164.         else:
  165.             for pod in res.pods:
  166.                 result_text += f"\n<b>{pod.title}</b>\n"
  167.                 for subpod in pod.subpods:
  168.                     result_text += f"{subpod.plaintext}\n"
  169.         if len(result_text) > 4000:
  170.             chunks = (result_text[i:i + 4000] for i in range(0, len(result_text), 4000))
  171.             keyboard = InlineKeyboardMarkup([[InlineKeyboardButton("Next", callback_data=f"4")], ])
  172.             await query.edit_message_text(f"<b>{html.escape(search_query)}</b>\n{next(chunks)}", reply_markup=keyboard,
  173.                                           parse_mode=ParseMode.HTML)
  174.         else:
  175.             await query.edit_message_text(f"<b>{html.escape(search_query)}</b>\n{result_text}",
  176.                                           parse_mode=ParseMode.HTML)
  177.     elif query.data.startswith("2:"):
  178.         search_query = query.data.split(":")[1]
  179.         query_url = urllib.parse.quote_plus(search_query)
  180.         short_url = f"http://api.wolframalpha.com/v1/result?appid={appid}&i={query_url}"
  181.         res = requests.get(short_url).text
  182.         await query.edit_message_text(f"<b>{html.escape(search_query)}</b>\n{res}", parse_mode=ParseMode.HTML)
  183.     elif query.data.startswith("3:"):
  184.         search_query = query.data.split(":")[1]
  185.         query_url = urllib.parse.quote_plus(search_query)
  186.         short_url = f"http://api.wolframalpha.com/v1/result?appid={appid}&i={query_url}"
  187.         res = requests.get(short_url).text
  188.         latex = f"Convert the following text in LaTex inline expression. Answer only with the LaTex code delimited by dollar sign. \nText: {search_query}={res}"
  189.         responses = model.generate_content(latex, stream=True)
  190.         latex_text = ''.join([response.text for response in responses])
  191.         latex_text = latex_text.lstrip('$')
  192.         latex_text = latex_text.rstrip('$')
  193.         latex_text = f"${latex_text}$"
  194.         await query.edit_message_text(latex_text)
  195.     elif query.data.startswith("4"):
  196.         await query.edit_message_text(next(chunks), parse_mode=ParseMode.HTML)
  197.  
  198.  
  199. app = ApplicationBuilder().token(TOKEN).build()
  200. app.add_handler(CommandHandler("query", reply_query))
  201. app.add_handler(CommandHandler("start", start))
  202. app.add_handler(CommandHandler("help", help_command))
  203. app.add_handler(CommandHandler("img", img))
  204. app.add_handler(CommandHandler("short", simple_query))
  205. app.add_handler(CallbackQueryHandler(button))
  206. app.add_handler(InlineQueryHandler(inline_query))
  207. app.run_polling()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement