Android/Java Multi-Threaded Execution with the Executor Framework (ThreadPoolExecutor)

These days CPUs with multiple cores have almost become the standard in Android devices. This means multiple operations (could be long running and resource intensive tasks) doesn’t have to wait to be executed on a single thread. They can rather run on multiple threads in parallel where each threads gets executed on a single processor. For example imagine an operation where multiple images have to be downloaded and shown in a gallery view. The downloads and any decoding required for different images could happen concurrently on multiple threads speeding up the entire operation, leading to a faster app experience.

Android has an Executor framework using which you can automatically manage a pool of threads with a task queue. Multiple threads will run in parellel executing tasks from the queue (BlockingQueue usually). The Android training material has a chapter that teaches you how to do this by making use of the ThreadPoolExecutor class mainly. You should definitely read all the lessons in that chapter:

What's the one thing every developer wants? More screens! Enhance your coding experience with an external monitor to increase screen real estate.

In a summary, the training resources linked above teaches how to create a ThreadPoolExecutor instance that manages a pool of thread with a task queue (we pass this task queue along with the number of threads during instantiation) and then pass Runnables to its execute() method for the sake of processing on one of the threads from the thread pool. Finally once the execution is done, the response is communicated back to the UI thread using Handlers.

We should now look into different classes/interfaces of the Executor framework (java.util.concurrent) like Executor, ExecutorService, Executors, ThreadPoolExecutor, Future, Callable, FutureTask, ThreadFactory, ExecutorCompletionService.

Executor

The Executor interface is the rudimentary component of the Executor framework. This interface has only one method, which is execute() that executes the command passed to it in a new thread or a pooled thread or in the calling thread, depending upon the implementation. The execute() method accepts a Runnable.

Quickly go over to the Executor reference to see how it can be implemented and used to run a command rather than invoking a new thread with new Thread(new RunnableTask()).start(). As you’ll see, all you have to do is, implement Executor and then run the Runnable command in its execute() method either on the same thread or a new thread. Executing runnables are as easy as doing this:

Executor executor = new MyExecutor();
executor.execute(new RunnableTaskOne());
executor.execute(new RunnableTaskTwo());
// and so on...

The API reference also has an example of a serial executor where all the submitted runnable tasks are submitted for execution in a series, one after the other, not on the same thread necessarily (again depends upon the Executor implementation used in scheduleNext()). The scheduleNext() method uses the executor instance member that could be a ThreadPoolExecutor instance or for now you could just make use of AsyncTask.THREAD_POOL_EXECUTOR for quick testing. The SerialExecutor class actually implements a producer-consumer pattern where the producer threads will create Runnable tasks and place them in a queue, whereas the consumer threads take tasks off the queue and execute/process them. I’ll just paste that same code here for quick reference actually:

class SerialExecutor implements Executor {
    final Queue<Runnable> tasks = new ArrayDeque();
    final Executor executor;
    Runnable active;

    SerialExecutor(Executor executor) {
        this.executor = executor;
    }


    public synchronized void execute(final Runnable r) {
        tasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (active == null) {
            scheduleNext();
        }
    }

    protected synchronized void scheduleNext() {
        if ((active = tasks.poll()) != null) {
            executor.execute(active);
        }
    }
}

So how is this Executor interface any useful ? Well if you look at new Thread(runnableTask).start() – it always runs the task in a new thread, starting right now (when it’s called). That dictates the mechanics of how the task is run. Whereas the code submitting tasks the to Executor (execute() method basically) need not know when Threads are created (if at all) and when they’re run (could be in future). All those details and much more can be encapsulated in the Executor implementation and the code that submits tasks doesn’t need to know about any of them. This decoupling is highly useful and efficient for larger implementation that incorporates lots of execution behaviours.

In sometime we’ll cover various Executor implementations in the same Java package like ExecutorService which is a more extensible interface and ThreadPoolExecutor which provides an extensible thread pool implementation.

At this time it is important to understand the concepts of Thread Pools.

Thread Pools

A thread pool manages a pool of worker threads (the exact number depends upon the implementation) with a task queue that holds tasks waiting to be executed by any one of the idle threads in the pool. Tasks are added to the queue by producers whereas the worker threads act as consumers by consuming the tasks from the queue whenever there’s an idle thread ready to perform a new background execution.

Thread pools are very advantageous over executing all the tasks on a single thread or spawning a new thread per task. I think you’d have already guessed or atleast imagined why. Threads don’t have to be created and destroyed for every task as the worker threads can be kept alive for new tasks. This ensures high performance and less resource wastage. Obviously the maximum number of threads that can be created can be controlled to not overload the system with loads of threads running background tasks.

Executors (Factory Class)

Using the Executors factory class, you can instantly create thread pools.

So firstly, there’s the Executors.newFixedThreadPool(n) method that creates a thread pool with `n` number of threads. This thread pool uses an unbounded task queue that grows freely as new tasks are added. Hence, a producer won’t fail at insertions.

Then we’ve Executors.newCachedThreadPool() that creates a dynamically sized thread pool where a new thread is created as needed. So when there’s a task to process, it’ll use previously constructed idle threads if they’re available. If not then a new thread will be created. Threads not used for sixty seconds will be terminated and removed from the cache/pool.

There’s also Executors.newSingleThreadExecutor() that contains only one worker thread to execute tasks from an unbounded queue serially, hence ensuring thread safety. If this single thread terminates, a new one will take its place. The difference between Executors.newSingleThreadExecutor() and Executors.newFixedThreadPool(1) (discussed earlier) is that, the former always has one thread whereas the later can be reconfigured to use/create additional threads using setCorePoolSize().

Once the thread pool is ready, let’s see how to submit tasks to it:

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(new Runnable() {
    public void run() {
        // Do some long running operation on a worker thread
    }
});

All these methods return an ExecutorService object.

ExecutorService

ExecutorService is an interface that extends Executor (we discussed earlier). It provides methods to manage execution of tasks and termination of the thread pool. It also provides us with methods like submit() that produces a Future that can be used to check if an asynchronous task has completed, wait for its completion and retrieve the returned result of the operation.

ThreadPoolExecutor

Finally we’re down to ThreadPoolExecutor which is what you came across if you went through those training materials linked above in this article’s introduction area. This class is actually an ExecutorService that you can subclass to create custom thread pools with customizations to any extent.

Let’s first see how to instantiate the class after which the executor instance can be directly used to submit tasks. Also this is what you could use in the SerialExecutor class example above. This is what its most basic constructor looks like:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue
);

Let’s discuss the paramteres quickly:

  • corePoolSize – Minimum number of threads to keep in the pool. Initially there are zero threads in the pool, but as tasks are added to the queue, new threads are created. If there are idle threads but the thread count is lower than the corePoolSize, then new threads will keep on being created.
  • maximumPoolSize – Maximum number of threads to allow in the pool. If this is more than the corePoolSize and the current number of threads is >= corePoolSize, then new worker threads will be created only if the queue is full.
  • keepAliveTime – When number of threads is greater than the core, the noncore threads (excess idle threads) will wait for time defined by this parameter for new tasks before terminating.
  • unit – Time unit for keepAliveTime.
  • workQueue – The task queue which’ll only hold Runnable tasks. It’ll have to be a .BlockingQueue.

You should configure your core pool size carefully. Some say it should be N+1 whereas some day it should be a small multiple (1 or 2) of N where N is the number of processors available on the system/device. There’s some interesting relevant information available in this SO thread.

You can get the number of processor cores that could be made available using this piece of code:

int NUMBER_OF_CORES = Runtime.getRuntime().availableProcessors();

You could use the same value for maximum pool size, that you use for core pool size.

Remember a long keep alive time could slightly improve performance as there’ll be no overhead of destroying the old thread and creating a new one, at the cost of consuming more resources (memory).

The work/task queue can be an unbounded task queue or a bounded one. If it is unbounded then make sure you don’t push so many tasks to it that it grows huge to eat up a lot of memory making the system slow. If it is bounded then make sure you handle rejected tasks when the queue is full. We’ll cover that in a bit.

Submitting Tasks

Now that we know how to instantiate ThreadPoolExecutor, we’ll see how to submit a Runnable task to it for execution.

int NUMBER_OF_CORES = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    NUMBER_OF_CORES*2,
    NUMBER_OF_CORES*2,
    60L,
    TimeUnit.SECONDS,
    new LinkedBlockingQueue<Runnable>()
);
executor.execute(new Runnable() {
    public void run() {
        // Do some long running operation in background
        // on a worker thread in the thread pool of
        // ThreadPoolExecutor
    }
});

Now there are a few things (sort of rules) to keep in mind when multiple tasks are executed on the Executor:

  • If core pool size has not reached, the Executor will add a new thread to the pool even if other worker threads are idle.
  • If core pool size (or more, as max pool size can be higher) has been attained, the Executor will queue task requests instead of creating new threads if the queue is not full.
  • If the core pool size has reached and the queue is full, a new thread is created unless the thread pool exceeds max pool size in which case the task will be rejected.

In our example, we used an unbounded queue – new LinkedBlockingQueue() – that has a capacity of Integer.MAX_VALUE, so we don’t have to worry about the queue becoming full but in case of a bounded queue you should keep the rejection part in mind.

Rejecting Tasks

New tasks submitted will be rejected when:

  • The Executor has shut down.
  • Number of worker threads as well as the queue are saturated.

The rejection can be handled by providing an implementation of RejectedHandlerExecution whose rejectedExecution(Runnable r, ThreadPoolExecutor executor) method will be called.

The ThreadPoolExecutor class has four predefined inner classes that can be used as rejected tasks handlers.

Hook Methods

There are a few hook methods available in ThreadPoolExecutor that you can use to manipulate the execution environment or perform some special processing by overriding them while subclassing:

Lifecycle

An Executor has a lifecycle that is managed and observed through the ExecutorService interface implemented by ThreadPoolExecutor. Let’s go through the different states:

  • Ready – When the Executor is created and it’s ready to accept tasks for execution.
  • Running – Once a task is submitted for execution, the Executor moves to this state where it accepts incoming tasks and executes them on a worker thread in the thread pool.
  • Shutdown – State after ExecutorService.shutdown() is called.
  • Stop – The state after ExecutorService.shutdownNow() is called.
  • Tidying – Internal cleaning. This is the state when both task queue and thread pool are empty.
  • Terminated – Final state when the terminate() method has completed. Threads waiting (blocked) in ExecutorService.awaitTermination() will also return (stop blocking).

Once a state is achieved, it cannot revert to the previous one, i.e., the states are not reversible. It is important to understand the difference between shutdown() and shutdownNow(). The former method will reject new tasks but allow previously submitted tasks to process before terminating. So it is graceful. On the other hand shutdownNow() interrupts current worker threads to stop current executing tasks as well as prevents previously submitted tasks from starting by removing them from the queue. It’ll return the list of pending Runnables that were not executed so that you can execute them later on other threads. The fact that states are irreversible also means once shutdown has been initiated, an Executor (thread pool) cannot be reused to process tasks. So a new thread pool has to be created.

If a thread pool is not manually shutdown then once there’s no remaining threads inside it (keep alive time has elapsed) and it’s not referenced by the application anymore, then it’ll automatically shutdown/terminate.

You’ll notice a constant defined for each state in the source code of ThreadPoolExecutor.

Core Thread Timeouts

When you create a ThreadPoolExecutor, the thread pool’s core threads won’t time out and terminate by default. If you want the same keep-alive policy of non-core threads to apply to core threads, then use executor.allowCoreThreadTimeOut(true). This will ensure the core threads to time out and terminate when idle for more than the keepAlive time and replaced if needed when new tasks arrive.

Pre-start Core Threads

By default, core threads are created when only when tasks are submitted. If you want to override this policy by creating idle worker threads waiting for work, you could make use of these methods:

You’d mostly want to use one of these methods when you create an executor with preloaded tasks.

BlockingQueue<Runnable> preloadedQueue = new LinkedBlockingQueue<Runnable>();
preloadedQueue.add(new Runnable() {
    public void run() {
        // Do long running operation
    }
});

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    2, 4,
    60L, TimeUnit.SECONDS,
    preloadedQueue
);

Although we covered a lot about ThreadPoolExecutor, you should also go through the reference API once.

ThreadFactory

Using a ThreadFactory, threads can be created on demand. When using a thread factory, direct calls to new Thread() can be eliminated, instead you can do a lot more like using thread subclasses, setting thread priorities, name, group, etc. Let’s see a quick example:

class MyThreadFactory extends ThreadFactory {
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setPriority(5);
        return t;
    }
}

// Usage later...
Thread t = new ThreadFactory().newThread(new Runnable() { ... });
t.start(); // If you want to start immediately

The Executors.defaultThreadFactory() method provides a really useful yet simple implementation of ThreadFactory. It does some useful configurations like setting the thread’s priority and name. You can see what it does in the source.

You’ll also notice a lot of methods in the ThreadPoolExecutor accepts a ThreadFactory object in order to create threads according to your specifications. By default a thread pool’s worker thread will get the same priority as the UI thread when you create an Executor (from the UI thread itself). It’s a good idea to lower down the priority of these worker threads so that they don’t compete with the UI thread for execution time (by the CPU). This can be done with a ThreadFactory that sets a lower priority and then passing that while creating an Executor. For example one of the ThreadPoolExecutor Thread.NORM_PRIORITY.

Future and Callable

Till now we’ve seen tasks represented by the Runnable interface, but now we’ll see tasks represented by the Callable interface that returns a result and may throw an exception with the help of the Future interface. The Callable interface defines a single method called call() which the implementors will have to override.

public interface Callable<V> {
    V call() throws Exception;
}

Callback tasks cannot be executed by Thread instances like Runnable tasks. It has to be submitted to and executed by an ExecutorService implementation (thread pools). On submission, a Future object is returned that can can be used to check the status of the Callable, retrieve results from it or control it in some way.

Submitting Tasks

Now we’ll see how a Callback task is submitted to an Executor and then its result is retrieved with Future:

ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Object> future = executor.submit(new Callable<Object>() {
    public Object call() throws Exception {
        Object object = doSomeTask();
        return object;
    }
});

// To retrieve the result returned by Callback.call()
Object result = future.get(); // Blocking call

You could also execute a Runnable object with Future if you want to:

ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(new Runnable() {
    public void run() {
        // Do some operation
    }
});

// Will return null
Object result = future.get(); // Blocking call

With Future you can cancel the execution of a task using cancel() and check for completion or cancelled status using isDone() and isCancelled() respectively.

ExecutorService.invokeAll()

Using ExecutorService.invokeAll() independent Callable tasks can be executed concurrently. This method blocks the calling thread until all the asynchronous tasks are finished after which the results are returned in a List.

Now it is important to note that the results are stored in the returned List of Futures in the same order as the input tasks. That means the position of a task in the input collection is the same as the position of the task’s corresponding Future in the returned result List. This is truly amazing, isn’t it ?!

Since the calling thread is blocked, make sure you don’t call invokeAll() from the UI thread but from a background thread for example by creating a simple executor and posting a Runnable to that which invokes invokeAll().

// Sample usage

List<Callable<String>> tasks = new ArrayList<Callable<String>>();
tasks.add(new Callable<String>() {
    public String call() throws Exception {
        return stringFromMethodOne();
    }
});
tasks.add(new Callable<String>() {
    public String call() throws Exception {
        return stringFromMethodTwo();
    }
});

ExecutorService executor = Executors.newFixedThreadPool(4);
List<Future<String>> futures = executor.invokeAll(tasks);

for (Future<String> future : futures) {
    // play with the `future` you have now
}

ExecutorService.invokeAny()

ExecutorService.invokeAny() is used to pass a List of Callback tasks to be processed as the input, but returns result from the first finished task and ignores the rest. The invokeAny() method blocks until one of the tasks has returned a result and cancels the rest with a call to future.cancel(true) on all of them. If the remaining tasks do not respond to interruption then they’ll continue to execute but their results will be ignored.

FutureTask

A FutureTask implements both Future and Runnable interfaces. Its usage is sort of similar to a Future. While creation you pass a Callable or Runnable to it. Later you can cancel the task or observe for statuses like whether the task is completed or cancelled or not.

I think the best part about this class is the done() call back which gets called when the task moves to the isDone state normally or via cancellation.

ExecutorCompletionService

The java.util.concurrent package has a class called ExecutorCompletionService to which you can supple an executor and then submit multiple tasks (Callable or Runnable) to be executed. Sort of like invokeAll() that we just discussed earlier. When a submitted task finishes, a Future object is placed in a BlockingQueue internally which is accessible (retrieved) using take(). The take() method will retrieve and remove the Future representing the next completed task, blocking/waiting if none are yet present.

The reference API has simple and short code samples that you could go through. Feel free to go through the source code of this class which’ll give you a lot more insights.

Using an Executor’s thread pool, you can manage worker threads and a task queue, but by taking help of this class, you can also manage the results of the finished tasks.

Wrapping Up

The Executor framework is really powerful which gives us full control of serial/concurrent asynchronous execution with a great amount of flexibility and scalability. Use it when you want concurrent execution with a great deal of control over the threads and your tasks execution (like cancellation, rejection, etc.).

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download

Author: Rishabh

Rishabh is a full stack web and mobile developer from India. Follow me on Twitter.

One thought on “Android/Java Multi-Threaded Execution with the Executor Framework (ThreadPoolExecutor)”

Leave a Reply

Your email address will not be published. Required fields are marked *