Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package flames.concurrent.execution.events
- import scala.concurrent.ExecutionContext
- import flames.concurrent.execution.{Cancellable, Shutdown}
- import java.net.{InetSocketAddress, SocketAddress}
- import java.nio.*
- import java.nio.channels.*
- import scala.collection.mutable
- import scala.concurrent.duration.*
- class SingleThreadedScheduler(
- nioLoop: NIOEventLoop = NIOEventLoop(),
- yieldCount: Int = 8,
- yieldTime: Duration = 1 minute span,
- ) extends ExecutionContext with Runnable with Shutdown {
- export nioLoop.listenChannel
- private val tasks = mutable.Queue.empty[Runnable]
- override def execute(runnable: Runnable): Unit =
- tasks.enqueue(runnable)
- override def reportFailure(cause: Throwable): Unit =
- cause.printStackTrace()
- private var stop: Boolean = false
- override def shutdown(): Unit = {
- nioLoop.shutdown()
- stop = true
- }
- override def isShutdown: Boolean = stop
- override def run(): Unit =
- while (!stop) {
- nioLoop.poll().drain { x =>
- tasks.enqueue(x)
- }
- var count = 0
- val deadline = System.nanoTime() + yieldTime.toNanos
- while {
- count < yieldCount && deadline > System.nanoTime() && tasks.nonEmpty
- } do {
- tasks.dequeue().run()
- count += 1
- }
- if (!nioLoop.haveWork && tasks.isEmpty) stop = true
- }
- end run
- }
- object SingleThreadedScheduler {
- val serverAddress = InetSocketAddress(
- "localhost",
- 9532,
- )
- val scheduler = SingleThreadedScheduler()
- given ExecutionContext = scheduler
- @main def server(): Unit =
- val serverChannel = ServerSocketChannel.open()
- serverChannel.bind(serverAddress)
- scheduler.listenChannel(serverChannel) { key => () =>
- if(key.isAcceptable) {
- val client = serverChannel.accept()
- val buffer = ByteBuffer.allocateDirect(1024)
- var write = false
- scheduler.listenChannel(client) { key => () =>
- if(client.isConnected) {
- if (key.isReadable) {
- println("Read")
- val bytes = client.read(buffer)
- if(bytes > 0) {
- write = true
- buffer.flip()
- } else scheduler.shutdown()
- }
- if (write && key.isWritable) {
- println("Echoing")
- client.write(buffer)
- write = false
- buffer.clear()
- }
- } else scheduler.shutdown()
- }
- }
- }
- scheduler.run()
- end server
- @main def client(): Unit =
- val channel = SocketChannel.open()
- channel.connect(serverAddress)
- val buffer = ByteBuffer.allocateDirect(1024)
- var write = true
- scheduler.listenChannel(channel) { key => () =>
- if(key.isWritable && write) {
- println("Write")
- buffer.put("Hello world".getBytes())
- buffer.flip()
- channel.write(buffer)
- buffer.clear()
- write = false
- }
- if (key.isReadable) {
- println("Read")
- channel.read(buffer)
- buffer.flip()
- printBuffer(buffer)
- buffer.rewind()
- scheduler.shutdown()
- }
- }
- scheduler.run()
- end client
- def printBuffer(buffer: ByteBuffer): Unit =
- while (buffer.hasRemaining) {
- print(buffer.get().toChar)
- }
- println
- end printBuffer
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement