1.0
A good understanding of concurrent programming is essential for the development of distributed applications, as servers always process several requests simultaneously.
Processes are isolated from each other and can only communicate with each other via explicit mechanisms; processes do not share the same address space.
All threads of a process share the same address space. Native threads are threads supported by the operating system that are managed directly by the operating system. Standard Java threads are native threads.
Fibres* (also coroutines) always use cooperative multitasking. This means that a fibre explicitly passes control to another fibre. (Formerly also referred to as green threads.) These are invisible to the operating system.
As of Java 21, Java not only supports classic (native) threads but also virtual threads (which are "somewhere" between green threads and native threads. The latter in particular allow very natural programming of middleware that takes care of parallelization/concurrency.
A monitor is an object in which the methods are executed in mutual exclusion (mutual exclusion).
Condition synchronisation
expresses a condition on the order in which operations are executed.
For example, data can only be removed from a buffer once data has been entered into the buffer.
Java only supports one (anonymous) condition variable per monitor, with the classic methods wait
and notify
or notifyAll
.
Monitors are just one model (alternatives: Semaphores, Message Passing) that enables the communication and synchronisation of threads. It is the standard model in Java and is directly supported by the Java Virtual Machine (JVM).
By reading and writing data encapsulated in shared objects that are protected by monitors.
Each object is implicitly derived from the class java.lang.Object
, which defines a mutual exclusion lock.
Methods in a class can be marked as synchronized
. The method is only executed when the lock is present. It waits until then. This process happens automatically.
The lock can also be acquired via a synchronized
statement that names the object.
A thread can wait for a single (anonymous) condition variable and notify it.
Threads are provided in Java via the predefined class java.lang.Thread
.
Alternatively, the interface:
public interface Runnable { void run(); }
can be implemented and an instance can then be passed to a Thread-Objekt.
Threads only start their execution when the start
method is called in the thread class. The thread.start
method calls the run
method. Calling the run
method directly does not lead to parallel execution.
The current thread can be determined using the static method Thread.currentThread()
.
A thread is terminated when the execution of its run method ends either normally or as the result of an unhandled exception.
Java distinguishes between user threads and daemon threads.
Daemon threads are threads that provide general services and are normally never terminated.
When all user threads are terminated, the daemon threads are terminated by the JVM and the main programme is terminated.
The method setDaemon
must be called before the thread is started.
A thread can wait (with or without a timeout) for another thread (the target) to finish, by calling the join
method of the target thread.
A thread can use the isAlive
method to determine whether the target thread has ended.
synchronized
-Methods and synchronized
-BlocksA mutual exclusion lock is assigned to each object. The lock cannot be accessed explicitly by the application. This happens implicitly if:
a method uses the method modifier synchronized
block synchronization with the keyword synchronized
is used
If a method is marked as synchronized
, the method can only be accessed if the system has received the lock.
Therefore, synchronized
methods have mutually exclusive access to the data encapsulated by the object, if this data is only accessed in other synchronized contexts.
Non-synchronized
methods do not require a lock and can therefore be called at any time.
1public class SynchronizedCounter {
23
private int count = 0;
45
public synchronized void increment() {
6count++;
7}
89
public synchronized int getCount() {
10return count;
11}
12}
1public class SharedLong {
23
private long theData; // reading and writing longs is not atomic
45
public SharedLong(long initialValue) {
6theData = initialValue;
7}
89
public synchronized long read() { return theData; }
1011
public synchronized void write(long newValue) { theData = newValue; }
1213
public synchronized void incrementBy(long by) {
14theData = theData + by;
15}
16}
1718
SharedLong myData = new SharedLong(42);
1public class SynchronizedCounter {
23
private int count = 0;
45
public void increment() {
6synchronized(this) {
7count++;
8}
9}
1011
public int getCount() {
12synchronized(this) {
13return count;
14}
15}
16}
1public class SharedCoordinate {
23
private int x, y;
45
public SharedCoordinate(int initX, int initY) {
6this.x = initX; this.y = initY;
7}
89
public synchronized void write(int newX, int newY) {
10this.x = newX; this.y = newY;
11}
1213
/*⚠️*/ public /* synchronized irrelevant */ int readX() { return x; } /*⚠️*/
14/*⚠️*/ public /* synchronized irrelevant */ int readY() { return y; } /*⚠️*/
1516
public synchronized SharedCoordinate read() {
17return new SharedCoordinate(x, y);
18} }
The two methods: readX
and readY
are not synchronised, as reading int
values is atomic. However, they do allow an inconsistent state to be read! It is conceivable that the corresponding thread is interrupted directly after a readX
and another thread changes the values of x
and y
. If the original thread is then continued and calls readY
, it receives the new value of y
and thus has a pair of x
, y
that never existed in this form.
A consistent state can only be determined by the method read
, which reads the values of x
and y
in one step and returns them as a pair.
If it can be ensured that a reading thread names the instance in a synchronized
block, then the reading of a consistent state can also be ensured for several consecutive method calls.
1SharedCoordinate point = new SharedCoordinate(0,0);
2synchronized (point1) {
3var x = point1.readX();
4var y = point1.readY();
5}
6// do something with x and y
However, this "solution" is very dangerous, as the probability of programming errors is very high and this can lead to either race conditions (here) or deadlocks (in general).
For the purpose of conditional synchronisation, the methods wait
, notify
and notifyAll
can be used in Java. These methods allow you to wait for certain conditions and notify other threads when the condition has changed.
These methods can only be used within methods that hold the object lock; otherwise a IllegalMonitorStateException
is thrown.
The wait
method always blocks the calling thread and releases the lock associated with the object.
The notify
method wakes up a waiting thread. Which thread is woken up is not specified.
notify
does not release the lock; therefore, the awakened thread must wait until it can receive the lock before it can continue.
Use notifyAll
to wake up all waiting threads.
If the threads are waiting due to different conditions, notifyAll
must always be used.
If no thread is waiting, then notify
and notifyAll
have no effect.
If a thread is waiting for a condition, no other thread can wait for the other condition.
With the primitives presented so far, direct modelling of this scenario is not possible. Instead, all threads must always be woken up to ensure that the intended thread is also woken up. This is why it is also necessary to check the condition in a loop.
A BoundedBuffer traditionally uses two condition variables: BufferNotFull und BufferNotEmpty.
1public class BoundedBuffer {
2private final int buffer[];
3private int first;
4private int last;
5private int numberInBuffer = 0;
6private final int size;
78
public BoundedBuffer(int length) {
9size = length;
10buffer = new int[size];
11last = 0;
12first = 0;
13};
14public synchronized void put(int item) throws InterruptedException {
15while (numberInBuffer == size)
16wait();
17last = (last + 1) % size;
18numberInBuffer++;
19buffer[last] = item;
20notifyAll();
21};
22public synchronized int get() throws InterruptedException {
23while (numberInBuffer == 0)
24wait();
25first = (first + 1) % size;
26numberInBuffer--;
27notifyAll();
28return buffer[first];
29}
30}
Error situation that could occur when using notify
instead of notifyAll
:
1BoundedBuffer bb = new BoundedBuffer(1);
2Thread g1,g2 = new Thread(() => { bb.get(); });
3Thread p1,p2 = new Thread(() => { bb.put(new Object()); });
4g1.start(); g2.start(); p1.start(); p2.start();
Operation |
Change of State of the Buffer |
Waiting for the lock |
Waiting for the condition |
|
---|---|---|---|---|
1 |
g1:bb.get() |
empty |
{g2,p1,p2} |
{g1} |
2 |
g2:bb.get() |
empty |
{p1,p2} |
{g1,g2} |
3 |
p1:bb.put() |
empty → not empty |
{p2,g1} |
{g2} |
4 |
p2:bb.put() |
not empty |
{g1} |
{g2,p2} |
5 |
g1:bb.get() |
not empty → empty |
{g2} |
{p2} |
6 |
g2:bb.get() |
empty |
∅ |
{g2,p2} |
In step 5, the VM woke up the g2
thread - instead of the p2
thread - due to the call of notify
by g1
. The awakened thread g2
checks the condition (step 6) and realises that the buffer is empty. It goes back to the wait state. Now both a thread that wants to write a value and a thread that wants to read a value are waiting.
Provides various classes to support common concurrent programming paradigms, e.g. support for BoundedBuffers or thread pools.
Provides support for lock-free, thread-safe programming on simple variables - such as atomic integers.
Provides various lock algorithms that complement the Java language mechanisms, e.g. read-write locks and conditional variables. This enables, for example: "Hand-over-Hand" or "Chain Locking".
A BoundedBuffer, for example, traditionally has two condition variables: BufferNotFull and BufferNotEmpty.
1public class BoundedBuffer<T> {
23
private final T buffer[];
4private int first;
5private int last;
6private int numberInBuffer;
7private final int size;
89
private final Lock lock = new ReentrantLock();
10private final Condition notFull = lock.newCondition();
11private final Condition notEmpty = lock.newCondition();
12public BoundedBuffer(int length) { /* Normal constructor. */
13size = length;
14buffer = (T[]) new Object[size];
15last = 0;
16first = 0;
17numberInBuffer = 0;
18}
19public void put(T item) throws InterruptedException {
20lock.lock();
21try {
2223
while (numberInBuffer == size) { notFull.await(); }
24last = (last + 1) % size;
25numberInBuffer++;
26buffer[last] = item;
27notEmpty.signal();
2829
} finally {
30lock.unlock();
31}
32}
33public T get() ... {
34lock.lock();
35try {
3637
while (numberInBuffer == 0) { notEmpty.await(); }
38first = (first + 1) % size;
39numberInBuffer--;
40notFull.signal();
41return buffer[first];
4243
} finally {
44lock.unlock();
45}
46}
47}
Although priorities can be assigned to the Java threads (setPriority
), they only serve the underlying scheduler as a guideline for resource allocation.
A thread can explicitly give up the processor resources by calling the yield
method.
yield
places the thread at the end of the queue for its priority level.
However, Java's scheduling and priority models are weak:
There is no guarantee that the thread with the highest priority that can run will always be executed.
Threads with the same priority may or may not be divided into time slices.
When using native threads, different Java priorities can be mapped to the same operating system priority.
synchronized
code should be kept as short as possible.
Nested monitor calls should be avoided as the outer lock is not released when the inner monitor is waiting. This can easily lead to a deadlock occurring.
For a class to be thread-safe, it must behave correctly in a single-threaded environment.
I.e. if a class is implemented correctly, then no sequence of operations (reading or writing public fields and calling public methods) on objects of this class should be able to
set the object to an invalid state,
observe the object in an invalid state, or
violate one of the invariants, preconditions or postconditions of the class.
The class must also behave correctly when accessed by multiple threads.
Independent of scheduling or the interleaving of the execution of these threads by the runtime environment,
Without additional synchronisation on the part of the calling code.
As a result, operations on a thread-safe object appear to all threads as if the operations were performed in a fixed, globally consistent order.
The objects are constant and cannot be changed.
The objects can be changed, but support concurrent access as the methods are synchronized accordingly.
All objects where each individual operation is thread-safe, but certain sequences of operations may require external synchronization.
All objects that have no synchronization at all. However, the caller can take over the synchronization externally if necessary.
Objects that are not thread-safe and cannot be made thread-safe as they manipulate global status, for example.
Delayed Execution
Implement a class (DelayingExecutor
) that accepts tasks (instances of java.lang.Runable
) and executes them after a certain time. The class must not block or be locked during this time.
Consider using virtual threads. A virtual thread can be created using the method: Thread.ofVirtual()
. A Runnable
object can then be passed to the start
method.
Delay the execution (Thread.sleep()
) by an average of 100ms with a standard deviation of 20ms. (Use Random.nextGaussian(mean,stddev)
)
Start 100 000 virtual threads. How long does the execution take? How long does the execution take with 100 000 platform (native) threads?
It is recommended to use the template.
1import java.util.ArrayList;
2import java.util.List;
3import java.util.Random;
45
public class DelayingExecutor {
67
private final Random random = new Random();
89
private Thread runDelayed(int id, Runnable task) {
10// TODO
11}
1213
public static void main(String[] args) throws Exception {
14var start = System.nanoTime();
15DelayingExecutor executor = new DelayingExecutor();
16List<Thread> threads = new ArrayList<>();
17for (int i = 0; i < 100000; i++) {
18final var no = i;
19var thread = executor.runDelayed(
20i,
21() -> System.out.println("i'm no.: " + no));
22threads.add(thread);
23}
24System.out.println("finished starting all threads");
25for (Thread thread : threads) {
26thread.join();
27}
28var runtime = (System.nanoTime() - start)/1_000_000;
29System.out.println(
30"all threads finished after: " + runtime + "ms"
31);
32}
33}
Thread-safe programming
Implement a class ThreadsafeArray
to store non-null
objects (java.lang.Object
) at selected indices - comparable to a normal array. Compared to a normal array, however, a thread which wants to read a value should be blocked if the cell is occupied. The class should provide the following methods:
get(int index)
:Returns the value at the position index
. The calling thread may be blocked until a value has been saved at the index
position. (The get
method does not remove the value from the array).
set(int index, Object value)
:Stores the value value
at the position index
. If a value has already been saved at the position index
, the calling thread is blocked until the value at the position index
has been deleted.
delete(int index)
:Deletes the value at position index
if a value exists. Otherwise, the thread is blocked until there is a value that can be deleted.
Implement the ThreadsafeArray
class using only the standard primitives: synchronized
, wait
, notify
and notifyAll
. Use the template.
Can you use both notify
and notifyAll
?
Implement the ThreadsafeArray
class using ReentrantLock
s and Condition
s. Use the template.
What are the advantages of using ReentrantLock
s?
You can also consider the class ThreadsafeArray
as an array of BoundedBuffers with the size 1.
1public class ThreadsafeArray {
23
private final Object[] array;
45
public ThreadsafeArray(int size) {
6this.array = new Object[size];
7}
89
// complete method signatures and implementations
10Object get(int index)
11void set(int index, Object value)
12void remove(int index)
1314
public static void main(String[] args) throws Exception {
15final var ARRAY_SIZE = 2;
16final var SLEEP_TIME = 1; // ms
17var array = new ThreadsafeArray(ARRAY_SIZE);
18for (int i = 0; i < ARRAY_SIZE; i++) {
19final var threadId = i;
2021
final var readerThreadName = "Reader";
22var t2 = new Thread(() -> {
23while (true) {
24int j = (int) (Math.random() * ARRAY_SIZE);
25try {
26out.println(readerThreadName + "[" + j + "]" );
27var o = array.get(j);
28out.println(readerThreadName +
29"[" + j + "] ⇒ #" + o.hashCode());
30Thread.sleep(SLEEP_TIME);
31} catch (InterruptedException e) {
32e.printStackTrace();
33}
34}
35}, readerThreadName);
36t2.start();
3738
// One Thread for each slot that will eventually
39// write some content
40final var writerThreadName = "Writer[" + threadId + "]";
41var t1 = new Thread(() -> {
42while (true) {
43try {
44var o = new Object();
45out.println(writerThreadName + " = #" + o.hashCode());
46array.set(threadId, o);
47out.println(writerThreadName + " done");
48Thread.sleep(SLEEP_TIME);
49} catch (InterruptedException e) {
50e.printStackTrace();
51}
52}
53}, writerThreadName);
54t1.start();
5556
// One Thread for each slot that will eventually
57// delete the content
58final var deleterThreadName = "Delete[" + threadId + "]";
59var t3 = new Thread(() -> {
60while (true) {
61try {
62out.println(deleterThreadName);
63array.delete(threadId);
64Thread.sleep(SLEEP_TIME);
65} catch (InterruptedException e) {
66e.printStackTrace();
67}
68}
69}, deleterThreadName);
70t3.start();
71}
72}
73}