How to efficiently read NdJson in dotnet with Pipes

11 min read ·
Published · in 41 minutes
C#dotnet.NETPerformance

Introduction to the problem

Recently I was working on a project that handled large volumes of data and efficiency in the handling of this data was one of the main targets for our team. As well as being quick we wanted to keep resource usage (and costs) low, which meant finding optimal ways of doing seemingly simple data processing tasks.

One such of these data processing tasks required parsing and processing a significant volume of NdJson, which if you are unfamiliar is newline-delimited JSON, being uploaded from a client application. The existing implementation worked just fine but it wasn't massively efficient so I went in search of a more efficient method which I'm now sharing here.

Now I may hear you asking: "Can't System.Text.Json do this for me?" and it would seem like something that would be supported natively within dotnet. But it's not currently supported without manually handling the splitting of the stream. So while these solutions do utilize System.Text.Json and use the JsonSerializer.Deserialize<T>() method, System.Text.Json can't handle NdJson without us first splitting out each line for it to consume individually.

Note

For the purposes of the examples below, only the code responsible for parsing the NdJson is shown. Reading from a file or other stream into these examples is out of the scope of this post.

The simple approach

The simplest approach to this problem is to split the stream into lines using a StreamReader and process them individually using JsonSerializer.Deserialize<T>(). Below is an example of reading NdJson from a stream into a list:

using System.Text.Json;
 
namespace Benchmarks.NdJson;
 
public static class Original
{
    public static async Task<List<DataContainer>> ReadFromStreamAsync(
        Stream stream, CancellationToken cancellationToken = default)
    {
        using var reader = new StreamReader(stream);
 
        var containers = new List<DataContainer>();
        while (await reader.ReadLineAsync(cancellationToken) is { } line)
        {
            var container = JsonSerializer.Deserialize<DataContainer>(line);
            if (container is not null)
                containers.Add(container);
        }
 
        return containers;
    }
}

So for a quick explanation the code above does the following:

  • Line 10: Initializes a new StreamReader using our input stream, as previously stated this could be any stream such as a FileStream, GZipStream or the stream from a HttpClient.
  • Line 13: We read a single line using await reader.ReadLineAsync(cancellationToken), if the result of that call is a non-null string then we continue the loop. Once the function returns null then we know we are at the end of the file and have finished processing.
  • Lines 15-17: Using JsonSerializer.Deserialize<DataContainer>(line) we parse the line we read in as a string into our desired object. Checking if the function returns null before adding it to our List<T>().

If all you are interested in is a way to read and parse a NdJson stream, and you aren't concerned about it being optimal regarding performance. Then you probably don't need to read the rest of this post. But, if you desire your NdJson-ing to be as performant as possible. Then I thoroughly recommend you continue reading as the optimized solution is around 4.2x faster while allocating less than half the memory 🤯...

Thinking with pipes

Eh? What do pipes have to do with Json?

If you haven't heard of System.IO.Pipelines yet it's a library built by Microsoft to make it easier to write high performance I/O code in dotnet. It's available via a NuGet package which once installed into your project provides access to pipes and a bunch of helper methods to make working with streams much easier and more performant.

You can create a pipe from a stream, or more specifically a PipeReader, once created it manages all memory and buffering on your behalf providing a neat API to efficiently read the data. Upon reading data it provides an allocation free way to access and manipulate it with types like ReadOnlySequence<T>, ReadOnlySpan<T>, and ReadOnlyMemory<T>.

The PipeReader allows us to tell it how much of the data we have read and consumed using AdvanceTo. Say for instance you try to read the currently buffered data but it doesn't contain a newline character, you will want the reader to continue buffering data till your able to process it. In this case you can tell the reader how much you have "consumed" and how much you have "examined", the "examined" data will be returned the next time ReadAsync returns along with any newly buffered data.

Optimized implementation

So let's take a look at writing an optimized verison. Firstly we need to install the System.IO.Pipelines NuGet package so that we can utilize PipeReader:

dotnet add package System.IO.Pipelines

With optimization often comes complication of code, for that reason the optimized implementation will be split into two parts so it's a bit easier to digest. So let's start with our main method:

using System.Buffers;
using System.IO.Pipelines;
using System.Text.Json;
 
namespace Benchmarks.NdJson;
 
public static class OptimizedList
{
    public static async Task<List<DataContainer>> ReadFromStreamAsync(
        Stream stream, CancellationToken cancellationToken = default)
    {
        var reader = PipeReader.Create(stream);
 
        var containers = new List<DataContainer>();
        while (true)
        {
            var result = await reader.ReadAsync(cancellationToken);
            var buffer = result.Buffer;
 
            while (TryReadLine(ref buffer, out var jsonData))
                containers.Add(DeserializeJsonData(jsonData));
 
            reader.AdvanceTo(buffer.Start, buffer.End);
 
            if (!result.IsCompleted)
                continue;
 
            if (buffer.Length > 0 && buffer.FirstSpan[0] != (byte)'\n')
                containers.Add(DeserializeJsonData(buffer));
 
            break;
        }
 
        await reader.CompleteAsync();
        return containers;
 
        static bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
        {
            /* Implementation of local function TryReadLine shown and explained later */
        }
 
        static DataContainer DeserializeJsonData(ReadOnlySequence<byte> jsonData)
        {
            var jsonReader = new Utf8JsonReader(jsonData);
            return JsonSerializer.Deserialize<DataContainer>(ref jsonReader) ?? throw new InvalidOperationException();
        }
    }
}

In the ReadFromStreamAsync function defined above we are doing the following:

  • Line 12: Creates a new PipeReader from our input stream. Again, this could be from any source as long as it's a Stream compatible with PipeReader.
  • Lines 17-18: We start a new while (true) loop and start the loop by awaiting a read of data from our pipe with var result = await reader.ReadAsync(cancellationToken);. This returns a result containing the currently buffered data from the pipe.
  • Line 20: Next we attempt to read a line of data from our newly obtained buffer, the contents of TryReadLine will be detailed below. But for now you just need to know that this function attempts to read up to any newline character and returns the current line to the caller in the variable jsonData. If it can't find a newline it returns false.
  • Line 21: If TryReadLine returned true, jsonData will be a ReadOnlySequence<byte> representing the data for the current line in our file as UTF-8 bytes. Luckily System.Text.Json can handle a stream of UTF-8 bytes so from here we simply pass it onto DeserializeJsonData (implemented on lines 42-46) and in return we get our deserialized data as a dotnet object. From there it's added to our results list.
  • Line 23: Next we need to tell the PipeReader how much data we have consumed and examined. To do this we call AdvanceTo with buffer.Start and buffer.End respectively. Thanks to TryReadLine the values for these are already set as required, again that will be covered that below.
  • Lines 25-26: Next we can check if the result is completed, if we've consumed all data in the stream this value will be true. If not we know that we still have data to process so we continue into another iteration of our while (true) loop.
  • Lines 28-29: If the result was completed then we are at the end of the file. We perform a final check to see if there are any bytes left in the buffer, if there are we check if the first byte is not a \n character. If not then we have a single JSON line left to process so again we call DeserializeJsonData to process the data for the remaining line.
  • Line 34: Finally we can tell the reader that we've completed reading our data and it can release any resources it's holding internally.

Above I referenced TryReadLine multiple times but didn't show it's implementation, mainly just to keep the explanations in smaller consumable chunks. So now let's look at how TryReadLine works:

static bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    var reader = new SequenceReader<byte>(buffer);
    while (!reader.End)
    {
        if (reader.TryReadToAny(out ReadOnlySequence<byte> sequence, "\r\n"u8))
        {
            if (sequence.Length == 0)
                continue;
 
            buffer = buffer.Slice(reader.Position);
 
            line = sequence;
            return true;
        }
 
        reader.Advance(buffer.Length - reader.Consumed);
    }
 
    line = default;
    return false;
}

Taking in our buffer from the parent function we initialize a new SequenceReader<byte>(buffer); which is a helper for working with ReadOnlySequence<byte>. We then continue to loop until the reader signals that it has ended with while (!reader.End).

Then using another helper method TryReadToAny we read until we find either a \r or \n line ending character. If a line ending is not found then we jump down to line 53 and advance the reader past what we've already consumed. Otherwise, it returns our "line" of data as the variable sequence.

Finally on lines 47-50 we slice the input buffer past the data we've consumed, thus updating the buffer.Start we saw above in the first section to tell the pipe the data we have "consumed" and return true with our line data.

And that's it! You've now got a function to read NdJson from a stream using a PipeReader and System.Text.Json. I told you it was a lot more complex code than the original but as you'll see in the benchmarks below it pays dividends!

Benchmarks

To show the impact of our optimized implementation I wrote a quick benchmark using BenchmarkDotNet. We run both our original and optimized versions against 3 separate NdJson files containing; 65MB, 679MB, and 6.5GB bytes of data respectively. The results of which are below:

MethodInputMeanRatioGen0Gen1Gen2AllocatedAlloc Ratio
Original100 Lines - 65MB130.51 ms1.00200020002000242.20 MB1.00
Optimized100 Lines - 65MB37.26 ms0.28153315331533112.72 MB0.47
Original1,000 Lines - 679MB1,365.14 ms1.007000600060002524.87 MB1.00
Optimized1,000 Lines - 679MB350.63 ms0.265000500050001181.42 MB0.47
Original10,000 Lines - 6.5GB12,853.32 ms1.0033000200001600024697.14 MB1.00
Optimized10,000 Lines - 6.5GB3,056.79 ms0.24100009000800011536.37 MB0.47

As you can see our Optimized version is up to 4.20x faster with less than half the memory allocations! Which with larger files like our 6.5GB example, can reduce your Gen1/Gen2 garbage collections by half and reduces Gen0 GC's even further! This of course can yield a further improvement to your applications as a whole by reducing the garbage collections occurring and the percentage of time .NET spends doing garbage collections instead of running your business logic.

Final thoughts

So should you go and replace everywhere you read a stream with a pipe? Well that depends on your performance margins, if you're working with an application that needs the absolute best in class performance possible then I'd thoroughly recommend investigating if using pipes and low level JSON can improve your performance.

Otherwise, if you aren't as worried about your performance or don't have as strict an SLA to achieve then I'd recommend sticking with the simple approach. It's about weighing up the cost of maintenance of the much more complex code in the optimized version against the benefits you would see from increasing your performance. I'm sure you've heard of premature optimization, no sense spending time optimizing code if it's already "fast enough" for your use case.

Additionally, if you're reading smaller files with much less data then the simple version is almost definately sufficient. But it's good to know that when you need to handle larger volumes of data dotnet has you covered!


Thanks for taking the time to read my post, I hope you enjoyed reading it! If you did I would greatly appreciate it if you shared it with your friends and colleagues.

Whether you did or you didn't I would love to hear your feedback; what works, what doesn't, did I leave anything out? Unfortunately I haven't implemented comments yet, but my socials are linked in the footer of this page if you wish to contact me.