Selasa, 14 Juni 2022

Belajar Coroutines DI Kotlin Terlengkap


Memulai Coroutines

Untuk lebih memahami tentang coroutines, mari kita mulai mencobanya langkah demi langkah. Hal pertama yang wajib Anda tahu adalah bahwa coroutines bukanlah bagian dari bahasa Kotlin. Coroutines hanyalah library lain yang disediakan oleh JetBrains. Untuk itu, agar bisa menggunakannya Anda perlu menambahkan dependensi berikut pada build.gradle.kts:


  1. dependencies {

  2.     implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.2")

  3. }


Dengan menambahkan dependensi di atas, kini Anda sudah siap untuk mencoba menggunakan fitur-fitur coroutines dalam membuat program concurrency. Yuk kita mulai dari kode yang sangat sederhana berikut ini:

  1. import kotlinx.coroutines.*

  2.  

  3. fun main() = runBlocking{

  4.     launch {

  5.         delay(1000L)

  6.         println("Coroutines!")

  7.     }

  8.     println("Hello,")

  9.     delay(2000L)

  10. }


Kode di atas menggambarkan bagaimana coroutines bekerja. Kita menggunakan fungsi runBlocking untuk memulai coroutine utama dan launch untuk menjalankan coroutine baru. Jika Anda menjalankan program tersebut, maka konsol akan menampilkan hasil ini:
Hello,
Coroutines!
Kata Hello, akan ditampilkan lebih awal dan kata Coroutines! Akan ditampilkan 1 detik setelahnya. Mengapa demikian? Padahal jika diperhatikan, kode untuk menampilkan kata Coroutines! dituliskan lebih dulu.
Fungsi delay(1000L) di dalam launch digunakan untuk menunda kode berikutnya selama 1 detik. delay adalah fungsi yang spesial pada coroutines. Ia merupakan sebuah suspending function yang tidak akan memblokir sebuah thread.
Selama proses penundaan tersebut, main thread akan terus berjalan sehingga fungsi println("Hello,") akan langsung dijalankan. Setelah 1 detik, baru fungsi println("Coroutines!") akan dijalankan.
delay(2000L) digunakan untuk menunda selama 2 detik sebelum JVM berakhir. Tanpa kode ini, JVM akan langsung berhenti ketika kode terakhir dijalankan, sehingga kode di dalam launch tidak akan pernah dijalankan.
Ini baru sekedar permulaan loh. Masih banyak lagi fungsi-fungsi menarik lain pada coroutines yang dapat mempermudah kita dalam membuat program concurrency. 
Anda bisa memanfaatkan kumpulan library yang dapat ditemukan pada repositori kotlinx.coroutines. JetBrains selaku tim pengembang juga berkomitmen untuk mempertahankan backward compatibility untuk tiap perubahan yang akan dirilis. Itulah mengapa coroutines sudah diperkenalkan pada Kotlin versi 1.1.
Tersedia juga panduan resmi untuk langkah-langkah penerapan coroutines. Ikuti saja tautan ini

Coroutines Builder

Pada modul sebelumnya kita sudah mencoba menggunakan fungsi runBlocking dan launch untuk memulai sebuah coroutines. Kedua fungsi tersebut merupakan coroutines builder, yaitu sebuah fungsi yang mengambil suspending lambda dan membuat coroutine untuk menjalankannya.
Kotlin menyediakan beberapa coroutine builder yang bisa disesuaikan dengan berbagai macam skenario, seperti:
  • launch
    Seperti yang sudah kita coba sebelumnya, fungsi ini digunakan untuk memulai sebuah coroutines yang tidak akan mengembalikan sebuah hasil. launch akan menghasilkan Job yang bisa kita gunakan untuk membatalkan eksekusi.
  • runBlocking
    Fungsi ini dibuat untuk menjembatani blocking code menjadi kode yang dapat ditangguhkan. runBlocking akan memblokir sebuah thread yang sedang berjalan hingga eksekusi coroutine selesai. Seperti contoh sebelumnya, kita bisa menggunakannya pada fungsi main() dan bisa juga untuk penerapan unit test
  • async
    Kebalikan dari launch, fungsi ini digunakan untuk memulai sebuah coroutine yang akan mengembalikan sebuah hasil. Ketika menggunakannya, Anda harus berhati-hati karena ia akan menangkap setiap exception yang terjadi di dalam coroutine. Jadi async akan mengembalikan Deferred yang berisi hasil atau exception. Ketika yang dikembalikan adalah exception, maka Anda harus siap untuk menanganinya.
Sekarang giliran kita untuk mencoba contoh penerapan coroutine dengan async. Bayangkan jika kita memiliki 2 (dua) suspending function seperti berikut:

  1. suspend fun getCapital(): Int {

  2.     delay(1000L)

  3.     return 50000

  4. }

  5.  

  6. suspend fun getIncome(): Int {

  7.     delay(1000L)

  8.     return 75000

  9. }


Anggap saja bahwa delay pada kedua fungsi tersebut adalah waktu yang dibutuhkan untuk melakukan operasi sebelum hasilnya didapatkan. Selanjutnya kita ingin memanfaatkan keduanya, misalnya untuk menghitung keuntungan seperti berikut:

  1. import kotlinx.coroutines.*

  2.  

  3. fun main() = runBlocking {

  4.     val capital = getCapital()

  5.     val income = getIncome()

  6.     println("Your profit is ${income - capital}")

  7. }


Pada kode di atas, kita menggunakan pendekatan sequential. Kenapa? Pada dasarnya kode di dalam coroutines juga dijalankan secara berurutan seperti kode normal lain. Dalam praktiknya kita melakukan ini jika kita menggunakan hasil dari fungsi pertama untuk membuat keputusan apakah kita perlu memanggil fungsi kedua.
Bagaimana jika tidak ada ketergantungan antara fungsi getCapital dan getIncome dan kita ingin menjalankan keduanya secara bersamaan? Di sinilah async dibutuhkan. Kita bisa menuliskan kode seperti berikut:

  1. import kotlinx.coroutines.*

  2.  

  3. fun main() = runBlocking {

  4.     val capital = async { getCapital() }

  5.     val income = async { getIncome() }

  6.     println("Your profit is ${income.await() - capital.await()}")

  7. }


Dengan kode tersebut, kita telah memanggil fungsi getCapital dan getIncome di dalam async.Maka async akan mengembalikan hasil dari masing-masing fungsi. Lalu untuk mengakses hasil tersebut, kita perlu menggunakan fungsi await.
Wait.. adakah perbedaan dengan kode sebelumnya? Dengan async seolah-olah kedua fungsi tersebut berjalan bersamaan dan membutuhkan waktu yang lebih singkat dari kode sebelumnya. Untuk membuktikannya, yuk coba jalankan kode berikut:
  1. import kotlinx.coroutines.*
  2. import kotlin.system.measureTimeMillis
  3.  
  4. fun main() = runBlocking {
  5.     val timeOne = measureTimeMillis {
  6.         val capital = getCapital()
  7.         val income = getIncome()
  8.         println("Your profit is ${income - capital}")
  9.     }
  10.  
  11.     val timeTwo = measureTimeMillis {
  12.         val capital = async { getCapital() }
  13.         val income = async { getIncome() }
  14.         println("Your profit is ${income.await() - capital.await()}")
  15.     }
  16.  
  17.     println("Completed in $timeOne ms vs $timeTwo ms")
  18.  
  19. }

Konsol akan menampilkan hasil berikut:
Your profit is 25000
Your profit is 25000
Completed in 2013 ms vs 1025 ms
Kita bisa lihat bahwa kode yang dijalankan di dalam async bisa selesai hampir 2 kali lebih cepat dibandingkan tanpa async!

Job and Deferred

Secara umum, fungsi asynchronous pada coroutines terbagi menjadi 2 (dua) jenis, yaitu fungsi yang mengembalikan hasil dan sebaliknya, fungsi yang tidak mengembalikan hasil. Fungsi yang mengembalikan hasil biasanya digunakan jika kita menginginkan sebuah data ketika fungsi tersebut selesai dijalankan. Sebagai contoh, fungsi untuk mengambil informasi dari web service yang menghasilkan respon berupa JSON atau yang lainnya. Sedangkan fungsi yang tidak mengembalikan hasil biasanya digunakan untuk mengirimkan analitik, menuliskan log, atau tugas sejenis lainnya.
Sebagai developer, tentunya kita menginginkan tetap bisa mengakses fungsi yang sudah dijalankan. Misalnya, ketika kita ingin membatalkan tugasnya atau memberikan instruksi tambahan ketika fungsi tersebut telah mencapai kondisi tertentu. Untuk bisa melakukannya, Anda perlu memahami tentang Job dan Deferred pada coroutines.

Job

Job adalah sebuah hasil dari perintah asynchronous yang dijalankan. Objek dari job akan merepresentasikan coroutine yang sebenarnya. Sebuah job akan memiliki 3 (tiga) properti yang nantinya bisa dipetakan ke dalam setiap state atau keadaan. Berikut adalah ketiga properti tersebut:
  1. isActive
    Sebuah properti yang menunjukkan ketika sebuah job sedang aktif.
  2. isCompleted
    Sebuah properti yang menunjukkan ketika sebuah job telah selesai.
  3. isCancelled
    Sebuah properti yang menunjukkan ketika sebuah job telah dibatalkan.
Pada dasarnya, job akan segera dijalankan setelah ia dibuat. Namun kita juga bisa membuat sebuah job tanpa menjalankannya. Job memiliki beberapa siklus hidup mulai dari pertama kali ia dibuat hingga akhirnya selesai. Kira-kira seperti inilah siklus dari sebuah job jika digambarkan dalam sebuah diagram:
2022042910202201a0e80e2a6d2f6cdbd36cb544a69416.png

Dari diagram di atas, kita bisa melihat bahwa job akan melewati beberapa state. Pada setiap state tersebut nantinya kita bisa memberikan instruksi sesuai yang kita inginkan. Sebelum kita mengolahnya, mari pahami terlebih dahulu semua state yang ada pada sebuah job.
  • New
    Keadaan di mana sebuah job telah diinisialisasi namun belum pernah dijalankan.
  • Active
    Sebuah job akan memiliki status aktif ketika ia sedang berjalan. Dalam hal ini, job yang sedang ditangguhkan (suspended job) juga termasuk ke dalam job yang aktif.
  • Completed
    Ketika job sudah tidak berjalan lagi. Ini berlaku untuk job yang berakhir secara normal, dibatalkan, ataupun karena suatu pengecualian.
  • Cancelling
    Suatu kondisi ketika fungsi cancel() dipanggil pada job yang sedang aktif dan memerlukan waktu untuk pembatalan tersebut selesai.
  • Cancelled
    Keadaan yang dimiliki oleh sebuah job yang sudah berhasil dibatalkan. Perlu diketahui bahwa job yang dibatalkan juga dapat dianggap sebagai Completed job

Membuat Job Baru

Job dapat diinisialisasikan menggunakan fungsi launch() maupun Job() seperti berikut:

  1. //menggunakan launch():

  2. fun main() = runBlocking {

  3.     val job = launch {

  4.         // Do background task here

  5.     }

  6. }

  7.  

  8. //menggunakan Job():

  9. fun main() = runBlocking {

  10.     val job = Job()

  11. }


Setelah diinisialisasikan, job akan memiliki state New dan akan langsung dijalankan. Jika Anda ingin membuat sebuah job tanpa langsung menjalankannya, Anda bisa memanfaatkan CoroutineStart.LAZY seperti berikut:

  1. fun main() = runBlocking {

  2.     val job = launch(start = CoroutineStart.LAZY) {

  3.         TODO("Not implemented yet!")

  4.     }

  5. }


Dengan begitu job tersebut bisa dijalankan saat nanti dibutuhkan.

Menjalankan Job

Setelah membuat sebuah job, kini kita bisa mulai menjalankan job tersebut. Caranya pun cukup sederhana, kita bisa menggunakan fungsi start() seperti berikut:

  1. fun main() = runBlocking {

  2.     val job = launch(start = CoroutineStart.LAZY) {

  3.         delay(1000L)

  4.         println("Start new job!")

  5.     }

  6.  

  7.     job.start()

  8.     println("Other task")

  9. }


Atau bisa juga dengan menggunakan fungsi join():

  1. fun main() = runBlocking {

  2.     val job = launch(start = CoroutineStart.LAZY) {

  3.         delay(1000L)

  4.         println("Start new job!")

  5.     }

  6.  

  7.     job.join()

  8.     println("Other task")

  9. }


Perbedaan dari keduanya adalah bahwa yang start() akan memulai job tanpa harus menunggu job tersebut selesai, sedangkan join() akan menunda eksekusi sampai job selesai. Jika kode pertama dijalankan, maka konsol akan menampilkan hasil berikut:
Other task
Start new job!
Sedangkan kode kedua akan menampilkan hasil:
Start new job!
Other task
Setelah dijalankan,  job akan memiliki state Active

Membatalkan Job

Ibarat pekerjaan di dunia nyata, sebuah job seharusnya bisa dibatalkan. Hanya job yang sedang aktif yang dapat dibatalkan. Anda bisa melakukannya dengan memanggil fungsi cancel() seperti berikut:

  1. fun main() = runBlocking {

  2.     val job = launch {

  3.         delay(5000)

  4.         println("Start new job!")

  5.     }

  6.  

  7.     delay(2000)

  8.     job.cancel()

  9.     println("Cancelling job...")

  10.     if (job.isCancelled){

  11.         println("Job is cancelled")

  12.     }

  13. }


Kode di atas menggambarkan sebuah job membutuhkan waktu 5 detik untuk dijalankan. Namun ketika mencapai waktu 2 detik, job tersebut telah dibatalkan. Saat fungsi cancel() dipanggil, job akan memasuki state Cancelling sampai pembatalan tersebut berhasil. Kemudian setelah pembatalan berhasil, job akan memiliki state Cancelled dan Completed.
Perlu diketahui bahwa jika cancel() dipanggil dalam job baru yang belum dijalankan, job tersebut tidak akan melalui state Cancelling, melainkan akan langsung memasuki state Cancelled.
Kita juga bisa menambahkan parameter terhadap fungsi cancel(), yaitu parameter cause yang bisa digunakan untuk memberitahu kenapa sebuah job dibatalkan.

  1. job.cancel(cause = CancellationException("Time is up!"))


CancellationException akan mengirimkan nilainya sebagai pengecualian dari job tersebut. Kita pun bisa mengakses nilai tersebut dengan fungsi getCancellationException. Karena getCancellationException masih tahap eksperimen, Anda perlu menambahkan anotasi @InternalCoroutinesApi. Cobalah modifikasi dan jalankan kode Anda:

  1. @InternalCoroutinesApi

  2. fun main() = runBlocking {

  3.     val job = launch {

  4.         delay(5000)

  5.         println("Start new job!")

  6.     }

  7.  

  8.     delay(2000)

  9.     job.cancel(cause = CancellationException("time is up!"))

  10.     println("Cancelling job...")

  11.     if (job.isCancelled){

  12.         println("Job is cancelled because ${job.getCancellationException().message}")

  13.     }

  14. }


Konsol akan menampilkan hasil berikut:
Cancelling job...
Job is cancelled because time is up!

Deferred

Seperti yang sudah disampaikan sebelumnya di bagian coroutines builder,  fungsi async akan mengembalikan nilai deferred yang berupa hasil atau exception. Deferred adalah nilai tangguhan yang dihasilkan dari proses coroutines. Nilai ini nantinya bisa kita kelola sesuai dengan kebutuhan. 
Deferred dapat kita ciptakan secara manual. Meskipun begitu, dalam praktiknya, jarang kita membuat deferred secara manual. Biasanya kita hanya bekerja dengan deferred yang dihasilkan oleh async.
Deferred juga memiliki life cycle yang sama dengan job. Perbedaanya hanyalah pada tipe hasil yang diberikan. Selain memberikan hasil ketika proses komputasi sukses, ia juga bisa memberikan hasil saat proses tersebut gagal. Hasil dari deferred tersedia ketika mencapai state completed dan dapat diakses dengan fungsi await. Deferred akan mengirimkan pengecualian jika ia telah gagal. Kita bisa mengakses nilai pengecualian tersebut dengan fungsi getCompletionExceptionOrNull.
Pada dasarnya, nilai deferred juga merupakan sebuah job. Ia diciptakan dan dimulai pada saat coroutines mencapai state active. Bagaimanapun, fungsi async juga memiliki opsional parameter seperti CoroutineStart.LAZY untuk memulainya. Dengan begitu, deferred juga bisa diaktifkan saat fungsi startjoin, atau await dipanggil.
Di modul sebelumnya kita sudah membahas kode berikut ini:

  1. import kotlinx.coroutines.*

  2.  

  3. fun main() = runBlocking {

  4.     val capital = async { getCapital() }

  5.     val income = async { getIncome() }

  6.     println("Your profit is ${income.await() - capital.await()}")

  7. }


capital dan income adalah contoh dari nilai deferred yang untuk mengaksesnya kita membutuhkan fungsi await

Coroutine Dispatcher

Seperti yang sudah kita ketahui, coroutines berjalan di atas sebuah thread. Tentunya kita harus mengetahui thread mana yang akan digunakan untuk menjalankan dan melanjutkan sebuah coroutine. Untuk menentukannya kita membutuhkan sebuah base class bernama CoroutineDispatcher. Di dalam kelas tersebut kita akan menemukan beberapa objek yang nantinya bisa digunakan untuk menentukan thread yang berfungsi menjalankan coroutines.
  • Dispatcher.Default
    Merupakan dispatcher dasar yang digunakan oleh semua standard builders seperti launchasync, dll jika tidak ada dispatcher lain yang ditentukan. Dispatcher.Default menggunakan kumpulan thread yang ada pada JVM. Pada dasarnya, jumlah maksimal thread yang digunakan adalah sama dengan jumlah core dari CPU.
    Untuk menggunakannya, Anda cukup menggunakan coroutines builder tanpa harus menuliskan dispatcher secara spesifik:

    1. launch {

    2.     // TODO: Implement suspending lambda here

    3. }


    Namun Anda juga tetap diperbolehkan untuk menuliskannya secara eksplisit:

    1. launch(Dispatcher.Default){

    2.     // TODO: Implement suspending lambda here

    3. }


  • Dispatcher.IOSebuah dispatcher yang dapat digunakan untuk membongkar pemblokiran operasi I/O. Ia akan menggunakan kumpulan thread yang dibuat berdasarkan permintaan. Anda bisa menerapkannya dengan menambahkan Dispatcher.IO pada coroutines builder:

    1. launch(Dispatcher.IO){

    2.     // TODO: Implement algorithm here

    3. }


  • Dispatcher.Unconfined
    Dispatcher ini akan menjalankan coroutines pada thread yang sedang berjalan sampai mencapai titik penangguhan. Setelah penangguhan, coroutines akan dilanjutkan pada thread dimana komputasi penangguhan yang dipanggil.
    Sebagai contoh, ketika fungsi a memanggil fungsi b, yang dijalankan dengan dispatcher dalam thread tertentu, fungsi a akan dilanjutkan dalam thread yang sama dengan fungsi b dijalankan. Perhatikan kode berikut:

    1. import kotlinx.coroutines.*

    2.  

    3. fun main() = runBlocking<Unit> {

    4.     launch(Dispatchers.Unconfined) {

    5.         println("Starting in ${Thread.currentThread().name}")

    6.         delay(1000)

    7.         println("Resuming in ${Thread.currentThread().name}")

    8.     }.start()

    9. }


    Jika dijalankan maka konsol akan menampilkan hasil berikut:
    Starting in mainResuming in kotlinx.coroutines.DefaultExecutorArtinya, coroutine telah dimulai dari main thread, kemudian tertunda oleh fungsi delay selama 1 detik. Setelah itu, coroutine dilanjutkan kembali pada thread DefaultExecutor.

Bersamaan dengan objek-objek tersebut, ada beberapa builder yang dapat digunakan untuk menentukan thread yang dibutuhkan:
  • Single Thread ContextDispatcher ini menjamin bahwa setiap saat coroutine akan dijalankan pada thread yang Anda tentukan. Untuk menerapkannya, Anda bisa memanfaatkan newSingleThreadContext() seperti kode dibawah ini:

    1. import kotlinx.coroutines.*

    2.  

    3. fun main() = runBlocking<Unit> {

    4.     val dispatcher = newSingleThreadContext("myThread")

    5.     launch(dispatcher) {

    6.         println("Starting in ${Thread.currentThread().name}")

    7.         delay(1000)

    8.         println("Resuming in ${Thread.currentThread().name}")

    9.     }.start()

    10. }


    Jalankan kode tersebut, seharusnya konsol akan menampilkan hasil berikut:
    Starting in myThreadResuming in myThread
    Walaupun sudah menjalankan fungsi delay, coroutine akan tetap berjalan pada myThread.
  • Thread PoolSebuah dispatcher yang memiliki kumpulan thread. Ia akan memulai dan melanjutkan coroutine di salah satu thread yang tersedia pada kumpulan tersebut. Runtime akan menentukan thread mana yang tersedia dan juga menentukan bagaimana proses distribusi bebannya.
    Anda bisa menerapkan thread pool dengan fungsi newFixedThreadPoolContext() seperti berikut:

    1. import kotlinx.coroutines.*

    2.  

    3. fun main() = runBlocking<Unit> {

    4.     val dispatcher = newFixedThreadPoolContext(3, "myPool")

    5.  

    6.     launch(dispatcher) {

    7.         println("Starting in ${Thread.currentThread().name}")

    8.         delay(1000)

    9.         println("Resuming in ${Thread.currentThread().name}")

    10.     }.start()

    11. }


    Pada kode di atas, kita telah menetapkan thread myPool sebanyak 3 thread. Runtime akan secara otomatis menentukan pada thread mana coroutine akan dijalankan dan dilanjutkan. Hasil dari kode tersebut adalah:
    Starting in myPool-1
    Resuming in myPool-2

Channels

Kita sudah belajar bagaimana membuat dan mengelola coroutines. Seperti kita ketahui, sebuah program dapat memiliki banyak thread dan dalam beberapa thread bisa terdapat jutaan coroutines. Lalu, bagaimana jika ada 2 (dua) coroutines yang saling ingin berinteraksi satu sama lain? Channels adalah jawabnya.
Beberapa masalah yang muncul pada concurrency seperti deadlock, race conditions, dan lainnya, sering kali dipicu oleh satu hal, apa itu? Rupanya problem pembagian memori atau sumber daya antar thread. Untuk mengatasinya, banyak programming language seperti GoDart, dan juga Kotlin telah menyediakan channels.
Channels adalah nilai deferred yang menyediakan cara mudah untuk mentransfer nilai tunggal antara coroutine. Pada dasarnya, channels sangat mirip dengan BlockingQueue. Namun, alih-alih memblokir sebuah thread, channels menangguhkan sebuah coroutine yang jauh lebih ringan. Untuk lebih memahaminya, mari simak kode di bawah ini:

  1. import kotlinx.coroutines.*

  2. import kotlinx.coroutines.channels.Channel

  3.  

  4. fun main() = runBlocking(CoroutineName("main")) {

  5.     val channel = Channel<Int>()

  6.     launch(CoroutineName("v1coroutine")){

  7.         println("Sending from ${Thread.currentThread().name}")

  8.         for (x in 1..5) channel.send(x * x)

  9.     }

  10.  

  11.     repeat(5) { println(channel.receive()) }

  12.     println("received in ${Thread.currentThread().name}")

  13. }


Kode di atas akan menghasilkan hasil berikut:
Sending from main @v1coroutine#2
1
4
9
16
25
received in main @main#1
Bisa dilihat bahwa pada coroutine v1coroutine bahwa channels telah mengirimkan nilai dari hasil komputasi dengan menggunakan fungsi send. Setelah itu, di coroutine lain (main) channel menerima nilai dengan menggunakan fungsi receive.
Kesimpulannya, channels memungkinkan komunikasi yang aman antar kode concurrent. Ia membuat kode concurrent dapat berkomunikasi dengan mengirim dan menerima pesan tanpa harus peduli di thread mana coroutine berjalan. 
Selengkapnya tentang channel Anda bisa mempelajarinya pada tautan ini

Posting Komentar