Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import html
- import logging
- import urllib.parse
- from uuid import uuid4
- import requests
- import wolframalpha
- from more_itertools import peekable
- from telegram import InlineQueryResultArticle, InputTextMessageContent, Update, InlineQueryResultPhoto
- from telegram.constants import ParseMode
- from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes, InlineQueryHandler
- from telegram.helpers import mention_html
- import os
- import vertexai
- from vertexai.generative_models import GenerativeModel, Part
- os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/Icarussiano/provagemini-417819.json'
- #setup Gemini
- project_id = "provagemini-417819"
- location = "us-central1"
- vertexai.init(project=project_id, location=location)
- model = GenerativeModel("gemini-1.0-pro")
- #setup logger
- logging.basicConfig(
- format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
- )
- # set higher logging level for httpx to avoid all GET and POST requests being logged
- logging.getLogger("httpx").setLevel(logging.WARNING)
- logger = logging.getLogger(__name__)
- TOKEN = os.getenv('TOKEN')
- appid = os.getenv('API_key')
- wolfram_client = wolframalpha.Client(appid)
- async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Send a message when the command /start is issued."""
- user_id = update.effective_user.id
- name = update.effective_user.full_name
- menzione = mention_html(user_id, name)
- await update.message.reply_html(
- f"Ciao {menzione}! Sono un bot che ti aiuta a rispondere alle domande usando WolframAlpha. Scrivi /help per "
- f"sapere come usarmi.")
- async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Send a message when the command /help is issued."""
- help_text = "Puoi usare i seguenti comandi:\n\n"
- help_text += "/short <query>: Risponderò usando l'API short answers di WolframAlpha\n"
- help_text += "/img <query>: Risponderò con l'immagine del risultato di WolframAlpha\n"
- help_text += ("/query <query>: Risponderò in modo dettagliato, riportando tutte le informazioni testuali "
- "disponibili\n")
- help_text += "\nPuoi anche usare il bot inline, scrivendo @simplewolframbot <query> ;"
- await update.message.reply_text(help_text)
- async def simple_query(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Handle the /simple command. This is run when you type: /simple <query>"""
- query = ' '.join(update.message.text.split()[1:])
- query_url = urllib.parse.quote_plus(query)
- short_url = f"http://api.wolframalpha.com/v1/result?appid={appid}&i={query_url}"
- res = requests.get(short_url).text
- await update.message.reply_text(res)
- async def img(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Handle the /img command. This is run when you type: /img <query>"""
- query = ' '.join(update.message.text.split()[1:])
- query_url = urllib.parse.quote_plus(query)
- photo_url = f"http://api.wolframalpha.com/v1/simple?appid={appid}&i={query_url}"
- res = requests.get(photo_url)
- with open('output.png', 'wb') as f:
- f.write(res.content)
- await update.message.reply_photo(open('output.png', 'rb'))
- async def reply_query(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Handle the /query command. This is run when you type: /query <query>"""
- query = ' '.join(update.message.text.split()[1:])
- res = wolfram_client.query(query)
- result_text = ""
- if peekable(res.results):
- for pod in res.results:
- for subpod in pod.subpods:
- result_text += f"{subpod.plaintext}\n"
- else:
- for pod in res.pods:
- result_text += f"\n{pod.title}\n"
- for subpod in pod.subpods:
- result_text += f"{subpod.plaintext}\n"
- if len(result_text) > 4096:
- result_text = result_text[:4096]
- await update.message.reply_text(result_text)
- async def inline_query(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Handle the inline query. This is run when you type: @simplewolframbot <query>"""
- query = update.inline_query.query
- if not query: # empty query should not be handled
- return
- elif ';' in query:
- query = query[:-1]
- query_url = urllib.parse.quote_plus(query)
- photo_url = f"http://api.wolframalpha.com/v1/simple?appid={appid}&i={query_url}"
- res = wolfram_client.query(query)
- result_text = ""
- if peekable(res.results):
- for pod in res.results:
- for subpod in pod.subpods:
- if subpod.plaintext:
- result_text += f"{html.escape(subpod.plaintext)}\n"
- else:
- for pod in res.pods:
- result_text += f"\n<b>{pod.title}</b>\n"
- length = len(f"\n<b>{pod.title}</b>\n")
- for subpod in pod.subpods:
- if subpod.plaintext:
- result_text += f"{html.escape(subpod.plaintext)}\n"
- else:
- result_text = result_text[:-length]
- short_url = f"http://api.wolframalpha.com/v1/result?appid={appid}&i={query_url}"
- req = requests.get(short_url).text
- message = InputTextMessageContent(f"<b>{html.escape(query)}</b> \n{result_text}", parse_mode=ParseMode.HTML)
- rapido = InputTextMessageContent(f"<b>{html.escape(query)}</b> \n{html.escape(req)}", parse_mode=ParseMode.HTML)
- if len(message.message_text) > 4096:
- message = InputTextMessageContent(message.message_text[:4096], parse_mode=ParseMode.HTML)
- if len(rapido.message_text) > 4096:
- rapido = InputTextMessageContent(rapido.message_text[:4096], parse_mode=ParseMode.HTML)
- latex = f"Convert the following text in LaTex inline expression. Answer only with the LaTex code delimited by dollar sign. \nText: {query}={result_text}"
- responses = model.generate_content(latex, stream=True)
- latex_text = ''.join([response.text for response in responses])
- #latex_text = responses.text
- latex_text = latex_text.lstrip('$')
- latex_text = latex_text.rstrip('$')
- latex_text = f"${latex_text}$"
- results = [
- InlineQueryResultArticle(
- id=str(uuid4()),
- title="Risposta rapida",
- input_message_content=rapido,
- ),
- InlineQueryResultArticle(
- id=str(uuid4()),
- title="Risposta testuale completa",
- input_message_content=message,
- ),
- InlineQueryResultArticle(
- id=str(uuid4()),
- title="LaTeX(sperimental)",
- input_message_content=InputTextMessageContent(latex_text, parse_mode=ParseMode.HTML),
- ),
- InlineQueryResultPhoto(
- id=str(uuid4()),
- title="Immagine",
- photo_url=photo_url,
- thumbnail_url=photo_url,
- ),
- ]
- await update.inline_query.answer(results)
- app = ApplicationBuilder().token(TOKEN).build()
- app.add_handler(CommandHandler("query", reply_query))
- app.add_handler(CommandHandler("start", start))
- app.add_handler(CommandHandler("help", help_command))
- app.add_handler(CommandHandler("img", img))
- app.add_handler(CommandHandler("short", simple_query))
- app.add_handler(InlineQueryHandler(inline_query))
- app.run_polling()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement