DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Thread-Safety Pitfalls in XML Processing
  • Using Java Stream Gatherers To Improve Stateful Operations
  • How to Convert XLS to XLSX in Java
  • Setting Up Data Pipelines With Snowflake Dynamic Tables

Trending

  • Enhancing Business Decision-Making Through Advanced Data Visualization Techniques
  • AI Speaks for the World... But Whose Humanity Does It Learn From?
  • Distributed Consensus: Paxos vs. Raft and Modern Implementations
  • Simpler Data Transfer Objects With Java Records
  1. DZone
  2. Coding
  3. JavaScript
  4. RxJava FAQ: Loading Files With Backpressure

RxJava FAQ: Loading Files With Backpressure

This guide will help you read large files, XML in this case, while managing resources efficiently by utilizing RXJava Flowables.

By 
Tomasz Nurkiewicz user avatar
Tomasz Nurkiewicz
DZone Core CORE ·
Sep. 02, 17 · Tutorial
Likes (20)
Comment
Save
Tweet
Share
19.0K Views

Join the DZone community and get the full member experience.

Join For Free

Processing files as a stream turns out to be tremendously effective and convenient. Many people seem to forget that since Java 8 (3+ years!), we have been able to very easily turn any file into a stream of lines:

String filePath = "foobar.txt";
try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
    reader.lines()
            .filter(line -> !line.startsWith("#"))
            .map(String::toLowerCase)
            .flatMap(line -> Stream.of(line.split(" ")))
            .forEach(System.out::println);
}


reader.lines() returns a Stream<String>, which you can further transform. In this example, we discard lines starting with "#" and explode each line by splitting it into words. This way, we achieve a stream of words as opposed to a stream of lines. Working with text files is almost as simple as working with normal Java collections. In RxJava we already learned about the generate() operator. It can be used here as well to create a robust stream of lines from a file:

Flowable<String> file = Flowable.generate(
        () -> new BufferedReader(new FileReader(filePath)),
        (reader, emitter) -> {
            final String line = reader.readLine();
            if (line != null) {
                emitter.onNext(line);
            } else {
                emitter.onComplete();
            }
        },
        reader -> reader.close()
);


The generate() operator in the aforementioned example is a little bit more complex. The first argument is a state factory. Every time someone subscribes to this stream, a factory is invoked and a stateful BufferedReader is created. Then, when downstream operators or subscribers wish to receive some data, a second lambda (with two parameters) is invoked. This lambda expression tries to pull exactly one line from a file and either send it downstream (onNext()) or complete when the end of the file is encountered. It's fairly straightforward. The third optional argument to generate() is a lambda expression that can do some cleanup with state. It's very convenient in our case, as we have to close the file not only when EOF is reached, but also when consumers prematurely unsubscribe.

Meet Flowable.using() Operator

This seems like a lot of work, especially when we already have a stream of lines from JDK 8. It turns out that there is a similar factory operator named using() that is quite handy. First of all, the simplest way of translating Stream from Java to Flowable is by converting Stream to an Iterator (checked exception handling ignored):

Flowable.fromIterable(new Iterable<String>() {
    @Override
    public Iterator<String> iterator() {
        final BufferedReader reader = new BufferedReader(new FileReader(filePath));
        final Stream<String> lines = reader.lines();
        return lines.iterator();
    }
});


This can be simplified to:

Flowable.<String>fromIterable(() -> {
    final BufferedReader reader = new BufferedReader(new FileReader(filePath));
    final Stream<String> lines = reader.lines();
    return lines.iterator();
});


But we forgot about closing BufferedReader, thus FileReader, thus the file handle. Thus we introduced a resource leak. Under such circumstances, the using() operator works like a charm. In a way, it's similar to a try-with-resources statement. You can create a stream based on some external resource. The lifecycle of this resource (creation and disposal) will be managed for you when someone subscribes or unsubscribes:

Flowable.using(
        () -> new BufferedReader(new FileReader(filePath)),
        reader -> Flowable.fromIterable(() -> reader.lines().iterator()),
        reader -> reader.close()
);


It's fairly similar to the last generate() example, however, the most important lambda expression in the middle is quite different. We get a resource (reader) as an argument and are supposed to return a Flowable (not a single element). This lambda is called only once, not every time downstream requests a new item. What the using() operator gives us is managing BufferedReaders's lifecycle. using() is useful when we have a piece of state (just like with generate()) that is capable of producing the whole Flowable at once, as opposed to one item at a time.

Streaming XML files

...or JSON for that matter. Imagine you have a very large XML file that consists of the following entries, hundreds of thousands of them:

<trkpt lat="52.23453" lon="21.01685">
    <ele>116</ele>
</trkpt>
<trkpt lat="52.23405" lon="21.01711">
    <ele>116</ele>
</trkpt>
<trkpt lat="52.23397" lon="21.0166">
    <ele>116</ele>
</trkpt>


This is a snippet from standard GPS Exchange Format that can describe geographical routes of arbitrary length. Each <trkpt> is a single point with latitude, longitude, and elevation. We would like to have a stream of track points (ignoring elevation for simplicity) so that the file can be consumed partially, as opposed to loading everything at once. We have three choices:

  • DOM/JAXB: Everything must be loaded into memory and mapped to Java objects. Won't work for infinitely long files (or even very large ones).
  • SAX: A push-based library that invokes callbacks whenever it discovers an XML tag opening or closing. Seems a bit better, but can't possibly support backpressure — it's the library that decides when to invoke callbacks and there is no way of slowing it down.
  • StAX: Like SAX, but we must actively pull for data from XML file. This is essential to support backpressure — we decide when to read next chunk of data.

Let's try to implement parsing and streaming of a possibly very large XML file using StAX and RxJava. First, we must learn how to use StAX in the first place. The parser is called XMLStreamReader and is created with the following sequence of spells and curses:

XMLStreamReader staxReader(String name) throws XMLStreamException {
    final InputStream inputStream = new BufferedInputStream(new FileInputStream(name));
    return XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
}


Just close your eyes and make sure you always have a place to copy-paste the snippet above from. It gets even worse. In order to read the first <trkpt> tag, including its attributes, we must write quite some complex code:

import lombok.Value;

@Value
class Trackpoint {
    private final BigDecimal lat;
    private final BigDecimal lon;
}

Trackpoint nextTrackpoint(XMLStreamReader r) {
    while (r.hasNext()) {
        int event = r.next();
        switch (event) {
            case XMLStreamConstants.START_ELEMENT:
                if (r.getLocalName().equals("trkpt")) {
                    return parseTrackpoint(r);
                }
                break;
            case XMLStreamConstants.END_ELEMENT:
                if (r.getLocalName().equals("gpx")) {
                    return null;
                }
                break;
        }
    }
    return null;
}

Trackpoint parseTrackpoint(XMLStreamReader r) {
    return new Trackpoint(
            new BigDecimal(r.getAttributeValue("", "lat")),
            new BigDecimal(r.getAttributeValue("", "lon"))
    );
}


The API is quite low-level and almost adorably antique. Everything happens in a gigantic loop that reads... something of type int. This int can be START_ELEMENT, END_ELEMENT, or a few other things that we are not interested in. Remember, we are reading an XML file, but not line-by-line or char-by-char — but by logical XML tokens (tags). So, if we discover an opening of a <trkpt> element, we parse it, otherwise, we continue. The second important condition is when we find a closing </gpx>, which should be the last thing in GPX file. We return null in such case, signaling end-of-XML-file.

Feels complex? This is actually the simplest way to read large XML files with constant memory usage, irrespective of file size. How does all of this relate to RxJava? At this point, we can very easily build a Flowable<Trackpoint>. Flowable, not Observable (see: Observable vs. Observable). Such a stream will have full support for backpressure, meaning it will read the file at the appropriate speed:

Flowable<Trackpoint> trackpoints = generate(
        () -> staxReader("track.gpx"),
        this::pushNextTrackpoint,
        XMLStreamReader::close);

void pushNextTrackpoint(XMLStreamReader reader, Emitter<Trackpoint> emitter) {
    final Trackpoint trkpt = nextTrackpoint(reader);
    if (trkpt != null) {
        emitter.onNext(trkpt);
    } else {
        emitter.onComplete();
    }
}


Wow, so simple, such backpressure![1] We first create an XMLStreamReader and make sure it's being closed when the file ends or someone unsubscribes. Remember that each subscriber will open and start parsing the same file over and over again. The lambda expression in the middle simply takes the state variables (XMLStreamReader) and emits one more trackpoint. All of this seems quite obscure, and it is! But we now have a backpresure-aware stream taken from a possibly very large file using very few resources. We can process the trackpoint concurrently or combine them with other sources of data. In the next article, we will learn how to load JSON in a very similar way.

file IO guidelines Stream (computing) XML

Published at DZone with permission of Tomasz Nurkiewicz, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Thread-Safety Pitfalls in XML Processing
  • Using Java Stream Gatherers To Improve Stateful Operations
  • How to Convert XLS to XLSX in Java
  • Setting Up Data Pipelines With Snowflake Dynamic Tables

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!