Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Client:
- package multithreading.blocking_queue;
- import java.util.ArrayList;
- import java.util.List;
- public class Client {
- private static final List<Thread> producers = new ArrayList<>();
- private static final List<Thread> consumers = new ArrayList<>();
- private static MyBlockingQueue<Long> queue;
- public static void main(String[] args) {
- final int capacity = 30;
- queue = new MyBlockingQueue<>(capacity);
- initProducers(4);
- initConsumers(3);
- startProducers();
- startConsumers();
- waitProducers();
- waitConsumers();
- }
- private static void initProducers(int count) {
- for (int i = 0; i < count; i++) {
- Thread producer = new Thread(() -> {
- while(true) {
- try {
- queue.produce(System.currentTimeMillis());
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- });
- producers.add(producer);
- }
- }
- private static void initConsumers(int count) {
- for (int i = 0; i < count; i++) {
- Thread producer = new Thread(() -> {
- while(true) {
- try {
- Long item = queue.consume();
- System.out.println(item + " has been consumed by " + Thread.currentThread().getName());
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- });
- producers.add(producer);
- }
- }
- private static void startProducers() {
- producers.forEach(Thread::start);
- }
- private static void startConsumers() {
- consumers.forEach(Thread::start);
- }
- private static void waitProducers() {
- for (Thread producer : producers) {
- try {
- producer.join();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
- private static void waitConsumers() {
- for (Thread consumer : consumers) {
- try {
- consumer.join();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
- package multithreading.blocking_queue;
- import java.util.ArrayDeque;
- import java.util.Queue;
- public class MyBlockingQueue<T> {
- private final int capacity;
- private final Queue<T> queue;
- public MyBlockingQueue(int capacity) {
- this.capacity = capacity;
- queue = new ArrayDeque<>(capacity);
- }
- public synchronized void produce(T item) throws InterruptedException {
- while(queue.size() >= capacity) wait();
- queue.add(item);
- notifyAll();
- }
- public synchronized T consume() throws InterruptedException {
- while(queue.isEmpty()) wait();
- T item = queue.remove();
- notifyAll();
- return item;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement