Advertisement
aa11111111111

Untitled

Jan 16th, 2025
46
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 10.46 KB | None | 0 0
  1. #modules >> silence_timer
  2.  
  3. import asyncio
  4. import random
  5. from typing import Callable, Dict
  6. from loguru import logger
  7. from voicex.models.data_models.silence_timer import (
  8.     SilenceTypes,
  9.     TimerState,
  10.     SilenceTimerConfig,
  11.     config as default_config
  12. )
  13. from copy import deepcopy
  14. from voicex.models.data_models.silence_timer import Silence_Types
  15. from voicex.models.data_models.silence_timer import LLMMessage
  16. from voicex.models.data_models.silence_timer import Config
  17.  
  18. config = Config()
  19.  
  20.  
  21. class SilenceTimer:
  22.     def __init__(
  23.         self,
  24.         agent,
  25.         get_ctx: Callable,
  26.         participant,
  27.         metadata,
  28.         config: SilenceTimerConfig = default_config,
  29.     ):
  30.         self.timers: Dict[SilenceTypes, TimerState] = {
  31.             SilenceTypes.CHECK_CUSTOMER: TimerState(
  32.                 timeout=config.check_customer_timeout
  33.             ),
  34.             SilenceTypes.HANG_UP: TimerState(
  35.                 timeout=config.hangup_timeout
  36.             ),
  37.         }
  38.         self.agent = agent
  39.         self.get_ctx = get_ctx
  40.         self.participant = participant
  41.         self.metadata = metadata
  42.         self.config = config
  43.         self.hangup_in_progress = False
  44.  
  45.     async def _timer(self, silence_type: SilenceTypes):
  46.         try:
  47.             remaining_time = self.timers[silence_type]["timeout"]
  48.             while remaining_time > 0 and self.timers[silence_type]["is_running"]:
  49.                 if not self.timers[silence_type]["is_paused"]:
  50.                     await asyncio.sleep(0.1)  # Check pause state every 0.1 seconds
  51.                     remaining_time -= 0.1
  52.                 else:
  53.                     await asyncio.sleep(0.1)  # Minimal sleep while paused
  54.  
  55.             if (
  56.                 self.timers[silence_type]["is_running"]
  57.                 and not self.timers[silence_type]["is_paused"]
  58.             ):
  59.                 await self.handle_silence(silence_type)
  60.         except asyncio.CancelledError:
  61.             pass
  62.  
  63.     def sample_message(self, silence_type: SilenceTypes) -> str:
  64.         """Randomly sample a message for the given silence type."""
  65.         messages = self.config.default_messages[silence_type]
  66.         return random.choice(messages) if isinstance(messages, list) else messages
  67.  
  68.     async def handle_silence(self, silence_type: SilenceTypes):
  69.         """Handle silence based on timer type"""
  70.         if self.hangup_in_progress:
  71.             return
  72.  
  73.         if silence_type == SilenceTypes.HANG_UP:
  74.             self.hangup_in_progress = True
  75.             self.stop()
  76.             logger.info(
  77.                 f"Stopping check customer timer for lead {self.metadata['lead_id']}"
  78.             )
  79.  
  80.         try:
  81.             self.stop(SilenceTypes.CHECK_CUSTOMER)
  82.             try:
  83.                 chat_ctx = self.agent.chat_ctx.copy()
  84.                 llm_message = self.config.llm_messages[silence_type]
  85.                 chat_ctx.append(role=llm_message.role, text=llm_message.message)
  86.                 logger.debug(f"LLM message: {llm_message}")
  87.                 message = self.agent.llm.chat(chat_ctx=chat_ctx)
  88.  
  89.             except Exception as e:
  90.                 message = self.sample_message(silence_type)
  91.                 logger.debug(
  92.                     f"Error generating silence message: {str(e)}\nUsing fallback message: {message}"
  93.                 )
  94.             # await self.agent.say(message)
  95.             speech_handle = await self.agent.say(message)
  96.             logger.info(f"Starting {silence_type} speech...")
  97.  
  98.             await speech_handle.wait_for_initialization()
  99.             logger.info(f"Speech handle completed for lead {self.metadata['lead_id']}")
  100.  
  101.             if silence_type == SilenceTypes.HANG_UP:
  102.                 logger.info("Entering hangup block")
  103.                 if speech_handle.interrupted:
  104.                     logger.info("Hangup interrupted...")
  105.                     self.hangup_in_progress = False
  106.                     self.start()
  107.                     return
  108.                 else:
  109.                     logger.info("Hangup completed...")
  110.                     await self.cleanup()
  111.                     # await self.agent_hang_up_callback(ctx, self.participant, self.metadata)
  112.                     # TODO: Add hangup logic here. Currently, hangup is triggered when the agent says `goodbye`
  113.  
  114.         except Exception as e:
  115.             logger.error(
  116.                 f"Error during {silence_type} speech: {str(e)} for lead {self.metadata['lead_id']}"
  117.             )
  118.         finally:
  119.             self.start(SilenceTypes.CHECK_CUSTOMER)
  120.  
  121.     def start(self, silence_type: SilenceTypes = None):
  122.         """Start specific or all timers"""
  123.         if silence_type:
  124.             self._start_timer(silence_type)
  125.         else:
  126.             # Start both timers
  127.             self._start_timer(SilenceTypes.CHECK_CUSTOMER)
  128.             self._start_timer(SilenceTypes.HANG_UP)
  129.  
  130.     def _start_timer(self, silence_type: SilenceTypes):
  131.         if self.timers[silence_type]["task"]:
  132.             self.timers[silence_type]["task"].cancel()
  133.         self.timers[silence_type]["is_running"] = True
  134.         self.timers[silence_type]["is_paused"] = False
  135.         self.timers[silence_type]["pause_time"] = None
  136.         self.timers[silence_type]["remaining_time"] = None
  137.         self.timers[silence_type]["task"] = asyncio.create_task(
  138.             self._timer(silence_type)
  139.         )
  140.  
  141.     def stop(self, silence_type: SilenceTypes = None):
  142.         """Stop specific or all timers"""
  143.         if silence_type:
  144.             self._stop_timer(silence_type)
  145.         else:
  146.             # Stop both timers
  147.             self._stop_timer(SilenceTypes.CHECK_CUSTOMER)
  148.             self._stop_timer(SilenceTypes.HANG_UP)
  149.  
  150.     def _stop_timer(self, silence_type: SilenceTypes):
  151.         self.timers[silence_type]["is_running"] = False
  152.         self.timers[silence_type]["is_paused"] = False
  153.         self.timers[silence_type]["pause_time"] = None
  154.         self.timers[silence_type]["remaining_time"] = None
  155.         if self.timers[silence_type]["task"]:
  156.             self.timers[silence_type]["task"].cancel()
  157.             self.timers[silence_type]["task"] = None
  158.  
  159.     def is_running(self, silence_type: SilenceTypes) -> bool:
  160.         """Check if a specific timer is running"""
  161.         return self.timers[silence_type]["is_running"]
  162.  
  163.     def pause(self, silence_type: SilenceTypes = None):
  164.         """Pause specific or all timers without canceling them"""
  165.         if silence_type:
  166.             self._pause_timer(silence_type)
  167.         else:
  168.             # Pause both timers
  169.             self._pause_timer(SilenceTypes.CHECK_CUSTOMER)
  170.             self._pause_timer(SilenceTypes.HANG_UP)
  171.  
  172.     def _pause_timer(self, silence_type: SilenceTypes):
  173.         timer_info = self.timers[silence_type]
  174.         if timer_info["is_running"] and not timer_info["is_paused"]:
  175.             timer_info["is_paused"] = True
  176.             timer_info["pause_time"] = asyncio.get_event_loop().time()
  177.             logger.debug(f"Paused {silence_type} timer")
  178.  
  179.     def resume(self, silence_type: SilenceTypes = None):
  180.         """Resume specific or all paused timers"""
  181.         if silence_type:
  182.             self._resume_timer(silence_type)
  183.         else:
  184.             # Resume both timers
  185.             self._resume_timer(SilenceTypes.CHECK_CUSTOMER)
  186.             self._resume_timer(SilenceTypes.HANG_UP)
  187.  
  188.     def _resume_timer(self, silence_type: SilenceTypes):
  189.         timer_info = self.timers[silence_type]
  190.         if timer_info["task"] and timer_info["is_paused"]:
  191.             timer_info["is_paused"] = False
  192.             logger.debug(f"Resumed {silence_type} timer")
  193.  
  194.     def is_paused(self, silence_type: SilenceTypes) -> bool:
  195.         """Check if a specific timer is paused"""
  196.         return self.timers[silence_type]["is_paused"]
  197.  
  198.     def setup_event_handlers(self, agent):
  199.  
  200.         @agent.on("agent_started_speaking")
  201.         def on_agent_started_speaking():
  202.             logger.debug(
  203.                 "Agent started speaking. Pausing Hangup Timer & stopping Check Customer Timer"
  204.             )
  205.             self.stop(SilenceTypes.CHECK_CUSTOMER)
  206.             self.pause(SilenceTypes.HANG_UP)
  207.  
  208.         @agent.on("agent_stopped_speaking")
  209.         def on_agent_stopped_speaking():
  210.             # Start both timers when agent stops speaking
  211.             logger.debug("Agent stopped speaking. Starting Check Customer Timer")
  212.             self.start(SilenceTypes.CHECK_CUSTOMER)
  213.             # For hang_up timer, only start it if it's not already running
  214.             if self.is_running(SilenceTypes.HANG_UP):
  215.                 logger.debug("Hangup timer already running. Resuming...")
  216.                 self.resume(SilenceTypes.HANG_UP)
  217.             else:
  218.                 logger.debug("Starting hangup timer...")
  219.                 self.stop(SilenceTypes.HANG_UP)
  220.                 self.start(SilenceTypes.HANG_UP)
  221.  
  222.         @agent.on("user_started_speaking")
  223.         def on_user_started_speaking():
  224.             logger.debug("User started speaking. Stopping Both Timers")
  225.             # Stop both timers
  226.             self.stop()
  227.  
  228.         @agent.on("user_stopped_speaking")
  229.         def on_user_stopped_speaking():
  230.             logger.debug("User stopped speaking. Starting Both Timers from scratch")
  231.             # Only start hang up timer
  232.             self.start()
  233.  
  234.         @agent.on("agent_speech_interrupted")
  235.         def on_agent_speech_interrupted():
  236.             logger.debug("Agent speech interrupted. Stopping Both Timers")
  237.             # Stop both timers
  238.             self.stop()
  239.  
  240.     async def cleanup(self):
  241.         """Gracefully cleanup and delete the SilenceTimer instance"""
  242.         # Stop all running timers
  243.         self.stop()
  244.  
  245.         # Cancel any remaining tasks
  246.         for silence_type in self.timers:
  247.             if self.timers[silence_type]["task"]:
  248.                 self.timers[silence_type]["task"].cancel()
  249.                 try:
  250.                     await self.timers[silence_type]["task"]
  251.                 except asyncio.CancelledError:
  252.                     pass
  253.  
  254.         # Clear all references
  255.         self.agent = None
  256.         self.get_ctx = None
  257.         self.participant = None
  258.         self.metadata = None
  259.         self.agent_hang_up_callback = None
  260.         self.hangup_in_progress = False
  261.  
  262.         # Reinitialize timers with infinite timeout
  263.         self.timers = {
  264.             SilenceTypes.CHECK_CUSTOMER: TimerState(
  265.                 timeout=float("inf")
  266.             ),
  267.             SilenceTypes.HANG_UP: TimerState(
  268.                 timeout=float("inf")
  269.             ),
  270.         }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement