14 minutes read

We've already learned how to create threads by extending the Thread class or implementing the Runnable interface. Both ways allow you to create an object that represents a thread and start it to perform a piece of code in a separate thread. While it is easy to create several threads and start them, it becomes a problem when your application has hundreds or even thousands of threads running concurrently.

In addition, Thread is a low-level class, and mixing it with the high-level code of your application may lead to unreadable code and poor architecture in the future. It may also produce some well-known errors, such as invoking run() instead of start().

Now we will learn how we can work with a pool of threads by creating thread services which can manage multi-threading.

Tasks and executors

To simplify the development of multi-threaded applications, Kotlin provides an interface called ExecutorService (or simply executor). It encapsulates one or more threads into a single pool and puts submitted tasks in an internal queue to execute them in threads.

an interface ExecutorService

This approach clearly isolates tasks from threads and allows you to focus on the tasks. You do not need to worry about creating and managing threads because the executor does it for you.

Creating executors

All types of executors are located in the java.util.concurrent package, which you need to import first. This package also contains a handy utility class Executors for creating different types of ExecutorService's.

First of all, let's create an executor with exactly four threads in the pool:

val executor: ExecutorService = Executors.newFixedThreadPool(4)

It can execute multiple tasks concurrently and speed up your program by performing somewhat parallel computations. If one of the threads dies, the executor creates a new one. We will further consider how to determine the required number of threads.

Submitting tasks

An executor has the submit function, which accepts a Runnable task to be executed. Since Runnable is a functional interface, it is possible to use a lambda expression as a task.

As an example, here we submit a task that prints "Hello!" to the standard output.

executor.submit { println("Hello!") }

Of course, we can declare a class that implements Runnable for our task and then submit an object of this class. However, for short tasks, it is very convenient to use lambda expressions together with executors.

After invoking submit, the current thread does not wait for the task to complete. It just adds the task to the executor's internal queue to be executed asynchronously by one of the threads.

The function also has several overloads, which we will consider in the following topics.

Stopping executors

An executor continues to work after the completion of a task, since the threads in the pool are waiting for new tasks. Your program will never stop while at least one executor still works.

There are two functions that can stop executors:

  • shutdown(): Unit waits until all running tasks are completed and prohibits submitting new tasks;

  • shutdownNow(): MutableList<Runnable!> immediately stops all running tasks and returns a list of the tasks that were awaiting execution.

Note that, unlike join() of Thread, shutdown() does not block the current thread. If you need to wait until the execution is complete, you can invoke awaitTermination(...) with the specified waiting time.

val executor: ExecutorService = Executors.newFixedThreadPool(4)

// submitting tasks

executor.shutdown()

val terminated = executor.awaitTermination(60, TimeUnit.MILLISECONDS)

if (terminated) {
    println("The executor was successfully stopped")
} else {
    println("Timeout elapsed before termination")
}

An example: names of threads and tasks

In the following example, we create an executor with a pool of four threads. We submit ten tasks to it and then analyze the results. Each task prints the name of the thread that executes it, as well as the name of the task.

import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

fun main() {
    val NUMBER_OF_TASKS = 10
    val POOL_SIZE = 4
    val executor: ExecutorService = Executors.newFixedThreadPool(POOL_SIZE)

    for (i in 0..NUMBER_OF_TASKS) {
        executor.submit {
            val taskName: String = "task-$i"
            val threadName: String = Thread.currentThread().name
            println("$threadName executes $taskName")
        }
    }
    executor.shutdown()
}

If you launch this program several times, you will get a different output. Below is one of the possible outputs:

pool-1-thread-1 executes task-0
pool-1-thread-2 executes task-1
pool-1-thread-4 executes task-3
pool-1-thread-3 executes task-2
pool-1-thread-3 executes task-7
pool-1-thread-3 executes task-8
pool-1-thread-3 executes task-9
pool-1-thread-1 executes task-6
pool-1-thread-4 executes task-5
pool-1-thread-2 executes task-4

It clearly demonstrates that the executor uses all four threads to solve the tasks. The number of tasks solved by each thread can vary. We cannot know for sure what exactly we'll get.

If you do not know how many threads are needed in your pool, you can take the number of available processors as the pool size.

val poolSize = Runtime.getRuntime().availableProcessors()
val executor = Executors.newFixedThreadPool(poolSize)

Types of executors

We have considered the most commonly used executor with a fixed size of the pool. Here are a few other types:

  • An executor with a single thread

The simplest executor has only a single thread in the pool. It may be enough for the async execution of rarely submitted and small tasks.

val executor: ExecutorService = Executors.newSingleThreadExecutor()

Important: one thread may not have time to process all incoming tasks, and the queue will grow significantly, consuming all the memory.

  • An executor with a growing pool

There is also an executor that automatically increases the number of threads when needed and reuses previously constructed threads.

val executor: ExecutorService = Executors.newCachedThreadPool()

It can usually improve the performance of programs with many short-lived asynchronous tasks. However, it can also lead to problems when the number of threads increases too much. It is preferable to choose the fixed thread-pool executor whenever possible.

  • An executor that schedules a task

If you need to perform the same task periodically or only once after a given delay, use the following executor:

val executor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()

The function scheduleAtFixedRate() submits a periodic Runnable task that becomes enabled first after the given initDelay, and subsequently with the given period.

Here is a quick example with scheduling:

val executor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()
executor.scheduleAtFixedRate(
    { println(LocalTime.now().toString() + ": Hello!") },
     1000,
     1000,
     TimeUnit.MILLISECONDS
)

Here is a fragment of the output:

02:30:06.375392: Hello!
02:30:07.375356: Hello!
02:30:08.375376: Hello!
...and even more...

It can be stopped the way we did it before.

This kind of executor also has a function named schedule, which starts a task only once after the given delay, and another function scheduleWithFixedDelay, which starts the task with a fixed wait after the previous one is completed.

Exception handling

In our examples, we often ignore error handling to simplify code. Here, we demonstrate one feature related to the handling of exceptions in executors (namely, the unchecked ones).

What do you think the following code will print?

val executor = Executors.newSingleThreadExecutor()
executor.submit { println(2 / 0) }

It does not print anything at all, not even the exception! That is why it is common practice to wrap a task in the try-catch block not to lose the exception.

val executor = Executors.newSingleThreadExecutor()
executor.submit {
    try {
        println(2 / 0)
    } catch (e: Exception) {
        println(e.stackTraceToString())
    }
}

Now you will see the exception. In real applications, it is better to use some kind of logging to output it. Note that the executor will still work after the exception because it dynamically creates a new thread.

Conclusion

In this topic, you've learned how to create services which manage threads and tasks. In a nutshell:

1. You can create different types of executor services by using functions of the Executors class: newCachedThreadPool, newFixedThreadPool, etc.

2. You can add tasks to a service by using the function submit.

3. You can stop the executor by using the function shutdown.

Now, you can create and configure your own thread services, just remember about exceptions in tasks. This skill will come in handy when you'll work on a big project. Meanwhile, let's have some practice!

30 learners liked this piece of theory. 0 didn't like it. What about you?
Report a typo