8 minutes read

One of the most popular kinds of concurrent collections is a concurrent queue. It is often used to organize communication and data exchange (messages, tasks, units of work, or something else) between multiple threads within an application. To achieve that, several threads should have a reference to a common queue and invoke its methods.

You already know that a queue is a collection that works according to the first-in-first-out principle (FIFO): the first element added to the queue will be the first one to be removed.

Thread safety of ConcurrentLinkedQueue

The simplest type of a concurrent queue is ConcurrentLinkedQueue, which is very similar to a standard queue but is thread-safe. It has two methods called add and offer to insert an element to the tail of a queue.

The following example demonstrates the thread safety of this concurrent queue. The program adds new elements using two threads and then prints the total number of elements in this queue:

import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.concurrent.thread

fun addNumbers(target: ConcurrentLinkedQueue<Int>) {
    for (i in 0..99_999) target.add(i)
}

fun main() {
    // assigning thread-safe implementation
    val numbers = ConcurrentLinkedQueue<Int>()

    val writer = thread(start = false, name = "Thread 1", block = {
        addNumbers(numbers)
    })
    writer.start()

    addNumbers(numbers) // add number from the main thread

    writer.join() // wait for writer thread

    println(numbers.size) // it prints 200000
}

It is not surprising that this program always prints 200000 as expected, no element gets lost. You may start this program as many times as you need. So, ConcurrentLinkedQueue is really thread-safe. There is also no ConcurrentModificationException if we would like to iterate through this queue.

Note that any single operation provided by this queue is thread-safe. However, if we group such operations together in a single method or a sequence of statements, the whole group of operations will not be thread-safe.

Moreover, bulk operations of ConcurrentLinkedQueue that add, remove, or examine multiple elements, such as the addAll, removeIf, and forEach methods, are not guaranteed to be performed atomically.

Communication between threads

The following picture demonstrates how to organize communication between threads using a queue. One thread puts elements at the head of a queue, while the other thread takes elements from the tail of the same queue.

organizing communication between threads using a queue

We assume that the queue is thread-safe; otherwise, it will not work correctly.

It is also possible to have more than two threads interacting through a queue.

organization of communication between more than two threads

The number of threads can vary.

Suppose we want to exchange data between two threads using a concurrent queue. One thread will generate three numbers, while the other thread will accept those numbers and print them. The poll method is used to get the current first element of a concurrent queue. It returns the element or null if the queue is empty. Let's consider the interaction of the threads on a simple example.

Here is a snippet of code with additional sleep invocations to make the output more predictable. generator and poller interact using a concurrent queue, and no data is lost because the queue is fully thread-safe.

import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.TimeUnit
import kotlin.concurrent.thread

fun main() {
    val queue = ConcurrentLinkedQueue<Int>()

    val generator = thread(start = false, name = "Thread 1", block = {
        try {
            queue.add(10)
            TimeUnit.MICROSECONDS.sleep(10)
            queue.add(20)
            TimeUnit.MILLISECONDS.sleep(10)
            queue.add(30)
        } catch (e: Exception) {
            e.printStackTrace()
        }
    })

    val poller = thread(start = false, name = "Thread 2", block = {
        var countRead = 0
        while (countRead != 3) {
            val next: Int? = queue.poll()
            if (next != null) countRead++
            println(next)
            try {
                TimeUnit.MILLISECONDS.sleep(10)
            } catch (e: Exception) {
                e.printStackTrace()
            }
        }
    })

    generator.start()
    poller.start()
}

Here is an example of the output:

null
10
20
30

It may be slightly different, but all numbers should be printed.

Composite operations

Every standard method of a concurrent queue provides thread safety. However, if you want to put several methods together, there are no such guarantees.

Suppose you want to add two elements in a concurrent queue so that they follow each other in this queue. Here is a method:

fun addTwoElements(queue: ConcurrentLinkedQueue<Int>, e1: Int, e2: Int) {
    queue.add(e1) // (1)
    queue.add(e2) // (2)
}

The method will add two elements one after the other only in case there is only one writing thread. If there are more writing threads, one thread may perform (1) and then another thread may intervene and do the same. Only after that, the first thread may perform (2). Thus, the order can be broken in some cases. This problem appears because the method is not atomic.

As mentioned above, bulk methods, such as addAll, are also not atomic and don't help to avoid this problem:

queue.addAll(listOf(e1, e2))

The problem can be solved only by the @Synchronized annotation:

@Synchronized fun addTwoElements(queue: ConcurrentLinkedQueue<Int>, e1: Int, e2: Int) {
    queue.add(e1) // (1)
    queue.add(e2) // (2)
}

In such a case, you need to make sure that all the operations that update the queue are synchronized, not only the method addTwoElements.

Conclusion

We've learned about concurrent queues and the thread-safe type of a concurrent queue in Kotlin: ConcurrentLinkedQueue. Now you know how such queues work and how to synchronize multiple methods with the @Synchronized annotation. You've also learned such methods of concurrent queues as poll and add. Time for some practice!

21 learners liked this piece of theory. 1 didn't like it. What about you?
Report a typo