Advertisement
Jaydeep999997

Blocking queue - Mutex

Oct 21st, 2024
39
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 4.54 KB | Source Code | 0 0
  1. import java.util.concurrent.locks.ReentrantLock;
  2.  
  3. class Demonstration {
  4.   public static void main(String args[]) throws Exception {
  5.     test();
  6.   }
  7.  
  8.   private static synchronized void safePrint(String message) {
  9.     System.out.println(message);
  10.   }
  11.  
  12.   private static void test() throws Exception {
  13.     final BlockingQueue<Integer> queue = new BlockingQueue<Integer>(5);
  14.  
  15.     Thread producer1 =
  16.         new Thread(
  17.             new Runnable() {
  18.  
  19.               @Override
  20.               public void run() {
  21.                 try {
  22.                   int i = 0;
  23.                   while (true) {
  24.                     queue.enqueue(new Integer(i));
  25.                     safePrint("producer1 Enqueued: " + i);
  26.                     i++;
  27.                   }
  28.                 } catch (InterruptedException ie) {
  29.  
  30.                 }
  31.               }
  32.             });
  33.  
  34.     Thread producer2 =
  35.         new Thread(
  36.             new Runnable() {
  37.  
  38.               @Override
  39.               public void run() {
  40.                 try {
  41.                   int i = 5001;
  42.                   while (true) {
  43.                     queue.enqueue(new Integer(i));
  44.                     safePrint("producer2 Enqueued: " + i);
  45.                     i++;
  46.                   }
  47.                 } catch (InterruptedException ie) {
  48.  
  49.                 }
  50.               }
  51.             });
  52.  
  53.     Thread producer3 =
  54.         new Thread(
  55.             new Runnable() {
  56.  
  57.               @Override
  58.               public void run() {
  59.                 try {
  60.                   int i = 100001;
  61.                   while (true) {
  62.                     queue.enqueue(new Integer(i));
  63.                     safePrint("producer3 Enqueued: " + i);
  64.                     i++;
  65.                   }
  66.                 } catch (InterruptedException ie) {
  67.  
  68.                 }
  69.               }
  70.             });
  71.  
  72.     Thread consumer1 =
  73.         new Thread(
  74.             new Runnable() {
  75.  
  76.               @Override
  77.               public void run() {
  78.                 try {
  79.                   while (true) {
  80.                     safePrint("Dequeued by Consumer 1: " + queue.deque());
  81.                   }
  82.                 } catch (InterruptedException ie) {
  83.  
  84.                 }
  85.               }
  86.             });
  87.  
  88.     Thread consumer2 =
  89.         new Thread(
  90.             new Runnable() {
  91.  
  92.               @Override
  93.               public void run() {
  94.                 try {
  95.                   while (true) {
  96.                     safePrint("Dequeued by Consumer 1: " + queue.deque());
  97.                   }
  98.                 } catch (InterruptedException ie) {
  99.  
  100.                 }
  101.               }
  102.             });
  103.  
  104.     Thread consumer3 =
  105.         new Thread(
  106.             new Runnable() {
  107.  
  108.               @Override
  109.               public void run() {
  110.                 try {
  111.                   while (true) {
  112.                     safePrint("Dequeued by Consumer 1: " + queue.deque());
  113.                   }
  114.                 } catch (InterruptedException ie) {
  115.  
  116.                 }
  117.               }
  118.             });
  119.  
  120.     producer1.setDaemon(true);
  121.     producer2.setDaemon(true);
  122.     producer3.setDaemon(true);
  123.     consumer1.setDaemon(true);
  124.     consumer2.setDaemon(true);
  125.     consumer3.setDaemon(true);
  126.  
  127.     producer1.start();
  128.     producer2.start();
  129.     producer3.start();
  130.  
  131.     consumer1.start();
  132.     consumer2.start();
  133.     consumer3.start();
  134.    
  135.     Thread.sleep(1000);
  136.   }
  137. }
  138.  
  139. // Blocking queue class
  140. class BlockingQueue<T> {
  141.   private int currentSize = 0;
  142.   private int head = 0, tail = 0;
  143.   private int maxCapacity;
  144.   T[] queue;
  145.   ReentrantLock lock = new ReentrantLock();
  146.  
  147.   @SuppressWarnings("unchecked")
  148.   public BlockingQueue(int maxCapacity) {
  149.     this.maxCapacity = maxCapacity;
  150.     this.queue = (T[]) new Object[maxCapacity];
  151.   }
  152.  
  153.   public void enqueue(T value) throws InterruptedException {
  154.  
  155.     lock.lock();
  156.     try {
  157.       while (currentSize == maxCapacity) {
  158.         lock.unlock();
  159.  
  160.         lock.lock();
  161.       }
  162.  
  163.       if (tail == maxCapacity) {
  164.         tail = 0;
  165.       }
  166.  
  167.       queue[tail] = value;
  168.       tail++;
  169.       currentSize++;
  170.     } finally {
  171.       lock.unlock();
  172.     }
  173.   }
  174.  
  175.   public T deque() throws InterruptedException {
  176.     T value = null;
  177.  
  178.     lock.lock();
  179.  
  180.     try {
  181.       while (currentSize == 0) {
  182.         lock.unlock();
  183.  
  184.         lock.lock();
  185.       }
  186.  
  187.       if (head == maxCapacity) {
  188.         head = 0;
  189.       }
  190.  
  191.       value = queue[head];
  192.       queue[head] = null;
  193.       head++;
  194.       currentSize--;
  195.     } finally {
  196.      
  197.       lock.unlock();
  198.     }
  199.  
  200.     return value;
  201.   }
  202. }
  203.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement