Advertisement
Cool_Dalek

Simple actor based async workers

Mar 23rd, 2022 (edited)
1,042
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.73 KB | None | 0 0
  1. import java.util.concurrent.ConcurrentLinkedQueue
  2. import java.util.concurrent.atomic.AtomicReference
  3. import scala.concurrent.ExecutionContext
  4. import scala.util.control.NonFatal
  5.  
  6. trait Worker[T](using ec: ExecutionContext) {
  7.  
  8.   sealed trait Behavior
  9.   private case object Pass extends Behavior
  10.   private case class Receive(act: T => Behavior) extends Behavior
  11.   private case object Stop extends Behavior
  12.  
  13.   extension (behavior: Behavior) {
  14.  
  15.     private inline def handleWith(handler: Throwable => Behavior): Behavior =
  16.       inline behavior match {
  17.         case Receive(act) =>
  18.           receive { message =>
  19.             try {
  20.               act(message)
  21.             } catch {
  22.               case NonFatal(exc) => handler(exc)
  23.             }
  24.           }
  25.         case pass => pass
  26.       }
  27.  
  28.     inline def onFailure(handle: PartialFunction[Throwable, Behavior]): Behavior =
  29.       handleWith { exc =>
  30.         handle.applyOrElse(exc, throw exc)
  31.       }
  32.  
  33.     inline def onException(handle: Throwable => Behavior): Behavior =
  34.       handleWith(handle)
  35.  
  36.   }
  37.  
  38.   private val behavior = AtomicReference(act())
  39.  
  40.   private val messages = ConcurrentLinkedQueue[T]()
  41.  
  42.   def tell[R <: T](message: R): Unit = {
  43.     val run = messages.isEmpty
  44.     messages.offer(message)
  45.     if(run) process()
  46.   }
  47.  
  48.   private def process(): Unit =
  49.     ec.execute { () =>
  50.       if(!messages.isEmpty) {
  51.         behavior.get() match {
  52.           case receive @ Receive(act) =>
  53.             val continue = behavior.compareAndSet(receive, Pass)
  54.             if(continue) {
  55.               val message = messages.poll()
  56.               ec.execute { () =>
  57.                 try {
  58.                   act(message) match {
  59.                     case Pass =>
  60.                       behavior.set(receive)
  61.                     case next =>
  62.                       behavior.set(next)
  63.                   }
  64.                   process()
  65.                 } catch {
  66.                   case NonFatal(exc) =>
  67.                     ec.reportFailure(exc)
  68.                     behavior.set(Stop)
  69.                 }
  70.               }
  71.             }
  72.           case _ => ()
  73.         }
  74.       }
  75.     }
  76.  
  77.   protected final def receive(receiver: T => Behavior): Behavior = Receive(receiver)
  78.  
  79.   inline protected final def stop: Behavior = Stop
  80.  
  81.   inline protected final def same: Behavior = Pass
  82.  
  83.   def act(): Behavior
  84.  
  85. }
  86.  
  87. trait SummonerK[Effect[_[_]]] {
  88.  
  89.   inline final def apply[F[_]: Effect]: Effect[F] = summon[Effect[F]]
  90.  
  91. }
  92.  
  93. import scala.concurrent.{ExecutionContext, Future, Promise}
  94. import scala.util.Success
  95.  
  96. trait AskWorker[F[_]] {
  97.  
  98.   def ask[T, Req <: T, Res <: T](worker: Worker[T])
  99.                                 (request: Worker[Res] => Req)
  100.                                 (using ExecutionContext): F[Res]
  101.  
  102. }
  103. object AskWorker extends SummonerK[AskWorker] {
  104.  
  105.   private[concurrent] class PromiseWorker[T](promise: Promise[T])
  106.                                             (using ExecutionContext) extends Worker[T] {
  107.     override def act(): Behavior =
  108.       receive { result =>
  109.         promise.complete(Success(result))
  110.         stop
  111.       }
  112.  
  113.   }
  114.  
  115. }
  116. extension [T](worker: Worker[T]) {
  117.  
  118.   inline def ask[F[_]: AskWorker, Req <: T, Res <: T](request: Worker[Res] => Req)(using ExecutionContext): F[Res] =
  119.     AskWorker[F].ask(worker)(request)
  120.  
  121. }
  122. given AskWorker[Future] = new AskWorker[Future] {
  123.  
  124.   override def ask[T, Req <: T, Res <: T](worker: Worker[T])
  125.                                          (request: Worker[Res] => Req)
  126.                                          (using ExecutionContext): Future[Res] = {
  127.     val promise = Promise[Res]()
  128.     val waiter = AskWorker.PromiseWorker(promise)
  129.     val message = request(waiter)
  130.     worker.tell(message)
  131.     promise.future
  132.   }
  133.  
  134. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement