Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class Demonstration {
- public static void main(String[] args) throws InterruptedException {
- BlockingQueueWithSemaphore<Integer> queue = new BlockingQueueWithSemaphore<Integer>(5);
- Thread t1 =
- new Thread(
- new Runnable() {
- @Override
- public void run() {
- try {
- for (int i = 0; i < 10; i++) {
- queue.enqueue(i);
- safePrint("Enqueued: " + i);
- }
- } catch (InterruptedException e) {
- }
- }
- });
- Thread t2 =
- new Thread(
- new Runnable() {
- @Override
- public void run() {
- try {
- for (int i = 0; i < 5; i++) {
- Integer item = queue.dequeue();
- safePrint("Dequeued T2: " + item);
- }
- } catch (InterruptedException e) {
- }
- }
- });
- Thread t3 =
- new Thread(
- new Runnable() {
- @Override
- public void run() {
- try {
- for (int i = 0; i < 5; i++) {
- Integer item = queue.dequeue();
- safePrint("Dequeued T3: " + item);
- }
- } catch (InterruptedException e) {
- }
- }
- });
- t1.start();
- t2.start();
- t3.start();
- t1.join();
- t2.join();
- t3.join();
- }
- private static synchronized void safePrint(String message) {
- System.out.println(message);
- }
- }
- class BlockingQueueWithSemaphore<T> {
- private int head = 0;
- private int tail = 0;
- private int capacity;
- T[] items;
- private final CountingSemaphore mutex = new CountingSemaphore(0, 1);
- private CountingSemaphore producerSemaphore;
- private CountingSemaphore consumerSemaphore;
- @SuppressWarnings("unchecked")
- public BlockingQueueWithSemaphore(int capacity) {
- this.capacity = capacity;
- this.items = (T[]) new Object[capacity];
- this.producerSemaphore = new CountingSemaphore(0, capacity);
- this.consumerSemaphore = new CountingSemaphore(capacity, capacity);
- }
- public void enqueue(T item) throws InterruptedException {
- producerSemaphore.acquire();
- mutex.acquire();
- if (tail == capacity) {
- tail = 0;
- }
- items[tail] = item;
- tail++;
- mutex.release();
- consumerSemaphore.release();
- }
- public T dequeue() throws InterruptedException {
- T item = null;
- consumerSemaphore.acquire();
- mutex.acquire();
- if (head == capacity) {
- head = 0;
- }
- item = items[head];
- head++;
- mutex.release();
- producerSemaphore.release();
- return item;
- }
- }
- class CountingSemaphore {
- private int usedPermits;
- private final int maxPermits;
- public CountingSemaphore(int usedPermits, int maxPermits) {
- this.usedPermits = usedPermits;
- this.maxPermits = maxPermits;
- }
- public synchronized void acquire() throws InterruptedException {
- while (usedPermits >= maxPermits) {
- wait();
- }
- usedPermits++;
- notify();
- }
- public synchronized void release() throws InterruptedException {
- while (usedPermits <= 0) {
- wait();
- }
- usedPermits--;
- notify();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement