Project Reactor – hot publishers and backpressure

In a recent post, we learned how to create publishers like Mono and Flux, we already know how to manage threads. Now we can learn about hot publishers.

Hot Publishers

As you already know, we can create publishers which will publish when any subscriber exists, but what if we want to publish current data even nobody subscribes to our publisher?

First of all, we can create Connectable Flux, there’s nothing fancy. In our standard Flux, we just add .publish() at the and, then we obtain Connectable Flux. This kind of Flux will start to emit when we call .connect() method, that’s it. In this case, we want to decide when to start to emit, but on the other hand, we can use Auto-connectable Flux, it means, that after creation, our Flux will connect itself. Let’s see an example.

Check this out, and subscribe:

Result:

As we expected, we’ve got numbers: 2 and 3 in the first subscription, and only the last number in the second one. Simple as that!

Replay method

In the above case, we’ve got only newly published items, but if we want to get also the last one, or more, we can use .replay() method. Replay method gets a number of items which we want to repeat. Let’s update our example.

Result:

As we expected, we have one more value in our subscribers. Replay method, allows us to send N elements created before subscription. Nice!

AutoConnect method

Sometimes you don’t want to start to emit explicitly. In some cases, we want to start to publish e.g. when more than two subscriptions exist. In this case we can use .autoConnect() method. This method returns plain Flux objects.
You can pass as parameter how many subscribers you need to starts to emit. Let’s modify our example.

As you suppose, we will get every four elements in every subscriber, because our publishers start to emit just after the second subscription.

What if we change .autoConnect() param to 3? Of course, nothing, because we have only two subscribers, so our publisher doesn’t even start to emit.

If you don’t pass any argument to .autoConnect() method, then this is equivalent to .autoConnect(1), so publisher starts to emit when any subscriber subscribes.

Backpressure

As you already know from the previous post the backpressure mechanism is very important and allows to communicate subscriber with the publisher. E.g. we can limit the amount of incoming data. Let’s take a look for example.

The .log() method allows controlling the flow a little bit. The current actions will be displayed in logs.
In the .onSubscribe() method we declare what we want to do in the beginning, so we want to only 3 elements from the publisher. We call the method .request(3). In the .onNext() method we decide what we want to do when the next element will be delivered. In this case, we check if we finish the process the recent request (3 elements). If so, we request another 3 elements, otherwise, we do nothing, just increment our counter.
Next two methods are obvious. We can handle errors and complete event, nothing fancy. Let’s see the result.

Summary

In this article, we’ve learned how to use hot publishers and backpressure mechanism. It’s a big step further. I hope you can feel the power of these features. Finally, we are ready to use it in the Spring application. Stay tuned!

GitHub with examples.

Leave a Reply

jvmfy