Advertisement
drinfernoo

Untitled

Jun 19th, 2023
831
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.07 KB | None | 0 0
  1. from dataclasses import asdict
  2. import json
  3. from typing import Type
  4.  
  5. from quart import jsonify, request, views
  6.  
  7. from betty.chat.api import ChatAPI
  8. from betty.types import Item
  9.  
  10.  
  11. class BaseModelView(views.MethodView):
  12.     def __init__(self, item_type: Type[Item]):
  13.         self.item_type = item_type
  14.  
  15.     @classmethod
  16.     def as_view(cls, name, *class_args, **class_kwargs):
  17.         view = super().as_view(name, *class_args, **class_kwargs)
  18.         view.model = cls(*class_args, **class_kwargs)
  19.         return view
  20.        
  21.  
  22. class StreamItemsView(BaseModelView):
  23.     async def handle_stream_request(self):
  24.         openai_api_key = request.headers.pop(
  25.             "Authorization", "Invalid Authorization"
  26.         ).split(" ")[1]
  27.  
  28.         story_generator = ChatAPI(ai_prefix="Betty", openai_api_key=openai_api_key)
  29.  
  30.         # implemented similarly to https://gist.github.com/mortymike/70711b028311681e5f3c6511031d5d43#solution
  31.         # `story_generator.stream` puts items into `queue` as they are streamed from the model
  32.         async def stream(*args, **kwargs):
  33.             queue = Queue()
  34.             job_done = object()
  35.             queue.put(job_done)
  36.  
  37.             t = Thread(target=asyncio.run, args=(story_generator.stream(*args, queue=queue, **kwargs),))
  38.             t.start()
  39.  
  40.             while True:
  41.                 try:
  42.                     next_obj = queue.get(True, timeout=1)
  43.                     if next_obj is job_done:
  44.                         if not t.is_alive():
  45.                             break
  46.                         continue
  47.                     json_obj = json.dumps(asdict(next_obj))
  48.                     yield f"{json_obj}\n"
  49.                 except Empty:
  50.                     if not t.is_alive():
  51.                         break
  52.                     continue
  53.  
  54.         data = await request.get_json()
  55.         item_request = (
  56.             self.item_type.get_completion_request_model().parse_obj(data).dict()
  57.         )
  58.  
  59.         return stream(self.item_type, **item_request)
  60.  
  61.     async def post(self):
  62.         return self.handle_stream_request()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement