Advertisement
Kostiggig

Producer/Consumer - BlockingQueue

May 4th, 2023 (edited)
771
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 3.20 KB | None | 0 0
  1. Client:
  2. package multithreading.blocking_queue;
  3.  
  4. import java.util.ArrayList;
  5. import java.util.List;
  6.  
  7. public class Client {
  8.  
  9.     private static final List<Thread> producers = new ArrayList<>();
  10.     private static final List<Thread> consumers = new ArrayList<>();
  11.  
  12.     private static MyBlockingQueue<Long> queue;
  13.  
  14.     public static void main(String[] args) {
  15.         final int capacity = 30;
  16.         queue = new MyBlockingQueue<>(capacity);
  17.         initProducers(4);
  18.         initConsumers(3);
  19.  
  20.         startProducers();
  21.         startConsumers();
  22.  
  23.         waitProducers();
  24.         waitConsumers();
  25.     }
  26.  
  27.     private static void initProducers(int count) {
  28.         for (int i = 0; i < count; i++) {
  29.             Thread producer = new Thread(() -> {
  30.                 while(true) {
  31.                     try {
  32.                         queue.produce(System.currentTimeMillis());
  33.                         Thread.sleep(1000);
  34.                     } catch (InterruptedException e) {
  35.                         throw new RuntimeException(e);
  36.                     }
  37.                 }
  38.             });
  39.  
  40.             producers.add(producer);
  41.         }
  42.     }
  43.  
  44.     private static void initConsumers(int count) {
  45.         for (int i = 0; i < count; i++) {
  46.             Thread producer = new Thread(() -> {
  47.                 while(true) {
  48.                     try {
  49.                         Long item = queue.consume();
  50.                         System.out.println(item + " has been consumed by " + Thread.currentThread().getName());
  51.                         Thread.sleep(1000);
  52.                     } catch (InterruptedException e) {
  53.                         throw new RuntimeException(e);
  54.                     }
  55.                 }
  56.             });
  57.  
  58.             producers.add(producer);
  59.         }
  60.     }
  61.  
  62.     private static void startProducers() {
  63.         producers.forEach(Thread::start);
  64.     }
  65.  
  66.     private static void startConsumers() {
  67.         consumers.forEach(Thread::start);
  68.     }
  69.  
  70.     private static void waitProducers() {
  71.         for (Thread producer : producers) {
  72.             try {
  73.                 producer.join();
  74.             } catch (InterruptedException e) {
  75.                 throw new RuntimeException(e);
  76.             }
  77.         }
  78.     }
  79.  
  80.     private static void waitConsumers() {
  81.         for (Thread consumer : consumers) {
  82.             try {
  83.                 consumer.join();
  84.             } catch (InterruptedException e) {
  85.                 throw new RuntimeException(e);
  86.             }
  87.         }
  88.     }
  89. }
  90.  
  91. package multithreading.blocking_queue;
  92.  
  93. import java.util.ArrayDeque;
  94. import java.util.Queue;
  95.  
  96. public class MyBlockingQueue<T> {
  97.  
  98.     private final int capacity;
  99.     private final Queue<T> queue;
  100.  
  101.     public MyBlockingQueue(int capacity) {
  102.         this.capacity = capacity;
  103.         queue = new ArrayDeque<>(capacity);
  104.     }
  105.  
  106.     public synchronized void produce(T item) throws InterruptedException {
  107.         while(queue.size() >= capacity) wait();
  108.         queue.add(item);
  109.         notifyAll();
  110.     }
  111.  
  112.     public synchronized T consume() throws InterruptedException {
  113.         while(queue.isEmpty()) wait();
  114.         T item = queue.remove();
  115.         notifyAll();
  116.         return item;
  117.     }
  118. }
  119.  
  120.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement