Advertisement
icarussiano

simplewolfram

Apr 3rd, 2024
546
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.81 KB | None | 0 0
  1. import html
  2. import logging
  3. import os
  4. import urllib.parse
  5. from uuid import uuid4
  6.  
  7. import requests
  8. import vertexai
  9. import wolframalpha
  10. from dotenv import load_dotenv
  11. from more_itertools import peekable
  12. from telegram import InlineKeyboardButton, InlineKeyboardMarkup, InlineQueryResultArticle, InputTextMessageContent, \
  13.     Update, InlineQueryResultPhoto
  14. from telegram.constants import ParseMode
  15. from telegram.ext import ApplicationBuilder, CallbackContext, CallbackQueryHandler, CommandHandler, ContextTypes, \
  16.     InlineQueryHandler
  17. from telegram.helpers import mention_html
  18. from vertexai.generative_models import GenerativeModel
  19.  
  20. from langdetect import detect
  21. from deep_translator import GoogleTranslator
  22.  
  23. # Use any translator you like, in this example GoogleTranslator
  24. translator = GoogleTranslator(source='auto', target='en')
  25.  
  26. load_dotenv()
  27.  
  28. os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/Icarussiano/provagemini-417819-7eb2582ee0d4.json'
  29. project_id = "provagemini-417819"
  30. location = "us-central1"
  31. vertexai.init(project=project_id, location=location)
  32. model = GenerativeModel("gemini-1.0-pro")
  33.  
  34. # setup logger
  35. logging.basicConfig(
  36.     filename='app.log',
  37.     filemode='a',
  38.     format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
  39.     level=logging.INFO,
  40. )
  41. # set higher logging level for httpx to avoid all GET and POST requests being logged
  42. logging.getLogger("httpx").setLevel(logging.WARNING)
  43. logger = logging.getLogger(__name__)
  44.  
  45. TOKEN = os.getenv('TOKEN2')
  46. appid = os.getenv('APPID')
  47. wolfram_client = wolframalpha.Client(appid)
  48.  
  49.  
  50. async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
  51.     """Send a message when the command /start is issued."""
  52.     user_id = update.effective_user.id
  53.     name = update.effective_user.full_name
  54.     menzione = mention_html(user_id, name)
  55.     await update.message.reply_html(
  56.         f"Ciao {menzione}! Sono un bot che ti aiuta a rispondere alle domande usando WolframAlpha. Scrivi /help per "
  57.         f"sapere come usarmi.")
  58.  
  59.  
  60. async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
  61.     """Send a message when the command /help is issued."""
  62.     help_text = "Puoi usare i seguenti comandi:\n\n"
  63.     help_text += "/short <query>: Risponderò usando l'API short answers di WolframAlpha\n"
  64.     help_text += "/img <query>: Risponderò con l'immagine del risultato di WolframAlpha\n"
  65.     help_text += ("/query <query>: Risponderò in modo dettagliato, riportando tutte le informazioni testuali "
  66.                   "disponibili\n")
  67.     help_text += "\nPuoi anche usare il bot inline, scrivendo @simplewolframbot <query> e poi selezionando uno dei " \
  68.                  "bottoni oppure direttamente @simplewolframbot img <query> per inviare inline l'immagine "
  69.     await update.message.reply_text(help_text)
  70.  
  71.  
  72. async def simple_query(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
  73.     """Handle the /simple command. This is run when you type: /simple <query>"""
  74.     query = ' '.join(update.message.text.split()[1:])
  75.     query_url = urllib.parse.quote_plus(query)
  76.     short_url = f"http://api.wolframalpha.com/v1/result?appid={appid}&i={query_url}"
  77.     res = requests.get(short_url).text
  78.     await update.message.reply_text(res)
  79.  
  80.  
  81. async def img(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
  82.     """Handle the /img command. This is run when you type: /img <query>"""
  83.     query = ' '.join(update.message.text.split()[1:])
  84.     query_url = urllib.parse.quote_plus(query)
  85.     photo_url = f"http://api.wolframalpha.com/v1/simple?appid={appid}&i={query_url}"
  86.     res = requests.get(photo_url)
  87.     with open('output.png', 'wb') as f:
  88.         f.write(res.content)
  89.     await update.message.reply_photo(open('output.png', 'rb'))
  90.  
  91.  
  92. async def reply_query(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
  93.     """Handle the /query command. This is run when you type: /query <query>"""
  94.     query = ' '.join(update.message.text.split()[1:])
  95.     res = wolfram_client.query(query)
  96.     result_text = ""
  97.     if peekable(res.results):
  98.         for pod in res.results:
  99.             for subpod in pod.subpods:
  100.                 result_text += f"{subpod.plaintext}\n"
  101.     else:
  102.         for pod in res.pods:
  103.             result_text += f"\n{pod.title}\n"
  104.             for subpod in pod.subpods:
  105.                 result_text += f"{subpod.plaintext}\n"
  106.     if len(result_text) > 4096:
  107.         result_text = result_text[:4096]
  108.     await update.message.reply_text(result_text)
  109.  
  110.  
  111. async def inline_query(update: Update, context: CallbackContext) -> None:
  112.     """Handle the inline query. This is run when you type: @simplewolframbot <query>"""
  113.     query = update.inline_query.query
  114.     user_id = update.inline_query.from_user.id
  115.     if query.startswith("img "):
  116.         query = query[4:]
  117.         query_url = urllib.parse.quote_plus(query)
  118.         photo_url = f"http://api.wolframalpha.com/v1/simple?appid={appid}&i={query_url}"
  119.         results = [
  120.             InlineQueryResultPhoto(
  121.                 id=str(uuid4()),
  122.                 photo_url=photo_url,
  123.                 thumbnail_url="https://www.wolframalpha.com/_next/static/images/share_3eSzXbxb.png",
  124.             )
  125.         ]
  126.         await update.inline_query.answer(results)
  127.     # create keyboard with three buttons named Risposta, Immagine , LaTeX(sperimentale)
  128.     keyboard = [
  129.         [
  130.             InlineKeyboardButton("Risposta", callback_data=f"1:{query}:{user_id}"),
  131.             InlineKeyboardButton("Risposta breve", callback_data=f"2:{query}:{user_id}"),
  132.         ],
  133.         [InlineKeyboardButton("LaTeX(Sperimentale)", callback_data=f"3:{query}:{user_id}"),
  134.          InlineKeyboardButton("Immagine", switch_inline_query_current_chat=f"img {query}"), ],
  135.         [
  136.             InlineKeyboardButton("Modifica query", switch_inline_query_current_chat=f"{query}"),
  137.         ]
  138.     ]
  139.     reply_markup = InlineKeyboardMarkup(keyboard)
  140.     results = [
  141.  
  142.         InlineQueryResultArticle(
  143.             id=str(uuid4()),
  144.             title="Risultati",
  145.             input_message_content=InputTextMessageContent(
  146.                 f"Seleziona l'opzione per cercare <code>{html.escape(query)}</code>", parse_mode=ParseMode.HTML),
  147.             reply_markup=reply_markup
  148.         ),
  149.     ]
  150.  
  151.     await context.bot.answer_inline_query(update.inline_query.id, results=results)
  152.  
  153.  
  154. async def button(update: Update, CallbackContext) -> None:
  155.     """Parses the CallbackQuery and updates the message text."""
  156.     query = update.callback_query
  157.     callback_user_id = query.data.split(":")[2]
  158.     user_id = query.from_user.id
  159.     if callback_user_id != str(user_id):
  160.         await query.answer("NON SEI AUTORIZZATO, SOLO L'UTENTE CHE HA FATTO LA QUERY PUÒ USARE I BOTTONI PER OTTENERE LE RISPOSTE. ", show_alert=True)
  161.         return
  162.     global chunks
  163.     keyboard = []
  164.     # CallbackQueries need to be answered, even if no notification to the user is needed
  165.     # Some clients may have trouble otherwise. See https://core.telegram.org/bots/api#callbackquery
  166.     await query.answer()
  167.     search_query = query.data.split(":")[1]
  168.     if detect(search_query)=="it":
  169.         search_query=translator.translate(search_query)
  170.  
  171.     answer = "No answer"
  172.     if query.data.startswith("1:"):
  173.         res = wolfram_client.query(search_query)
  174.         result_text = ""
  175.         if peekable(res.results):
  176.             for pod in res.results:
  177.                 for subpod in pod.subpods:
  178.                     result_text += f"{subpod.plaintext}\n"
  179.         else:
  180.             for pod in res.pods:
  181.                 result_text += f"\n<b>{pod.title}</b>\n"
  182.                 for subpod in pod.subpods:
  183.                     result_text += f"{subpod.plaintext}\n"
  184.         if len(result_text) > 4000:
  185.             chunks = (result_text[i:i + 4000] for i in range(0, len(result_text), 4000))
  186.             keyboard = InlineKeyboardMarkup([[InlineKeyboardButton("Next", callback_data=f"4")], ])
  187.             await query.edit_message_text(f"<b>{html.escape(search_query)}</b>\n{next(chunks)}", reply_markup=keyboard,
  188.                                           parse_mode=ParseMode.HTML)
  189.         else:
  190.             await query.edit_message_text(f"<b>{html.escape(search_query)}</b>\n{result_text}",
  191.                                           parse_mode=ParseMode.HTML)
  192.     elif query.data.startswith("2:"):
  193.         query_url = urllib.parse.quote_plus(search_query)
  194.         short_url = f"http://api.wolframalpha.com/v1/result?appid={appid}&i={query_url}"
  195.         res = requests.get(short_url).text
  196.         await query.edit_message_text(f"<b>{html.escape(search_query)}</b>\n{res}", parse_mode=ParseMode.HTML)
  197.     elif query.data.startswith("3:"):
  198.         query_url = urllib.parse.quote_plus(search_query)
  199.         short_url = f"http://api.wolframalpha.com/v1/result?appid={appid}&i={query_url}"
  200.         res = requests.get(short_url).text
  201.         latex = f"Convert the following text in LaTex inline expression. Answer only with the LaTex code delimited by dollar sign. \nText: {search_query}={res}"
  202.         responses = model.generate_content(latex, stream=True)
  203.         latex_text = ''.join([response.text for response in responses])
  204.         latex_text = latex_text.lstrip('$')
  205.         latex_text = latex_text.rstrip('$')
  206.         latex_text = f"${latex_text}$"
  207.         await query.edit_message_text(latex_text)
  208.     elif query.data.startswith("4"):
  209.         await query.edit_message_text(next(chunks), parse_mode=ParseMode.HTML)
  210.  
  211.  
  212. app = ApplicationBuilder().token(TOKEN).build()
  213. app.add_handler(CommandHandler("query", reply_query))
  214. app.add_handler(CommandHandler("start", start))
  215. app.add_handler(CommandHandler("help", help_command))
  216. app.add_handler(CommandHandler("img", img))
  217. app.add_handler(CommandHandler("short", simple_query))
  218. app.add_handler(CallbackQueryHandler(button))
  219. app.add_handler(InlineQueryHandler(inline_query))
  220. app.run_polling()
  221.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement