Project Reactor is an implementation of the Reactive Streams standard, which we learned about in one of the recent posts. Finally, we are ready to write our first publishers and subscribers. Let’s do it!
Mono and Flux
Mono and Flux are the implementations of the Publisher in a Project Reactor.
Mono represents 0 or 1 element, whereas Flux represents from 0 to N elements. Why we need Mono if Flux can contains 0 and 1 elements as well?
You can think about Mono in the same way like about simple, plain object, and Flux is an equivalent to List. In collections we have some methods like add, addAll, contains, get, etc. which we no need in a single object.
Flux provides some static methods to create an instance, so let’s use one of them.
1 |
Flux<Integer> firstPublisher = Flux.range(0,4).doOnNext(i -> System.out.println(i)); |
Flux.range create a Publisher which will emit four numbers: 0,1,2,3.
The chained doOnNext method will be called every time when our publisher will publish data, so in this case, we should get four numbers in a console.
Let’s run the program.
The result is surprising, cause nothing happens… As we mentioned in an introduction – in a Reactive Programming nothing happens until subscribe.
That’s the reason why we got an empty console.
Take a look at how to subscribe to our publisher.
1 2 3 4 5 |
Flux<Integer> firstPublisher = Flux.range(0, 4) .doOnNext(i -> System.out.println("Published value: " + i)); firstPublisher.subscribe(value -> System.out.println("Subscribed value:" + value)); |
The output should look like the following:
Published value: 0
Subscribed value: 0
Published value: 1
Subscribed value: 1
Published value: 2
Subscribed value: 2
Published value: 3
Subscribed value: 3
Congrats, you’ve just created your first publisher and subscriber!
The second way to create Flux is using ‘interval’ method which allows setting the time between every emits.
1 2 3 4 5 6 |
Flux<Long> secondFlux= Flux.interval(Duration.ofMillis(100)) .doOnNext(i -> log.info("{}", Thread.currentThread().getName())); secondFlux.subscribe(c -> log.info("subscriber 1: {}", c)); secondFlux.subscribe(c -> log.info("subscriber 2: {}", c)); |
In the above sample, I’ve used logger but you can use System.out.println as well.
The output is infinite and every 100 milliseconds, we got an increased number with info about current thread.
1 2 3 4 |
16:13:21.553 [parallel-1] INFO com.jvmfy.webflux.reactor.ReactorSamples - parallel-1 16:13:21.560 [parallel-2] INFO com.jvmfy.webflux.reactor.ReactorSamples - parallel-2 16:13:21.560 [parallel-2] INFO com.jvmfy.webflux.reactor.ReactorSamples - subscriber 2: 0 16:13:21.560 [parallel-1] INFO com.jvmfy.webflux.reactor.ReactorSamples - subscriber 1: 0 |
As you can notice, our flux publishes data on two threads called parallel-1 and parallel-2, but let’s back to the previous example and print threads names in the same way.
This time we’ve got a different situation.
1 |
16:32:15.481 [main] INFO com.jvmfy.webflux.reactor.ReactorSamples - main |
Our publisher is working in the main thread. Where is the difference? How does Project Reactor decide about threads?
Dive inside the above methods.
1 2 3 4 5 6 7 8 9 |
public static Flux<Integer> range(int start, int count) { if (count == 1) { return just(start); } if (count == 0) { return empty(); } return onAssembly(new FluxRange(start, count)); } |
1 2 3 |
public static Flux<Long> interval(Duration period) { return interval(period, Schedulers.parallel()); } |
Nothing interesting in the first method, but in the second we may notice some Schedulers usage. There is a secret.
Threads managing
Project Reactor allows managing threads by schedulers. As you can guess, Schedulers provide some static methods to create new instances.
1 2 3 4 |
Schedulers.elastic(); Schedulers.immediate(); Schedulers.parallel(); Schedulers.single(); |
You can use any of them, just call
The default strategy is Schedulers.imediate() which try to use the current thread. That’s mean, if you crate publisher on the main thread, the main thread will be used to emit data. The single() method will use one thread, but not necessarily the same, which create publisher od subscriber, the thread will be probably called single-1.
An elastic thread pool. It creates new worker pools as needed, and reuse idle ones. Worker pools that stay idle for too long (default is
The parallel approach creates as many workers as you have CPU cores.
Of course, you can still create your own Schedulers by calling one of the Schedulers.newXXX() methods. There you can set up numbers of threads, names, etc..
Summary
In this post, you learned how to create publishers and subscribers and how to manage threads what is super important in a Reactive World.
I highly recommend you play with Schedulers.
In the next post, we will create Hot Publishers and add some mapping. Then we will be ready to start our adventure with Spring WebFlux.
Leave a Reply