Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package flames.concurrent.execution.events
- import flames.concurrent.actor.*
- import flames.concurrent.actor.mailbox.SystemMessage.*
- import flames.concurrent.actor.behavior.Behavior
- import flames.concurrent.execution.*
- import flames.logging.*
- import sourcecode.{Enclosing, Line}
- import java.net.{InetSocketAddress, SocketAddress}
- import java.nio.*
- import java.nio.channels.*
- import scala.collection.mutable
- import scala.concurrent.duration.FiniteDuration
- class SingleThreadedScheduler(
- nioLoop: NIOEventLoop = NIOEventLoop(),
- ) extends Scheduler with Runnable {
- export nioLoop.listenChannel
- private val tasks = mutable.Queue.empty[Runnable]
- override def execute(runnable: Runnable): Unit =
- tasks.enqueue(runnable)
- override def reportFailure(cause: Throwable): Unit =
- cause.printStackTrace()
- private var stop: Boolean = false
- override def shutdown(): Unit = {
- nioLoop.shutdown()
- stop = true
- }
- override def isShutdown: Boolean = stop
- override def run(): Unit =
- while (!stop) {
- nioLoop.poll().drain { x =>
- tasks.enqueue(x)
- }
- if(tasks.nonEmpty) tasks.dequeue().run()
- if (!nioLoop.haveWork && tasks.isEmpty) stop = true
- }
- end run
- override def blocking(action: Runnable): Unit =
- tasks.enqueue(action)
- override def schedule[T](delay: FiniteDuration)(action: => T): Cancellable = ???
- override def schedule[T](delay: FiniteDuration, period: FiniteDuration)(action: => T): Cancellable = ???
- override def config: SchedulerConfig = ???
- }
- object SingleThreadedScheduler {
- val serverAddress = InetSocketAddress(
- "localhost",
- 9532,
- )
- val scheduler = SingleThreadedScheduler()
- given runtime: ActorRuntime = ActorRuntime.default(
- LogLevel.Debug,
- makeScheduler = (_, _) => scheduler
- )
- @main def server(): Unit =
- val serverChannel = ServerSocketChannel.open()
- serverChannel.bind(serverAddress)
- runtime.spawn(Server(serverChannel))
- scheduler.run()
- end server
- trait NetworkHandler(channel: SelectableChannel) extends Actor[SelectionKey, ExecutionModel.Async]:
- private var token: Cancellable | Null = null
- override def act(): Behavior[SelectionKey] =
- token = scheduler.listenChannel(channel) { key =>
- self.tell(key)
- null
- }
- handle
- end act
- protected def handle: Behavior[SelectionKey]
- protected def shutdown: Behavior[SelectionKey] = {
- if(token != null) token.cancel()
- channel.close()
- stop
- }
- inline protected def logBuffer(buffer: ByteBuffer, logLevel: LogLevel = LogLevel.Debug)(using Enclosing, Line): Unit =
- if(logger.isEnabled(logLevel)) {
- val builder = new StringBuilder(buffer.limit())
- while (buffer.hasRemaining) {
- builder += buffer.get().toChar
- }
- val message = builder.result()
- logger.log(logLevel, message, null)
- }
- end logBuffer
- end NetworkHandler
- class Server(channel: ServerSocketChannel)(using ActorEnv) extends NetworkHandler(channel) {
- override def handle: Behavior[SelectionKey] =
- receive { key =>
- if(key.isAcceptable) {
- val clientChannel = channel.accept()
- spawnRef(new ClientHandler(clientChannel))
- }
- same
- } and receiveSystem {
- case ChildStopped(_, _) => shutdown
- }
- }
- class ClientHandler(channel: SocketChannel)(using ActorEnv) extends NetworkHandler(channel) {
- private val buffer = ByteBuffer.allocateDirect(1024)
- override def handle: Behavior[SelectionKey] = read
- def read: Behavior[SelectionKey] =
- receive { key =>
- if(key.isReadable) {
- val bytesRead = channel.read(buffer)
- if(bytesRead > 0) {
- buffer.flip()
- write
- } else shutdown
- } else same
- }.ignoreSystem
- def write: Behavior[SelectionKey] =
- receive { key =>
- if(key.isWritable) {
- channel.write(buffer)
- buffer.clear()
- read
- } else same
- }.ignoreSystem
- }
- @main def client(): Unit =
- val channel = SocketChannel.open()
- channel.connect(serverAddress)
- runtime.spawn(new Client(channel))
- scheduler.run()
- end client
- class Client(channel: SocketChannel)(using ActorEnv) extends NetworkHandler(channel) {
- private val buffer = ByteBuffer.allocateDirect(1024)
- override def handle: Behavior[SelectionKey] = write
- def write: Behavior[SelectionKey] =
- receive { key =>
- if(key.isWritable) {
- buffer.put("Hello World".getBytes)
- buffer.flip()
- channel.write(buffer)
- buffer.clear()
- read
- } else same
- }.ignoreSystem
- def read: Behavior[SelectionKey] =
- receive { key =>
- if(key.isReadable) {
- channel.read(buffer)
- buffer.flip()
- logBuffer(buffer)
- buffer.rewind()
- shutdown
- } else same
- }.ignoreSystem
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement