Reactive Programming in Java

Nil Seri
9 min readFeb 16, 2024

--

Introduction to Reactive Manifesto, Reactive Streams, Project Reactor and Spring WebFlux

Photo by Johannes Plenio on Unsplash

Reactive Manifesto

Reactive systems: Systems that are responsive, resilient, elastic and message-driven. They are flexible, loosely-coupled and scalable. With these, they can respond to fast evolving, modern day requirements.

You can read the Reactive Manifesto here.

Asynchrony Options in Java — Previously

  • Callbacks: Asynchronous methods that accept a callback as a parameter and invokes it when the blocking call completes.
    In large systems -> callback hell
  • Future (came with Java 5) provided async code but it was not easy to combine response from multiple Futures.
  • CompletableFuture (came with Java 8) provides async code in functional style. It is easy to compose/combine multiple Futures. The disadvantage is that it cannot handle infinite values.

Reactive Streams

Handling streams of data — especially “live” data whose volume is not predetermined — requires special care in an asynchronous system. We need to make sure that a fast data source does not overwhelm the stream destination, by passing elements on to another thread or thread-pool.

Reactive Streams is a standard and specification for stream-oriented libraries for the JVM that

  • process a potentially unbounded number of elements
  • in sequence,
  • asynchronously passing elements between components,
  • with mandatory non-blocking backpressure (flow control).

Backpressure: allows the queues which mediate between threads to be bounded for a fully non-blocking and asynchronous behavior.

With backpressure in place, Reactive Streams corresponds to a combination of Push-Pull based data flow model.

Data flow of the Push/Pull Modes between the Publisher and Subscribers

Strategies for Backpressure

Buffer — buffer the deficit data and process it later when the server has capacity.

Drop — not processing events; data sampling combined with buffering to achieve less data loss may be a better option instead.

Control —the best option; control the data that is being produced and sent to the consumer in both push and pull-based streams.

Reactive Streams are only concerned with mediating the stream of data between different API Components. These are Publisher, Subscriber, Subscription and Processor.

A Publisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s). A Processor represents a processing stage — which is both a Subscriber and a Publisher and must obey the contracts of both.

Reactive Streams in Java

  • Publisher interface has 1 method called “subscribe(Subscriber <? super T> s)”. It represents datasource (database, remote server)
  • Subscriber interface has 4 methods; “onSubscribe(Subscription s)”, “onNext(T t)”, “onError(Throwable t)”, “onComplete()”. onNext — how data is sent to the caller from data source; onComplete — how datasource notifies the app that there’s no more data. Exceptions are handled similarly to data, implying that the exception will be dispatched to the Subscriber as an event (via onError method). You can handle the error gracefully in onError method. If not handled, the reactive stream will be terminated afterwards.
  • Subscription interface has 2 methods; “request(long n)” and “cancel()”. request — the app requesting for data; cancel — once the app decides it does not require any more data.
    Subscription is the one which connects the app to the datasource.
  • Processor interface has no methods, it extends Publisher and Subscriber. It is rarely used.
Success Scenario
Error/Exception Scenario

Project Reactor

Reactor is a fourth-generation reactive library, based on the Reactive Streams specification, for building non-blocking applications on the JVM.

  • It directly interacts with Java’s functional API; CompletableFuture, Stream and Duration.
  • It offers two reactive and composable APIs which implement Reactive Streams spec; Flux [N] — a reactive type to represent 0 to N elements and Mono [0|1] — a reactive type to represent 0 to 1 elements (retrieves one single data at most).
  • It offers backpressure-ready network engines for HTTP (including web sockets), TCP, and UDP.

Spring WebFlux

Spring WebFlux is the new module introduced in Spring 5. Spring WebFlux is the first step towards reactive programming model in Spring Framework.

Project Reactor is the foundation of the reactive stack in the Spring ecosystem and is featured in projects such as Spring WebFlux, Spring Data, and Spring Cloud Gateway.

https://spring.io/reactive

Netty:
To handle requests using non-blocking style at server side, Netty is used. Netty is a non-blocking server that uses event loop model. Spring Webflux uses Netty and Project Reactor for building non-blocking/reactive APIs.

Reactive libraries take care of releasing the server thread and handle this whole execution in a totally different thread abstraction and sends a response once the operation completes.

Every time a client makes a request, a Channel is created.
Channel: represents an open connection between the server and the current request; response is sent via the Channel.

Channel has ChannelHandler:
- accepting the client connection
- reading the data as bytes from the network to a Java object — transformation
- writing data back to the client.

EventLoop: Netty uses EventLoop model to handle the connections in a non-blocking fashion.
- powered by 1 single thread (NodeJS uses the same pattern — it has just 1 thread / 1 event loop to handle client requests).

NodeJS Server

- number of EventLoops to handle the request = number of cores on your machine
- processes in order
- EventLoops are part of the EventLoopGroup.

How are Channel and EventLoop Linked?
- Any time a Channel is created, it gets assigned to an EventLoop.
- EventLoop is responsible for handling different events that occur in the lifetime of a channel.

Channel Lifecycle — executed by EventLoop
1. ChannelUnregistered: Channel is created but not registered with EventLoop yet.
2. ChannelRegistered: Channel is registered with EventLoop.
3. ChannelActive: Channel is active, now it is possible to send/receive data.
4. ChannelInactive: Channel is not connected to the client anymore, it is ready to be closed.

How Does Netty Handle the Request?
Netty has 2 EventLoop groups.
1 — to accept the connection and create channels.
2 — to handle the channel — all the created channels are handed over to the second (read/write/close…)

Netty Threading Model

Simple examples with Flux and Mono

Project Dependencies
The dependencies required in your project are:

<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${reactor-core.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-tools</artifactId>
<version>${reactor-tools.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>${reactor-test.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

Reactor tools are a set of tools to improve Project Reactor’s debugging and development experience.

Some Coding Examples:

Flux<String> fluxStream = Flux.just("Athos", "Porthos", "Aramis");
Mono<String> monoStream = Mono.just("hola"); // will return 1 element

Unless you subscribe to it, you cannot consume data:

fluxStream.subscribe(musketeer -> System.out.println(musketeer));
// Simple example: emit every second, starting immediately
Flux<Long> simpleFlux = Flux.interval(Duration.ofSeconds(1));
simpleFlux.subscribe(number -> System.out.println("Simple: Tick " + number));

If you append .log() method at the end of Flux or Mono methods, it prints logs about what has happened between subscriber and publisher via Slf4j logging framework.

Flux.just(A”, “B”, “C”).log() output

In request(unbounded), unbounded means requesting for “all the data”.

Testing Flux and Mono using StepVerifier

StepVerifier comes from Project Reactor’s reactor-test dependency.

Flux<String> fluxStream = Flux.just("Athos", "Porthos", "Aramis");
StepVerifier
.create(fluxStream)
.expectNext("Athos", "Porthos", "Aramis")
.verifyComplete();
Flux<String> fluxStream = Flux.just("Athos", "Porthos", "Aramis");
StepVerifier
.create(fluxStream)
.expectNextCount(3)
.verifyComplete();
Flux<String> fluxStream = Flux.just("Athos", "Porthos", "Aramis");
StepVerifier
.create(fluxStream)
.expectNext("Athos")
.expectNextCount(2)
.verifyComplete();

Transforming Data Using Operators

  • No operations occur until the stream is subscribed.
  • Chaining multiple functions = Pipeline in functional programming
  • Reactive streams are immutable.

map and filter:

// convert to uppercase
Flux<String> threeMusketeersStream = Flux.just("Athos", "Porthos", "Aramis");
Flux<String> uppercaseStream = threeMusketeersStream.map(str -> str.toUpperCase());
// filter even numbers
Flux<Integer> oneToTenStream = Flux.range(1, 10);
Flux<Integer> filteredStream = oneToTenStream.filter(num -> num % 2 == 0);

flatmap:

Function<String, Publisher<String>> toUpperCaseFunction = s -> Flux.just(s.toUpperCase().split(""));
Flux<String> threeMusketeersFlux = Flux.just("Athos", "Porthos", "Aramis");
Flux<String> outputFlux = threeMusketeersFlux.flatMap(toUpperCaseFunction);

map vs. flatmap:

  • map: 1–1 transformation.
    flatmap: 1-N transformation.
  • map: simple transformation from T to V.
    flatmap: more than transformation, subscribes to Flux/Mono to flatten it and send it downstream.
  • map: used for simple synchronous transformations.
    flatmap: used for asynchronous transformations.
  • map: no support for transformations that returns Publisher.
    flatmap: use it with transformations that returns Publisher.
  • If the transformation is going to return a Mono, then use flatMap — it always returns a Mono<T>.
  • Use flatmap if transformation involves making a REST API call or any kind of functionality that can be done asynchronously.

concatMap:

  • similar to flatMap
  • flatMap vs concatMap: preserves the ordering sequence — an asynchronous call but preserves the order; overall processing time is higher than flatMap

flatMapMany:

  • If a Mono is to be transformed to a Flux, then use flatMapMany().

transform:

  • used to transform from 1 type to another.
  • accepts “Function” Functional Interface (Java 8)
    Input — Publisher (Flux or Mono)
    Output — Publisher (Flux or Mono)
Flux<String> threeMusketeersStream = Flux.just("Athos", "Porthos", "Aramis");
Function<Flux<String>, Flux<String>> filterMap = name -> name.map(String::toUpperCase).filter(s -> s.length > 5);
Flux<String> outputFlux = threeMusketeersFlux.transform(filterMap);

defaultIfEmpty & switchIfEmpty:

  • data source may not emit data all the time.
  • if request(n) then onComplete() — or we can use these 2 operators to provide default values.
  • defaultIfEmpty accepts the actual type.
    switchIfEmpty accepts a Publisher (Flux or Mono).

concat & concatWith:

  • used to combine Flux and Mono
  • concatenation happens in a sequence — first one completes, second one is subscribed after that.
  • concat — static method in Flux
    concatWith — instance method in Flux and Mono.
  • Flux + Mono = Flux
    Mono + Mono = Flux
Flux<String> abcStream = Flux.just("a", "b", "c");
Flux<String> defStream = Flux.just("d", "e", "f");
// the output will be: a, b, c, d, e, f
Flux<Integer> concattedFlux = Flux.concat(abcStream, defStream);
Flux<Integer> concattedWithFlux = abcStream.concatWith(defStream);

merge & mergeWith:

  • both publishers are subscribed at the same time eagerly and in interleaved fashion.
  • concat was a sequential subscription.
  • merge — static method in Flux
    mergeWith — instance method in Flux and Mono.

mergeSequential:

  • used to combine 2 publishers (Flux) in to 1.
  • static method in Flux.
  • both publishers are subscribed at the same time and eagerly but merge happens in a sequence.

zip & zipWith:

  • zips multiple publishers into 1.
  • waits for all the publishers to emit 1 element and then move on to the next element.
  • continues until 1 of them sends onComplete().
  • zip — static method in Flux. can be used to zip up to 2–8 Flux or Mono publishers into 1.
    zipWith — instance method in Flux and Mono. can be used to zip 2 publishers into 1.
  • eagerly subscription.
Flux<String> abcStream = Flux.just("a", "b", "c");
Flux<String> defStream = Flux.just("d", "e", "f");
// the output will be: ab, cd, ef
Flux<Integer> zippedFlux = Flux.zip(abcStream, defStream, (a, b) -> a + b);
Flux<String> one = Flux.just("a", "b", "c");
Flux<String> two = Flux.just("d", "e", "f");
Flux<String> three = Flux.just("1", "2", "3");
Flux<String> four = Flux.just("4", "5", "6");
// the output will be: ad14, be25, cf36
Flux<Integer> zippedFlux =
Flux.zip(one, two, three, four) // Tuple4 (can be up to Tuple8)
.map(t4 -> t4.getT1() + t4.getT2() + t4.getT3() + t4.getT4());

Sinks

A sink is a class that allows safe manual triggering of signals. It can either be associated to a subscription (from inside an operator) or completely standalone.

Sinks.One & Sinks.Many:

Any time a subscriber is connected to it, to replay all events:

Sinks.Many<Integer> replaySink = Sinks.many().replay().all();

Sink Categories

  • many().multicast(): transmit only newly pushed data to its subscribers.
  • many().unicast(): data pushed before the first subscriber registers is buffered. It allows only a single subscriber.
  • many().replay(): replay a specified history size of pushed data to new subscribers then continue pushing new data live.
  • one(): will play a single element to its subscribers.
  • empty(): will play a terminal signal (Mono)

Happy Coding!

--

--

Nil Seri
Nil Seri

Written by Nil Seri

I would love to change the world, but they won’t give me the source code | coding 👩🏻‍💻 | coffee ☕️ | jazz 🎷 | anime 🐲 | books 📚 | drawing 🎨

No responses yet