{{announcement.body}}
{{announcement.title}}

The Good, the Bad, and the Ugly: Propagating Data Through Reactive Streams

DZone 's Guide to

The Good, the Bad, and the Ugly: Propagating Data Through Reactive Streams

Propagating data through reactive pipelines is a very common concern when building reactive applications.

· Java Zone ·
Free Resource

Image title

How to propagate data through reactive pipelines the Good, the Bad, and the Ugly way.

Propagating data through reactive pipelines is a very common development concern that arises when building reactive applications based on any Reactive Streams implementation (e.g. Project Reactor, RxJava, and Akka Streams).

You may also like: 5 Things to Know About Reactive Programming

We’ll be going through the Good, the Bad and the Ugly of propagating information downstream, using Project Reactor as our Reactive Streams implementation of choice.

NOTE: If you’re quite familiar with Project Reactor and reactive programming already, you can jump to my demo Spring Boot application on GitHub and dig through the source code; it’s quite straightforward!

The Bad

One of the most common solutions employed to solve the data propagation issue is the usage of local (effectively final) variables, which can either be used immediately in the scope of the current method or passed on as extra parameters to other methods.

The pros:

  • Quick and dirty;
  • That’s it…

The cons:

  • Encourages you to build longer methods in order to re-use the same local variable in multiple pipeline steps;
  • Alternatively, it pollutes your API by adding extra method parameters whenever you need to refactor the code into smaller pieces;
  • Your code becomes hard to maintain very quickly.

Check out the following example.

Controller snippet:

    @Autowired
    private PrefixingService service;

    @GetMapping(value = "/localvar/{base}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Mono<ResponseEntity<Flux<String>>> getMultiplesWithLocalVarPrefix(
            @PathVariable int base,
            @RequestParam int multiplier,
            Authentication authentication) {
        // saving the prefix as a local variable
        String prefix = DATA_REQUESTED_BY + authentication.getName();

        Flux<String> body = 
                // passing the prefix onto the next method, polluting its API
                service.doPrefix(prefix, getMultiplierFlux(base, multiplier))
                .onErrorReturn(IllegalArgumentException.class, ILLEGAL_ARGUMENT_MSG);

        return Mono.just(ResponseEntity.ok().body(body));
    }

    private Flux<Integer> getMultiplierFlux(int base, int multiplier) {
        return Flux
                .<Integer>create(sink -> {
                    if(base < 0 || multiplier < 0)
                        sink.error(new IllegalArgumentException());

                    for(int i = 1; i <= 10; i++)
                        sink.next(base * multiplier * i);

                    sink.complete();
                })
                .delayElements(ofSeconds(1));
    }


The controller uses local variables to pass data through reactive pipelines.

Furthermore, here is a polluted service snippet:

    // Data propagation via local variable polluted the method signature!
    public Flux<String> doPrefix(String prefix, Flux<Integer> toPrefix) {
        return toPrefix
                .map(data -> prefix + " " + data);
    }


This demonstrates the corresponding service with extra parameters in the method signature.

The Ugly

Another prevalent solution consists of the usage of Tuples, which aggregate multiple pieces of data together into a single object that gets propagated downstream and allows specific access to any and every component.

The pros:

  • No extra parameters are needed to propagate data downstream
  • No need to create our own aggregator POJOs
  • Good for propagating mutable data downstream
  • Tuples are a Project Reactor component; therefore, we must be doing things the Reactor way, am I right? 

The cons:

  • Methods signatures become quite long and are filled with generics declarations
  • The code required to handle tuples is definitely ugly
  • The code becomes hard to read at first glance.

Let's look at the following controller snippet:

    @Autowired
    private PrefixingService service;

    @GetMapping(value = "/tuples/{base}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Mono<ResponseEntity<Flux<String>>> getMultiplesWithTuplesPrefix(
            @PathVariable int base,
            @RequestParam int multiplier) {

        Flux<String> body = 
                service.doPrefixWithTuple(
                            getMultiplierFlux(base, multiplier)
                            .map(multiple -> Tuples.of(multiple, ReactiveSecurityContextHolder.getContext()))
                        )
                .onErrorReturn(IllegalArgumentException.class, ILLEGAL_ARGUMENT_MSG);

        return Mono.just(ResponseEntity.ok().body(body));
    }

    private Flux<Integer> getMultiplierFlux(int base, int multiplier) {
        return Flux
                .<Integer>create(sink -> {
                    if(base < 0 || multiplier < 0)
                        sink.error(new IllegalArgumentException());

                    for(int i = 1; i <= 10; i++)
                        sink.next(base * multiplier * i);

                    sink.complete();
                })
                .delayElements(ofSeconds(1));
    }



The controller uses tuples to pass data through reactive pipelines. Here is an ugly service code snippet:
    // Data propagation via Tuples alters the method signature and makes the code ugly really quickly!
    public Flux<String> doPrefixWithTuple(Flux<Tuple2<Integer, Mono<SecurityContext>>> toPrefix) {
        return toPrefix
                .flatMap(tuple -> tuple.getT2().map(securityContext -> Tuples.of(tuple.getT1(), securityContext.getAuthentication().getName())))
                .map(tuple -> DATA_REQUESTED_BY + tuple.getT2() + " " + tuple.getT1());
    }


The corresponding service with an ugly method signature, and even uglier code, can handle the tuple.

The Good

A way less common and often unknown solution consists of the usage of the Project Reactor’s context, a map-like structure that is automatically and transparently propagated throughout the whole reactive pipeline and can be easily accessed at any moment by calling the Mono.subscriberContext() static method.

The context can be populated at subscription time by adding either the subscriberContext(Function) or the subscriberContext(Context) method invocation at the end of your reactive pipeline.

It is an excellent solution for propagating static, technical data about the current process and dealing with cross-cutting concerns. Therefore, it should be used for things such as propagation of authentication contexts, static logging information, correlation ids, and transaction contexts.

The pros:

  • There are no extra parameters needed to propagate data downstream
  • The methods’ signatures are completely unscathed
  • A very elegant solution for dealing with cross-cutting concerns
  • We are still able to do things the Reactor way.

The cons:

  • It's not the best tool for the propagation of functional, highly mutable data;
  • It is a bit verbose when compared to alternatives.

Here's an example of a controller snippet:

    @Autowired
    private PrefixingService service;

    @GetMapping(value = "/{base}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Mono<ResponseEntity<Flux<String>>> getMultiples(
            @PathVariable int base,
            @RequestParam int multiplier) {

        Flux<String> body = 
                service.doPrefix(getMultiplierFlux(base, multiplier))
                .onErrorReturn(IllegalArgumentException.class, ILLEGAL_ARGUMENT_MSG)
                .subscriberContext(ctx -> ctx.put(PREFIX_KEY, getPrefix(ctx)));

        return Mono.just(ResponseEntity.ok().body(body));
    }

    private Mono<String> getPrefix(Context ctx) {
        return ctx.getOrDefault(SecurityContext.class, Mono.just(new SecurityContextImpl()))
                .map(securityCtx -> DATA_REQUESTED_BY + securityCtx.getAuthentication().getName());
    }

   private Flux<Integer> getMultiplierFlux(int base, int multiplier) {
        return Flux
                .<Integer>create(sink -> {
                    if(base < 0 || multiplier < 0)
                        sink.error(new IllegalArgumentException());

                    for(int i = 1; i <= 10; i++)
                        sink.next(base * multiplier * i);

                    sink.complete();
                })
                .delayElements(ofSeconds(1));
    }


A controller uses Project Reactor’s context to pass data through reactive pipelines

NOTE: In the snippet above, the Spring Security authentication context is retrieved from Project Reactor’s context since the latter is already filled with it by the Spring WebFlux Security module. Neat!

Here is an example of a clean service snippet:

    // Data propagation via Reactor Context is transparent, the method signature is unscathed!
    public Flux<String> doPrefix(Flux<Integer> toPrefix) {
        return toPrefix
                .flatMap(data -> 
                    Mono.subscriberContext()
                    .flatMap(ctx -> ctx.getOrDefault(PREFIX_KEY, Mono.just("")))
                    .map(prefix -> prefix + " " + data)
                );
    }


The corresponding service signature is unscathed as data is propagated transparently!

Wrapping Up

We went through three instances of the same — simple but definitely overcomplicated — example showing different approaches to data propagation in reactive pipelines.

There is no clear and absolute winner for every use case, but Project Reactor’s context certainly deserves an honorable mention for dealing with cross-cutting concerns in a way that’s both elegant and transparent.

References

For more detailed information about Project Reactor’s context, you can refer to the official Project Reactor documentation.

If you want to give a deeper look at some of the code shown above or try it out yourself, feel free to check out my demo project on GitHub.

Further Reading

5 Things to Know About Reactive Programming

Reactive Programming With Project Reactor

What Are Reactive Streams in Java?

Topics:
reactive programming ,reactive development ,reactive applications ,project reactor ,rxjava ,akka streams ,java ,spring boot ,spring ,spring security

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}