w3resource

Java Program: Implementing Thread-Safe Queue with ConcurrentLinkedQueue

Java Multithreading: Exercise-9 with Solution

ConcurrentLinkedQueue:

An unbounded thread-safe queue based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue. A ConcurrentLinkedQueue is an appropriate choice when many threads will share access to a common collection. Like most other concurrent collection implementations, this class does not permit the use of null elements.

This implementation employs an efficient non-blocking algorithm based on one described in Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Maged M. Michael and Michael L. Scott.

Iterators are weakly consistent, returning elements reflecting the state of the queue at some point at or since the creation of the iterator. They do not throw ConcurrentModificationException, and may proceed concurrently with other operations. Elements contained in the queue since the creation of the iterator will be returned exactly once.

Write a Java program that utilizes the ConcurrentLinkedQueue class to implement a thread-safe queue.

Sample Solution:

Java Code:

import java.util.concurrent.ConcurrentLinkedQueue;

public class ConcurrentQueueExercise {
  public static void main(String[] args) {
    ConcurrentLinkedQueue < String > queue = new ConcurrentLinkedQueue < > ();

    // Create and start the producer threads
    Thread producerThread1 = new Thread(new Producer(queue, "Producer-1"));
    Thread producerThread2 = new Thread(new Producer(queue, "Producer-2"));
    producerThread1.start();
    producerThread2.start();

    // Create and start the consumer threads
    Thread consumerThread1 = new Thread(new Consumer(queue, "Consumer-1"));
    Thread consumerThread2 = new Thread(new Consumer(queue, "Consumer-2"));
    consumerThread1.start();
    consumerThread2.start();
  }

  static class Producer implements Runnable {
    private ConcurrentLinkedQueue < String > queue;
    private String producerName;
    private int counter;

    public Producer(ConcurrentLinkedQueue < String > queue, String producerName) {
      this.queue = queue;
      this.producerName = producerName;
      this.counter = 0;
    }

    public void run() {
      while (true) {
        String item = producerName + "-Item-" + counter++;
        queue.offer(item);
        System.out.println("Produced: " + item);

        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
  }

  static class Consumer implements Runnable {
    private ConcurrentLinkedQueue < String > queue;
    private String consumerName;

    public Consumer(ConcurrentLinkedQueue < String > queue, String consumerName) {
      this.queue = queue;
      this.consumerName = consumerName;
    }

    public void run() {
      while (true) {
        String item = queue.poll();
        if (item != null) {
          System.out.println(consumerName + " consumed: " + item);
        }

        try {
          Thread.sleep(1500);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
  }
}

Sample Output:

Produced: Producer-2-Item-0
Produced: Producer-1-Item-0
Produced: Producer-2-Item-1
Produced: Producer-1-Item-1
Consumer-2 consumed: Producer-1-Item-0
Consumer-1 consumed: Producer-2-Item-0
Produced: Producer-2-Item-2
Produced: Producer-1-Item-2
Produced: Producer-1-Item-3
Produced: Producer-2-Item-3
.................................................
.................................................
.................................................

Explanation:

In the above exercise -

  • The "ConcurrentQueueExample" class represents the main program. It creates an instance of ConcurrentLinkedQueue called a queue.
  • The "Producer" class implements the Runnable interface and represents a producer thread. Each producer thread generates a unique item using the producer name and a counter. It adds it to the queue using the offer() method.
  • The "Consumer" class implements the Runnable interface and represents a consumer thread. Consumer threads retrieve items from the queue using the poll() method and prints it.
  • In the main() method, we create and start two producer threads and two consumer threads. Producers add items to the queue, and consumers retrieve and print them. Since ConcurrentLinkedQueue is thread-safe, multiple producers and consumers can access the queue concurrently without explicit synchronization.

Flowchart:

Flowchart: Java Program: Implementing Thread-Safe Queue with ConcurrentLinkedQueue.
Flowchart: Java Program: Implementing Thread-Safe Queue with ConcurrentLinkedQueue.
Flowchart: Java Program: Implementing Thread-Safe Queue with ConcurrentLinkedQueue.

Java Code Editor:

Improve this sample solution and post your code through Disqus

Previous: Concurrent Map Access with ConcurrentHashMap.
Next: Synchronize multiple threads.

What is the difficulty level of this exercise?

Test your Programming skills with w3resource's quiz.



Follow us on Facebook and Twitter for latest update.