Dev Journal #13 - Asynchronous Programming



Threads in Android

Whenever an application launches, the Android system creates an execution thread which we often refer to as the main thread or the UI thread. This main thread is responsible for dispatching all events to the visual interface of the screen such as the input event or the layout inflation. Hence, if you try to share the main thread to work on any tasks that may be resource-intensive which may take more than several seconds to complete, it can hold up the UI event queue resulting in an unresponsive screen. This is called blocking and the app may even throw an ANR (Application Not Responding) error if it hangs for too long.

source: https://developer.android.com/topic/performance/vitals/anr

Asynchrony

The problem illustrated above can be resolved with asynchronous programming which uses non-blocking function calls. Generally, there are two different ways to approach asynchronous programming: concurrency and parallelism.

Parallelism means that multiple tasks are executed at the same time. Multithreading is one of the common implementations of parallelism. Your Android machine is capable of creating multiple background threads to handle long-running operations in parallel with the main thread.

On the other hand, concurrency is when multiple tasks are executed during overlapping periods but not exactly at the same instance. For example, single-threaded asynchrony can be achieved through cooperative multitasking (also known as non-preemptive multitasking) where processes voluntarily yield CPUs when they are complete or idle.

Here is an analogy of preparing breakfast items to describe these two similar but different concepts. Let’s say I would like some coffee, scrambled eggs and a piece of toast for breakfast. I can sequentially prepare one menu after the other in order. That would be an example of synchronous programming where tasks queued at the end like making coffee is blocked until toasts and eggs are ready. This is the complete opposite of what we want to achieve.

Sequential Processing

To achieve parallelism, I can get help from two of my friends, Cleo and Leon to prepare breakfast with me in parallel. For the purpose of this analogy, I’m going to assume that the quantity of the meal remains the same. If each of us takes on an item, we can complete our breakfast in the fastest achievable time.

Parallel Processing

As my friends and I are making breakfast, I come to realize that the tasks don’t require our attention 100% of the time. In fact, we are mostly standing around idling. Leon is waiting for the coffee to finish brewing, Cleo is stirring eggs once in a while but mostly waiting for them to be cooked. I’m twiddling my thumbs waiting for the toaster machine to shoot out the bread. This is somewhat similar to what happens in programming tasks. Big chunks of time can be spent fetching data from disks or waiting for network responses.

The next morning, I come up with a grandmaster plan to shorten my preparation time even without the help of my friends. I concurrently make coffee, toasts and eggs with my multitasking skills. Although I cannot mend to all three menus at the same time, I’m still able to process multiple menus together by interleaving tasks when possible.

If I fully utilize the idle time by working on other tasks, I can complete all of my tasks in a shorter time compared to when I was handling them sequentially. This concurrency example illustrates how coroutines work in a simplified form. Coroutines can run multiple tasks from a single thread, suspending tasks whenever it is idle waiting for some result to return, and resuming from where it was paused when it’s ready to work again. If all the coroutines are suspended, the main thread is free to continue working on other important jobs such as responding to UI events.

Concurrent Processing

Asynchronous Programming Techniques

Now that we have refreshed our knowledge on asynchrony. Let’s explore different types of programming techniques that apply this concept to prevent our apps from blocking.

Callbacks

Callbacks are executable code that is passed as an argument to another function, where the latter function is expected to call back the argument upon completion of itself. Callbacks are useful ways to update the main thread about the results returned from the task that it was working on from a separate thread.

Take a look at the following function that calculates the nth Fibonacci number as an example:

fun fib(n: Int): Int {
    var result = 0
    var a = 0
    var b = 1
    var c: Int
    when {
        n < 0 -> {
            throw Exception("Input must be greater or equal to 0")
        }
        n == 0 -> {
            result = a
        }
        else -> {
            for (i in 3..n + 1) {
                c = Math.addExact(a, b) // throws ArithmeticException on overflow
                a = b
                b = c
            }
            result = b
        }
    }
    Thread.sleep(1000)
    return result
}

I have added Thread.sleep(1000) to exaggerate the computation delay. To avoid blocking the main thread, we can split up the function into two parts:

  1. Asynchronous function that runs calculation on a background thread
import kotlin.concurrent.thread

fun callbackFib(
    n: Int,
    onSuccess: (Int, Int) -> Unit,
    onError: (Int, Exception) -> Unit
) {
    thread(start = true) {
        println("${Thread.currentThread().name}: Start async task for input: $n")
        var result = 0
        try {
            result = fib(n)
            onSuccess(n, result)
        } catch (e: Exception) {
            onError(n, e)
        }
    }
}
  1. Callback function to announce the result to the foreground thread
val onSuccess = { n: Int, result: Int ->
    println("${Thread.currentThread().name}: The ${n}th term of Fibonacci sequence is $result")
}

val onError = { n: Int, e: Exception ->
    println("${Thread.currentThread().name}: ERROR! Input $n threw error: ${e.message}");
}

Combining the two, I can now run the calculation in the background without blocking my main thread even if call multiple calculations.

fun main() {
    println("Thread-${Thread.currentThread().name}: Start main")
    fib(5, onSuccess, onError)
    fib(15, onSuccess, onError)
    fib(50, onSuccess, onError)
    println("Thread-${Thread.currentThread().name}: Next task on main")
    Thread.sleep(10000)
}

The above code snippet will return the following output:

Thread-main: Start main
Thread-0: Start async task for input: 5
Thread-main: Next task on main
Thread-2: Start async task for input: 50
Thread-1: Start async task for input: 15
Thread-2: ERROR! Input 50 threw error: integer overflow
Thread-0: The 5th term of Fibonacci sequence is 5
Thread-1: The 15th term of Fibonacci sequence is 610

As you can see from the above console output, the async tasks each ran on a separate thread independent from the main thread. You can also see that the order of the threads does not determine the order of task completion. Some async tasks will run faster than some other tasks that started before them. But most importantly, the main thread is not blocked no matter how many async tasks are initiated.

Promises/Futures

Promises or futures are another common constructs practiced in many different programming languages to execute asynchronous computations. Future is the consumer of the implementation which Promise produces. Although the name might seem a bit confusing at first, CompletableFuture from the Java library can be used as a promise because you can implement the chaining steps. On top of that, it can also be used like a future where you can consume the result like the typical FutureTask with the get method.

Understanding these concepts, I can translate the callback function into CompletableFuture as shown in the following block of code:

fun fibFuture(n: Int) {
    CompletableFuture
        .supplyAsync {
            println("${Thread.currentThread().name}: Start async task for input: $n")
            fib(n)
        }
        .exceptionally { e ->
            println("${Thread.currentThread().name}: ERROR! Input $n threw error: ${e.message}");
            null
        }
        .thenAccept {
            println("${Thread.currentThread().name}: The ${n}th term of Fibonacci sequence is $it")
        }
}

This creates a promise chain that will process the task introduced in the supplyAsync followed by thenAccept while handling the exceptions through exceptionally.

Calling the same list of inputs into our Fibonacci functions, we get the following output.

fun main() {
    println("Thread-${Thread.currentThread().name}: Start main")
    fibFuture(5)
    fibFuture(15)
    fibFuture(50)
    println("Thread-${Thread.currentThread().name}: Next task on main")
    Thread.sleep(10000)
}
Thread-main: Start main
ForkJoinPool.commonPool-worker-3: Start async task for input: 5
Thread-main: Next task on main
ForkJoinPool.commonPool-worker-7: Start async task for input: 50
ForkJoinPool.commonPool-worker-5: Start async task for input: 15
ForkJoinPool.commonPool-worker-7: ERROR! Input 50 threw error: java.lang.ArithmeticException: integer overflow
ForkJoinPool.commonPool-worker-7: The 50th term of Fibonacci sequence is null
ForkJoinPool.commonPool-worker-3: The 5th term of Fibonacci sequence is 5
ForkJoinPool.commonPool-worker-5: The 15th term of Fibonacci sequence is 610

Once again, the main thread remains unblocked.

Rx (Reactive extensions)

ReactiveX is yet another tool that can be used for event-based asynchronous programming. In ReactiveX an Observer subscribes to an Observable. The observable then emits a sequence of items that the observer can consume. These items can also be further transformed by chaining Operator calls. If many events occur asynchronously, the items must be stored in a queue or be dropped. Rx is especially good at handling asynchronous data streams like input events, push notifications, and sensor updates by using hot observables which will begin emitting items as soon as it is created. But it also works well with a one-time event like network response, reading a file or querying a database which can be handled with cold observables where the observable waits until an observer subscribes to it before it begins to emit items. Lastly, RxJava provides an option to control the threads by specifying the Scheduler as well.

In Android, reactivity will be most commonly be needed by network calls, in which case will be an example of a cold observable. This can be best represented using Single which is an Observable variant that emits just one value instead of series of values.

To use RxJava, I must first add the following line to the build.gradle.kts script. The latest version can be found here.

dependencies {
    implementation("io.reactivex.rxjava3:rxjava:3.0.13")
}

Here is the wrapper Fibonacci function that returns the single observables:

fun singleFib(n: Int): Single<List<Int>> {
    return Single.create { it ->
        it.onSuccess(listOf(n, fib(n)))
    }
}

And the definition for the single observer:

val singleObserver = object: SingleObserver<List<Int>> {
    override fun onSubscribe(d: Disposable) {
        println("${Thread.currentThread().name}: onSubscribe")
    }

    override fun onSuccess(data: List<Int>) {
        println("${Thread.currentThread().name}: The ${data[0]}th term of Fibonacci sequence is ${data[1]}")
    }

    override fun onError(e: Throwable) {
        println("${Thread.currentThread().name}: ERROR! Input threw error: ${e.message}");
    }
}

Now subscribing to each observable with our observer:

fun main() {
    println("Thread-${Thread.currentThread().name}: Start main")

    singleFib(5)
        .subscribeOn(Schedulers.io())
        .subscribe(singleObserver)

    singleFib(15)
        .subscribeOn(Schedulers.io())
        .subscribe(singleObserver)

    singleFib(50)
        .subscribeOn(Schedulers.io())
        .subscribe(singleObserver)

    println("Thread-${Thread.currentThread().name}: Next task on main")
    sleep(10000)
}

I subscribed the observer to the Schedulers.io to mock network calls. This implementation is backed by a pool of single-threaded ScheduledExecutorService that will try to reuse started instances or start a new one if no threads are available.

Thread-main: Start main
main: onSubscribe
main: onSubscribe
main: onSubscribe
Thread-main: Next task on main
RxCachedThreadScheduler-3: ERROR! Input threw error: integer overflow
RxCachedThreadScheduler-1: The 5th term of Fibonacci sequence is 5
RxCachedThreadScheduler-2: The 15th term of Fibonacci sequence is 610

Coroutines

With the release of the new support library AndroidX, the official documentation now recommends the Android developers to use coroutines. Coroutines are lightweight. You can run many coroutines on a single thread using suspending functions. Coroutines also follow the principle of structured concurrency which means that new coroutines can be only launched in a specific CoroutineScope. This ensures that coroutines are not lost and do not leak. Most Jetpack libraries include extensions that provide full coroutine support with some even providing their own coroutine scope. So coroutines should be our first choice of tool when it comes to asynchronous programming on Android.

I must first add the following dependency to the bulid.gradle.kts script to migrate my code to use coroutines. The latest version can be found here.

dependencies {
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.0")
}

Next, the original Fibonacci function needs to be wrapped in a suspending function. Suspending functions can be paused and resumed at a later time without blocking the main thread. They can only be invoked by another suspending function or within a coroutine.

suspend fun suspendFib(n: Int) {
    withContext(Dispatchers.Default) {
        try {
            val result = fib(n)
            println("${Thread.currentThread().name}: The ${n}th term of Fibonacci sequence is $result")
        } catch(e: Exception) {
            println("${Thread.currentThread().name}: ERROR! Input $n threw error: ${e.message}");
        }
    }
}

withContext is a scope function that creates a cancelable coroutine. Dispatchers can be passed as an arg so that function block can be executed from a thread from the dispatcher.

Finally, the suspending wrapper functions must be called inside the CoroutineScope. To run multiple Fibonacci functions in parallel, each function must be called from its own scope.

This is the function definition provided from the documentation:

CoroutineScope defines a new scope for new coroutines. Every coroutine builder (like launch, async, etc) is an extension on CoroutineScope and inherits its coroutineContext to automatically propagate all its elements and cancellation.

fun main() {
    println("Thread-${Thread.currentThread().name}: Start main")
    CoroutineScope(Dispatchers.Default).launch {
        suspendFib(5)
    }
    CoroutineScope(Dispatchers.Default).launch{
        suspendFib(15)
    }
    CoroutineScope(Dispatchers.Default).launch{
        suspendFib(50)
    }
    println("Thread-${Thread.currentThread().name}: Next task on main")
    Thread.sleep(10000)
}

Running this block of code will output the following:

Thread-main: Start main
Thread-main: Next task on main
DefaultDispatcher-worker-3: ERROR! Input 50 threw error: integer overflow
DefaultDispatcher-worker-1: The 5th term of Fibonacci sequence is 5
DefaultDispatcher-worker-2: The 15th term of Fibonacci sequence is 610

Kotlin Flow

Flow is built on top of coroutines. It can emit multiple values sequentially as opposed to suspending functions that return only a single value. Flow is in many ways similar to RxJava’s Observable.

Modifying the suspending function from the previous coroutine example to return the result at the end of the function:

suspend fun suspendFib(n: Int): Int {
    var result: Int
    withContext(Dispatchers.Default) {
        result = fib(n)
    }
    return result
}

Use the flow builder APIs to create new flows and manually emit new values to the stream of data when the suspending function completes.

fun flowFib(n: Int): Flow<String> = flow {
    try {
        val result = suspendFib(n)
        emit("${Thread.currentThread().name}: The ${n}th term of Fibonacci sequence is $result")
    } catch (e: Exception) {
        emit("${Thread.currentThread().name}: ERROR! Input $n threw error: ${e.message}")
    }
}

Because collect is a suspending function, it needs to be executed within a coroutine scope.

fun main() {
    println("Thread-${Thread.currentThread().name}: Start main")
    CoroutineScope(Dispatchers.Default).launch {
        flowFib(5).collect {
            println(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch {
        flowFib(15).collect {
            println(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch {
        flowFib(50).collect {
            println(it)
        }
    }
    println("Thread-${Thread.currentThread().name}: Next task on main")
    Thread.sleep(10000)
}

The above main code block will output the following:

Thread-main: Start main
Thread-main: Next task on main
DefaultDispatcher-worker-3: ERROR! Input 50 threw error: integer overflow
DefaultDispatcher-worker-2: The 15th term of Fibonacci sequence is 610
DefaultDispatcher-worker-1: The 5th term of Fibonacci sequence is 5

We can update the flowFib function to take in a list of values if we want to simulate the same API or database object updating the value multiple times.

fun flowFibStream(nums: List<Int>): Flow<String> = flow {
    for (n in nums) {
        try {
            val result = suspendFib(n)
            emit("${Thread.currentThread().name}: The ${n}th term of Fibonacci sequence is $result")
        } catch (e: Exception) {
            emit("${Thread.currentThread().name}: ERROR! Input $n threw error: ${e.message}")
        }
    }
}

The main code block will also update to the following:

fun main() {
    println("Thread-${Thread.currentThread().name}: Start main")
    CoroutineScope(Dispatchers.Default).launch {
        flowFibStream(listOf(5, 15, 50)).collect {
            println(it)
        }
    }
    println("Thread-${Thread.currentThread().name}: Next task on main")
    Thread.sleep(10000)
}

This will sequentially emit the result in the order of input.

Thread-main: Start main
Thread-main: Next task on main
DefaultDispatcher-worker-1: The 5th term of Fibonacci sequence is 5
DefaultDispatcher-worker-1: The 15th term of Fibonacci sequence is 610
DefaultDispatcher-worker-1: ERROR! Input 50 threw error: integer overflow

Conclusion

There are many programming techniques and libraries that can handle asynchronous programming. I covered several popular concepts used in web/mobile development including the relatively newer solution from the Flow API. No one solution will be perfect and it will ultimately be up to the developer to decide on the final design. In my opinion, RxJava or Coroutine + Flow combined can provide most of the features you are looking for to build any typical application.

Previous Post Next Post