A good understanding of concurrent programming is essential for the development of distributed applications, as servers always process several requests simultaneously.
Processes are isolated from each other and can only communicate with each other via explicit mechanisms; processes do not share the same address space.
All threads of a process share the same address space. Native threads are threads supported by the operating system that are managed directly by the operating system. Standard Java threads are native threads.
Fibres* (also coroutines) always use cooperative multitasking. This means that a fibre explicitly passes control to another fibre. (Formerly also referred to as green threads.) These are invisible to the operating system.
As of Java 21, Java not only supports classic (native) threads but also virtual threads (which are "somewhere" between green threads and native threads. The latter in particular allow very natural programming of middleware that takes care of parallelization/concurrency.
A monitor is an object in which the methods are executed in mutual exclusion (mutual exclusion).
Condition synchronization
expresses a condition on the order in which operations are executed.
For example, data can only be removed from a buffer once data has been entered into the buffer.
Java only supports one (anonymous) condition variable per monitor, with the classic methods wait and notify or notifyAll.
Monitors are just one model (alternatives: Semaphores, Message Passing) that enables the communication and synchronization of threads. It is the standard model in Java and is directly supported by the Java Virtual Machine (JVM).
By reading and writing data encapsulated in shared objects that are protected by monitors.
Each object is implicitly derived from the class java.lang.Object, which defines a mutual exclusion lock.
Methods in a class can be marked as synchronized. The method is only executed when the lock is present. It waits until then. This process happens automatically.
The lock can also be acquired via a synchronized statement that names the object.
A thread can wait for a single (anonymous) condition variable and notify it.
Threads are provided in Java via the predefined class java.lang.Thread.
Alternatively, the interface:
public interface Runnable { void run(); }
can be implemented and an instance can then be passed to a Thread-Objekt.
Threads only start their execution when the start method is called in the thread class. The thread.start method calls the run method. Calling the run method directly does not lead to parallel execution.
The current thread can be determined using the static method Thread.currentThread().
A thread is terminated when the execution of its run method ends either normally or as the result of an unhandled exception.
Java distinguishes between user threads and daemon threads.
Daemon threads are threads that provide general services and are normally never terminated.
When all user threads are terminated, the daemon threads are terminated by the JVM and the main programme is terminated.
The method setDaemon must be called before the thread is started.
A thread can wait (with or without a timeout) for another thread (the target) to finish, by calling the join method of the target thread.
A thread can use the isAlive method to determine whether the target thread has ended.
synchronized-Methods and synchronized-BlocksA mutual exclusion lock is assigned to each object. The lock cannot be accessed explicitly by the application. This happens implicitly if:
a method uses the method modifier synchronized
block synchronization with the keyword synchronized is used
If a method is marked as synchronized, the method can only be accessed if the system has received the lock.
Therefore, synchronized methods have mutually exclusive access to the data encapsulated by the object, if this data is only accessed in other synchronized contexts.
Non-synchronized methods do not require a lock and can therefore be called at any time.
1public class SynchronizedCounter {23private int count = 0;45public synchronized void increment() {6count++;7}89public synchronized int getCount() {10return count;11}12}
1public class SharedLong {23private long theData; // reading and writing longs is not atomic45public SharedLong(long initialValue) {6theData = initialValue;7}89public synchronized long read() { return theData; }1011public synchronized void write(long newValue) { theData = newValue; }1213public synchronized void incrementBy(long by) {14theData = theData + by;15}16}1718SharedLong myData = new SharedLong(42);
1public class SynchronizedCounter {23private int count = 0;45public void increment() {6synchronized(this) {7count++;8}9}1011public int getCount() {12synchronized(this) {13return count;14}15}16}
1public class SharedCoordinate {23private int x, y;45public SharedCoordinate(int initX, int initY) {6this.x = initX; this.y = initY;7}89public synchronized void write(int newX, int newY) {10this.x = newX; this.y = newY;11}1213/*⚠️*/ public /* synchronized irrelevant */ int readX() { return x; } /*⚠️*/14/*⚠️*/ public /* synchronized irrelevant */ int readY() { return y; } /*⚠️*/1516public synchronized SharedCoordinate read() {17return new SharedCoordinate(x, y);18} }
The two methods: readX and readY are not synchronized, as reading int values is atomic. However, they do allow an inconsistent state to be read! It is conceivable that the corresponding thread is interrupted directly after a readX and another thread changes the values of x and y. If the original thread is then continued and calls readY, it receives the new value of y and thus has a pair of x, y that never existed in this form.
A consistent state can only be determined by the method read, which reads the values of x and y in one step and returns them as a pair.
If it can be ensured that a reading thread names the instance in a synchronized block, then the reading of a consistent state can also be ensured for several consecutive method calls.
1SharedCoordinate point = new SharedCoordinate(0,0);2synchronized (point1) {3var x = point1.readX();4var y = point1.readY();5}6// do something with x and y
However, this "solution" is very dangerous, as the probability of programming errors is very high and this can lead to either race conditions (here) or deadlocks (in general).
For the purpose of conditional synchronisation, the methods wait, notify and notifyAll can be used in Java. These methods allow you to wait for certain conditions and notify other threads when the condition has changed.
These methods can only be used within methods that hold the object lock; otherwise a IllegalMonitorStateException is thrown.
The wait method always blocks the calling thread and releases the lock associated with the object.
The notify method wakes up a waiting thread. Which thread is woken up is not specified.
notify does not release the lock; therefore, the awakened thread must wait until it can receive the lock before it can continue.
Use notifyAll to wake up all waiting threads.
If the threads are waiting due to different conditions, notifyAll must always be used.
If no thread is waiting, then notify and notifyAll have no effect.
If a thread is waiting for a condition, no other thread can wait for the other condition.
With the primitives presented so far, direct modelling of this scenario is not possible. Instead, all threads must always be woken up to ensure that the intended thread is also woken up. This is why it is also necessary to check the condition in a loop.
A BoundedBuffer traditionally uses two condition variables: BufferNotFull und BufferNotEmpty.
1public class BoundedBuffer {2private final int buffer[];3private int first;4private int last;5private int numberInBuffer = 0;6private final int size;78public BoundedBuffer(int length) {9size = length;10buffer = new int[size];11last = 0;12first = 0;13};
14public synchronized void put(int item) throws InterruptedException {15while (numberInBuffer == size)16wait();17last = (last + 1) % size;18numberInBuffer++;19buffer[last] = item;20notifyAll();21};
22public synchronized int get() throws InterruptedException {23while (numberInBuffer == 0)24wait();25first = (first + 1) % size;26numberInBuffer--;27notifyAll();28return buffer[first];29}30}
Error situation that could occur when using notify instead of notifyAll:
1BoundedBuffer bb = new BoundedBuffer(1);2Thread g1,g2 = new Thread(() => { bb.get(); });3Thread p1,p2 = new Thread(() => { bb.put(new Object()); });4g1.start(); g2.start(); p1.start(); p2.start();
Operation |
Change of State of the Buffer |
Waiting for the lock |
Waiting for the condition |
|
|---|---|---|---|---|
1 |
g1:bb.get() |
empty |
{g2,p1,p2} |
{g1} |
2 |
g2:bb.get() |
empty |
{p1,p2} |
{g1,g2} |
3 |
p1:bb.put() |
empty → not empty |
{p2,g1} |
{g2} |
4 |
p2:bb.put() |
not empty |
{g1} |
{g2,p2} |
5 |
g1:bb.get() |
not empty → empty |
{g2} |
{p2} |
6 |
g2:bb.get() |
empty |
∅ |
{g2,p2} |
In step 5, the VM woke up the g2 thread - instead of the p2 thread - due to the call of notify by g1. The awakened thread g2 checks the condition (step 6) and realises that the buffer is empty. It goes back to the wait state. Now both a thread that wants to write a value and a thread that wants to read a value are waiting.
Provides various classes to support common concurrent programming paradigms, e.g. support for BoundedBuffers or thread pools.
Provides support for lock-free, thread-safe programming on simple variables - such as atomic integers.
Provides various lock algorithms that complement the Java language mechanisms, e.g. read-write locks and conditional variables. This enables, for example: "Hand-over-Hand" or "Chain Locking".
A BoundedBuffer, for example, traditionally has two condition variables: BufferNotFull and BufferNotEmpty.
1public class BoundedBuffer<T> {23private final T buffer[];4private int first;5private int last;6private int numberInBuffer;7private final int size;89private final Lock lock = new ReentrantLock();10private final Condition notFull = lock.newCondition();11private final Condition notEmpty = lock.newCondition();
12public BoundedBuffer(int length) { /* Normal constructor. */13size = length;14buffer = (T[]) new Object[size];15last = 0;16first = 0;17numberInBuffer = 0;18}
19public void put(T item) throws InterruptedException {20lock.lock();21try {2223while (numberInBuffer == size) { notFull.await(); }24last = (last + 1) % size;25numberInBuffer++;26buffer[last] = item;27notEmpty.signal();2829} finally {30lock.unlock();31}32}
33public T get() ... {34lock.lock();35try {3637while (numberInBuffer == 0) { notEmpty.await(); }38first = (first + 1) % size;39numberInBuffer--;40notFull.signal();41return buffer[first];4243} finally {44lock.unlock();45}46}47}
Although priorities can be assigned to the Java threads (setPriority), they only serve the underlying scheduler as a guideline for resource allocation.
A thread can explicitly give up the processor resources by calling the yield method.
yield places the thread at the end of the queue for its priority level.
However, Java's scheduling and priority models are weak:
There is no guarantee that the thread with the highest priority that can run will always be executed.
Threads with the same priority may or may not be divided into time slices.
When using native threads, different Java priorities can be mapped to the same operating system priority.
synchronized code should be kept as short as possible.
Nested monitor calls should be avoided as the outer lock is not released when the inner monitor is waiting. This can easily lead to a deadlock occurring.
For a class to be thread-safe, it must behave correctly in a single-threaded environment.
I.e. if a class is implemented correctly, then no sequence of operations (reading or writing public fields and calling public methods) on objects of this class should be able to
set the object to an invalid state,
observe the object in an invalid state, or
violate one of the invariants, preconditions or postconditions of the class.
The class must also behave correctly when accessed by multiple threads.
Independent of scheduling or the interleaving of the execution of these threads by the runtime environment,
Without additional synchronisation on the part of the calling code.
As a result, operations on a thread-safe object appear to all threads as if the operations were performed in a fixed, globally consistent order.
The objects are constant and cannot be changed.
The objects can be changed, but support concurrent access as the methods are synchronized accordingly.
All objects where each individual operation is thread-safe, but certain sequences of operations may require external synchronization.
All objects that have no synchronization at all. However, the caller can take over the synchronization externally if necessary.
Objects that are not thread-safe and cannot be made thread-safe as they manipulate global status, for example.
Delayed Execution
Implement a class (DelayingExecutor) that accepts tasks (instances of java.lang.Runable) and executes them after a certain time. The class must not block or be locked during this time.
Consider using virtual threads. A virtual thread can be created using the method: Thread.ofVirtual(). A Runnable object can then be passed to the start method.
Delay the execution (Thread.sleep()) by an average of 100ms with a standard deviation of 20ms. (Use Random.nextGaussian(mean,stddev))
Start 100 000 virtual threads. How long does the execution take? How long does the execution take with 100 000 platform (native) threads?
It is recommended to use the template.
1import java.util.ArrayList;2import java.util.List;3import java.util.Random;45public class DelayingExecutor {67private final Random random = new Random();89private Thread runDelayed(int id, Runnable task) {10// TODO11}1213public static void main(String[] args) throws Exception {14var start = System.nanoTime();15DelayingExecutor executor = new DelayingExecutor();16List<Thread> threads = new ArrayList<>();17for (int i = 0; i < 100000; i++) {18final var no = i;19var thread = executor.runDelayed(20i,21() -> System.out.println("i'm no.: " + no));22threads.add(thread);23}24System.out.println("finished starting all threads");25for (Thread thread : threads) {26thread.join();27}28var runtime = (System.nanoTime() - start)/1_000_000;29System.out.println(30"all threads finished after: " + runtime + "ms"31);32}33}
Thread-safe programming
Implement a class ThreadsafeArray to store non-null objects (java.lang.Object) at selected indices - comparable to a normal array. Compared to a normal array, however, a thread which wants to read a value should be blocked if the cell is occupied. The class should provide the following methods:
get(int index):Returns the value at the position index. The calling thread may be blocked until a value has been saved at the index position. (The get method does not remove the value from the array).
set(int index, Object value):Stores the value value at the position index. If a value has already been saved at the position index, the calling thread is blocked until the value at the position index has been deleted.
delete(int index):Deletes the value at position index if a value exists. Otherwise, the thread is blocked until there is a value that can be deleted.
Implement the ThreadsafeArray class using only the standard primitives: synchronized, wait, notify and notifyAll. Use the template.
Can you use both notify and notifyAll?
Implement the ThreadsafeArray class using ReentrantLocks and Conditions. Use the template.
What are the advantages of using ReentrantLocks?
You can also consider the class ThreadsafeArray as an array of BoundedBuffers with the size 1.
1public class ThreadsafeArray {23private final Object[] array;45public ThreadsafeArray(int size) {6this.array = new Object[size];7}89// complete method signatures and implementations10Object get(int index)11void set(int index, Object value)12void remove(int index)1314public static void main(String[] args) throws Exception {15final var ARRAY_SIZE = 2;16final var SLEEP_TIME = 1; // ms17var array = new ThreadsafeArray(ARRAY_SIZE);18for (int i = 0; i < ARRAY_SIZE; i++) {19final var threadId = i;2021final var readerThreadName = "Reader";22var t2 = new Thread(() -> {23while (true) {24int j = (int) (Math.random() * ARRAY_SIZE);25try {26out.println(readerThreadName + "[" + j + "]" );27var o = array.get(j);28out.println(readerThreadName +29"[" + j + "] ⇒ #" + o.hashCode());30Thread.sleep(SLEEP_TIME);31} catch (InterruptedException e) {32e.printStackTrace();33}34}35}, readerThreadName);36t2.start();3738// One Thread for each slot that will eventually39// write some content40final var writerThreadName = "Writer[" + threadId + "]";41var t1 = new Thread(() -> {42while (true) {43try {44var o = new Object();45out.println(writerThreadName + " = #" + o.hashCode());46array.set(threadId, o);47out.println(writerThreadName + " done");48Thread.sleep(SLEEP_TIME);49} catch (InterruptedException e) {50e.printStackTrace();51}52}53}, writerThreadName);54t1.start();5556// One Thread for each slot that will eventually57// delete the content58final var deleterThreadName = "Delete[" + threadId + "]";59var t3 = new Thread(() -> {60while (true) {61try {62out.println(deleterThreadName);63array.delete(threadId);64Thread.sleep(SLEEP_TIME);65} catch (InterruptedException e) {66e.printStackTrace();67}68}69}, deleterThreadName);70t3.start();71}72}73}