Concurrency in Java
1.0
Ein gutes Verständnis von nebenläufiger Programmierung ist für die Entwicklung von verteilten Anwendungen unerlässlich, da Server immer mehrere Anfragen gleichzeitig bearbeiten.
Prozesse sind voneinander isoliert und können nur über explizite Mechanismen miteinander kommunizieren; Prozesse teilen sich nicht denselben Adressraum.
Alle Threads eines Prozesses teilen sich denselben Adressraum. Native Threads sind vom Betriebssystem unterstützte Threads, die direkt vom Betriebssystem verwaltet werden. Standard Java Threads sind Native Threads.
Fibres (auch Coroutines) nutzen immer kooperatives Multitasking. D. h. ein Fibre gibt die Kontrolle an eine andere Fibre explizit ab. (Früher auch als Green Threads bezeichnet.) Diese sind für das Betriebssystem unsichtbar.
Ab Java 21 unterstützt Java nicht nur klassische (native) Threads sondern zusätzlich auf Virtual Threads. Letztere erlauben insbesondere eine sehr natürliche Programmierung von Middleware, die sich um die Parallelisierung/Nebenläufigkeit kümmert.
Ein Monitor ist ein Objekt, bei dem die Methoden im wechselseitigen Ausschluss (engl. mutual exclusion) ausgeführt werden.
Bedingungs-Synchronisation
drückt eine Bedingung für die Reihenfolge der Ausführung von Operationen aus.
z. B. können Daten erst dann aus einem Puffer entfernt werden, wenn Daten in den Puffer eingegeben wurden.
Java unterstützt pro Monitor nur eine (anonyme) Bedingungs-Variable, mit den klassischen Methoden wait
und notify
bzw. notifyAll
.
Monitore sind nur ein Modell (Alternativen: Semaphores, Message Passing), das die Kommunikation und Synchronisation von Threads ermöglicht. Es ist das Standardmodell in Java und wird von der Java Virtual Machine (JVM) unterstützt.
Durch Lesen und Schreiben von Daten, die in gemeinsamen Objekten gekapselt sind, die durch Monitore geschützt werden.
Jedes Objekt ist implizit von der Klasse Object
abgeleitet, welche eine gegenseitige Ausschlusssperre definiert.
Methoden in einer Klasse können als synchronized
gekennzeichnet werden. Die Methode wird erst dann ausgeführt, wenn die Sperre vorliegt. Bis dahin wird gewartet. Dieser Prozess geschieht automatisch.
Die Sperre kann auch über eine synchronized
Anweisung erworben werden, die das Objekt benennt.
Ein Thread kann auf eine einzelne (anonyme) Bedingungsvariable warten und diese benachrichtigen.
Threads werden in Java über die vordefinierte Klasse java.lang.Thread bereitgestellt.
Alternativ kann das Interface:
public interface Runnable { void run(); }
implementiert werden und an ein Thread-Objekt übergeben werden.
Threads beginnen ihre Ausführung erst, wenn die start
-Methode in der Thread-Klasse aufgerufen wird. Die Thread.start
-Methode ruft die run
-Methode auf. Ein Aufruf der run
-Methode direkt führt nicht zu einer parallelen Ausführung.
Der aktuelle Thread kann mittels der statischen Methode Thread.currentThread()
ermittelt werden.
Ein Thread wird beendet, wenn die Ausführung seiner Run-Methode entweder normal oder als Ergebnis einer unbehandelten Ausnahme endet.
Java unterscheidet User-Threads und Daemon-Threads.
Daemon-Threads sind Threads, die allgemeine Dienste bereitstellen und normalerweise nie beendet werden.
Wenn alle Benutzer-Threads beendet sind, werden die Daemon-Threads von der JVM beendet, und das Hauptprogramm wird beendet.
Die Methode setDaemon
muss aufgerufen werden, bevor der Thread gestartet wird.
Ein Thread kann (mit oder ohne Zeitüberschreitung) auf die Beendigung eines anderen Threads (des Ziels) warten, indem er die join
-Methode für das Thread-Objekt des Ziels aufruft.
Mit der Methode isAlive
kann ein Thread feststellen, ob der Ziel-Thread beendet wurde.
synchronized
-Methoden und synchronized
-BlöckeJedem Objekt ist eine gegenseitige Ausschlusssperre zugeordnet. Auf die Sperre kann von der Anwendung nicht explizit zugegriffen werden. Dies geschieht implizit, wenn:
eine Methode den Methodenmodifikator synchronized
verwendet
Blocksynchronisierung mit dem Schlüsselwort synchronized
verwendet wird
Wenn eine Methode als synchronisiert gekennzeichnet ist, kann der Zugriff auf die Methode nur erfolgen, wenn das System die Sperre erhalten hat.
Daher haben synchronisierte Methoden einen sich gegenseitig ausschließenden Zugriff auf die vom Objekt gekapselten Daten, wenn auf diese Daten nur von anderen synchronisierten Methoden zugegriffen wird.
Nicht-synchronisierte Methoden benötigen keine Sperre und können daher jederzeit aufgerufen werden.
1public class SynchronizedCounter {
23
private int count = 0;
45
public synchronized void increment() {
6count++;
7}
89
public synchronized int getCount() {
10return count;
11}
12}
1public class SharedLong {
23
private long theData; // reading and writing longs is not atomic
45
public SharedLong(long initialValue) {
6theData = initialValue;
7}
89
public synchronized long read() { return theData; }
1011
public synchronized void write(long newValue) { theData = newValue; }
1213
public synchronized void incrementBy(long by) {
14theData = theData + by;
15}
16}
1718
SharedLong myData = new SharedLong(42);
1public class SynchronizedCounter {
23
private int count = 0;
45
public void increment() {
6synchronized(this) {
7count++;
8}
9}
1011
public int getCount() {
12synchronized(this) {
13return count;
14}
15}
16}
Dies liegt daran, dass es nicht möglich ist, die mit einem bestimmten Objekt verbundene Synchronisation zu verstehen, indem man sich nur das Objekt selbst ansieht. Andere Objekte können bgzl. des Objekts eine synchronized
-Block verwenden.
1public class SharedCoordinate {
23
private int x, y;
45
public SharedCoordinate(int initX, int initY) {
6this.x = initX; this.y = initY;
7}
89
public synchronized void write(int newX, int newY) {
10this.x = newX; this.y = newY;
11}
1213
/*⚠️*/ public /* synchronized irrelevant */ int readX() { return x; } /*⚠️*/
14/*⚠️*/ public /* synchronized irrelevant */ int readY() { return y; } /*⚠️*/
1516
public synchronized SharedCoordinate read() {
17return new SharedCoordinate(x, y);
18} }
Die beiden Methoden: readX
und readY
sind nicht synchronisiert, da das Lesen von int
-Werten atomar ist. Allerdings erlauben sie das Auslesen eines inkonsistenten Zustands! Es ist denkbar, dass direkt nach einem readX
der entsprechende Thread unterbrochen wird und ein anderer Thread die Werte von x
und y
verändert. Wird dann der ursprüngliche Thread fortgesetzt, und ruft readY
auf, so erhält er den neuen Wert von y
und hat somit ein paar x
, y
vorliegen, dass in dieser Form nie existiert hat.
Ein konsistenter Zustand kann nur durch die Methode read
ermittelt werden, die die Werte von x
und y
in einem Schritt ausliest und als Paar zurückgibt.
Kann sichergestellt werden, dass ein auslesender Thread die Instanz in einem synchronized
Block benennt, dann kann die Auslesung eines konsistenten Zustands auch bei mehreren Methodenaufrufen hintereinander sichergestellt werden.
1SharedCoordinate point = new SharedCoordinate(0,0);
2synchronized (point1) {
3var x = point1.readX();
4var y = point1.readY();
5}
6// do something with x and y
Diese „Lösung“ muss jedoch als sehr kritisch betrachtet werden, da die Wahrscheinlichkeit von Programmierfehlern sehr hoch ist und es dann entweder zur Race Conditions oder zu Deadlocks kommen kann.
Zum Zwecke der bedingten Synchronisation können in Java die Methoden
wait
,notify
undnotifyAll
verwendet werden. Diese Methoden erlauben es auf bestimmte Bedingungen zu warten und andere Threads zu benachrichtigen, wenn sich die Bedingung geändert hat.
Diese Methoden können nur innerhalb von Methoden verwendet werden, die die Objektsperre halten; andernfalls wird eine IllegalMonitorStateException
ausgelöst.
Die wait
-Methode blockiert immer den aufrufenden Thread und gibt die mit dem Objekt verbundene Sperre frei.
Die notify
-Methode weckt einen wartenden Thread auf. Welcher Thread aufgeweckt wird, ist nicht spezifiziert.
notify
gibt die Sperre nicht frei; daher muss der aufgeweckte Thread warten, bis er die Sperre erhalten kann, bevor er fortfahren kann.
Um alle wartenden Threads aufzuwecken, muss die Methode notifyAll
verwendet werden.
Warten die Threads aufgrund unterschiedlicher Bedingungen, so ist immer notifyAll
zu verwenden.
Wenn kein Thread wartet, dann haben notify
und notifyAll
keine Wirkung.
Wenn ein Thread auf eine Bedingung wartet, kann kein anderer Thread auf die andere Bedingung warten.
Mit den bisher vorgestellten Primitiven ist eine direkte Modellierung dieses Szenarios so nicht möglich. Stattdessen müssen immer alle Threads aufgeweckt werden, um sicherzustellen, dass auch der intendierte Thread aufgeweckt wird. Deswegen ist auch das Überprüfen der Bedingung in einer Schleife notwendig.
Ein BoundedBuffer hat z. B. traditionell zwei Bedingungsvariablen: BufferNotFull und BufferNotEmpty.
1public class BoundedBuffer {
2private final int buffer[];
3private int first;
4private int last;
5private int numberInBuffer = 0;
6private final int size;
78
public BoundedBuffer(int length) {
9size = length;
10buffer = new int[size];
11last = 0;
12first = 0;
13};
14public synchronized void put(int item) throws InterruptedException {
15while (numberInBuffer == size)
16wait();
17last = (last + 1) % size;
18numberInBuffer++;
19buffer[last] = item;
20notifyAll();
21};
22public synchronized int get() throws InterruptedException {
23while (numberInBuffer == 0)
24wait();
25first = (first + 1) % size;
26numberInBuffer--;
27notifyAll();
28return buffer[first];
29}
30}
Fehlersituation, die bei der Verwendung von notify
statt notifyAll
auftreten könnte:
1BoundedBuffer bb = new BoundedBuffer(1);
2Thread g1,g2 = new Thread(() => { bb.get(); });
3Thread p1,p2 = new Thread(() => { bb.put(new Object()); });
4g1.start(); g2.start(); p1.start(); p2.start();
Aktionen |
(Änderung des) Zustand(s) des Buffers |
Auf die Sperre (Lock) wartend |
An der Bedingung wartend |
|
---|---|---|---|---|
1 |
g1:bb.get() |
empty |
{g2,p1,p2} |
{g1} |
2 |
g2:bb.get() |
empty |
{p1,p2} |
{g1,g2} |
3 |
p1:bb.put() |
empty → not empty |
{p2,g1} |
{g2} |
4 |
p2:bb.put() |
not empty |
{g1} |
{g2,p2} |
5 |
g1:bb.get() |
not empty → empty |
{g2} |
{p2} |
6 |
g2:bb.get() |
empty |
∅ |
{g2,p2} |
In Schritt 5 wurde von der VM - aufgrund des Aufrufs von notify
durch g1
- der Thread g2
aufgeweckt - anstatt des Threads p2
. Der aufgeweckte Thread g2
prüft die Bedingung (Schritt 6) und stellt fest, dass der Buffer leer ist. Er geht wieder in den Wartezustand. Jetzt warten sowohl ein Thread, der ein Wert schreiben möchte als auch ein Thread, der einen Wert lesen möchte.
Bietet verschiedene Klassen zur Unterstützung gängiger nebenläufiger Programmierparadigmen, z. B. Unterstützung für BoundedBuffers oder Thread-Pools.
Bietet Unterstützung für sperrfreie (lock-free), thread-sichere Programmierung auf einfachen Variablen — wie zum Beispiel atomaren Integern — an.
Bietet verschiedene Sperralgorithmen an, die die Java-Sprachmechanismen ergänzen, z. B. Schreib-Lese-Sperren und Bedingungsvariablen. Dies ermöglicht zum Beispiel: „Hand-over-Hand“ oder „Chain Locking“.
Ein BoundedBuffer hat z. B. traditionell zwei Bedingungsvariablen: BufferNotFull und BufferNotEmpty.
1public class BoundedBuffer<T> {
23
private final T buffer[];
4private int first;
5private int last;
6private int numberInBuffer;
7private final int size;
89
private final Lock lock = new ReentrantLock();
10private final Condition notFull = lock.newCondition();
11private final Condition notEmpty = lock.newCondition();
12public BoundedBuffer(int length) { /* Normaler Constructor. */
13size = length;
14buffer = (T[]) new Object[size];
15last = 0;
16first = 0;
17numberInBuffer = 0;
18}
19public void put(T item) throws InterruptedException {
20lock.lock();
21try {
2223
while (numberInBuffer == size) { notFull.await(); }
24last = (last + 1) % size;
25numberInBuffer++;
26buffer[last] = item;
27notEmpty.signal();
2829
} finally {
30lock.unlock();
31}
32}
33public T get() ... {
34lock.lock();
35try {
3637
while (numberInBuffer == 0) { notEmpty.await(); }
38first = (first + 1) % size;
39numberInBuffer--;
40notFull.signal();
41return buffer[first];
4243
} finally {
44lock.unlock();
45}
46}
47}
Obwohl den Java-Threads Prioritäten zugewiesen werden können (setPriority
), dienen sie dem zugrunde liegenden Scheduler nur als Richtschnur für die Ressourcenzuweisung.
Sobald ein Thread läuft, kann er die Prozessorressourcen explizit aufgeben, indem er die yield
-Methode aufruft.
yield
setzt den Thread an das Ende der Warteschlange für seine Prioritätsstufe.
Die Scheduling- und Prioritätsmodelle von Java sind jedoch schwach:
Es gibt keine Garantie dafür, dass immer der Thread mit der höchsten Priorität ausgeführt wird, der lauffähig ist.
Threads mit gleicher Priorität können in Zeitscheiben unterteilt sein oder auch nicht.
Bei der Verwendung nativer Threads können unterschiedliche Java-Prioritäten auf dieselbe Betriebssystempriorität abgebildet werden.
synchronized
Code sollte so kurz wie möglich gehalten werden.
Verschachtelte Monitoraufrufe sollten vermieden werden, da die äußere Sperre nicht freigegeben wird, wenn der innere Monitor wartet. Dies kann leicht zum Auftreten eines Deadlocks führen.
Threadsicherheit
Damit eine Klasse thread-sicher ist, muss sie sich in einer single-threaded Umgebung korrekt verhalten.
D. h. wenn eine Klasse korrekt implementiert ist, dann sollte keine Abfolge von Operationen (Lesen oder Schreiben von öffentlichen Feldern und Aufrufen von öffentlichen Methoden) auf Objekten dieser Klasse in der Lage sein:
das Objekt in einen ungültigen Zustand versetzen,
das Objekt in einem ungültigen Zustand zu beobachten oder
eine der Invarianten, Vorbedingungen oder Nachbedingungen der Klasse verletzen.
Die Klasse muss das korrekte Verhalten auch dann aufweisen, wenn auf sie von mehreren Threads aus zugegriffen wird.
Unabhängig vom Scheduling oder der Verschachtelung der Ausführung dieser Threads durch die Laufzeitumgebung,
Ohne zusätzliche Synchronisierung auf Seiten des aufrufenden Codes.
Dies hat zur Folge, dass Operationen auf einem thread-sicheren Objekt für alle Threads so erscheinen als ob die Operationen in einer festen, global konsistenten Reihenfolge erfolgen würden.
Die Objekt sind konstant und können nicht geändert werden.
Die Objekte sind veränderbar, unterstützen aber nebenläufigen Zugriff, da die Methoden entsprechend synchronisiert sind.
All solche Objekte bei denen jede einzelne Operation thread-sicher ist, aber bestimmte Sequenzen von Operationen eine externe Synchronisierung erfordern können.
Alle Objekte die keinerlei Synchronisierung aufweisen. Der Aufrufer kann die Synchronisierung jedoch ggf. extern übernehmen.
Objekte, die nicht thread-sicher sind und auch nicht thread-sicher gemacht werden können, da sie zum Beispiel globalen Zustand manipulieren.
Virtueller Puffer
Implementieren Sie einen virtuellen Puffer, der Tasks (Instanzen von java.lang.Runable
) entgegennimmt und nach einer bestimmten Zeit ausführt. Der Puffer darf währenddessen nicht blockieren bzw. gesperrt sein.
Nutzen Sie ggf. virtuelle Threads, um auf ein explizites Puffern zu verzichten. Ein virtueller Thread kann zum Beispiel mit: Thread.ofVirtual()
erzeugt werden. Danach kann an die Methode start
ein Runnable
Objekt übergeben werden.
Verzögern Sie die Ausführung (Thread.sleep()
) im Schnitt um 100ms mit einer Standardabweichung von 20ms. (Nutzen Sie Random.nextGaussian(mean,stddev)
)
Starten Sie 100 000 virtuelle Threads. Wie lange dauert die Ausführung? Wie lange dauert die Ausführung bei 100 000 platform (native) Threads.
Nutzen Sie ggf. die Vorlage.
1import java.util.ArrayList;
2import java.util.List;
3import java.util.Random;
45
public class VirtualBuffer {
67
private final Random random = new Random();
89
private Thread runDelayed(int id, Runnable task) {
10// TODO
11}
1213
public static void main(String[] args) throws Exception {
14var start = System.nanoTime();
15VirtualBuffer buffer = new VirtualBuffer();
16List<Thread> threads = new ArrayList<>();
17for (int i = 0; i < 100000; i++) {
18final var no = i;
19var thread = buffer.runDelayed(
20i,
21() -> System.out.println("i'm no.: " + no));
22threads.add(thread);
23}
24System.out.println("finished starting all threads");
25for (Thread thread : threads) {
26thread.join();
27}
28var runtime = (System.nanoTime() - start)/1_000_000;
29System.out.println(
30"all threads finished after: " + runtime + "ms"
31);
32}
33}
Thread-sichere Programmierung
Implementieren Sie eine Klasse ThreadsafeArray
zum Speichern von nicht-null
Objekten (java.lang.Object
) an ausgewählten Indizes — vergleichbar mit einem normalen Array. Im Vergleich zu einem normalen Array sollen die Aufrufer jedoch ggf. blockiert werden, wenn die Zelle belegt ist. Die Klasse soll folgende Methoden bereitstellen:
get(int index)
:Liefert den Wert an der Position index
zurück. Der aufrufende Thread wird ggf. blockiert, bis ein Wert an der Position index
gespeichert wurde. (Die get
-Methode entfernt den Wert nicht aus dem Array.)
set(int index, Object value)
:Speichert den Wert value an der Position index
. Falls an der Position index
bereits ein Wert gespeichert wurde, wird der aufrufende Thread blockiert, bis der Wert an der Position index
gelöscht wurde.
delete(int index)
:Löscht ggf. den Wert an der Position index
wenn ein Wert vorhanden ist. Andernfalls wird der Thread blockiert, bis es einen Wert gibt, der gelöscht werden kann.
Implementieren Sie die Klasse ThreadsafeArray
nur unter Verwendung der Standardprimitive: synchronized
, wait
, notify
und notifyAll
. Nutzen Sie die Vorlage.
Können Sie sowohl notify
als auch notifyAll
verwenden?
Implementieren Sie die Klasse ThreadsafeArray
unter Verwendung von ReentrantLock
s und Condition
s. Nutzen Sie die Vorlage.
Welche Vorteile hat die Verwendung von ReentrantLock
s?
Sie können sich die Klasse ThreadsafeArray
auch als ein Array von BoundedBuffers mit der Größe 1 vorstellen.
1public class ThreadsafeArray {
23
private final Object[] array;
45
public ThreadsafeArray(int size) {
6this.array = new Object[size];
7}
89
// Methodensignaturen ggf. vervollständigen
10// und Implementierungen ergänzen
11Object get(int index)
12void set(int index, Object value)
13void remove(int index)
1415
public static void main(String[] args) throws Exception {
16final var ARRAY_SIZE = 2;
17final var SLEEP_TIME = 1; // ms
18var array = new ThreadsafeArray(ARRAY_SIZE);
19for (int i = 0; i < ARRAY_SIZE; i++) {
20final var threadId = i;
2122
final var readerThreadName = "Reader";
23var t2 = new Thread(() -> {
24while (true) {
25int j = (int) (Math.random() * ARRAY_SIZE);
26try {
27out.println(readerThreadName + "[" + j + "]" );
28var o = array.get(j);
29out.println(readerThreadName +
30"[" + j + "] ⇒ #" + o.hashCode());
31Thread.sleep(SLEEP_TIME);
32} catch (InterruptedException e) {
33e.printStackTrace();
34}
35}
36}, readerThreadName);
37t2.start();
3839
// One Thread for each slot that will eventually
40// write some content
41final var writerThreadName = "Writer[" + threadId + "]";
42var t1 = new Thread(() -> {
43while (true) {
44try {
45var o = new Object();
46out.println(writerThreadName + " = #" + o.hashCode());
47array.set(threadId, o);
48out.println(writerThreadName + " done");
49Thread.sleep(SLEEP_TIME);
50} catch (InterruptedException e) {
51e.printStackTrace();
52}
53}
54}, writerThreadName);
55t1.start();
5657
// One Thread for each slot that will eventually
58// delete the content
59final var deleterThreadName = "Delete[" + threadId + "]";
60var t3 = new Thread(() -> {
61while (true) {
62try {
63out.println(deleterThreadName);
64array.delete(threadId);
65Thread.sleep(SLEEP_TIME);
66} catch (InterruptedException e) {
67e.printStackTrace();
68}
69}
70}, deleterThreadName);
71t3.start();
72}
73}
74}