Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.util.concurrent.locks.ReentrantLock;
- class Demonstration {
- public static void main(String args[]) throws Exception {
- test();
- }
- private static synchronized void safePrint(String message) {
- System.out.println(message);
- }
- private static void test() throws Exception {
- final BlockingQueue<Integer> queue = new BlockingQueue<Integer>(5);
- Thread producer1 =
- new Thread(
- new Runnable() {
- @Override
- public void run() {
- try {
- int i = 0;
- while (true) {
- queue.enqueue(new Integer(i));
- safePrint("producer1 Enqueued: " + i);
- i++;
- }
- } catch (InterruptedException ie) {
- }
- }
- });
- Thread producer2 =
- new Thread(
- new Runnable() {
- @Override
- public void run() {
- try {
- int i = 5001;
- while (true) {
- queue.enqueue(new Integer(i));
- safePrint("producer2 Enqueued: " + i);
- i++;
- }
- } catch (InterruptedException ie) {
- }
- }
- });
- Thread producer3 =
- new Thread(
- new Runnable() {
- @Override
- public void run() {
- try {
- int i = 100001;
- while (true) {
- queue.enqueue(new Integer(i));
- safePrint("producer3 Enqueued: " + i);
- i++;
- }
- } catch (InterruptedException ie) {
- }
- }
- });
- Thread consumer1 =
- new Thread(
- new Runnable() {
- @Override
- public void run() {
- try {
- while (true) {
- safePrint("Dequeued by Consumer 1: " + queue.deque());
- }
- } catch (InterruptedException ie) {
- }
- }
- });
- Thread consumer2 =
- new Thread(
- new Runnable() {
- @Override
- public void run() {
- try {
- while (true) {
- safePrint("Dequeued by Consumer 1: " + queue.deque());
- }
- } catch (InterruptedException ie) {
- }
- }
- });
- Thread consumer3 =
- new Thread(
- new Runnable() {
- @Override
- public void run() {
- try {
- while (true) {
- safePrint("Dequeued by Consumer 1: " + queue.deque());
- }
- } catch (InterruptedException ie) {
- }
- }
- });
- producer1.setDaemon(true);
- producer2.setDaemon(true);
- producer3.setDaemon(true);
- consumer1.setDaemon(true);
- consumer2.setDaemon(true);
- consumer3.setDaemon(true);
- producer1.start();
- producer2.start();
- producer3.start();
- consumer1.start();
- consumer2.start();
- consumer3.start();
- Thread.sleep(1000);
- }
- }
- // Blocking queue class
- class BlockingQueue<T> {
- private int currentSize = 0;
- private int head = 0, tail = 0;
- private int maxCapacity;
- T[] queue;
- ReentrantLock lock = new ReentrantLock();
- @SuppressWarnings("unchecked")
- public BlockingQueue(int maxCapacity) {
- this.maxCapacity = maxCapacity;
- this.queue = (T[]) new Object[maxCapacity];
- }
- public void enqueue(T value) throws InterruptedException {
- lock.lock();
- try {
- while (currentSize == maxCapacity) {
- lock.unlock();
- lock.lock();
- }
- if (tail == maxCapacity) {
- tail = 0;
- }
- queue[tail] = value;
- tail++;
- currentSize++;
- } finally {
- lock.unlock();
- }
- }
- public T deque() throws InterruptedException {
- T value = null;
- lock.lock();
- try {
- while (currentSize == 0) {
- lock.unlock();
- lock.lock();
- }
- if (head == maxCapacity) {
- head = 0;
- }
- value = queue[head];
- queue[head] = null;
- head++;
- currentSize--;
- } finally {
- lock.unlock();
- }
- return value;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement