Create worker threads using Runnable and Callable, manage the thread life cycle, including automations provided by different Executor services and concurrent API.
Develop thread-safe code, using different locking mechanisms and concurrent API.
Process Java collections concurrently including the use of parallel streams.
Threads allow multiple paths of execution to occur concurrently within a single program. Each thread represents a separate path of execution, allowing different parts of the code to run simultaneously.
You can think of threads like lanes on a highway. Just as multiple lanes allow many cars to drive down the road simultaneously, multiple threads allow different segments of code to execute concurrently within the same application. However, just as cars in different lanes need to coordinate when merging or exiting, threads must coordinate carefully when accessing shared resources to avoid conflicts.
To create a new thread, you can extend the Thread
class or implement the Runnable
interface. When extending Thread
, you override the run()
method to define the code that will execute in the new thread:
public class MyThread extends Thread {
public void run() {
System.out.println("New thread is running");
}
}
To launch the new thread, create an instance of the class and call its start()
method:
MyThread myThread = new MyThread();
myThread.start();
The start()
method initiates a new thread that executes the code defined in run()
. Alternatively, you can create a new thread by implementing Runnable
:
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
And passing an instance to the Thread
constructor:
public class MyRunnable implements Runnable {
public void run() {
System.out.println("New thread is running");
}
}
MyRunnable myRunnable = new MyRunnable();
Thread myThread = new Thread(myRunnable);
myThread.start();
Java distinguishes between daemon and non-daemon threads. Daemon threads are those that do not prevent the JVM from exiting when the program finishes. They run in the background and are typically used for tasks like garbage collection, background cleanup, etc. The JVM will continue running as long as there is at least one active non-daemon thread. Daemon threads are terminated when all non-daemon threads complete. To make a thread a daemon, call its setDaemon(true)
method before starting it:
public class DaemonThreadExample {
public static void main(String[] args) {
Thread daemonThread = new Thread(() -> {
while (true) {
System.out.println("Daemon thread is running");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
daemonThread.setDaemon(true);
daemonThread.start();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Main thread exiting");
}
}
In this example, we create a daemon thread using a lambda expression. The daemon thread runs in an infinite loop, printing a message every second. We set the thread to be a daemon by calling setDaemon(true)
before starting it.
The main thread sleeps for 5 seconds and then continues its execution. When the main thread (which is a non-daemon thread) terminates, the JVM will automatically terminate the daemon thread.
A thread progresses through several states during its life cycle:
┌─────────┐
│ NEW │
└────┬────┘
│
│ start()
│
▼
┌──────────────┐
┌───────▶│ RUNNABLE │◀────────┐
│ └──────┬───────┘ │
│ │ │
│ │ run() │
│ │ completes │
│ ▼ │
│ ┌─────────────┐ │
│ │ TERMINATED │ │
│ └─────────────┘ │
│ │
│ │
│ │
│ ┌──────────────┐ │
│ │ BLOCKED │◀────────┘
│ └───────┬──────┘ │
│ │ │
│ │ Lock │
│ │ acquired │
│ │ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
└───────▶│ WAITING │ │
└───────┬──────┘ │
│ │
│ interrupt() │
│ notify() │
│ notifyAll() │
│ │
▼ │
┌───────────────┐ │
│ TIMED_WAITING │────────┘
└───────────────┘
▲
│
│
sleep()─────┘
wait(long)
join(long)
LockSupport.parkNanos(long)
LockSupport.parkUntil(long)
Here’s a brief explanation of the transitions:
When a thread is created, it starts in the NEW
state.
When the start()
method is called, the thread moves to the RUNNABLE
state, indicating that it’s eligible for execution by the thread scheduler.
If the thread’s run()
method completes normally, the thread transitions to the TERMINATED
state.
If the thread attempts to acquire a lock that is currently held by another thread, it moves to the BLOCKED
state until the lock becomes available.
When the thread is waiting for another thread to perform a specific action (such as waiting for a lock or waiting on a condition), it transitions to the WAITING
state.
If the thread is waiting for a specified amount of time (using methods like sleep(long)
, wait(long)
, join(long)
, etc.), it moves to the TIMED_WAITING
state.
From the WAITING
or TIMED_WAITING
state, the thread can be brought back to the RUNNABLE
state by calling interrupt()
, notify()
, or notifyAll()
on the object it’s waiting on.
The static Thread.sleep(long millis)
method causes the current thread to suspend execution for the specified number of milliseconds:
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Handle interruption
}
To prematurely wake a sleeping or waiting thread, you can call its interrupt()
method. This will throw an InterruptedException
in the target thread, which must be handled:
public class InterruptExample {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try {
System.out.println("Thread is going to sleep");
Thread.sleep(5000);
System.out.println("Thread woke up");
} catch (InterruptedException e) {
System.out.println("Thread was interrupted");
}
});
thread.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
thread.interrupt();
}
}
In this example, we create a thread that goes to sleep for 5 seconds. The main thread sleeps for 2 seconds and then calls interrupt()
on the other thread.
When interrupt()
is called, it sets the interrupted status of the target thread. If the target thread is sleeping or waiting, it will immediately throw an InterruptedException
. The thread can then handle the interruption appropriately.
In the example, the output will be:
Thread is going to sleep
Thread was interrupted
The thread’s sleep is prematurely interrupted after 2 seconds, and it catches the InterruptedException
and prints a message.
When working with threads, it’s important to be aware of potential problems that can arise due to the complex nature of concurrent programming. These issues can lead to unexpected behavior, reduced performance, or even complete program failure.
In the context of multi-threaded programming, problems start to occur when threads get stuck in a state where they cannot proceed, preventing the program from moving forward. Let’s talk about some of the most common problems.
A deadlock occurs when two or more threads are unable to proceed because each thread is waiting for a resource that another thread holds, resulting in a circular dependency. It’s a situation where threads are permanently blocked, waiting for each other to release the resources they need.
Imagine two friends, Anne and Joe, who are each trying to cross a narrow bridge from opposite ends. The bridge is so narrow that only one person can cross at a time. Anne starts walking from one end, and Joe starts walking from the other end. When they meet in the middle, neither can continue forward, and neither can go back because there’s no space to turn around. They’re stuck in a situation where neither can proceed, and neither can retreat. This deadlock situation halts their progress, similar to how a deadlock in Java halts the execution of threads waiting on each other to release resources.
In the context of multi-threaded programming, resources are typically locks or other synchronization mechanisms used to control access to shared data. Deadlocks occur when the following four conditions are simultaneously met:
Mutual Exclusion: At least one resource must be held in a non-sharable mode, meaning only one thread can use the resource at a time.
Hold and Wait: A thread must be holding at least one resource while waiting to acquire additional resources held by other threads.
No Preemption: Resources cannot be forcibly taken away from a thread; they must be released voluntarily by the thread holding them.
Circular Wait: There must be a circular chain of two or more threads, each waiting for a resource held by the next thread in the chain.
Consider this class that illustrates the deadlock analogy:
public class DeadlockExample {
private static Object narrowBridgePart1 = new Object();
private static Object narrowBridgePart2 = new Object();
public static void main(String[] args) {
Thread anne = new Thread(() -> {
synchronized (narrowBridgePart1) {
System.out.println("Anne: Holding art 1 of the bridge...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Anne: Waiting for part 2 of the bridge...");
synchronized (narrowBridgePart2) {
System.out.println("Anne: Holding part 1 and part 2 of the bridge...");
}
}
});
Thread joe = new Thread(() -> {
synchronized (narrowBridgePart2) {
System.out.println("Joe: Holding part 2 of the bridge...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Joe: Waiting for part 1 of the bridge...");
synchronized (narrowBridgePart1) {
System.out.println("Joe: Holding part 1 and part 2 of the bridge...");
}
}
});
anne.start();
joe.start();
}
}
In this example, we have two threads, Anne
and Joe
, and two locks, narrowBridgePart1
and narrowBridgePart2
. The program encounters a deadlock when the following sequence of events occurs:
Anne
acquires narrowBridgePart1
and enters the first synchronized block.
Joe
acquires narrowBridgePart2
and enters the first synchronized block.
Anne
attempts to acquire narrowBridgePart2
in the second synchronized block but is blocked because narrowBridgePart2
is held by Joe
.
Joe
attempts to acquire narrowBridgePart1
in the second synchronized block but is blocked because narrowBridgePart1
is held by Anne
.
At this point, both threads are waiting for each other to release the parts of the bridge they hold, resulting in a deadlock. Anne and Joe are stuck in the middle of the bridge, unable to proceed or retreat, just like the threads in a deadlock situation in Java. The program will hang indefinitely, with no thread being able to proceed.
To avoid deadlocks, it’s important to follow best practices such as:
Acquiring locks in a consistent order: If multiple locks need to be acquired, they should be acquired in the same order by all threads to avoid circular wait conditions.
Timeout mechanisms: Use timeout mechanisms when attempting to acquire locks, so that threads don’t wait indefinitely if they are unable to acquire a lock.
Resource ordering: Assign a numerical order to resources and ensure that threads acquire resources in ascending order to prevent circular wait conditions.
Lock granularity: Use fine-grained locks when possible, locking only the necessary sections of code to reduce the likelihood of contention and deadlocks.
Starvation occurs when a thread is perpetually denied access to a shared resource, preventing it from making progress. In other words, a thread is starved of the resources it needs to complete its task. Starvation can happen when other threads continuously acquire the shared resource, causing the starved thread to wait indefinitely.
Think of a scenario where a group of people is waiting in line to buy tickets for a popular concert. If someone cuts in line repeatedly or if the ticket seller keeps serving only a certain group of people, some individuals may never get a chance to buy tickets. They are essentially starved of the opportunity to make their purchase.
In multi-threaded programs, starvation often arises when threads are assigned different priorities. Java assigns priorities to threads ranging from 1 (lowest) to 10 (highest), with 5 being the default priority. When threads with higher priorities are continuously given preference over threads with lower priorities, the lower-priority threads may suffer from starvation.
Let’s review this class that illustrates the concert ticket analogy:
public class ConcertTicketStarvationExample {
private static Object ticketSeller = new Object();
public static void main(String[] args) {
Thread impatientFan = new Thread(() -> {
while (true) {
synchronized (ticketSeller) {
System.out.println("Impatient Fan: Bought a ticket");
// Simulate buying a ticket
}
}
});
Thread patientFan = new Thread(() -> {
while (true) {
synchronized (ticketSeller) {
System.out.println("Patient Fan: Bought a ticket");
// Simulate buying a ticket
}
}
});
// Impatient fan has higher priority and keeps getting tickets
impatientFan.setPriority(Thread.MAX_PRIORITY);
// Patient fan has lower priority and might get starved
patientFan.setPriority(Thread.MIN_PRIORITY);
impatientFan.start();
patientFan.start();
}
}
In this example, we have two threads, impatientFan
and patientFan
, competing for the same lock object, ticketSeller
. The threads are assigned different priorities: impatientFan
has the maximum priority (10), while patientFan
has the minimum priority (1).
When the program runs, impatientFan
, having a higher priority, is likely to acquire the lock more frequently than patientFan
. As a result, patientFan
may starve, waiting for its turn to access the shared resource. The output of the program might show that impatientFan
acquires the lock repeatedly, while patientFan
doesn’t get a chance to execute as often as impatientFan
.
It’s important to note that thread priorities are not guaranteed to be strictly followed by the Java Virtual Machine (JVM). The JVM’s thread scheduler uses priorities as a hint for making scheduling decisions but may not always adhere to them. Nevertheless, assigning proper priorities to threads can help reduce the risk of starvation.
To mitigate starvation, consider the following approaches:
Fair scheduling: Use fair scheduling mechanisms, such as fair locks or semaphores, which ensure that threads are granted access to shared resources in the order they requested them.
Avoid long-running tasks: Break down long-running tasks into smaller units of work, allowing other threads to have a chance to execute in between.
Adjust thread priorities: Assign appropriate priorities to threads based on their importance and resource requirements. However, be careful when manipulating thread priorities, as it can lead to complex and hard-to-predict behavior.
Timeout mechanisms: Implement timeout mechanisms that allow threads to abandon waiting for a resource if they have been waiting for too long.
Livelock occurs when two or more threads are actively responding to each other’s actions but are unable to make progress. Unlike deadlock, where threads are stuck in a waiting state, threads in a livelock are constantly changing their state in response to the actions of other threads. However, despite the continuous activity, no real progress is made towards completing the intended task.
Imagine a scenario where a husband and wife are sitting at a table with only one spoon to share for their meal. Both are extremely polite and insist that the other should eat first. The husband, holding the spoon, offers it to the wife, but she refuses and insists that he eats first. This back-and-forth continues indefinitely, with neither of them ever eating because they keep offering the spoon to each other.
In the context of multi-threaded programming, livelock often occurs when threads are repeatedly yielding to each other without making any meaningful progress. Livelock can also happen when threads keep retrying an operation that persistently fails due to the actions of other threads.
Consider this program:
public class LivelockExample {
static class Spoon {
private Diner owner;
public Spoon(Diner d) {
owner = d;
}
public Diner getOwner() {
return owner;
}
public synchronized void setOwner(Diner d) {
owner = d;
}
public synchronized void use() {
System.out.println(owner.name + " is eating.");
}
}
static class Diner {
private String name;
private boolean isHungry;
public Diner(String n) {
name = n;
isHungry = true;
}
public String getName() {
return name;
}
public boolean isHungry() {
return isHungry;
}
public void eatWith(Spoon spoon, Diner spouse) {
while (isHungry) {
if (spoon.getOwner() != this) {
try {
Thread.sleep(1); // wait for the spoon to be free
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
if (spouse.isHungry()) {
System.out.println(name + ": " + spouse.getName() + " you eat first.");
spoon.setOwner(spouse);
continue;
}
spoon.use();
isHungry = false;
System.out.println(name + ": I am done eating.");
spoon.setOwner(spouse);
}
}
}
public static void main(String[] args) {
Diner husband = new Diner("Husband");
Diner wife = new Diner("Wife");
Spoon spoon = new Spoon(husband);
new Thread(() -> husband.eatWith(spoon, wife)).start();
new Thread(() -> wife.eatWith(spoon, husband)).start();
}
}
This program is kind of complex, so let me walk you through it step by step.
First, we have a Spoon
class:
static class Spoon {
private Diner owner;
public Spoon(Diner d) {
owner = d;
}
public Diner getOwner() {
return owner;
}
public synchronized void setOwner(Diner d) {
owner = d;
}
public synchronized void use() {
System.out.println(owner.name + " is eating.");
}
}
Think of the spoon as a shared resource. This class keeps track of who currently has the spoon. It has a few methods:
A constructor to set the initial owner of the spoon.
getOwner()
to find out who currently has the spoon.
setOwner(Diner d)
to change the owner of the spoon.
use()
to simulate the action of using the spoon to eat, which just prints a message.
Next, we have the Diner
class:
static class Diner {
private String name;
private boolean isHungry;
public Diner(String n) {
name = n;
isHungry = true;
}
public String getName() {
return name;
}
public boolean isHungry() {
return isHungry;
}
public void eatWith(Spoon spoon, Diner spouse) {
while (isHungry) {
if (spoon.getOwner() != this) {
try {
Thread.sleep(1); // wait for the spoon to be free
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
if (spouse.isHungry()) {
System.out.println(name + ": " + spouse.getName() + " you eat first.");
spoon.setOwner(spouse);
continue;
}
spoon.use();
isHungry = false;
System.out.println(name + ": I am done eating.");
spoon.setOwner(spouse);
}
}
}
It represents each person who wants to eat. Each diner has a name and a flag to indicate if they are hungry. The key part of this class is the eatWith
method, which is where the livelock happens. This method does the following:
It checks if the diner owns the spoon.
If they don’t, they wait a little and check again.
If they do own the spoon, they check if their spouse is hungry.
If the spouse is hungry, they offer the spoon to the spouse and wait.
If the spouse is not hungry, they use the spoon to eat and then stop being hungry.
In the main
method, we create two Diner
objects: husband and wife. We also create one Spoon
object and give it to the husband initially. Then we start two threads, one for each diner. Each thread runs the eatWith
method for their respective diner, trying to use the spoon:
public static void main(String[] args) {
Diner husband = new Diner("Husband");
Diner wife = new Diner("Wife");
Spoon spoon = new Spoon(husband);
new Thread(() -> husband.eatWith(spoon, wife)).start();
new Thread(() -> wife.eatWith(spoon, husband)).start();
}
When the program runs, both the husband and wife are trying to eat using the spoon. Here’s what happens step by step:
The husband starts with the spoon.
The husband checks if the wife is hungry (she is), so he offers the spoon to her.
The wife now has the spoon. She checks if the husband is hungry (he is), so she offers the spoon back to him.
This process repeats endlessly, with both the husband and wife constantly offering the spoon to each other without either of them ever eating.
This continuous back-and-forth without making any progress is a livelock. Both threads (the husband and wife) are active and continuously changing their state, but they are not able to proceed with eating because they keep deferring to each other.
To resolve livelocks, consider the following approaches:
Randomized backoff: Introduce randomness in the yielding mechanism. Instead of immediately yielding, threads can wait for a random amount of time before retrying. This reduces the likelihood of threads continuously yielding to each other in a synchronized manner.
Resource ordering: Assign a specific order to the resources or conditions that threads are waiting for. Ensure that threads acquire resources or check conditions in a consistent order to avoid circular dependencies.
Timeout mechanisms: Implement timeout mechanisms that allow threads to abandon waiting and take alternative actions if they have been waiting for too long. This prevents threads from indefinitely yielding to each other.
Locking strategies: Use appropriate locking strategies, such as read-write locks or fine-grained locks, to minimize contention and reduce the chances of livelock.
Race conditions occur when multiple threads access shared data concurrently, and the final outcome depends on the relative timing of their executions. In other words, the behavior of the program becomes unpredictable and inconsistent because the threads race each other to perform operations on the shared data. Race conditions can lead to incorrect results, data corruption, and unexpected program behavior.
Imagine a scenario where two people, Anne and Joe, have a joint bank account. They both independently decide to withdraw money from an ATM at the same time. Suppose the account initially has a balance of $100. Anne tries to withdraw $50, while Joe tries to withdraw $70. If the ATM processes their requests concurrently without proper synchronization, the outcomes become unpredictable. The final balance could be $50, $30, or even negative $20, depending on the order in which the withdrawals are processed.
In multi-threaded programs, race conditions typically arise when multiple threads access shared variables or resources without appropriate synchronization mechanisms. The threads may read and write the shared data simultaneously, leading to inconsistent or unexpected results.
The following class simulates the race condition described in the analogy:
public class BankAccount {
private int balance;
public BankAccount(int initialBalance) {
this.balance = initialBalance;
}
public void withdraw(String name, int amount) {
if (balance >= amount) {
System.out.println(name + " is going to withdraw " + amount);
try {
// Simulate the time taken to process withdrawal
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
balance -= amount;
System.out.println(name + " completed the withdrawal of " + amount);
} else {
System.out.println(name + " tried to withdraw " + amount + " but insufficient balance.");
}
System.out.println("Current balance: " + balance);
}
public static void main(String[] args) {
BankAccount account = new BankAccount(100);
Runnable anneWithdrawal = () -> {
account.withdraw("Anne", 50);
};
Runnable joeWithdrawal = () -> {
account.withdraw("Joe", 70);
};
Thread anneThread = new Thread(anneWithdrawal);
Thread joeThread = new Thread(joeWithdrawal);
anneThread.start();
joeThread.start();
try {
anneThread.join();
joeThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Final balance: " + account.balance);
}
}
The class uses threads to represent Anne and Joe withdrawing money from a shared bank account:
The BankAccount
class has a balance that both Anne and Joe will try to withdraw from.
The withdraw
method checks if there is enough balance, simulates the processing time with Thread.sleep(100)
, and then deducts the amount from the balance.
The main
method creates a BankAccount
instance with an initial balance of $100.
It defines two Runnable
tasks for Anne and Joe, each trying to withdraw money.
Two threads are created and started to simulate concurrent withdrawals.
The join
method ensures the main thread waits for both withdrawal operations to complete before printing the final balance.
Running this code multiple times can produce different final balances, illustrating the race condition caused by unsynchronized access to the shared balance
variable.
To prevent race conditions, it’s essential to use synchronization mechanisms that ensure exclusive access to shared resources. Some common techniques include:
Locks: Use lock objects, such as ReentrantLock
(from java.util.concurrent.locks
) or synchronized
blocks, to ensure that only one thread can access the shared resource at a time.
Atomic variables: Use atomic variables, such as AtomicInteger
, which provide thread-safe operations for reading and writing shared variables.
Concurrent data structures: Utilize thread-safe data structures from the java.util.concurrent
package, such as ConcurrentHashMap
or CopyOnWriteArrayList
, which are designed to handle concurrent access.
Synchronization primitives: Employ synchronization primitives like semaphores, barriers, or latches to coordinate thread execution and access to shared resources.
It’s important to note that while synchronization is necessary to prevent race conditions, excessive synchronization can lead to performance overhead and potential liveness issues like deadlocks. Therefore, it’s important to strike a balance and synchronize only when necessary, using granular locks and minimizing the scope of synchronized regions.
In general, identifying and resolving threading problems requires careful analysis and understanding of the program’s behavior. By being aware of issues like deadlocks, starvation, livelocks, and race conditions, you can design and implement thread-safe code in concurrent programs.
In the next section, we’ll explore techniques for synchronizing access to shared resources and coordinating thread execution to prevent these common problems.
When developing multi-threaded applications, it’s important to ensure that the code is thread-safe.
Thread-safety is the property of a program or a piece of code that guarantees its correct execution in a multi-threaded environment. A thread-safe code ensures that the shared data remains consistent and the program produces the expected output, regardless of the interleaving or timing of thread execution.
To achieve thread-safety, we need to address two main concerns:
Data Visibility: Ensuring that changes made by one thread are visible to other threads.
Data Consistency: Maintaining the integrity and correctness of shared data when multiple threads access and modify it concurrently.
Java provides several mechanisms to tackle these concerns and facilitate thread-safe programming:
┌───────────────────────────────────────────────────────────┐
│ Thread-Safety Mechanisms │
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ volatile │ │ Atomic │ │ synchronized │ │
│ │ │ │ Classes │ │ │ │
│ │ Visibility │ │ Atomicity of │ │ Exclusivity │ │
│ │ guarantee │ │ operations │ │ of execution │ │
│ └─────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ ┌─────────────┐ ┌───────────────┐ ┌─────────────┐ │
│ │ Lock │ │ Cyclic │ │ Concurrent │ │
│ │ Interface │ │ Barrier │ │ Collections │ │
│ │ │ │ │ │ │ │
│ │ Fine-grained│ │Synchronization│ │ Thread-safe │ │
│ │ control │ │ point │ │ data struct │ │
│ └─────────────┘ └───────────────┘ └─────────────┘ │
│ │
└───────────────────────────────────────────────────────────┘
Let’s explore them in more detail.
volatile
The volatile
keyword in Java is used to indicate that a variable may be modified by multiple threads concurrently. When a variable is declared as volatile
, it guarantees that any write to that variable will be immediately visible to other threads, and any subsequent read will always see the most up-to-date value.
Here’s an example:
public class VolatileExample {
private static volatile boolean flag = false;
public static void main(String[] args) {
Thread thread1 = new Thread(() -> {
while (!flag) {
// Wait for the flag to become true
}
System.out.println("Thread 1 finished");
});
Thread thread2 = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
System.out.println("Thread 2 set the flag");
});
thread1.start();
thread2.start();
}
}
In this example, we have a volatile
variable named flag
. thread1
continuously checks the value of flag
and waits for it to become true
. thread2
sleeps for a second and then sets flag
to true
.
By declaring flag
as volatile
, we ensure that when thread2
modifies its value, the change is immediately visible to thread1
. This guarantees that thread1
will see the updated value and exit the waiting loop.
However, it’s important to note that volatile
only ensures visibility and does not provide atomicity or mutual exclusion. If multiple threads perform compound operations (such as read-modify-write) on a volatile
variable concurrently, it can still lead to race conditions. In such cases, additional synchronization mechanisms are required.
Java provides a set of atomic classes in the java.util.concurrent.atomic
package that offer thread-safe operations on single variables. These classes ensure that the operations performed on the variables are atomic, meaning they are executed as a single, indivisible unit of work.
Some commonly used atomic classes are:
AtomicBoolean
: Provides atomic operations on a boolean value.
AtomicInteger
: Provides atomic operations on an integer value.
AtomicLong
: Provides atomic operations on a long value.
AtomicReference<V>
: Provides atomic operations on an object reference of type V
.
Here’s an example using AtomicInteger
:
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicExample {
private static AtomicInteger count = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
count.incrementAndGet();
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
count.incrementAndGet();
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println("Final count: " + count.get());
}
}
In this example, we have two threads that increment the count
variable 1000 times each. The count
variable is an instance of AtomicInteger
, which provides thread-safe operations for incrementing and retrieving its value.
By using AtomicInteger
, we ensure that the increment operation is performed atomically, avoiding race conditions. The incrementAndGet()
method atomically increments the value and returns the updated value. The get()
method retrieves the current value of the AtomicInteger
.
Atomic classes provide various methods for performing thread-safe operations on variables. Some common methods include:
get()
: Returns the current value.
set(type newValue)
: Sets the value to newValue
.
getAndSet(type newValue)
: Sets the value to newValue
and returns the previous value.
incrementAndGet()
: Atomically increments the value by one and returns the updated value.
getAndIncrement()
: Atomically increments the value by one and returns the previous value.
decrementAndGet()
: Atomically decrements the value by one and returns the updated value.
getAndDecrement()
: Atomically decrements the value by one and returns the previous value.
Here’s an example that demonstrates the usage of these methods:
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicMethodsExample {
private static AtomicInteger value = new AtomicInteger(0);
public static void main(String[] args) {
// get(): Returns the current value
int currentValue = value.get();
System.out.println("Current value: " + currentValue);
// set(type newValue): Sets the value to newValue
value.set(10);
System.out.println("Value after set(10): " + value.get());
// getAndSet(type newValue): Sets the value to newValue and returns the previous value
int previousValue = value.getAndSet(20);
System.out.println("Previous value: " + previousValue);
System.out.println("Value after getAndSet(20): " + value.get());
// incrementAndGet(): Atomically increments the value by one and returns the updated value
int incrementedValue = value.incrementAndGet();
System.out.println("Value after incrementAndGet(): " + incrementedValue);
// getAndIncrement(): Atomically increments the value by one and returns the previous value
previousValue = value.getAndIncrement();
System.out.println("Previous value: " + previousValue);
System.out.println("Value after getAndIncrement(): " + value.get());
// decrementAndGet(): Atomically decrements the value by one and returns the updated value
int decrementedValue = value.decrementAndGet();
System.out.println("Value after decrementAndGet(): " + decrementedValue);
// getAndDecrement(): Atomically decrements the value by one and returns the previous value
previousValue = value.getAndDecrement();
System.out.println("Previous value: " + previousValue);
System.out.println("Value after getAndDecrement(): " + value.get());
}
}
This is the output of the program:
Current value: 0
Value after set(10): 10
Previous value: 10
Value after getAndSet(20): 20
Value after incrementAndGet(): 21
Previous value: 21
Value after getAndIncrement(): 22
Value after decrementAndGet(): 21
Previous value: 21
Value after getAndDecrement(): 20
This example demonstrates the usage of the various methods provided by the AtomicInteger
class:
get()
: Retrieves the current value of the AtomicInteger
using get()
and print it.
set(type newValue)
: Sets the value of the AtomicInteger
to 10 using set(10)
and then print the updated value.
getAndSet(type newValue)
: Sets the value of the AtomicInteger
to 20 using getAndSet(20)
. This method returns the previous value, which we store in the previousValue
variable and print. We also print the updated value after the operation.
incrementAndGet()
: Atomically increments the value of the AtomicInteger
by one using incrementAndGet()
. This method returns the updated value after the increment, which we store in the incrementedValue
variable and print.
getAndIncrement()
: Atomically increments the value of the AtomicInteger
by one using getAndIncrement()
. This method returns the previous value before the increment, which we store in the previousValue
variable and print. We also print the updated value after the operation.
decrementAndGet()
: Atomically decrements the value of the AtomicInteger
by one using decrementAndGet()
. This method returns the updated value after the decrement, which we store in the decrementedValue
variable and print.
getAndDecrement()
: Atomically decrements the value of the AtomicInteger
by one using getAndDecrement()
. This method returns the previous value before the decrement, which we store in the previousValue
variable and print. We also print the updated value after the operation.
You can use similar methods for other atomic classes like AtomicLong
, AtomicBoolean
, etc., depending on the type of variable you need to work with.
In Java, the synchronized
keyword is used to achieve mutual exclusion and synchronize access to shared resources. When a block of code is marked as synchronized
, only one thread can execute that block at a time, while other threads attempting to enter the synchronized
block will be blocked until the lock is released.
The general syntax for using a synchronized
block is as follows:
synchronized (lockObject) {
// Code block that requires synchronization
}
Here, lockObject
is an object that serves as the lock. The thread that enters the synchronized
block must acquire the lock on lockObject
before executing the code inside the block. Once the thread exits the synchronized
block, it automatically releases the lock, allowing other threads to acquire it and enter the block.
Here’s an example that demonstrates the usage of a synchronized
block:
public class SynchronizedExample {
private static int count = 0;
private static Object lock = new Object();
public static void increment() {
synchronized (lock) {
count++;
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
increment();
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
increment();
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println("Final count: " + count);
}
}
In this example, we have a shared variable count
that needs to be incremented by multiple threads. To ensure thread-safety, we use a synchronized
block inside the increment()
method. The lock
object serves as the lock for synchronization.
When a thread enters the increment()
method, it acquires the lock on lock
before entering the synchronized
block. Once inside the block, the thread increments the count
variable. After exiting the block, the lock is automatically released, allowing other threads to acquire it and enter the block.
By synchronizing access to the count
variable using a synchronized
block, we ensure that only one thread can increment the variable at a time, preventing race conditions and maintaining data consistency.
In addition to using synchronized
blocks, Java allows you to synchronize entire methods using the synchronized
keyword. The lock associated with the method depends on whether the method is an instance method or a static method.
For instance methods, the lock is associated with the object on which the method is invoked. Each instance of the class has its own lock, so multiple threads can simultaneously execute synchronized instance methods on different instances of the class.
On the other hand, for static methods, the lock is associated with the class itself, rather than any specific instance. Since there is only one class object per JVM, only one thread can execute a synchronized static method in the class at a time, regardless of the number of instances of that class.
Here’s an example of synchronizing a method:
public class SynchronizedMethodExample {
private static int count = 0;
public static synchronized void increment() {
count++;
}
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
increment();
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
increment();
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println("Final count: " + count);
}
}
In this example, the increment()
method is declared as synchronized
. When a thread invokes the increment()
method, it automatically acquires the lock associated with the object on which the method is called (in this case, the class itself since the method is static).
Only one thread can execute the increment()
method at a time, while other threads attempting to invoke the method will be blocked until the lock is released. This ensures that the count
variable is incremented atomically and avoids race conditions.
It’s important to note that synchronizing static methods can potentially lead to reduced concurrency since there is only one lock associated with the entire class. If multiple threads need to access different shared resources within the class, synchronizing at the method level may be too coarse-grained, and using synchronized
blocks or more fine-grained locking mechanisms might be more appropriate.
Synchronizing methods provides a cleaner and more concise way of achieving thread-safety compared to using synchronized
blocks. However, it’s important to note that synchronizing an entire method can potentially lead to reduced concurrency if the method contains code that doesn’t require synchronization.
In general, it’s recommended to synchronize only the critical sections of code that access shared resources, using synchronized
blocks or methods judiciously to strike a balance between thread-safety and performance.
Lock
InterfaceJava provides the Lock
interface in the java.util.concurrent.locks
package as an alternative to the synchronized
keyword. The Lock
interface offers more flexibility and control over lock acquisition and release compared to the implicit locking mechanism of synchronized
.
The main methods provided by the Lock
interface are:
void lock()
: Acquires the lock, blocking until the lock is available.
void unlock()
: Releases the lock. Always call unlock()
in a finally
block to ensure proper lock release.
boolean tryLock()
: Attempts to acquire the lock without blocking. Returns true
if the lock is acquired, false
otherwise.
boolean tryLock(long time, TimeUnit unit)
: Attempts to acquire the lock while blocking for a specified amount of time. Returns true
if the lock is acquired within the specified time, false
otherwise.
Condition newCondition()
: Creates a new Condition
instance associated with the lock for coordinating thread execution based on conditions.
The java.util.concurrent.locks
package provides a few implementations of the Lock
interface, including:
ReentrantLock
: The most commonly used implementation, providing the same basic functionality as synchronized
but with additional features such as fairness control and lock status queries.
ReentrantReadWriteLock.ReadLock
and ReentrantReadWriteLock.WriteLock
: Provide a pair of associated locks for read and write access. Multiple threads can acquire the read lock simultaneously, while only one thread can acquire the write lock at a time.
To use a Lock
, follow these steps:
Create an instance of the desired Lock
implementation.
Acquire the lock using lock()
, tryLock()
, or tryLock(long time, TimeUnit unit)
.
Perform the critical section operations while holding the lock.
Release the lock using unlock()
in a finally
block.
Here’s an example of using a ReentrantLock
:
Lock lock = new ReentrantLock();
try {
lock.lock();
// Critical section
} finally {
lock.unlock();
}
The Lock
interface provides additional features compared to synchronized
, such as:
tryLock()
:Lock lock = new ReentrantLock();
if (lock.tryLock()) {
try {
// Critical section
} finally {
lock.unlock();
}
} else {
// Lock not acquired, perform alternative actions
}
tryLock(long time, TimeUnit unit)
:Lock lock = new ReentrantLock();
try {
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
// Critical section
} finally {
lock.unlock();
}
} else {
// Lock not acquired within the specified time
}
} catch (InterruptedException e) {
// Handle interruption
}
Lock lock = new ReentrantLock(true); // Creating a fair lock
try {
lock.lock();
// Critical section
} finally {
lock.unlock();
}
The ReentrantLock
class is the most common implementation of the Lock
interface. It provides explicit lock acquisition and release, exception handling for incorrect lock usage, and lock reentrancy.
Here’s an example comparing synchronized
and ReentrantLock
:
// Using synchronized
synchronized (lock) {
// Critical section
}
// Using ReentrantLock
Lock lock = new ReentrantLock();
try {
lock.lock();
// Critical section
} finally {
lock.unlock();
}
The ReentrantReadWriteLock.ReadLock
and ReentrantReadWriteLock.WriteLock
classes provide a way to handle concurrent read and write access to a shared resource. Here’s a simplified example:
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockExample {
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();
private int sharedResource = 0;
public void write(int value) {
writeLock.lock();
try {
sharedResource = value;
System.out.println("Written: " + value);
} finally {
writeLock.unlock();
}
}
public void read() {
readLock.lock();
try {
System.out.println("Read: " + sharedResource);
} finally {
readLock.unlock();
}
}
public static void main(String[] args) {
ReadWriteLockExample example = new ReadWriteLockExample();
Thread writer = new Thread(() -> {
example.write(42);
});
Thread reader = new Thread(() -> {
example.read();
});
writer.start();
reader.start();
try {
writer.join();
reader.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
As you can see, the Lock
interface provides more advanced features and control compared to the synchronized
keyword, allowing for fine-grained locking, non-blocking lock attempts, and fairness control. However, it also requires explicit management of lock acquisition and release, which can be error-prone if not handled properly.
CyclicBarrier
ClassIn concurrent programming, there are scenarios where multiple threads need to work together and synchronize their progress at certain points. The java.util.concurrent.CyclicBarrier
class provides a synchronization aid that allows a set of threads to wait for each other to reach a common barrier point before proceeding further.
The CyclicBarrier
class is designed to facilitate coordination between a fixed number of threads. It is particularly useful when you have a group of threads that need to perform tasks in parallel and then wait for each other to finish before moving on to the next stage.
Here’s how the CyclicBarrier
works:
When creating a CyclicBarrier
, you specify the number of threads that need to reach the barrier before they can all proceed.
Each thread performs its task and then calls the await()
method on the CyclicBarrier
to indicate that it has reached the barrier.
The thread calling await()
is blocked until all the specified number of threads have reached the barrier.
Once all threads have reached the barrier, the barrier is released, and all threads can proceed.
If desired, you can specify a barrier action, which is a Runnable
task that is executed by one of the threads after all threads have reached the barrier but before they are released.
Here’s a simple example that demonstrates the usage of CyclicBarrier
:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
private static final int NUM_THREADS = 3;
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(NUM_THREADS, () -> {
System.out.println("All threads reached the barrier");
});
for (int i = 0; i < NUM_THREADS; i++) {
int threadId = i;
new Thread(() -> {
try {
System.out.println("Thread " + threadId + " is performing task");
Thread.sleep(1000); // Simulating task execution
System.out.println("Thread " + threadId + " reached the barrier");
barrier.await();
System.out.println("Thread " + threadId + " continued after the barrier");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
In this example, we create a CyclicBarrier
with a count of NUM_THREADS
(3 in this case). We also specify a barrier action that will be executed once all threads have reached the barrier.
We then start three threads, each performing a task (simulated by sleeping for a short duration). After completing its task, each thread calls await()
on the barrier to indicate that it has reached the synchronization point.
The output of the program will be similar to the following:
Thread 0 is performing task
Thread 1 is performing task
Thread 2 is performing task
Thread 0 reached the barrier
Thread 1 reached the barrier
Thread 2 reached the barrier
All threads reached the barrier
Thread 0 continued after the barrier
Thread 1 continued after the barrier
Thread 2 continued after the barrier
As you can see, all threads perform their tasks concurrently. Once all threads have reached the barrier, the barrier action is executed, and then all threads proceed further.
The CyclicBarrier
is called cyclic because it can be reused after all threads have passed the barrier. You can call await()
again on the same barrier object, and it will wait for the specified number of threads to reach the barrier again.
It’s important to note that if any thread leaves the barrier prematurely by interrupting itself or throwing an exception, all other threads waiting on the barrier will receive a BrokenBarrierException
. In such cases, you need to handle the exception appropriately and decide whether to continue or terminate the execution.
Java provides a powerful and flexible Concurrency API in the java.util.concurrent
package, which offers a wide range of classes and interfaces for managing concurrent operations. This API simplifies the development of concurrent applications by providing high-level abstractions and utilities for managing threads, coordinating tasks, and synchronizing access to shared resources.
The Concurrency API was introduced in Java 5 and has been continuously enhanced in subsequent versions. It includes several key components, such as:
Executors: The Executor
and ExecutorService
interfaces provide a way to manage the execution of tasks in a thread pool, allowing you to focus on defining the tasks rather than managing the threads directly.
Concurrent Collections: The java.util.concurrent
package offers thread-safe collections, such as ConcurrentHashMap
, CopyOnWriteArrayList
, and BlockingQueue
, which provide better performance and scalability compared to using synchronized collections.
Synchronizers: Classes like CountDownLatch
, CyclicBarrier
, Semaphore
, and Phaser
help coordinate the actions of multiple threads, allowing them to wait for each other or control access to shared resources.
Locks: The Lock
interface and its implementations, provide more advanced locking mechanisms compared to the synchronized
keyword.
Atomic Variables: The java.util.concurrent.atomic
package provides atomic variables, such as AtomicInteger
and AtomicReference
, which offer thread-safe operations on single variables without the need for explicit synchronization.
These components work together to provide a comprehensive framework for building concurrent and parallel applications in Java.
In previous sections, we have covered atomic variables, locks, and CyclicBarrier
. In this section, we are going to focus on executors.
ExecutorService
InterfaceThe ExecutorService
interface is a central part of the Concurrency API and extends the Executor
interface. It provides methods for submitting tasks for execution and managing the lifecycle of the underlying thread pool.
A thread pool is a collection of pre-created and reusable threads that are ready to perform tasks. It acts as a pool of worker threads that can be used to execute tasks concurrently.
Imagine you have a big task that needs to be done, like painting a house. You could do it all by yourself, but it would take a long time. Instead, you decide to hire a group of workers to help paint the house. These workers are like a thread pool. When you have a task that needs to be executed, such as painting a room, you assign it to one of the workers in the pool. The worker takes the task, performs it, and when finished, returns to the pool, ready to take on another task.
The advantage of using a thread pool is that you don’t have to create a new worker (thread) every time you have a task to execute. Creating a new thread for each task can be expensive in terms of time and resources. Instead, you have a pre-created pool of workers (threads) that are ready to take on tasks as they come in. The thread pool manages the lifecycle of the threads, meaning it creates the threads when the pool is initialized and destroys them when the pool is shut down. It also handles the allocation of tasks to the available threads in the pool.
Here’s an example of creating an ExecutorService
using the Executors
factory class:
ExecutorService executorService = Executors.newFixedThreadPool(5);
In this case, we create a fixed thread pool with 5 threads using the Executors.newFixedThreadPool()
method.
The primary methods of the ExecutorService
interface include:
void execute(Runnable command)
: Submits a Runnable
task for execution without returning a result.
<T> Future<T> submit(Callable<T> task)
: Submits a Callable
task for execution and returns a Future
representing the pending result of the task.
<T> Future<T> submit(Runnable task, T result)
: Submits a Runnable
task for execution and returns a Future
representing the given result upon completion.
Future<?> submit(Runnable task)
: Submits a Runnable
task for execution and returns a Future
representing the task’s completion.
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
: Submits a collection of Callable
tasks for execution and returns a list of Future
objects representing the results of each task.
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
: Submits a collection of Callable
tasks for execution and returns the result of one of the successfully completed tasks.
The ExecutorService
interface also provides methods for managing the lifecycle of the thread pool:
void shutdown()
: Initiates an orderly shutdown of the ExecutorService
, in which previously submitted tasks are executed, but no new tasks are accepted. This method does not wait for the running tasks to complete.
List<Runnable> shutdownNow()
: Attempts to stop all actively executing tasks and halts the processing of waiting tasks. It returns a list of the tasks that were awaiting execution.
boolean isShutdown()
: Returns true
if the ExecutorService
has been shut down, either by calling shutdown()
or shutdownNow()
.
boolean isTerminated()
: Returns true
if all tasks have completed following a shutdown request.
It’s important to properly shut down an ExecutorService
when it’s no longer needed to allow graceful termination of the threads and to release any resources held by the thread pool.
Here’s an example showing the use of these methods:
ExecutorService executorService = Executors.newFixedThreadPool(5);
// Submit tasks for execution
executorService.execute(() -> {
System.out.println("Task 1 executed by " + Thread.currentThread().getName());
});
executorService.execute(() -> {
System.out.println("Task 2 executed by " + Thread.currentThread().getName());
});
// Initiate orderly shutdown
executorService.shutdown();
// Check if the ExecutorService has been shut down
boolean isShutdown = executorService.isShutdown();
System.out.println("ExecutorService is shut down: " + isShutdown);
// Wait for all tasks to complete and check if the ExecutorService has terminated
try {
boolean isTerminated = executorService.awaitTermination(1, TimeUnit.MINUTES);
System.out.println("ExecutorService is terminated: " + isTerminated);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
In this example, we create an ExecutorService
, submit tasks for execution using the execute()
method, initiate an orderly shutdown using shutdown()
, and check the status of the ExecutorService
using isShutdown()
and isTerminated()
methods.
The ExecutorService
interface provides several methods for submitting tasks for execution:
void execute(Runnable command)
: Submits a Runnable
task for execution without returning a result. The execute()
method is inherited from the Executor
interface.
<T> Future<T> submit(Callable<T> task)
: Submits a Callable
task for execution and returns a Future
representing the pending result of the task. The Future
allows you to retrieve the result once the task completes.
<T> Future<T> submit(Runnable task, T result)
: Submits a Runnable
task for execution and returns a Future
representing the given result upon completion. This is useful when you want to return a specific result from a Runnable
task.
Future<?> submit(Runnable task)
: Submits a Runnable
task for execution and returns a Future
representing the task’s completion. The Future
’s get()
method will return null
upon completion.
In addition to submitting individual tasks, the ExecutorService
interface also provides methods for submitting multiple tasks at once:
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
: Submits a collection of Callable
tasks for execution and returns a list of Future
objects representing the results of each task. This method blocks until all tasks have completed.
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
: Submits a collection of Callable
tasks for execution and returns the result of one of the completed tasks. This method blocks until at least one task has completed successfully.
These methods allow you to submit multiple tasks concurrently and retrieve their results using the Future
interface.
Consider the following example:
ExecutorService executorService = Executors.newFixedThreadPool(5);
// Submit a Runnable task using execute()
executorService.execute(() -> {
System.out.println("Task executed by " + Thread.currentThread().getName());
});
// Submit a Callable task using submit()
Future<String> future = executorService.submit(() -> {
// Perform some computation
return "Result of the task";
});
try {
// Submit multiple Callable tasks using invokeAll()
List<Callable<Integer>> tasks = Arrays.asList(
() -> 1,
() -> 2,
() -> 3
);
List<Future<Integer>> futures = executorService.invokeAll(tasks);
// Submit multiple Callable tasks using invokeAny()
Integer result = executorService.invokeAny(tasks);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
executorService.shutdown();
This example demonstrates submitting tasks using execute()
for a Runnable
task, submit()
for a Callable
task, invokeAll()
for submitting multiple Callable
tasks and retrieving their results as a list of Future
objects, and invokeAny()
for submitting multiple Callable
tasks and retrieving the result of one of the completed tasks.
When submitting tasks using the submit()
or invokeAll()
/invokeAny()
methods, you receive Future
objects representing the pending results of the tasks. The Future
interface provides methods to check the status of a task and retrieve its result:
boolean isDone()
: Returns true
if the task has completed, either normally or through an exception.
boolean isCancelled()
: Returns true
if the task was cancelled before it completed normally.
boolean cancel(boolean mayInterruptIfRunning)
: Attempts to cancel the execution of the task. If the task has already completed or been cancelled, this method has no effect.
V get()
: Waits if necessary for the task to complete and retrieves its result. If the task throws an exception, it is wrapped in an ExecutionException
.
V get(long timeout, TimeUnit unit)
: Waits if necessary for at most the given time for the task to complete and retrieves its result. If the timeout expires before the task completes, a TimeoutException
is thrown.
These methods allow you to synchronize the main thread with the completion of the submitted tasks and retrieve their results when needed.
Consider the following example:
ExecutorService executorService = Executors.newSingleThreadExecutor();
// Submit a Callable task using submit()
Future<String> future = executorService.submit(() -> {
// Simulate a long-running task
Thread.sleep(2000);
return "Result of the task";
});
// Check if the task is done
boolean isDone = future.isDone();
System.out.println("Task is done: " + isDone);
// Cancel the task
boolean isCancelled = future.cancel(true);
System.out.println("Task is cancelled: " + isCancelled);
// Retrieve the result of the task
String result = null;
try {
result = future.get(1, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
System.out.println("Result: " + result);
executorService.shutdown();
In this example, we submit a Callable
task using submit()
, check if the task is done using isDone()
, attempt to cancel the task using cancel()
, and retrieve the task’s result using get()
with a timeout. If the task completes within the specified timeout, the result is obtained. Otherwise, a TimeoutException
is thrown.
Callable
InterfaceAs you have seen from the previous examples, the Callable
interface is similar to the Runnable
interface but with a few key differences. While Runnable
represents a task that can be executed concurrently, Callable
represents a task that returns a result and that may throw an exception.
Here’s the declaration of the Callable
interface:
public interface Callable<V> {
V call() throws Exception;
}
The Callable
interface has a single method, call()
, which returns a value of type V
and may throw an exception. This is in contrast to the Runnable
interface, which has a void run()
method that does not return a value or throw checked exceptions.
The main differences between Callable
and Runnable
are:
Return Value: Callable
tasks can return a result, whereas Runnable
tasks cannot. The call()
method of Callable
returns a value of the specified type V
, while the run()
method of Runnable
is void
and does not return a value.
Exception Handling: Callable
tasks can throw checked exceptions, whereas Runnable
tasks cannot. The call()
method of Callable
declares that it may throw an Exception
, while the run()
method of Runnable
does not declare any checked exceptions.
Here’s an example that demonstrates the usage of Callable
:
ExecutorService executorService = Executors.newSingleThreadExecutor();
// Create a Callable task
Callable<Integer> task = () -> {
// Perform some computation
int result = 0;
for (int i = 1; i <= 10; i++) {
result += i;
}
return result;
};
// Submit the Callable task to the ExecutorService
Future<Integer> future = executorService.submit(task);
// Retrieve the result of the task
try {
Integer result = future.get();
System.out.println("Result: " + result); // Prints 55
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executorService.shutdown();
In this example, we create a Callable
task that performs a simple computation and returns the result. We submit the task to the ExecutorService
using the submit()
method, which returns a Future
object representing the pending result. We then use the get()
method of Future
to retrieve the result of the task. If the task throws an exception, the exception is wrapped in an ExecutionException
, but an InterruptedException
can also be thrown if the current thread was interrupted while waiting.
The choice between using Callable
and Runnable
depends on whether you need to return a result from the task and handle checked exceptions. If your task does not need to return a value and does not throw checked exceptions, you can use Runnable
. However, if your task needs to return a result or throws checked exceptions, you should use Callable
.
In addition to executing tasks immediately, the Concurrency API provides the ability to schedule tasks for execution at a later time or to execute tasks repeatedly with a fixed delay or at a fixed rate. This functionality is provided by the ScheduledExecutorService
interface, which extends the ExecutorService
interface.
The ScheduledExecutorService
interface provides the following methods for scheduling tasks:
schedule(Runnable command, long delay, TimeUnit unit)
: Schedules a Runnable
task to be executed after the specified delay
, expressed in the given TimeUnit
.
schedule(Callable<V> callable, long delay, TimeUnit unit)
: Schedules a Callable
task to be executed after the specified delay
, expressed in the given TimeUnit
, and returns a ScheduledFuture
representing the pending result.
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
: Schedules a Runnable
task to be executed periodically, with a fixed time interval between the end of one execution and the start of the next. The initialDelay
parameter specifies the delay before the first execution, and the period
parameter specifies the fixed time interval between executions.
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
: Schedules a Runnable
task to be executed repeatedly, with a fixed delay between the end of one execution and the start of the next. The initialDelay
parameter specifies the delay before the first execution, and the delay
parameter specifies the fixed delay between executions.
Here’s an example that demonstrates the usage of ScheduledExecutorService
methods:
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
// Schedule a task to run after a delay of 2 seconds
scheduledExecutorService.schedule(() -> {
System.out.println("Task executed after 2 seconds delay");
}, 2, TimeUnit.SECONDS);
// Schedule a task to run repeatedly at a fixed rate of 1 second
scheduledExecutorService.scheduleAtFixedRate(() -> {
System.out.println("Task executed at fixed rate");
}, 0, 1, TimeUnit.SECONDS);
// Schedule a task to run repeatedly with a fixed delay of 500 milliseconds
scheduledExecutorService.scheduleWithFixedDelay(() -> {
System.out.println("Task executed with fixed delay");
}, 0, 500, TimeUnit.MILLISECONDS);
// Keep the main thread alive for 5 seconds
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduledExecutorService.shutdown();
In this example, we create a ScheduledExecutorService
using the Executors.newSingleThreadScheduledExecutor()
method. We then demonstrate the usage of the schedule()
, scheduleAtFixedRate()
, and scheduleWithFixedDelay()
methods.
The schedule()
method is used to schedule a task to run after a delay of 2 seconds. The scheduleAtFixedRate()
method is used to schedule a task to run repeatedly at a fixed rate of 1 second, meaning that the next execution will start exactly 1 second after the previous execution starts, regardless of how long the task takes to complete. The scheduleWithFixedDelay()
method is used to schedule a task to run repeatedly with a fixed delay of 500 milliseconds between the end of one execution and the start of the next.
It’s important to note that the ScheduledExecutorService
does not automatically terminate after the scheduled tasks are executed. You need to explicitly shut it down using the shutdown()
method once you no longer need it.
Remember that the ScheduledExecutorService
uses a limited number of threads to execute the scheduled tasks, so it’s essential to choose the appropriate execution method based on your requirements and ensure the scheduled tasks do not overwhelm the available resources.
Throughout this section, we have used various factory methods provided by the Executors
class to create instances of ExecutorService
and ScheduledExecutorService
. The Executors
class is a utility class that offers several static factory methods for creating different types of thread pools and executor services.
Here’s an overview of the commonly used factory methods provided by the Executors
class:
ExecutorService newSingleThreadExecutor()
: Creates an ExecutorService
that uses a single worker thread to execute tasks. Tasks are guaranteed to be executed sequentially, and no more than one task will be active at any given time.
ScheduledExecutorService newSingleThreadScheduledExecutor()
: Creates a single-threaded ScheduledExecutorService
that can schedule tasks to run after a given delay or to execute periodically.
ExecutorService newCachedThreadPool()
: Creates a thread pool that creates new threads as needed but will reuse previously constructed threads when they are available. Idle threads are kept in the pool for 60 seconds before being terminated and removed from the pool.
ExecutorService newFixedThreadPool(int nThreads)
: Creates a thread pool with a fixed number of threads. The nThreads
parameter specifies the number of threads in the pool. If additional tasks are submitted when all threads are active, they will wait in a queue until a thread becomes available.
ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
: Creates a thread pool that can schedule tasks to run after a given delay or to execute periodically. The corePoolSize
parameter specifies the number of threads to keep in the pool, even if they are idle.
Here are code examples demonstrating the usage of each factory method:
// newSingleThreadExecutor()
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
singleThreadExecutor.submit(() -> {
System.out.println("Task executed by single thread");
});
singleThreadExecutor.shutdown();
// newSingleThreadScheduledExecutor()
ScheduledExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
singleThreadScheduledExecutor.schedule(() -> {
System.out.println("Task scheduled by single thread scheduled executor");
}, 2, TimeUnit.SECONDS);
singleThreadScheduledExecutor.shutdown();
// newCachedThreadPool()
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
cachedThreadPool.submit(() -> {
System.out.println("Task executed by cached thread pool");
});
}
cachedThreadPool.shutdown();
// newFixedThreadPool(int nThreads)
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
fixedThreadPool.submit(() -> {
System.out.println("Task executed by fixed thread pool");
});
}
fixedThreadPool.shutdown();
// newScheduledThreadPool(int corePoolSize)
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
scheduledThreadPool.scheduleAtFixedRate(() -> {
System.out.println("Task scheduled by scheduled thread pool");
}, 0, 1, TimeUnit.SECONDS);
// Keep the main thread alive for 3 seconds
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduledThreadPool.shutdown();
In these examples, we create different types of executor services using the respective factory methods provided by the Executors
class.
The newSingleThreadExecutor()
method creates an ExecutorService
with a single worker thread, ensuring that tasks are executed sequentially. The newSingleThreadScheduledExecutor()
method creates a single-threaded ScheduledExecutorService
for scheduling tasks with delays or periodic execution.
The newCachedThreadPool()
method creates a thread pool that creates new threads as needed and reuses idle threads. The newFixedThreadPool(int nThreads)
method creates a thread pool with a fixed number of threads specified by the nThreads
parameter.
The newScheduledThreadPool(int corePoolSize)
method creates a ScheduledExecutorService
with a fixed number of threads specified by the corePoolSize
parameter. It allows scheduling tasks with delays or periodic execution.
These factory methods provide convenient ways to create different types of executor services based on specific requirements. They encapsulate the complexities of thread creation, management, and termination, allowing developers to focus on defining and submitting tasks.
It’s important to choose the appropriate factory method based on your application’s needs. Consider factors such as the number of tasks, concurrency requirements, scheduling needs, and resource constraints when selecting a suitable executor service.
In any case, remember to properly shut down the executor services using the shutdown()
method when they are no longer needed to ensure graceful termination and resource cleanup.
When working with Java collections like ArrayList
, HashMap
, etc., in a multi-threaded environment, you may have encountered a ConcurrentModificationException
. This exception is thrown when one thread is iterating over a collection while another thread tries to modify it structurally, for example, by adding or removing elements.
The solution is to use thread-safe, concurrent collections instead. Java provides several concurrent collection classes that allow multiple threads to access and modify them safely, without the risk of ConcurrentModificationException
.
Some key concurrent collection classes include:
java.util.concurrent.ConcurrentHashMap<K,V>
A thread-safe version of HashMap
that achieves high concurrency using advanced techniques like CAS (Compare-And-Swap) operations. This allows multiple threads to read and write to the map simultaneously:
Map<String, Integer> map = new ConcurrentHashMap<>();
map.put("apple", 1);
map.put("banana", 2);
java.util.concurrent.ConcurrentLinkedQueue<E>
A thread-safe queue based on linked nodes. It allows multiple threads to add elements at the tail and remove elements from the head concurrently:
Queue<String> queue = new ConcurrentLinkedQueue<>();
queue.add("task1");
queue.add("task2");
String task = queue.poll();
java.util.concurrent.ConcurrentSkipListMap<K,V>
A concurrent version of TreeMap
that maintains the elements in sorted order based on their natural ordering or a provided Comparator
. It allows concurrent access by multiple threads:
ConcurrentNavigableMap<Integer, String> map = new ConcurrentSkipListMap<>();
map.put(1, "one");
map.put(2, "two");
String value = map.get(1);
java.util.concurrent.ConcurrentSkipListSet<E>
A scalable concurrent version of TreeSet
that maintains elements in sorted order according to their natural ordering, or by a Comparator provided at set creation time, depending on which constructor is used. It offers log (n)
time cost for add, remove, and contains operations:
Set<String> set = new ConcurrentSkipListSet<>();
set.add("apple");
set.add("banana");
set.add("orange");
System.out.println(set); // [apple, banana, orange]
java.util.concurrent.CopyOnWriteArrayList<E>
and java.util.concurrent.CopyOnWriteArraySet<E>
These classes are thread-safe variants of ArrayList
and HashSet
respectively. They achieve thread-safety by creating a fresh copy of the underlying array every time a write operation (add, set, remove, etc.) is performed. This means that multiple threads can safely iterate over the collection without the need for synchronization. However, the copy-on-write behavior can consume significant memory if the collection is large and write operations are frequent:
List<Integer> list = new CopyOnWriteArrayList<>();
list.add(1);
list.add(2);
list.add(3);
System.out.println(list); // [1, 2, 3]
Set<String> set = new CopyOnWriteArraySet<>();
set.add("apple");
set.add("banana");
set.add("apple");
System.out.println(set); // [apple, banana]
java.util.concurrent.LinkedBlockingQueue<E>
A thread-safe variant of LinkedList that implements the BlockingQueue
interface. It’s useful for implementing the producer-consumer pattern, where one or more threads produce items and put them into the queue, and one or more consumer threads take items out of the queue and process them. If the queue is empty, consumers will block until an item becomes available. If the queue reaches its maximum capacity, producers will block until space becomes available:
BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);
// Producer thread
new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
queue.put("item-" + i);
System.out.println("Produced: " + "item-" + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// Consumer thread
new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
String item = queue.take();
System.out.println("Consumed: " + item);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
In this example, the producer thread tries to put 20 items into the queue, but the queue has a maximum capacity of 10. When the limit is reached, the producer will block until the consumer has taken some items out. The consumer thread continually takes items from the queue and processes them. If the queue becomes empty, the consumer will block until the producer puts more items in.
When running the example above, the exact output may vary due to the concurrent execution of threads.
In addition to these purpose-built concurrent classes, in the java.util.Collections
class, Java also provides methods to obtain synchronized versions of regular collections. These synchronization wrappers add a layer of thread-safety around an existing non-concurrent collection.
Some examples of these methods are:
synchronizedCollection(Collection<T> c)
synchronizedList(List<T> list)
synchronizedMap(Map<K,V> m)
synchronizedNavigableMap(NavigableMap<K,V> m)
synchronizedNavigableSet(NavigableSet<T> s)
synchronizedSet(Set<T> s)
synchronizedSortedMap(SortedMap<K,V> m)
synchronizedSortedSet(SortedSet<T> s)
For example, to create a synchronized version of an ArrayList
:
List<String> list = new ArrayList<>();
List<String> syncList = Collections.synchronizedList(list);
Now syncList
is a thread-safe collection that can be safely accessed and modified by multiple threads. However, the synchronization is done at the method level, meaning each method of the collection is synchronized. This can limit concurrency compared to the purpose-built concurrent collections that often use more sophisticated techniques like CAS operations and non-blocking algorithms.
In general, it’s preferable to use the concurrent collection classes directly, as they are designed from the ground up for high concurrency. The synchronization wrappers are useful when you need to add thread-safety to an existing collection or when using a less common collection type that doesn’t have a direct concurrent equivalent.
In the world of Java streams, there’s a feature that can greatly enhance performance when working with large datasets: parallel streams.
A parallel stream is a stream that splits its elements into multiple chunks, processing each chunk with a different thread in parallel. This can significantly speed up operations on large datasets by leveraging the power of multi-core processors.
However, there’s an important concern to keep in mind when using parallel streams: the order of elements. Unlike regular sequential streams, the order of elements in a parallel stream is not guaranteed unless specifically enforced. This means that operations like forEach
, which rely on encounter order, may produce unexpected results.
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
numbers.parallelStream().forEach(System.out::println);
When running the above example, the output will show an unpredictable order. For example:
7
6
8
9
10
1
3
5
4
2
Another key consideration when using parallel streams is to avoid stateful lambda expressions. A stateful lambda is one that modifies shared state across invocations. In a parallel stream, multiple threads may be executing the same lambda concurrently, which can lead to race conditions and unpredictable behavior if the lambda is stateful:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
int[] state = {0}; // Shared state
numbers.parallelStream().forEach(n -> {
// Simulate some processing time
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
state[0] += n; // Stateful lambda, unsafe
});
System.out.println(state[0]); // Unpredictable result due to race conditions
In this example, we use an array to hold the shared state, which allows us to modify it inside the lambda expression. The Thread.sleep(100)
call introduces a small delay, increasing the likelihood of race conditions. Sometimes, the output will be 15. Other times, it will be 13 or something else.
To avoid these issues, it’s important to use stateless lambda expressions when working with parallel streams.
There are a few ways to create a parallel stream in Java:
Using the parallelStream()
method on a collection:
List<String> list = Arrays.asList("a", "b", "c");
Stream<String> parallelStream = list.parallelStream();
Using the parallel()
method on an existing stream:
List<String> list = Arrays.asList("a", "b", "c");
Stream<String> stream = list.stream();
Stream<String> parallelStream = stream.parallel();
Using StreamSupport.stream()
with a specified parallelism flag:
List<String> list = Arrays.asList("a", "b", "c");
boolean isParallel = true;
Stream<String> parallelStream = StreamSupport.stream(list.spliterator(), isParallel);
Parallel decomposition is the process of breaking a task into smaller, independent subtasks that can be processed concurrently, and then combining the results to produce the final output. This is a fundamental concept in parallel computing, and it’s key to understanding how parallel streams work under the hood.
When you invoke a terminal operation on a parallel stream, the Java runtime performs a parallel decomposition of the stream behind the scenes. This involves several steps:
Splitting the Stream into Substreams: The original stream is divided into multiple smaller substreams. The division is typically recursive, and does not necessarily match the number of processor cores directly. Each substream represents a portion of the original stream that can be processed independently, allowing for optimal utilization of computing resources.
Processing Each Substream Independently: Each substream is processed by a separate thread from ForkJoinPool
, Java’s built-in thread pool for parallel execution. ForkJoinPool
uses a work-stealing algorithm to balance the load, and dynamically allocates tasks among threads. This allows multiple substreams to be processed concurrently, leveraging the power of multi-core processors. Each thread applies the stream operations to its assigned substream independently of the others.
Combining the Results: Once all the substreams have been processed, their individual results need to be combined to produce the final result. The combining process also leverages ForkJoinPool
’s capabilities to parallelize this step, especially for associative operations. The specific way in which the results are combined depends on the terminal operation. For example, with a reduce
operation, the results of each substream’s reduction are combined using the provided accumulator function. For a collect
operation, the results are combined using the provided combiner function.
Consider the following example:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
int sum = numbers.parallelStream().reduce(0, Integer::sum);
System.out.println(sum); // Output: 55
In this example, the reduce operation is performed in parallel. The stream is split into substreams, each substream is summed independently, and then the results are combined to produce the final sum.
Here’s a visual representation of this process:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
|
Split into
substreams
______________|________________
| | | | | |
[1, 2] [3, 4] [5, 6] [7, 8] [9, 10]
| | | | | |
Process each substream independently
| | | | | |
[3] [7] [11] [15] [19]
\ | | | /
\ | | | /
Combine the subresults
|
[55]
The power of parallel decomposition lies in its ability to break down a large task into smaller, more manageable pieces that can be processed concurrently. This can lead to significant performance improvements, especially for computationally intensive tasks operating on large datasets.
However, it’s important to note that not all operations can be parallelized effectively. For parallel decomposition to work, the subtasks must be independent - that is, the processing of one subtask should not depend on the results of another. This is why stateful lambda expressions can cause problems in parallel streams, as they introduce dependencies between subtasks.
Additionally, the cost of splitting the stream and combining the results should be taken into account. For small streams or simple operations, the overhead of parallel decomposition may outweigh the benefits of concurrent processing. The Java runtime attempts to make intelligent decisions about when to parallelize a stream based on factors like the stream size and the complexity of the operations, but it’s still important to understand the implications of using parallel streams in your particular use case.
There are certain operations that rely on the encounter order of elements. These operations are known as order-based tasks, and they can behave differently when used with parallel streams compared to sequential streams. Let’s take a closer look at some of these methods and their implications.
forEach
and forEachOrdered
It’s important to understand the difference between the forEach
and forEachOrdered
terminal operations.
The forEach
operation, as we’ve seen earlier, is used to perform an action on each element of a stream. When used with a parallel stream, forEach
does not guarantee the order in which the elements will be processed. Each substream is processed independently by a different thread, and the order in which the threads are scheduled is non-deterministic.
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
numbers.parallelStream().forEach(System.out::println);
// Possible output: 3, 1, 4, 2, 5
In this example, the numbers may be printed in any order, depending on how the parallel stream is split and how the threads are scheduled.
This non-deterministic ordering can be beneficial in certain scenarios. For example, if you’re performing an operation where the order doesn’t matter, such as adding elements to a thread-safe collection or updating counters in a thread-safe manner, forEach
can significantly boost performance by allowing operations to be performed in parallel without the overhead of maintaining order.
On the other hand, forEachOrdered
guarantees that the action will be performed on the elements in the encounter order, even when used with a parallel stream. This means that the elements will be processed in the same order as they would be in a sequential stream, even though the processing is happening in parallel.
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
numbers.parallelStream().forEachOrdered(System.out::println);
// Output: 1, 2, 3, 4, 5
In this case, the numbers are always printed in their original order, regardless of how the parallel stream is split and processed.
However, this ordering guarantee comes at a cost. To maintain the encounter order, forEachOrdered
introduces a degree of synchronization and communication between the threads processing the substreams. This can reduce the performance benefits of parallelism, especially for large streams or complex operations.
So, when should you use forEach
, and when should you use forEachOrdered
? The answer depends on your specific use case.
Use forEach
when:
ConcurrentHashMap
).Use forEachOrdered
when:
List
).It’s worth noting that in many cases, if you need deterministic ordering, it may be more efficient to use a sequential stream instead of a parallel stream with forEachOrdered
. The sequential stream will maintain the encounter order naturally, without the overhead of parallel decomposition and synchronization.
findFirst()
The findFirst()
method returns an Optional
describing the first element of the stream, or an empty Optional
if the stream is empty. In a sequential stream, this is straightforward, it simply returns the first element encountered in the stream.
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Optional<Integer> first = numbers.stream().findFirst();
System.out.println(first.get()); // Output: 1
However, when used with a parallel stream, findFirst()
returns the first element from the first substream that produces a result. Since the order in which substreams are processed is non-deterministic, the element returned by findFirst()
on a parallel stream may not always be the same.
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Optional<Integer> first = numbers.parallelStream().findFirst();
System.out.println(first.get()); // Output: non-deterministic (could be 1, 2, 3, 4, or 5)
limit()
The limit()
method returns a stream consisting of the first n
elements of the original stream. In a sequential stream, this is again straightforward - it simply returns the first n
elements in the encounter order.
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
numbers.stream().limit(3).forEach(System.out::println);
// Output: 1, 2, 3
When used with a parallel stream, limit()
returns the first n
elements from the stream, but the order in which they are returned may not match the encounter order. This is because each substream is processed independently, and the first n
elements from the combined results of the substreams are returned.
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
numbers.parallelStream().limit(3).forEach(System.out::println);
// Possible output: 1, 3, 2
skip()
The skip()
method is the complement of limit()
. It returns a stream consisting of the remaining elements of the original stream after discarding the first n
elements. In a sequential stream, this skips the first n
elements in the encounter order.
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
numbers.stream().skip(3).forEach(System.out::println);
// Output: 4, 5
In a parallel stream, skip()
discards the first n
elements from the combined results of the substreams. However, since the substreams are processed independently, the elements that are skipped may not be the first n
elements in the encounter order.
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
numbers.parallelStream().skip(3).forEach(System.out::println);
// Possible output: 5, 4 (but could also be 1, 5 or 2, 4 or other combinations)
The non-deterministic behavior of these order-based methods when used with parallel streams can lead to surprising and potentially incorrect results if not handled correctly. If your operation relies on the encounter order of elements, it’s generally safer to use a sequential stream.
However, there are situations where the non-deterministic ordering may be acceptable, or even desirable. For example, if you’re using findFirst()
to find any element matching a certain predicate, and you don’t care which matching element is returned, using a parallel stream can provide a performance boost.
As with all aspects of parallel programming, the key is to understand the behavior and implications of the methods you’re using, and to carefully consider whether the potential performance benefits outweigh the risks of non-deterministic results.
Reduction operations, such as reduce()
, collect()
, and sum()
, are powerful tools for combining the elements of a stream into a single result. When used with parallel streams, these operations can provide significant performance benefits by allowing the reduction to be performed concurrently on multiple substreams. However, there are certain pitfalls to be aware of, particularly when it comes to the choice of accumulator function.
The accumulator function combines elements during a reduction operation. For example, in the reduce()
method, the accumulator function takes two parameters: the partial result of the reduction so far, and the next element to be incorporated.
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
int sum = numbers.parallelStream().reduce(0, Integer::sum);
System.out.println(sum); // Output: 15
In this example, the accumulator function is Integer::sum
, which simply adds two integers together.
For a reduction in a parallel stream to produce correct results, the accumulator function must be associative and stateless. An associative function is one in which the order of application doesn’t matter. That is, (a op b) op c
is equal to a op (b op c)
, where op
is the accumulator function.
However, certain accumulator functions can lead to issues in parallel streams. For example, using a mutable accumulator:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
ArrayList<Integer> list = numbers.parallelStream().reduce(
new ArrayList<>(),
(l, i) -> { l.add(i); return l; },
(l1, l2) -> { l1.addAll(l2); return l1; });
System.out.println(list); // Output: non-deterministic (could be [1, 2, 3, 4, 5], [1, 3, 5, 2, 4], etc.)
The output is non-deterministic because we’re using a mutable ArrayList
as the accumulator. The lambda expressions are modifying the same ArrayList
concurrently from multiple threads, leading to race conditions and non-deterministic results.
To avoid these issues and ensure deterministic, correct results from parallel reductions, follow these best practices:
Use associative and stateless accumulator functions. If your accumulator function is not associative, consider using a sequential stream instead.
Avoid using mutable accumulators. If you need to collect results into a mutable container, use the collect()
method with a concurrent collector, such as toConcurrentMap()
, instead of reduce()
. Concurrent collectors are designed to handle parallel modifications safely.
Be careful with floating-point arithmetic. Due to the limitations of floating-point representation, floating-point addition and multiplication are not strictly associative. If absolute precision is required, consider using a sequential stream or a different numerical representation.
Test your reductions thoroughly. Do it with different stream sizes and different levels of parallelism to ensure that they produce consistent, correct results.
The collect()
method is a terminal operation that allows you to accumulate elements of a stream into a collection or other data structure. When used with parallel streams, collect()
can provide significant performance benefits by allowing the accumulation to be performed concurrently on multiple substreams. However, to ensure correct and efficient operation, there are certain considerations to keep in mind.
Remember, the collect()
method takes a Collector
, which specifies how the elements of the stream should be accumulated. A Collector
is defined by four components:
A supplier function that creates a new result container.
An accumulator function that adds an element to the result container.
A combiner function that merges two result containers into one.
A finisher function that performs an optional final transformation on the result container.
The Java Collectors
class provides a wide variety of predefined collectors, such as toList()
, toSet()
, toMap()
, groupingBy()
, and more.
When using collect()
with a parallel stream, there are several key considerations to ensure correct and efficient operation:
The collector should be concurrent. This means that the accumulator and combiner functions must be thread-safe and should not depend on the order in which elements are processed. The Collectors
class provides several concurrent collectors, such as toConcurrentMap()
, groupingByConcurrent()
, etc:
List<String> strings = Arrays.asList("a", "b", "c", "d", "e");
ConcurrentMap<String, Integer> map = strings.parallelStream()
.collect(Collectors.toConcurrentMap(s -> s, s -> 1, Integer::sum));
System.out.println(map); // Output: {a=1, b=1, c=1, d=1, e=1}
If the collector is not concurrent, consider using a concurrent result container. For example, you can collect into a ConcurrentHashMap
or a CopyOnWriteArrayList
:
List<String> strings = Arrays.asList("a", "b", "c", "d", "e");
ConcurrentHashMap<String, Integer> map = strings.parallelStream()
.collect(ConcurrentHashMap::new,
(m, s) -> m.put(s, 1),
ConcurrentHashMap::putAll);
System.out.println(map); // Output: {a=1, b=1, c=1, d=1, e=1}
Be careful with order-dependent collectors. Collectors like Collectors.toList()
and Collectors.toCollection(ArrayList::new)
preserve the encounter order of elements in a sequential stream, but not necessarily in a parallel stream. If the order of elements in the result is important, consider using Collectors.toCollection(LinkedHashSet::new)
or collecting to a concurrent container and then copying to an ordered container:
List<String> strings = Arrays.asList("a", "b", "c", "d", "e");
List<String> list = strings.parallelStream()
.collect(Collectors.toCollection(CopyOnWriteArrayList::new))
.stream()
.sorted()
.collect(Collectors.toList());
System.out.println(list); // Output: [a, b, c, d, e]
Consider the characteristics of the collector. The Collector
interface defines three characteristics (java.util.stream.Collector.Characteristics
):
CONCURRENT
: Indicates that this collector is concurrent, meaning that the result container can support the accumulator function being called concurrently with the same result container from multiple threads.UNORDERED
: Indicates that the collection operation does not commit to preserving the encounter order of input elements.IDENTITY_FINISH
: Indicates that the finisher function is the identity function and can be left out.These characteristics provide hints to the stream framework about how the collector can be optimized. For example, if a collector is UNORDERED
, the stream framework can freely rearrange the elements, which can enable certain optimizations:
List<String> strings = Arrays.asList("a", "b", "c", "d", "e");
Set<String> set = strings.parallelStream()
.collect(Collectors.toUnmodifiableSet()); // UNORDERED collector
System.out.println(set); // Output: [a, b, c, d, e] (possibly in a different order)
By understanding these considerations and choosing the appropriate collector for your use case, you can effectively harness the power of collect()
with parallel streams to achieve significant performance improvements in your stream-based operations.
In addition to the predefined collectors provided by the Collectors
class, you can also create your own custom collectors using the Collector.of()
method. This allows you to define your own supplier, accumulator, combiner, and finisher functions to collect elements into a custom data structure or perform a custom accumulation operation.
Here’s an example using a sequential stream for string concatenation, which ensures deterministic output and better performance:
List<String> strings = Arrays.asList("a", "b", "c", "d", "e");
String concatenated
String concatenated = strings.stream() // Using a sequential stream
.collect(Collector.of(
StringBuilder::new, // Supplier
StringBuilder::append, // Accumulator
(sb1, sb2) -> {
sb1.append(sb2);
return sb1;
}, // Combiner
StringBuilder::toString // Finisher
));
System.out.println(concatenated); // Output: abcde
In this example, we use a sequential stream to concatenate a list of strings into a single string. A custom collector is created using Collector.of()
, with StringBuilder
as the container for accumulating the strings. The StringBuilder::append
method is used as the accumulator, ensuring that strings are appended in the correct order. The combiner is defined to merge StringBuilder
instances during parallel processing, but since we are using a sequential stream, it ensures the concatenation is performed efficiently and deterministically. Finally, the StringBuilder::toString
method is used as the finisher to produce the final concatenated string. This approach guarantees the correct order of elements and optimal performance for string concatenation.
However, string concatenation is inherently sequential, and using a parallel stream here is likely to be less efficient than using a sequential stream. In fact, the output of this operation is non-deterministic for a parallel stream, because the order in which the substreams are combined is not guaranteed.
A better example for parallel streams using a custom collector might involve a task that can benefit from parallel processing and has a well-defined order of elements. Consider this example that adds up integers, where parallel processing can provide performance benefits:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Integer sum = numbers.parallelStream() // Using a parallel stream
.collect(Collector.of(
() -> new int[1], // Supplier
(a, t) -> a[0] += t, // Accumulator
(a1, a2) -> {
a1[0] += a2[0];
return a1;
}, // Combiner
a -> a[0] // Finisher
));
System.out.println(sum); // Output: 15
In this example, a custom collector is defined using Collector.of()
, with an integer array as the container to hold the sum. The accumulator function adds each integer to the array’s single element, and the combiner function merges two arrays by summing their elements. The finisher function extracts the summed value from the array.
However, creating custom collectors that are both correct and efficient for parallel streams can be challenging. It requires a deep understanding of concurrency, thread safety, and the characteristics of the stream and collector. If possible, it’s generally recommended to use the predefined collectors or compose them to achieve your desired operation.
Threads allow multiple paths of execution to occur concurrently within a single program, representing separate paths of execution.
To create a new thread, you can either extend the Thread
class and override the run()
method, or implement the Runnable
interface and pass an instance to the Thread
constructor.
Java distinguishes between daemon and non-daemon threads. Daemon threads are terminated when all non-daemon threads complete.
A thread progresses through several states during its life cycle: NEW
, RUNNABLE
, BLOCKED
, WAITING
, TIMED_WAITING
, and TERMINATED
.
The Thread.sleep(long millis)
method causes the current thread to suspend execution for the specified number of milliseconds.
Calling the interrupt()
method on a thread sets its interrupted status and can prematurely wake a sleeping or waiting thread.
Deadlock occurs when two or more threads are unable to proceed because each thread is waiting for a resource that another thread holds, resulting in a circular dependency.
Starvation occurs when a thread is perpetually denied access to a shared resource, preventing it from making progress.
Livelock occurs when two or more threads are actively responding to each other’s actions but are unable to make progress.
Race conditions occur when multiple threads access shared data concurrently, and the final outcome depends on the relative timing of their executions, leading to unpredictable behavior.
To prevent threading problems, it’s important to use synchronization mechanisms such as locks, atomic variables, concurrent data structures, and synchronization primitives to ensure exclusive access to shared resources and coordinate thread execution.
Thread-safety ensures correct execution of code in a multi-threaded environment by addressing data visibility and data consistency.
The volatile
keyword guarantees that changes made by one thread are immediately visible to other threads, but it doesn’t provide atomicity or mutual exclusion.
Java provides atomic classes (AtomicBoolean
, AtomicInteger
, AtomicLong
, AtomicReference<V>
) for thread-safe operations on single variables.
Atomic classes offer methods like get()
, set()
, getAndSet()
, incrementAndGet()
, getAndIncrement()
, decrementAndGet()
, and getAndDecrement()
for atomic operations.
The synchronized
keyword is used to achieve mutual exclusion. Only one thread can execute a synchronized block or method at a time.
For instance methods, the lock is associated with the object. For static methods, the lock is associated with the class itself.
The Lock
interface provides more flexibility and control over lock acquisition and release compared to synchronized
.
ReentrantLock
is a common implementation of Lock
, offering features like non-blocking lock attempts, timed lock attempts, and fairness control.
ReentrantReadWriteLock
provides ReadLock
and WriteLock
for handling concurrent read and write access to a shared resource.
The CyclicBarrier
class allows multiple threads to wait for each other at a common barrier point before proceeding further.
If a thread leaves a CyclicBarrier
prematurely, other waiting threads will receive a BrokenBarrierException
.
The Concurrency API in Java provides high-level abstractions and utilities for managing concurrent operations in the java.util.concurrent
package.
The ExecutorService
interface is a central part of the Concurrency API and provides methods for submitting tasks for execution and managing the lifecycle of the underlying thread pool.
A thread pool is a collection of pre-created and reusable threads that are ready to perform tasks concurrently.
The Executors
class provides factory methods for creating different types of ExecutorService
and ScheduledExecutorService
instances.
Tasks can be submitted to an ExecutorService
using methods like execute()
for Runnable
tasks and submit()
for Callable
tasks.
The Callable
interface represents a task that returns a result and may throw an exception, unlike the Runnable
interface.
The Future
interface represents the pending result of a task submitted to an ExecutorService
and provides methods to check the status and retrieve the result of the task.
The ScheduledExecutorService
interface extends ExecutorService
and provides methods for scheduling tasks to run after a delay or to execute periodically.
It’s important to properly shut down an ExecutorService
or ScheduledExecutorService
using the shutdown()
method when it’s no longer needed to ensure graceful termination and resource cleanup.
Concurrent collections like ConcurrentHashMap
, ConcurrentLinkedQueue
, ConcurrentSkipListMap
, ConcurrentSkipListSet
, CopyOnWriteArrayList
, and CopyOnWriteArraySet
are thread-safe and allow concurrent access and modification by multiple threads.
The Collections
class provides methods to obtain synchronized versions of regular collections, but these are less efficient than purpose-built concurrent collections.
LinkedBlockingQueue
is useful for implementing the producer-consumer pattern, where producers put items into the queue and consumers take items out, with blocking behavior when the queue is full or empty.
Parallel streams split elements into multiple chunks and process each chunk with a different thread in parallel, potentially speeding up operations on large datasets.
The order of elements in a parallel stream is not guaranteed unless specifically enforced, which can lead to unexpected results for order-dependent operations.
Stateful lambda expressions should be avoided with parallel streams to prevent race conditions and unpredictable behavior.
Parallel streams can be created using parallelStream()
, stream().parallel()
, or StreamSupport.stream()
with a parallelism flag.
Parallel decomposition breaks a task into smaller, independent subtasks that can be processed concurrently and then combined to produce the final output.
forEach
does not guarantee encounter order in parallel streams, while forEachOrdered
does but with potential performance costs.
findFirst()
, limit()
, and skip()
can return non-deterministic results when used with parallel streams.
Reduction operations like reduce()
and collect()
can provide performance benefits with parallel streams, but the accumulator function must be associative and stateless.
Collectors used with parallel streams should be concurrent or use concurrent result containers, and the characteristics of the collector (like UNORDERED
) can enable optimizations.
Custom collectors can be created with Collector.of()
, but ensuring correctness and efficiency for parallel streams can be challenging.
1. Which of the following lines of code correctly creates and starts a new thread?
public class Main {
public static void main(String[] args) {
Runnable task = () -> {
for (int i = 0; i < 5; i++) {
System.out.println("Task is running");
}
};
// Insert code here to create and start a new thread
}
}
A) Thread thread = new Thread(); thread.start(task);
B) Thread thread = new Thread(task).run();
C) Thread thread = new Thread(task); thread.start();
D) Thread thread = new Thread(); task.run();
E) Thread thread = Thread.start(task);
2. Which of the options correctly uses a synchronized block to ensure that only one thread at a time can execute a critical section that increments a shared counter?
public class Main {
private static int counter = 0;
public static void main(String[] args) {
Runnable task = () -> {
for (int i = 0; i < 1000; i++) {
// Insert synchronized block here
}
};
Thread thread1 = new Thread(task);
Thread thread2 = new Thread(task);
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Final counter value: " + counter);
}
}
A) synchronized (this) { counter++; }
B) synchronized (Main.class) { counter++; }
C) synchronized (task) { counter++; }
D) synchronized (counter) { counter++; }
E) synchronized (System.out) { counter++; }
3. Which of the following statements about atomic classes is correct? (Choose all that apply)
A) AtomicInteger
is part of the java.util.concurrent.atomic
package, but it does not provide atomic operations for increment and decrement.
B) AtomicReference
can only be used with reference types, not primitive types.
C) AtomicLong
supports atomic operations on long
values, including getAndIncrement()
and compareAndSet()
methods.
D) AtomicBoolean
can be used to perform atomic arithmetic operations on boolean
values.
4. Which of the following code snippets correctly uses the Lock
interface to ensure thread-safe access to a shared resource?
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Counter {
private int count = 0;
private Lock lock = new ReentrantLock();
public void increment() {
// Insert code here
}
public int getCount() {
return count;
}
}
A)
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
B)
lock.lock();
count++;
lock.unlock();
C)
try {
lock.lock(() -> {
count++;
});
} finally {
lock.unlock();
}
D)
synchronized(lock) {
count++;
}
5. Which of the following code snippets correctly demonstrates the shutdown of an ExecutorService
?
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecutorExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
// Insert code here
}
}
A)
executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.MINUTES);
B)
executor.awaitTermination(1, TimeUnit.MINUTES);
executor.shutdown();
C)
executor.shutdown();
executor.shutdownNow();
D)
executor.shutdown();
try {
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
6. Which of the following code snippets correctly demonstrates how to get a result from a Callable
task using an ExecutorService
?
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CallableExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Callable<Integer> task = () -> {
return 123;
};
// Insert code here
}
}
A)
Future<Integer> future = executor.submit(task);
executor.shutdown();
Integer result = future.get();
System.out.println(result);
B)
Future<Integer> future = executor.submit(task);
Integer result = future.get();
executor.shutdownNow();
System.out.println(result);
C)
Future<Integer> future = executor.submit(task);
System.out.println(future.get(1, TimeUnit.SECONDS));
executor.shutdown();
D)
Future<Integer> future = executor.submit(task);
try {
Integer result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
7. Which of the following statements about Java’s concurrent collections is correct?
A) ConcurrentHashMap
allows concurrent read and write operations, and retrieval operations do not block even when updates are being made.
B) CopyOnWriteArrayList
is optimized for scenarios with a high number of write operations compared to read operations.
C) ConcurrentSkipListSet
does not kept elements sorted.
D) BlockingQueue
implementations like LinkedBlockingQueue
allow elements to be added and removed concurrently without any internal locking mechanisms.
8. Which of the following statements about parallel streams is correct?
A) Parallel streams always improve the performance of a program by utilizing multiple threads.
B) Parallel streams can lead to incorrect results if the operations performed are not thread-safe.
C) The order of elements in a parallel stream is always preserved compared to the original stream.
D) Using parallel streams guarantees that the operations on elements will execute in a fixed order.
9. Which of the following code snippets correctly demonstrates how to reduce a parallel stream to compute the sum of its elements?
import java.util.Arrays;
import java.util.List;
public class ParallelStreamExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
// Insert code here
}
}
A)
int sum = numbers.parallelStream().reduce(1, Integer::sum);
System.out.println(sum);
B)
int sum = numbers.parallelStream().reduce(0, Integer::sum);
System.out.println(sum);
C)
int sum = numbers.stream().reduce(0, Integer::sum);
System.out.println(sum);
D)
int sum = numbers.parallelStream().collect(reduce(0, Integer::sum));
System.out.println(sum);
Do you like what you read? Would you consider?
Do you have a problem or something to say?