PROJECT REACTOR – FIRST STEPS

Project Reactor is an implementation of the Reactive Streams standard, which we learned about in one of the recent posts. Finally, we are ready to write our first publishers and subscribers. Let’s do it!

Mono and Flux

Mono and Flux are the implementations of the Publisher in a Project Reactor.
Mono represents 0 or 1 element, whereas Flux represents from 0 to N elements. Why we need Mono if Flux can contains 0 and 1 elements as well?
You can think about Mono in the same way like about simple, plain object, and Flux is an equivalent to List. In collections we have some methods like add, addAll, contains, get, etc. which we no need in a single object.

Flux provides some static methods to create an instance, so let’s use one of them.

Flux.range create a Publisher which will emit four numbers: 0,1,2,3.
The chained doOnNext method will be called every time when our publisher will publish data, so in this case, we should get four numbers in a console.
Let’s run the program.
The result is surprising, cause nothing happens… As we mentioned in an introduction – in a Reactive Programming nothing happens until subscribe.
That’s the reason why we got an empty console.
Take a look at how to subscribe to our publisher.

The output should look like the following:

Published value: 0
Subscribed value: 0
Published value: 1
Subscribed value: 1
Published value: 2
Subscribed value: 2
Published value: 3
Subscribed value: 3

Congrats, you’ve just created your first publisher and subscriber!

The second way to create Flux is using ‘interval’ method which allows setting the time between every emits.

In the above sample, I’ve used logger but you can use System.out.println as well.
The output is infinite and every 100 milliseconds, we got an increased number with info about current thread.

As you can notice, our flux publishes data on two threads called parallel-1 and parallel-2, but let’s back to the previous example and print threads names in the same way.
This time we’ve got a different situation.

Our publisher is working in the main thread. Where is the difference? How does Project Reactor decide about threads?
Dive inside the above methods.

Nothing interesting in the first method, but in the second we may notice some Schedulers usage. There is a secret.

Threads managing

Project Reactor allows managing threads by schedulers. As you can guess, Schedulers provide some static methods to create new instances.

You can use any of them, just call publishOn or subscribeOn on publisher or subscriber respectively. In this simple way, you can manage threads during publishing and subscribing.
The default strategy is Schedulers.imediate() which try to use the current thread. That’s mean, if you crate publisher on the main thread, the main thread will be used to emit data. The single() method will use one thread, but not necessarily the same, which create publisher od subscriber, the thread will be probably called single-1.
An elastic thread pool. It creates new worker pools as needed, and reuse idle ones. Worker pools that stay idle for too long (default is 60s) are disposed. This is a good choice for I/O blocking work for instance.
The parallel approach creates as many workers as you have CPU cores.
Of course, you can still create your own Schedulers by calling one of the Schedulers.newXXX() methods. There you can set up numbers of threads, names, etc..

Summary

In this post, you learned how to create publishers and subscribers and how to manage threads what is super important in a Reactive World.
I highly recommend you play with Schedulers.
In the next post, we will create Hot Publishers and add some mapping. Then we will be ready to start our adventure with Spring WebFlux.

Leave a Reply

jvmfy