Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.util.concurrent.ConcurrentLinkedQueue
- import java.util.concurrent.atomic.AtomicReference
- import scala.concurrent.ExecutionContext
- import scala.util.control.NonFatal
- trait Worker[T](using ec: ExecutionContext) {
- sealed trait Behavior
- private case object Pass extends Behavior
- private case class Receive(act: T => Behavior) extends Behavior
- private case object Stop extends Behavior
- extension (behavior: Behavior) {
- private inline def handleWith(handler: Throwable => Behavior): Behavior =
- inline behavior match {
- case Receive(act) =>
- receive { message =>
- try {
- act(message)
- } catch {
- case NonFatal(exc) => handler(exc)
- }
- }
- case pass => pass
- }
- inline def onFailure(handle: PartialFunction[Throwable, Behavior]): Behavior =
- handleWith { exc =>
- handle.applyOrElse(exc, throw exc)
- }
- inline def onException(handle: Throwable => Behavior): Behavior =
- handleWith(handle)
- }
- private val behavior = AtomicReference(act())
- private val messages = ConcurrentLinkedQueue[T]()
- def tell[R <: T](message: R): Unit = {
- val run = messages.isEmpty
- messages.offer(message)
- if(run) process()
- }
- private def process(): Unit =
- ec.execute { () =>
- if(!messages.isEmpty) {
- behavior.get() match {
- case receive @ Receive(act) =>
- val continue = behavior.compareAndSet(receive, Pass)
- if(continue) {
- val message = messages.poll()
- ec.execute { () =>
- try {
- act(message) match {
- case Pass =>
- behavior.set(receive)
- case next =>
- behavior.set(next)
- }
- process()
- } catch {
- case NonFatal(exc) =>
- ec.reportFailure(exc)
- behavior.set(Stop)
- }
- }
- }
- case _ => ()
- }
- }
- }
- protected final def receive(receiver: T => Behavior): Behavior = Receive(receiver)
- inline protected final def stop: Behavior = Stop
- inline protected final def same: Behavior = Pass
- def act(): Behavior
- }
- trait SummonerK[Effect[_[_]]] {
- inline final def apply[F[_]: Effect]: Effect[F] = summon[Effect[F]]
- }
- import scala.concurrent.{ExecutionContext, Future, Promise}
- import scala.util.Success
- trait AskWorker[F[_]] {
- def ask[T, Req <: T, Res <: T](worker: Worker[T])
- (request: Worker[Res] => Req)
- (using ExecutionContext): F[Res]
- }
- object AskWorker extends SummonerK[AskWorker] {
- private[concurrent] class PromiseWorker[T](promise: Promise[T])
- (using ExecutionContext) extends Worker[T] {
- override def act(): Behavior =
- receive { result =>
- promise.complete(Success(result))
- stop
- }
- }
- }
- extension [T](worker: Worker[T]) {
- inline def ask[F[_]: AskWorker, Req <: T, Res <: T](request: Worker[Res] => Req)(using ExecutionContext): F[Res] =
- AskWorker[F].ask(worker)(request)
- }
- given AskWorker[Future] = new AskWorker[Future] {
- override def ask[T, Req <: T, Res <: T](worker: Worker[T])
- (request: Worker[Res] => Req)
- (using ExecutionContext): Future[Res] = {
- val promise = Promise[Res]()
- val waiter = AskWorker.PromiseWorker(promise)
- val message = request(waiter)
- worker.tell(message)
- promise.future
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement