In a recent post, we learned how to create publishers like Mono and Flux, we already know how to manage threads. Now we can learn about hot publishers.
Hot Publishers
As you already know, we can create publishers which will publish when any subscriber exists, but what if we want to publish current data even nobody subscribes to our publisher?
First of all, we can create Connectable Flux, there’s nothing fancy. In our standard Flux, we just add .publish() at the and, then we obtain Connectable Flux. This kind of Flux will start to emit when we call .connect() method, that’s it. In this case, we want to decide when to start to emit, but on the other hand, we can use Auto-connectable Flux, it means, that after creation, our Flux will connect itself. Let’s see an example.
1 2 3 |
ConnectableFlux<Integer> connectableFlux = Flux.range(0,4).delayElements(Duration.ofMillis(100)).publish(); // create connectable flux connectableFlux.connect(); //start to emit, even there is no subsribers |
Check this out, and subscribe:
1 2 3 4 5 |
Thread.sleep(200); //wait 200ms before subscription, numbers 0 and 1 should be already emmited connectableFlux.subscribe(i -> log.info("subscriber1: {}", i)); Thread.sleep(150); //wait next 150ms and subscribe once more connectableFlux.subscribe(i -> log.info("subscriber2{}", i)); |
Result:
1 2 3 |
22:07:12.668 [parallel-3] INFO com.jvmfy.webflux.reactor.ReactorSamples - subscriber1: 2 22:07:12.771 [parallel-4] INFO com.jvmfy.webflux.reactor.ReactorSamples - subscriber1: 3 22:07:12.771 [parallel-4] INFO com.jvmfy.webflux.reactor.ReactorSamples - subscriber2: 3 |
As we expected, we’ve got numbers: 2 and 3 in the first subscription, and only the last number in the second one. Simple as that!
Replay method
In the above case, we’ve got only newly published items, but if we want to get also the last one, or more, we can use .replay() method. Replay method gets a number of items which we want to repeat. Let’s update our example.
1 |
ConnectableFlux<Integer> connectableFlux = Flux.range(0, 4).delayElements(Duration.ofMillis(100)).replay(1); //instead of publish, we use replay, we want to send the latest item before subscription as well |
Result:
1 2 3 4 5 |
22:23:23.480 [main] INFO com.jvmfy.webflux.reactor.ReactorSamples - subscriber1: 1<br> 22:23:23.574 [parallel-3] INFO com.jvmfy.webflux.reactor.ReactorSamples - subscriber1: 2<br> 22:23:23.632 [main] INFO com.jvmfy.webflux.reactor.ReactorSamples - subscriber2: 2<br> 22:23:23.675 [parallel-4] INFO com.jvmfy.webflux.reactor.ReactorSamples - subscriber1: 3<br> 22:23:23.675 [parallel-4] INFO com.jvmfy.webflux.reactor.ReactorSamples - subscriber2: 3 |
As we expected, we have one more value in our subscribers. Replay method, allows us to send N elements created before subscription. Nice!
AutoConnect method
Sometimes you don’t want to start to emit explicitly. In some cases, we want to start to publish e.g. when more than two subscriptions exist. In this case we can use .autoConnect() method. This method returns plain Flux objects.
You can pass as parameter how many subscribers you need to starts to emit. Let’s modify our example.
1 2 3 4 5 6 |
Flux<Integer> connectableFlux = Flux.range(0, 4).delayElements(Duration.ofMillis(100)).publish().autoConnect(2); Thread.sleep(200); connectableFlux.subscribe(i -> log.info("subscriber1: {}", i)); Thread.sleep(150); connectableFlux.subscribe(c -> log.info("subscriber2: {}", c)); |
As you suppose, we will get every four elements in every subscriber, because our publishers start to emit just after the second subscription.
1 2 3 4 5 6 7 8 |
15:13:25.850 [parallel-1] INFO com.jvmfy.webflux.reactor.ReactorSamples - subscriber1: 0 15:13:25.857 [parallel-1] INFO com.jvmfy.webflux.reactor.ReactorSamples - subscriber2: 0 15:13:25.959 [parallel-2] INFO com.jvmfy.webflux.reactor.ReactorSamples - subscriber1: 1 15:13:25.959 [parallel-2] INFO com.jvmfy.webflux.reactor.ReactorSamples - subscriber2: 1 15:13:26.063 [parallel-3] INFO com.jvmfy.webflux.reactor.ReactorSamples - subscriber1: 2 15:13:26.063 [parallel-3] INFO com.jvmfy.webflux.reactor.ReactorSamples - subscriber2: 2 15:13:26.167 [parallel-4] INFO com.jvmfy.webflux.reactor.ReactorSamples - subscriber1: 3 15:13:26.168 [parallel-4] INFO com.jvmfy.webflux.reactor.ReactorSamples - subscriber2: 3 |
What if we change .autoConnect() param to 3? Of course, nothing, because we have only two subscribers, so our publisher doesn’t even start to emit.
If you don’t pass any argument to .autoConnect() method, then this is equivalent to .autoConnect(1), so publisher starts to emit when any subscriber subscribes.
Backpressure
As you already know from the previous post the backpressure mechanism is very important and allows to communicate subscriber with the publisher. E.g. we can limit the amount of incoming data. Let’s take a look for example.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
Flux<Integer> flux = Flux.range(0, 20) .log() .delaySubscription(Duration.ofSeconds(3)) .delaySequence(Duration.ofSeconds(3)); flux.subscribe(new Subscriber<Integer>() { private Subscription subscription; private int dataCounter; @Override public void onSubscribe(Subscription s) { this.subscription = s; s.request(3); } @Override public void onNext(Integer value) { this.dataCounter++; if (this.dataCounter % 3 == 0) { this.subscription.request(3); } } @Override public void onError(Throwable t) { log.error("Cannot obtain value cause: {}", t); } @Override public void onComplete() { log.info("Completed"); } }); |
The .log() method allows controlling the flow a little bit. The current actions will be displayed in logs.
In the .onSubscribe() method we declare what we want to do in the beginning, so we want to only 3 elements from the publisher. We call the method .request(3). In the .onNext() method we decide what we want to do when the next element will be delivered. In this case, we check if we finish the process the recent request (3 elements). If so, we request another 3 elements, otherwise, we do nothing, just increment our counter.
Next two methods are obvious. We can handle errors and complete event, nothing fancy. Let’s see the result.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
15:34:47.013 [parallel-1] INFO reactor.Flux.Range.1 - | request(3) 15:34:47.014 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(0) 15:34:47.022 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(1) 15:34:47.022 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(2) 15:34:50.024 [parallel-2] INFO reactor.Flux.Range.1 - | request(3) 15:34:50.024 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(3) 15:34:50.024 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(4) 15:34:50.024 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(5) 15:34:53.026 [parallel-2] INFO reactor.Flux.Range.1 - | request(3) 15:34:53.026 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(6) 15:34:53.026 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(7) 15:34:53.026 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(8) 15:34:56.027 [parallel-2] INFO reactor.Flux.Range.1 - | request(3) 15:34:56.027 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(9) 15:34:56.027 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(10) 15:34:56.027 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(11) 15:34:59.027 [parallel-2] INFO reactor.Flux.Range.1 - | request(3) 15:34:59.027 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(12) 15:34:59.027 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(13) 15:34:59.028 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(14) 15:35:02.029 [parallel-2] INFO reactor.Flux.Range.1 - | request(3) 15:35:02.029 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(15) 15:35:02.030 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(16) 15:35:02.030 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(17) 15:35:05.031 [parallel-2] INFO reactor.Flux.Range.1 - | request(3) 15:35:05.031 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(18) 15:35:05.031 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(19) 15:35:05.032 [parallel-2] INFO reactor.Flux.Range.1 - | onComplete() |
Summary
In this article, we’ve learned how to use hot publishers and backpressure mechanism. It’s a big step further. I hope you can feel the power of these features. Finally, we are ready to use it in the Spring application. Stay tuned!
GitHub with examples.
Leave a Reply