Pasang Iklan Atas 1

Down Menu

Drop Down MenusCSS Drop Down MenuPure CSS Dropdown Menu

Wednesday, March 27, 2019

RxJava Vs Reactor by Tomasz Nurkiewicz

Table of contents:

  1. API
  2. Type-safety
  3. Checked exceptions
  4. Testing
  5. Debugging
  6. Spring support
  7. Android development
  8. Maturity
  9. Summary

Banyak orang bertanya kepada saya, perpustakaan mana yang akan digunakan dalam proyek baru mereka (jika ada). Beberapa khawatir bahwa mereka belajar RxJava 1.x dan kemudian 2.x datang, dan Reaktor. Yang mana yang harus mereka gunakan? Dan beberapa juga bertanya-tanya, ada apa dengan java.util.concurrent.Flow baru ini? Izinkan saya mengklarifikasi beberapa hal. Pertama-tama, kedua versi RxJava dan Reactor sangat mirip dari perspektif fungsional. Jika Anda tahu 1.x atau 2.x, Reactor akan sangat akrab, meskipun Anda masih harus belajar tentang perbedaannya. Kedua, kelas Flow (satu set antarmuka, tepatnya) adalah bagian dari spesifikasi stream reaktif, yang dibundel ke dalam JDK. Spesifikasi ini menentukan bahwa berbagai perpustakaan reaktif harus berperilaku sopan dan berinteraksi satu sama lain dengan bersih. Namun spesifikasi lahir sebelum Java 9 dan pengenalan Flow, oleh karena itu perpustakaan didasarkan pada eksternal reactive-streams.jar, bukan JDK.

Dalam hal perbandingan RxJava / Reactor, ada beberapa perspektif. Biarkan saya cepat melewati beberapa perbedaan. Saya berasumsi Anda memiliki keakraban dengan kedua library ini.

API

Flowable dan Flux memiliki API yang sangat mirip. Jelas, keduanya mendukung operator dasar seperti map (), filter (), flatMap (), dan yang lebih maju. Perbedaan utama adalah versi target Java. RxJava 2.x masih harus mendukung Java 6 karena banyak digunakan di Android (baca nanti). Reactor, di sisi lain, menargetkan Java 8+. Oleh karena itu Reactor dapat memanfaatkan API modern (-ish, Java 8 berumur 5 tahun, pada saat penulisan) seperti java.time dan java.util.function. Jauh lebih aman untuk mengetik:

  1. import java.time.Duration;
  2. ...
  3. flux.window(Duration.ofSeconds(1));

sebagai lawan:

  1. import java.util.concurrent.TimeUnit;
  2. ...
  3. flowable.window(1, TimeUnit.SECONDS);

Melewati instance Duration tunggal lebih mudah dan lebih aman daripada integer. Reactor juga memiliki konversi langsung dari CompletableFuture, Opsional, java.util.stream.Stream, dll +1 untuk Reactor.

Type-safety


Tapi berbicara tentang keamanan jenis, saya benar-benar kehilangan tipe berbutir halus yang diperkenalkan di RxJava 1/2. Tabel HTML bernilai seribu div:



RxJava 2ReactorPurpose
CompletableN/ACompletes successfully or with failure, without emitting any value. Like CompletableFuture<Void>
Maybe<T>Mono<T>Completes successfully or with failure, may or may not emit a single value. Like an asynchronous Optional<T>
Single<T>N/AEither complete successfully emitting exactly one item or fails.
Observable<T>N/AEmits an indefinite number of events (zero to infinite), optionally completes successfully or with failure. Does not support backpressure due to the nature of the source of events it represents.
Flowable<T>Flux<T>Emits an indefinite number of events (zero to infinite), optionally completes successfully or with failure. Support backpressure (the source can be slowed down when the consumer cannot keep up)


Kurangnya beberapa jenis dalam Reactor tidak berarti itu tidak mendukung beberapa kasus penggunaan. Jika Anda membutuhkan Completable, Anda menggunakan Mono canggung <Void> (seperti operator Mono.then ()). Anda tahu bahwa beberapa operasi harus mengeluarkan nilai? Nasib buruk, Anda terjebak dengan Mono <T> dan orang-orang menjadi bingung - apakah selalu memancarkan nilai itu? Misalnya Flux.count ()). Atau mungkin Flux Anda tidak terlalu mendukung tekanan balik? Sayang sekali, Anda harus menggunakan abstraksi yang sama. Perbedaan antara Observable dan Flowable memberi Anda petunjuk, seperti apa kontrol aliran yang harus Anda harapkan.

Saya percaya bahwa kompilator selalu mengalahkan tes unit, dan yang terakhir selalu mengalahkan dokumentasi. (Anda mungkin tidak setuju dengan pernyataan sebelumnya.) Sebagai contoh, Flux.next (): "Hanya memancarkan item pertama yang dipancarkan" - sesuai dengan dokumentasi. Apa yang terjadi jika Flux kosong? Apakah saya akan mendapatkan Mono kosong atau Mono dengan NoSuchElementException? Keduanya perilaku yang valid dan waras ... Dalam RxJava 2 saya memiliki firstElement () mengembalikan Mungkin dan firstOrError () mengembalikan Single. Cukup mudah, belum lagi penamaannya kurang membingungkan. Flux.next (), apa artinya?

RxJava 2 juga memisahkan tipe yang dapat diamati dan yang dapat mengalir. Jika sumbernya secara inheren tidak terkendali, kita dapat menyatakannya dalam tipe-safe Observable. Beberapa operator tidak masuk akal atau tidak mungkin diterapkan pada Observable. Tidak apa-apa. Di sisi lain Flowable memiliki dukungan tekanan balik penuh, yang berarti dapat melambat. Saya dapat dengan mudah mengkonversi dari Flowable ke Observable, mengubah sebaliknya membutuhkan saya untuk berpikir. Apa yang harus saya lakukan ketika konsumen tidak dapat bersaing dengan produsen, tetapi produsen tidak dapat diperlambat? Kirim pesan ekstra? Buffer mereka sebentar? Dalam Reactor kedua jenis aliran diwakili oleh Flux (seperti dalam RxJava 1.x) sehingga Anda selalu dapat mengharapkan kesalahan karena tidak ada tekanan balik yang hilang. Dalam RxJava 2 ini menjadi sedikit kurang umum karena jaminan yang jelas.

+1 untuk RxJava, untuk API yang lebih aman.


Checked exceptions

Reactor menggunakan tipe fungsional standar dari JDK, seperti Function di API-nya. Itu keren. Tetapi efek samping kecil dari pilihan itu adalah penanganan canggung terhadap pengecualian yang diperiksa di dalam transformasi. Pertimbangkan kode berikut, yang tidak dikompilasi:


  1. Flux
  2. .just("java.math.BigDecimal", "java.time.Instant")
  3. .map(Class::forName)


Class.forName () melempar dicentang ClassNotFoundException, sayangnya, Anda tidak diizinkan untuk melempar pengecualian yang diperiksa dari java.util.function.Function. Di RxJava, di sisi lain, io.reactivex.functions.Fungsi bebas dari kendala seperti itu dan kode yang sama akan dikompilasi dengan baik. Apakah Anda suka mengecek pengecualian atau tidak, begitu Anda harus menghadapinya, RxJava membuat pengalaman itu lebih menyenangkan. +1 untuk RxJava, meskipun saya tidak menganggap ini sebagai keuntungan utama.


Testing



Kehadiran penjadwal di kedua perpustakaan tidak hanya memungkinkan kontrol konkurensi yang baik. Penjadwal juga memainkan peran penting dalam pengujian unit. Baik dalam Reactor dan RxJava Anda dapat mengganti scheduler berdasarkan jam dinding dengan yang didasarkan pada jam virtual buatan. Ini sangat berguna ketika Anda menguji bagaimana stream Anda berperilaku ketika waktu berlalu. Peristiwa berkala, batas waktu, penundaan - semua ini dapat diuji dengan andal. Jadi +1 untuk keduanya? Tidak juga, Reactor melangkah lebih jauh. Di RxJava Anda harus mengeksternalisasi konfigurasi setiap penjadwal tunggal sehingga Anda dapat menggantinya dalam unit test. Tidak buruk, Anda seharusnya tetap dieksternalisasi. Namun, dengan cepat menjadi berantakan ketika Anda harus melewati TestScheduler ke banyak tempat dalam kode produksi Anda. Di Reactor, di sisi lain, itu cukup untuk mengelilingi kode yang sedang diuji dan semua penjadwal yang mendasari secara otomatis diganti dengan yang virtual:

  1. StepVerifier
  2. .withVirtualTime(() ->
  3. Flux
  4. .never()
  5. .timeout(ofMillis(100))
  6. )
  7. .expectSubscription()
  8. .expectNoEvent(ofMillis(99))
  9. .thenAwait(ofMillis(1))
  10. .expectError(TimeoutException.class)
  11. .verify(ofSeconds(1));

Tes khusus ini memastikan batas waktu berfungsi seperti yang diharapkan. Tes ini sangat akurat * dan 100% dapat diprediksi. Tidak ada tidur atau sibuk menunggu hasilnya. Keuntungan lebih dari RxJava adalah tidak masalah seberapa rumit aliran Anda, semua penjadwal terhenti. Di RxJava Anda dapat menulis tes serupa, tetapi Anda harus memastikan semua penjadwal dalam kode yang diuji diganti dengan TestScheduler. Reactor dengan mudah menyuntikkan jam virtual melalui semua lapisan. +1 untuk Reaktor


Debugging

Reactor menambahkan permata debug yang luar biasa:

  1. Hooks.onOperatorDebug();

Garis kecil ini ditempatkan di awal aplikasi Anda akan melacak bagaimana sinyal mengalir melalui aliran Anda. Mari kita ambil contoh praktis. Bayangkan aliran berikut:

  1. import reactor.core.publisher.Flux;
  2. import reactor.core.publisher.Hooks;
  3. import reactor.core.publisher.Mono;
  4.  
  5. import java.io.File;
  6.  
  7. public class StackTest {
  8.  
  9. public static void main(String[] args) {
  10.  
  11. Mono<Long> totalTxtSize = Flux
  12. .just("/tmp", "/home", "/404")
  13. .map(File::new)
  14. .concatMap(file -> Flux.just(file.listFiles()))
  15. .filter(File::isFile)
  16. .filter(file -> file.getName().endsWith(".txt"))
  17. .map(File::length)
  18. .reduce(0L, Math::addExact);
  19.  
  20.  
  21. totalTxtSize.subscribe(System.out::println);
  22. }
  23.  
  24. }

Ia menemukan semua file .txt di bawah direktori / tmp, / home dan / 404 dan menghitung ukuran totalnya. Program gagal saat runtime dengan stack-trace yang samar dan panjang:

  1. java.lang.NullPointerException
  2. at reactor.core.publisher.Flux.fromArray(Flux.java:953)
  3. at reactor.core.publisher.Flux.just(Flux.java:1161)
  4. at com.nurkiewicz.StackTest.lambda$main$0(StackTest.java:16)
  5. at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:368)
  6. at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:244)
  7. at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
  8. at reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:126)
  9. at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:99)
  10. at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
  11. at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:229)
  12. at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
  13. at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
  14. at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
  15. at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
  16. at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121)
  17. at reactor.core.publisher.FluxFilter.subscribe(FluxFilter.java:49)
  18. at reactor.core.publisher.FluxFilter.subscribe(FluxFilter.java:53)
  19. at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62)
  20. at reactor.core.publisher.MonoReduceSeed.subscribe(MonoReduceSeed.java:65)
  21. at reactor.core.publisher.Mono.subscribe(Mono.java:3695)
  22. at reactor.core.publisher.Mono.subscribeWith(Mono.java:3801)
  23. at reactor.core.publisher.Mono.subscribe(Mono.java:3689)
  24. at reactor.core.publisher.Mono.subscribe(Mono.java:3656)
  25. at reactor.core.publisher.Mono.subscribe(Mono.java:3603)
  26. at com.nurkiewicz.StackTest.main(StackTest.java:23)

Jika Anda sedikit membersihkan tumpukan, Anda mungkin akan merasakan operator mana yang melihat NullPointerException yang terkenal:

  1. at ...Flux.fromArray()
  2. at ...Flux.just()
  3. at com.nurkiewicz.StackTest.lambda$main$0(StackTest.java:16)
  4. ...
  5. at ...FluxArray.subscribe()
  6. at ...FluxMapFuseable.subscribe()
  7. at ...FluxConcatMap.subscribe()
  8. at ...FluxFilter.subscribe()
  9. at ...FluxFilter.subscribe()
  10. at ...FluxMap.subscribe()
  11. at ...MonoReduceSeed.subscribe()
  12. ...
  13. at com.nurkiewicz.StackTest.main(StackTest.java:23)

Tetapi itu tidak banyak membantu dan sebagian besar tumpukan jejak menunjuk ke kode sumber Reactor (Anda tidak ingin pergi ke sana). Jauh lebih nyaman untuk melihat di mana kata operator dinyatakan dalam kode kita sendiri. Inilah yang ditunjukkan oleh Hooks.onOperatorDebug () di sebelah jejak stack yang disebutkan di atas:

  1. Assembly trace from producer [reactor.core.publisher.FluxConcatMap] :
  2. reactor.core.publisher.Flux.concatMap(Flux.java:3425)
  3. com.nurkiewicz.StackTest.main(StackTest.java:17)
  4. Error has been observed by the following operator(s):
  5. |_ Flux.concatMap com.nurkiewicz.StackTest.main(StackTest.java:17)
  6. |_ Flux.filter com.nurkiewicz.StackTest.main(StackTest.java:18)
  7. |_ Flux.filter com.nurkiewicz.StackTest.main(StackTest.java:19)
  8. |_ Flux.map com.nurkiewicz.StackTest.main(StackTest.java:20)
  9. |_ Flux.reduce com.nurkiewicz.StackTest.main(StackTest.java:21)

Paradoksnya, kami tidak tertarik di mana pengecualian dilemparkan pada saat runtime. Jawabannya hampir selalu: dalam keberanian Reactor. Kami lebih suka melihat bagaimana aliran yang salah dibangun. Reaktor tidak terkalahkan di sini. Debugging program reaktif sulit, sangat sulit. Operator ini membuatnya sedikit lebih mudah. Ngomong-ngomong, Anda tahu apa sumber NullPointerException? Dari JavaDoc dari File.listFiles ():

Mengembalikan nol jika [...] terjadi kesalahan I / O.

Pengembalian ... null ... jika ... kesalahan ... terjadi. Pada abad XXI.

Sudahlah, hapus +1 untuk Reactor.

Spring support

Anda bebas menggunakan Reactor tanpa kerangka Pegas. Anda juga dapat menggunakan kerangka kerja Spring tanpa Reactor. Tetapi kebetulan bahwa mereka mengintegrasikan sangat erat dan Spring WebFlux (ingat nama) menggunakan Mono dan Flux secara luas. Misalnya, Anda dapat mengembalikan Mono langsung dari pengontrol dan berperilaku seperti yang baik 'DeferredResult. Setelah Anda menempatkan RxJava di CLASSPATH Anda, Spring juga terintegrasi dengannya. Kerangka kerja web reaktif dan Data Musim Semi reaktif. Namun, mengapa Anda menambahkan ketergantungan lain jika Reactor sudah ada di sana? Sepertinya Spring tidak mendiskriminasi RxJava dengan cara apa pun. Hanya saja Reactor tampak lebih alami dan terintegrasi. +1 untuk Reaktor.

Android development

RxJava sangat populer di kalangan pengembang Android. Ini memecahkan dua masalah dengan sangat bersih:

  • menghindari panggilan balik neraka dengan memodelkan peristiwa UI sebagai aliran
  • mudah berpindah-pindah antar utas, terutama memastikan I / O tidak terjadi pada utas UI

Itulah salah satu alasan mengapa RxJava masih menargetkan versi Java yang lebih lama. Ini mungkin berubah di masa depan tetapi pada saat penulisan, RxJava adalah satu-satunya pilihan untuk pengembang Android. Dan itu adalah perpustakaan yang kuat sehingga saya tidak berpikir mereka akan kehilangan banyak hal dari Reactor. +1 untuk RxJava

Maturity

RxJava jauh lebih matang dan mapan di pasar (lihat: Android). Juga, ada banyak proyek independen yang memilih RxJava sebagai API mereka, misalnya, driver Couchbase resmi. Itu juga kasus untuk MongoDB, tetapi mereka pindah dari driver RcJava ke driver stream reaktif yang lebih umum yang kompatibel dengan RxJava dan Reactor. Hal yang sama berlaku untuk RxNetty, adiknya, reaktor-netty. Jumlah buku RxJava juga sangat melebihi yang ada di Reactor. Jadi, untuk saat ini, +1 untuk RxJava, tetapi ini kemungkinan besar akan berubah dalam beberapa bulan mendatang.

Summary

  • Tetap berpegang pada perpustakaan mana pun yang sudah Anda miliki di CLASSPATH Anda.
  • Jika Anda mendapatkan pilihan, Reactor lebih disukai, RxJava 2.x masih merupakan alternatif yang baik
  • Jika Anda menggunakan Android, maka RxJava 2.x adalah satu-satunya pilihan Anda
Saya tidak mengantisipasi itu, tetapi ternyata kami memiliki dasi. Namun, melihat ke depan, Reactor jelas lebih menjanjikan. Kinerjanya tampak lebih baik, pengembangan lebih aktif dan didukung oleh pemain yang lebih besar (Pivotal). Pustaka ini sangat mirip (setidaknya dari perspektif API) tetapi jika Anda mendapatkan pilihan, Reactor mungkin akan melayani Anda dengan lebih baik.

* untuk beberapa alasan tes ini gagal ketika saya mengubah batas waktu (100) menjadi 98 atau 101. Tetapi masih berhasil untuk 99 (?)