Skip to main content

microprofile-reactive-streams-operators

View Github Project

Rationale

Reactive Streams is an integration SPI – it allows two different libraries that provide asynchronous streaming to be able to stream data to and from each other.

Reactive Streams is not however designed to be used directly by application developers. The semantics defined by Reactive Streams are very strict, and are non trivial, particularly in the areas of thread safety, to implement correctly. Typically, application developers are expected to use third party libraries that provide the tools necessary to manipulate and control streams. Examples include Akka StreamsRxJava and Reactor.

Depending on third party libraries for this essential application developer facing functionality however is not something that MicroProfile can do. MicroProfile specifications are not allowed to depend on anything other than the JDK and other MicroProfile specifications.

Hence, for MicroProfile to provide application developer APIs that use Reactive Streams, MicroProfile must provide its own Reactive Streams manipulation and control library. In future, it is hoped that this library will be adopted by the JDK itself, after a period of suitable incubation in the MicroProfile project. Some JDK maintainers have indicated that this would be a suitable path to get this type of functionality into the JDK.

Influences and History

The naming and scope of this API is inspired by the JDK8 java.util.stream API. The JDK8 stream API however does not capture all typical functionality that an application developer using Reactive Streams would need. For additional API naming and scope, Akka Streams, RxJava and Reactor have been used as an inspiration.

Implementations

MicroProfile Reactive Streams does not contain an implementation itself but only provides the specified API, a TCK and documentation.

The following Implementations are available:

Design

MicroProfile Reactive Streams offers a series of builders for creating instances of Reactive Streams PublisherSubscriber and ProcessorSubscriber’s are associated with a CompletionStage of a result of consuming the stream, this may be the result of a reduction on the elements, or in some cases just indicates the termination of the stream either as a success or with an error.

The API builds a graph from a series of graph stages, which are provided as an SPI. A Reactive Streams engine, which is implemented by implementations of the specification, and can also manually be provided by end users, is responsible for building the graph into a running stream.

Reactive Streams is available in JDK9 in the java.util.concurrent.Flow API, however, MicroProfile is not ready to require a baseline JDK version above 8. For this reason, the same interfaces provided by https://www.reactive-streams.org in the org.reactivestreams package are used instead. This does place a dependency from MicroProfile to a third party library, however, that third party library is nothing more than the four Reactive Streams interfaces (PublisherSubscriberSubscription and Processor), and these have been copied as is into JDK9. As an interim solution while JDK9 adoption catches up, this has been deemed an acceptable exception to the rule. The approach documented in MicroProfile Approach to Reactive for ensuring a smooth transition to the JDK9 APIs has been adopted.

Building

The whole MicroProfile Reactive Streams project can be built via Apache Maven.

$> mvn clean install

Diagrams

The API documentation makes extensive use of marble diagrams to visualize what each operator does. These diagrams are generated using a DSL written specifically for this purpose, rather creating them in a visual diagram editor which would make it very difficult to ensure consistency, especially between different contributors.

The DSL is implemented in JavaScript, and the diagrams are created using SVG. This combination of technologies allows rapid development of the diagrams, since they can be run, inspected, debugged and tweaked directly in a web browser. They are then exported to PNG files using Puppeteer, a high level JavaScript API for controlling headless Chrome, running on node.

Editing diagrams

All the diagrams are declared in mp-rs-ops-marbles.js. This contains a map of all the graphs. Each graph has an array of stages, and stages have zero to many elements. The stages are as follows:

  • ins – An input stream. Called ins because in is a JavaScript keyword, and ins makes the width 3 characters for nice alignment with other stages. The first parameter may optionally be a map of options, the only option currently read is label, which is used to label the stage.

  • sub – A sub stream. This will automatically have a label generated for it, stream[n], where n is the number of the sub stage, starting from one.

  • out – An output stream, the difference between out and ins is that out marbles inherit their colour from the previous ins marble.

  • op – The operator stage. The first argument must be the operator, this may be followed by zero to many intermediate value marbles.

  • eff – An effect stream. Used to visualize side effects (eg, callbacks executed by forEach).

  • res – A result stage. This displays the result that gets redeemed by a CompletionSubscriber when it completes.

Each of the stages support zero to many marbles, which are declared as follows:

  • n – an ‘onNext’ marble. A regular element passed through the stream. The colour is automatically computed – each marble in an ins stream gets a new colour, while all other marbles have their colour selected based on the previous ins marble, searching up vertically first, then left. May optionally take a options argument, supporting a link argument which indicates that this marble should be linked to (eg as a feedback loop from the output back to the input) another marble.

  • term – an onComplete marble, indicating successful termination of the stream.

  • nterm – a hybrid between n and term, allowing a marble to also carry a termination signal. This is used to make it clear that when a certain marble is emitted, that is the last marble and the stream will be terminated immediately.

  • err – an onError marble, indicating failed termination of the stream.

  • e – a side effect. Should usually contain an example callback invocation.

  • i – an intermediate value. This is used for operators that compute intermediate values that are fed back into the operator callbacks with the next element.

  • r – a result value. This is used with the res stage, to indicate the value that gets redeemed by the result CompletionSubscriber.

  • none – No marble. Inserts a blank space between marbles. Needed to ensure alignment of cause and effected marbles.

After editing or creating a new diagram, you can test your changes by opening them in a browser. Before you do this on the very first time, you need to run npm install in the api module to ensure the JavaScript dependencies are installed. This will be done automatically if you run mvn process-resources (or any lifecycle phase after process-resources, such as compile or install) – the build uses the Maven frontend plugin to install Node, install npm, then run npm to install the dependencies. Included in the dependencies is an installation of Chromium which is used to generate the PNG diagrams for inclusion in the javadocs, this may take a while to download.

Once the dependencies are installed, you can then open api/src/docs/js/index.html, this will show you all the rendered diagrams. No generation step is required to view these diagrams, you can simply hit refresh in the browser after making any changes.

Generating diagrams

We convert the diagrams to SVG, then to PNG, by using Puppeteer, a high level API on top of Chrome running in headless mode. The SVG diagrams are generated in Chrome, and then screenshotted to create the PNGs. This is automatically done by Puppeteer. However, we can’t run this as a part of the regular build because the MicroProfile CI and release server does not have the necessary dependencies to run Chrome. We’ve investigated a variety of different alternatives, including using different strategies for generating the diagrams, but nothing viable has come up, and unfortunately installing shared libraries in the Eclipse CBI is too high a maintenance burden for the Eclispe CBI maintainers, so they’ve refused to do it. Consequently, we need to check the diagrams into git, which means whenever they are changed, they need to be manually regenerated.

To generate the diagrams, run:

mvn -Pmarble-diagrams clean package

The diagrams will be saved to api/src/main/java/org/eclipse/microprofile/reactive/streams/doc-files, from there they can be included in the javadocs using an image tag, eg:

<img src="doc-files/map.png" alt="map marble diagram">

Make sure to include the alt text, the CI build will fail if it’s not there.

You can then view the diagrams in the api docs by opening api/target/apidocs/index.html, and navigating to the class that you added the marble diagram to.

Before committing your changes, make sure to use the above command to generate the diagrams, and then check the results of it into git, including the updated marble-diagram-hashes.json file. As part of the verification of the build, we have a task that checks that all the hashes of all the input and output files from the diagram generation process match the hashes when the diagrams were last generated. Failure to do this will result in the build failing in CI, and so it won’t pass PR validation.

Contributing

Do you want to contribute to this project? Find out how you can help here.

Help optimize Enterprise Java for a microservices architecture.

Join the MicroProfile Google Group, peruse recent topics or create your own, and join in on the conversation. It's that easy!

JOIN THE DISCUSSION