Advertisement
Jaydeep999997

Blocking Queue - Semaphore

Oct 21st, 2024
82
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 3.39 KB | Source Code | 0 0
  1. class Demonstration {
  2.   public static void main(String[] args) throws InterruptedException {
  3.     BlockingQueueWithSemaphore<Integer> queue = new BlockingQueueWithSemaphore<Integer>(5);
  4.  
  5.     Thread t1 =
  6.         new Thread(
  7.             new Runnable() {
  8.               @Override
  9.               public void run() {
  10.                 try {
  11.                   for (int i = 0; i < 10; i++) {
  12.                     queue.enqueue(i);
  13.                     safePrint("Enqueued: " + i);
  14.                   }
  15.                 } catch (InterruptedException e) {
  16.  
  17.                 }
  18.               }
  19.             });
  20.  
  21.     Thread t2 =
  22.         new Thread(
  23.             new Runnable() {
  24.               @Override
  25.               public void run() {
  26.                 try {
  27.                   for (int i = 0; i < 5; i++) {
  28.                     Integer item = queue.dequeue();
  29.                     safePrint("Dequeued T2: " + item);
  30.                   }
  31.                 } catch (InterruptedException e) {
  32.  
  33.                 }
  34.               }
  35.             });
  36.  
  37.     Thread t3 =
  38.         new Thread(
  39.             new Runnable() {
  40.               @Override
  41.               public void run() {
  42.                 try {
  43.                   for (int i = 0; i < 5; i++) {
  44.                     Integer item = queue.dequeue();
  45.                     safePrint("Dequeued T3: " + item);
  46.                   }
  47.                 } catch (InterruptedException e) {
  48.  
  49.                 }
  50.               }
  51.             });
  52.  
  53.     t1.start();
  54.     t2.start();
  55.     t3.start();
  56.  
  57.     t1.join();
  58.     t2.join();
  59.     t3.join();
  60.   }
  61.  
  62.   private static synchronized void safePrint(String message) {
  63.     System.out.println(message);
  64.   }
  65. }
  66.  
  67. class BlockingQueueWithSemaphore<T> {
  68.  
  69.   private int head = 0;
  70.   private int tail = 0;
  71.   private int capacity;
  72.   T[] items;
  73.   private final CountingSemaphore mutex = new CountingSemaphore(0, 1);
  74.   private CountingSemaphore producerSemaphore;
  75.   private CountingSemaphore consumerSemaphore;
  76.  
  77.   @SuppressWarnings("unchecked")
  78.   public BlockingQueueWithSemaphore(int capacity) {
  79.     this.capacity = capacity;
  80.     this.items = (T[]) new Object[capacity];
  81.     this.producerSemaphore = new CountingSemaphore(0, capacity);
  82.     this.consumerSemaphore = new CountingSemaphore(capacity, capacity);
  83.   }
  84.  
  85.   public void enqueue(T item) throws InterruptedException {
  86.     producerSemaphore.acquire();
  87.     mutex.acquire();
  88.  
  89.     if (tail == capacity) {
  90.       tail = 0;
  91.     }
  92.     items[tail] = item;
  93.     tail++;
  94.  
  95.     mutex.release();
  96.     consumerSemaphore.release();
  97.   }
  98.  
  99.   public T dequeue() throws InterruptedException {
  100.     T item = null;
  101.  
  102.     consumerSemaphore.acquire();
  103.     mutex.acquire();
  104.  
  105.     if (head == capacity) {
  106.       head = 0;
  107.     }
  108.     item = items[head];
  109.     head++;
  110.  
  111.     mutex.release();
  112.     producerSemaphore.release();
  113.  
  114.     return item;
  115.   }
  116. }
  117.  
  118. class CountingSemaphore {
  119.   private int usedPermits;
  120.   private final int maxPermits;
  121.  
  122.   public CountingSemaphore(int usedPermits, int maxPermits) {
  123.     this.usedPermits = usedPermits;
  124.     this.maxPermits = maxPermits;
  125.   }
  126.  
  127.   public synchronized void acquire() throws InterruptedException {
  128.     while (usedPermits >= maxPermits) {
  129.       wait();
  130.     }
  131.  
  132.     usedPermits++;
  133.     notify();
  134.   }
  135.  
  136.   public synchronized void release() throws InterruptedException {
  137.     while (usedPermits <= 0) {
  138.       wait();
  139.     }
  140.  
  141.     usedPermits--;
  142.     notify();
  143.   }
  144. }
  145.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement