home

Threads

Overview

Thread vs thread

Creating and Starting a Thread

Passing a Runnable to a Thread

Runnable r = () -> {};
new Thread(r).start();
Runnable r = () -> {};
new Thread(r).start();
new Thread(r).start(); // Perfectly legal to pass r to multiple Threads as target

Determining the Thread executing the Runnable

Runnable r = () -> {
    System.out.println("Being executed by: " + Thread.currentThread().getName());
};
new Thread(r, "foo").start();
new Thread(r, "bar").start();

// Being executed by: foo
// Being executed by: bar

Thred Scheduler and Thread States

A Thread is

Alive Threads

An alive Thread can be in

The thread scheduler is the part of the JVM which pulls alive threads from the thread pool and moves them between runnable and running states. A Thread can influence / notify the scheduler on its intend using the methods:

static void sleep(long millis) throws InterruptedException
static void yield()
void join() throws InterruptedException
void wait() throws InterruptedException
void notify()
void notifyAll()

runnable Thread

running Thread

blocked / waiting / sleeping Thread

Preventing Thread Execution

Making a Thread Sleep

Making a Thread Yield

Making a Thread Join Another Thread

Runnable longRunning = () -> {
    try {
        Thread.sleep(4000);
        System.out.println("I am done!");
    } catch (InterruptedException ignored) {}
};

Thread longRunningThread = new Thread(longRunning);
longRunningThread.start();

System.out.println("I will wait for my long running friend!");
longRunningThread.join();
System.out.println("I can continue!");

// I will wait for my long running friend!
// I am done!
// I can continue!

Thread Problems and Synchronisation

Race Condition

Race Condition: Where a thread uses a resource (or races in) while another thread is doing an operation that is supposed to be atomic.

The following example in my case prints 0 for the most of the time, but not every single time!. The value seen in the console will be -10 from time to time.

The race condition here is as follows, assuming when account balance is currently 10:

class Account {
    int balance = 50;
    int withdraw() {
        balance = balance - 10;
        return 10;
    }
}

class Client implements Runnable {
    Account account;
    public Client(Account account) {
        this.account = account;
    }
    @Override
    public void run() {
        while (account.balance > 0) {
            account.withdraw();
            try {
                Thread.sleep(15);
            } catch (InterruptedException ignored) {}
        }
    }
}

final Account account = new Account();
final Client alice = new Client(account);
final Client bob = new Client(account);

final Thread threadAlice = new Thread(alice);
final Thread threadBob = new Thread(bob);

threadAlice.start();
threadBob.start();

threadAlice.join();
threadBob.join();

System.out.println(account.balance); // Most of the time 0, -10 from time to time

Using the synchronized Keyword for Converting Operations to Atomic

An example can be as seen below:

class Account {
    int balance = 50;
    synchronized int withdraw() {
        if (balance > 0) {
            balance = balance - 10;
            return 10;
        }
        return 0;
    }
}

In the example above, once a thread enters the withdraw method, it is guaranteed that no other thread can enter any synchronized methods (including withdraw), making balance overdraw impossible.

Synchronisation and Locks

Arbitrary Lock Example

class Foo {
    Object lock_1 = new Object();
    Object lock_2 = new Object();

    void foo() {
        synchronized (lock_1) {
        }
    }

    void bar() {
        synchronized (lock_2) {
        }
    }
}

In the example above, assume an instance of Foo is shared between 2 threads: a and b

Synchronized Method vs Synchronized Block

synchronized void foo() {} is basically saying void foo() {synchronized (this) {}}

Thread Safety

A class is said to be thread-safe if its data is protected by synchronized methods / blocks. However it still needs consideration to use thread-safe classes.

List<String> synchronizedList = Collections.synchronizedList(new ArrayList<>());
synchronizedList.addAll(Arrays.asList("foo"));

Runnable r = () -> {
    if (synchronizedList.size() > 0) {
        try {
            Thread.sleep(10);
            String remove = synchronizedList.remove(0);
            if (remove == null) {
                System.exit(-1);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
};

new Thread(r).start();
new Thread(r).start();

The example above will throw an IndexOutOfBoundsException even though we are using a SynchronizedList. In a SynchronizedList each individual method is synchronized. There is nothing stopping from the following:

It is better to use just a plain old ArrayList, but synchronising on the list itself as seen below.

List<String> synchronizedList = new ArrayList<>();
synchronizedList.addAll(Arrays.asList("foo"));

Runnable r = () -> {
    synchronized (synchronizedList) {
        if (synchronizedList.size() > 0) {
            try {
                Thread.sleep(10);
                String remove = synchronizedList.remove(0);
                if (remove == null) {
                    System.exit(-1);
                }
            } catch (InterruptedException e) {}
        }
    }
};

new Thread(r).start();
new Thread(r).start();

Deadlock

A deadlock happens when a thread acquires lock_1 and starts waiting for lock_2 to be released, where lock_2 is acquired by another thread which is waiting for lock_1 to be released. Neither thread is ever able to acquire the lock it is waiting for, and neither thread ever releases the lock it is holding.

class DeadLockExample {

    Object lock_1 = new Object();
    Object lock_2 = new Object();

    void foo() {
        synchronized (lock_1) {
            try {Thread.sleep(10);}
            catch (InterruptedException e) {}
            synchronized (lock_2) {
                System.out.println("foo");
            }
        }
    }

    void bar() {
        synchronized (lock_2) {
            try {Thread.sleep(10);}
            catch (InterruptedException e) {}
            synchronized (lock_1) {
                System.out.println("foo");
            }
        }
    }
}

final DeadLockExample deadLockExample = new DeadLockExample();

Runnable foo = () -> deadLockExample.foo();
Runnable bar = () -> deadLockExample.bar();

new Thread(foo).start();
new Thread(bar).start();

Thread Starvation

Thread starvation happens when a thread acquires a lock on a shared resource and goes onto a long running process or an infinite loop, never giving a chance to the other thread to proceed.

wait - notify

A thread can acquire a lock on an object, and then may decide to release its lock by calling the wait method on the object, until notify is called on the same object by some other thread. For example:

wait() wait(long timeout) notify() notifyAll()
Can only be called on objects where the lock is acquired
Throws IllegalMonitorStateException which is an unchecked exception, hence need not to be handled
Throws InterruptedException which must be handled or declared Does not throws any checked exceptions
Releases the lock immediately and blocks the thread Does not release the lock immediately
Waits until the notify is called on the object Waits until the notify is called on the object or timeout occurs Only one of the threads waiting on the object is notified, which one is undefined All of the threads waiting on the object is notified, which one will run first is undefined

wait - notify Example

class Notifier implements Runnable {
    final Stack<String> messages;

    public Notifier(Stack<String> messages) {
        this.messages = messages;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (messages) {
                if (messages.empty()) {
                    try {
                        messages.wait();
                    } catch (InterruptedException ignored) {}
                }
                final String mostRecentMsg = messages.pop();
                System.out.println(LocalTime.now() + " " + mostRecentMsg);
                if (mostRecentMsg.equals("q"))
                    System.exit(-1);
            } // synchronized
        } // while
    } // run
} // Notifier

class Receiver implements Runnable {
    final Stack<String> messages;

    public Receiver(Stack<String> messages) {
        this.messages = messages;
    }

    @Override
    public void run() {
        while (true) {
            final Scanner scanner = new Scanner(System.in);
            final String msg = scanner.nextLine();
            synchronized (messages) {
                messages.push(msg);
                messages.notify();
            }
        }
    }
}

final Stack<String> messages = new Stack<>();

new Thread(new Notifier(messages)).start();
new Thread(new Receiver(messages)).start();

// Sample execution
// Hello World!
// 12:20:50.057 Hello World!
// Thank you!
// 12:20:52.113 Thank you!
// q
// 12:20:52.989 q
//
// Process finished with exit code 255

wait - notify Example - 2

Object lock = new Object();

new Thread(() -> {
    synchronized(lock) {
        try {
            lock.wait();
        } catch (InterruptedException e) {}
    }
    System.out.println("Notified!");
}).start();

try {
    Thread.sleep(500);
} catch (InterruptedException e) {}

System.out.print("Press any button to continue.");
new Scanner(System.in).nextLine();
synchronized (lock) {
    lock.notifyAll();
}

// Press any button to continue.
// (Users hits enter in console)
// Notified!

Atomic Variables

AtomicInteger Example

Sharing a counter between 2 threads without AtomicInteger

class Counter {
    int count;
    void increment() {
        count++; // Not atomic, not threadsafe.
    }
}

Counter counter = new Counter();
Thread a = new Thread(() -> {
    for (int i = 0; i < 1000; i++) {
        counter.increment();
    }
});
Thread b = new Thread(() -> {
    for (int i = 0; i < 1000; i++) {
        counter.increment();
    }
});

a.start();
b.start();

try {
    a.join();
    b.join();
} catch (InterruptedException e) {}

System.out.println(counter.count); // 1735

Sharing a counter between 2 threads without AtomicInteger

Using an AtomicInteger instead of an int as follows will ensure 2000 is printed everytime.

import java.util.concurrent.atomic.AtomicInteger;

class Counter {
    AtomicInteger count = new AtomicInteger();
    void increment() {
        count.incrementAndGet();
    }
}

Lock

ReentrantLock

Thread Safe Example Using synchronized

class Counter {
    int count;

    synchronized void increment() {
        count++;
    }
}

Counter counter = new Counter();

Runnable counterIncrementer = () -> {
    for (int i = 0; i < 1000; i++) {
        counter.increment();
    }
};

new Thread(counterIncrementer).start();
new Thread(counterIncrementer).start();

try {
    Thread.sleep(10);
} catch (InterruptedException ignored){}

System.out.println(counter.count); // 2000

Thread Safe Example Using ReentrantLock

class Counter {
    int count;

    Lock lock = new ReentrantLock();

    void increment() {
        lock.lock();
        count++;
        lock.unlock();
    }
}

Avoiding DeadLock with tryLock

import java.util.concurrent.locks.*;

class Counter {
    int count;

    void increment() {
        count++;
    }
}

Counter counter = new Counter();

Lock a = new ReentrantLock();
Lock b = new ReentrantLock();

Runnable r = () -> {
    for (int i = 0; i < 1000;) {
        boolean lockA = a.tryLock();
        boolean lockB = b.tryLock();
        if (lockA && lockB) {
            i++; // only increment if both locks acquired!
            counter.increment();
            a.unlock();
            b.unlock();
        } else {
            if ((lockA && !lockB) || (lockB && !lockA)) {
                System.out.println("Acquired one lock but not the other!");
            }
            if (lockA) {
                a.unlock();
            }
            if (lockB) {
                b.unlock();
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {}
        }
    }
};

Thread first = new Thread(r);
Thread second = new Thread(r);

first.start();
second.start();

try {
    first.join();
    second.join();
} catch (InterruptedException ignored){}

System.out.println(counter.count);

// Acquired one lock but not the other!
// Acquired one lock but not the other!
// Acquired one lock but not the other!
// Acquired one lock but not the other!
// 2000

Condition

Condition Example

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

new Thread(() -> {
    lock.lock();
    try {
        condition.await();
    } catch (InterruptedException e) {}
    System.out.println("Notified!");
    lock.unlock();
}).start();

try {
    Thread.sleep(500);
} catch (InterruptedException e) {}

System.out.print("Press any button to continue.");
new Scanner(System.in).nextLine();
lock.lock();
condition.signalAll();
lock.unlock();

Executor and ExecutorService

Different Types of ExecutorServices

void tryService(ExecutorService es) throws InterruptedException {
    long start = System.currentTimeMillis();
    AtomicInteger atomicInteger = new AtomicInteger(0);

    Runnable zzz = () -> {
        System.out.print(atomicInteger.incrementAndGet() + " ");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ignored) {}
    };

    for (int i = 0; i < 16; i++) {
        es.execute(zzz);
    }

    es.shutdown();
    es.awaitTermination(2, TimeUnit.MINUTES);

    long end = System.currentTimeMillis();
    System.out.println("\nTook: " + (end - start) + " milliseconds");
}

tryService(Executors.newSingleThreadExecutor());
tryService(Executors.newFixedThreadPool(4));
tryService(Executors.newCachedThreadPool());

Output resembles the following

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 
Took: 16135 milliseconds
1 2 3 4 5 8 7 6 9 10 11 12 13 14 16 15 
Took: 4023 milliseconds
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 
Took: 1007 milliseconds

Observations

ScheduledExecutorService

Lets you schedule tasks to run repeatedly with delays

Runnable r = () -> {
    System.out.println("Sleeping: " + LocalTime.now().format(DateTimeFormatter.ofPattern("mm:ss")));
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("Woke up: " + LocalTime.now().format(DateTimeFormatter.ofPattern("mm:ss")));
};
ScheduledExecutorService es = Executors.newScheduledThreadPool(1);
es.scheduleAtFixedRate(r, 0, 5, TimeUnit.SECONDS);

Output

Sleeping: 25:58
Woke up: 25:59
Sleeping: 26:03
Woke up: 26:04
Sleeping: 26:08
Woke up: 26:09

Callable and Future

List<Future<Integer>> futures = new ArrayList<>();

Callable<Integer> rng = () -> {
    Thread.sleep(1000); // Long running task..
    return ThreadLocalRandom.current().nextInt(0, 10);
};

ExecutorService executorService = Executors.newFixedThreadPool(4);
for (int i = 0; i < 16; i++) {
    futures.add(executorService.submit(rng));  // not execute, submit!
}

executorService.shutdown();
try {
    executorService.awaitTermination(1, TimeUnit.MINUTES);  // block until all tasks are done
} catch (InterruptedException e) {
    e.printStackTrace();
}

for (Future future : futures) {
    try {
        System.out.printf("%s ", future.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}