Decoding Streaming JSON with Tokio

October 14, 2017

Applies to hyper 0.11.* and reqwest 0.8.* with unstable features

Tokio is too hard

For the past week, I have been working on an IPFS API client in rust and struggling to grasp some of the abstractions surrounding asynchronous I/O. The hyper tutorials provide some great examples for reading an entire response and parsing it as JSON, which is sufficient for the majority of responses returned by the IPFS API. However, a small number of API requests stream data back, making it inefficient to wait for the entire response.

A ping response stream

Some responses will even leave it up to the client to close the stream.

A pubsub response stream

These kinds of responses posed a huge challenge: I needed a way to start parsing the response body BEFORE the entire response body was fully received.

Trying to grasp how this could work was extremely challenging for me considering the difficulty I was already having with the futures library. One of the biggest obstacles was trying to wrap my head around the laziness of the Body instance (note that I am using the reqwest::unstable module, which has a Body type that functions almost identically to the one defined in the hyper crate).

Consider the example from the hyper tutorial:

client.get(uri).and_then(|res| {
    println!("Response: {}", res.status());

    res.body().concat2().and_then(move |body| {
        let v: Value = serde_json::from_slice(&body).map_err(|e| {
            io::Error::new(
                io::ErrorKind::Other,
                e
            )
        })?;
        println!("current IP address is {}", v["origin"]);
        Ok(())
    })
});

In this example, part of the response is received before ANY of the response body is received. Most of the time, you can just wait until the full response body is received, which happens to be what the line res.body().concat2() is doing. The docs describe the Concat2 type returned by concat2 as a stream combinator to concatenate the results of a stream into the first yielded item. So how does a Concat2 actually do the concatenation? Unfortunately, the call to concat2 doesn't actually make it transparent how this is being done. Understanding this requires a little bit more information about what a Body type actually is.

The reqwest documentation shows that a Body is a Stream that returns a series of Chunk instances (a Chunk is essentially just a slice of bytes). In the most common case, after everything is concatented together, you can just dereference the finalized chunk and parse the bytes as JSON. The concatenation works because the Chunk type implements Extend. Going back to Concat2, we can see that Concat2 is only a Future if the Item of the stream it is wrapping is an Extend<..> + IntoIterator + Default.

The key here is that in order to parse streaming JSON, we need to drop the call to concat2(), and replace it with something to process chunks as they are received. One of the issues is that a Chunk doesn't necessarily contain a full JSON object instance -- it could contain a part of one object or more than one. This isn't something I was able to figure out a solution to just yet, but doing some kind of buffering while receiving the response should allow you to work around that. You can also see from the screen captures above that the IPFS API separates each JSON object with a new line delimiter, making it easy to figure out where objects start and end.

For my solution, I ended up just assuming that each Chunk contained exactly one full JSON object. This ended up being a pretty safe assumption for the most part, but I did end up running into some JSON parse errors, presumably because the IPFS API was sending partial objects.

let url = format!(...);

url.parse::<Url>()
    .map(|url| self.client.request(Method::Get, url))
    .map_err(From::from)
    .into_future()
    .and_then(|mut req| req.send().from_err())
    .map(|res| res.into_body().from_err())
    .flatten_stream()
    .and_then(|chunk| serde_json::from_slice(&chunk).map_err(From::from));

So, what is going on here? The first thing I am doing is creating a reqwest Url and building a Request from that. One interesting thing that I found out is that all Result types implement IntoFuture, and can by converted into a future using into_future, which is what I am doing immediately after building the request.

The request is sent and when the response is received, a stream representing the response body is returned. The resulting future is then flattened (you can think of this like doing a flat_map), and each chunk is processed as it comes in, and parsed as a JSON object.

The code is littered with from_err, and From::from because the library that I am writing has it's own error type that can be built from various errors returned by reqwest and serde. These errors need to be converted to a single type because futures and streams can only be combined if they have the same error types.

The result is this:

Pinging a peer with the rust client

You can see that the rust client allows you to either process the ping results as they come in, or wait until the response is finished.

It also works with Pubsub!

Pubsub with the rust client


The next step to make this work even better is to figure out how to turn a reqwest body (in the unstable reqwest module it is known as a Decoder) into a type that implements AsyncRead, so I can use the lines function to deal with partial responses. Still not really sure how to do this, so if you have any tips, shoot me a message!