How to efficiently read NdJson in dotnet with Pipes
Introduction to the problem
Recently I was working on a project that handled large volumes of data and efficiency in the handling of this data was one of the main targets for our team. As well as being quick we wanted to keep resource usage (and costs) low, which meant finding optimal ways of doing seemingly simple data processing tasks.
One such of these data processing tasks required parsing and processing a significant volume of NdJson, which if you are unfamiliar is newline-delimited JSON, being uploaded from a client application. The existing implementation worked just fine but it wasn't massively efficient so I went in search of a more efficient method which I'm now sharing here.
Now I may hear you asking: "Can't System.Text.Json
do this for me?" and it would seem like something that would be
supported natively within dotnet. But it's not currently supported without manually handling the splitting of the
stream. So while these solutions do utilize System.Text.Json
and use the JsonSerializer.Deserialize<T>()
method,
System.Text.Json
can't handle NdJson without us first splitting out each line for it to consume individually.
For the purposes of the examples below, only the code responsible for parsing the NdJson is shown. Reading from a file or other stream into these examples is out of the scope of this post.
The simple approach
The simplest approach to this problem is to split the stream into lines using a StreamReader
and process them
individually using JsonSerializer.Deserialize<T>()
. Below is an example of reading NdJson from a stream into
a list:
So for a quick explanation the code above does the following:
- Line 10: Initializes a new
StreamReader
using our input stream, as previously stated this could be any stream such as aFileStream
,GZipStream
or the stream from aHttpClient
. - Line 13: We read a single line using
await reader.ReadLineAsync(cancellationToken)
, if the result of that call is a non-null string then we continue the loop. Once the function returnsnull
then we know we are at the end of the file and have finished processing. - Lines 15-17: Using
JsonSerializer.Deserialize<DataContainer>(line)
we parse the line we read in as a string into our desired object. Checking if the function returns null before adding it to ourList<T>()
.
If all you are interested in is a way to read and parse a NdJson stream, and you aren't concerned about it being optimal regarding performance. Then you probably don't need to read the rest of this post. But, if you desire your NdJson-ing to be as performant as possible. Then I thoroughly recommend you continue reading as the optimized solution is around 4.2x faster while allocating less than half the memory 🤯...
Thinking with pipes
Eh? What do pipes have to do with Json?
If you haven't heard of System.IO.Pipelines yet it's a library built by Microsoft to make it easier to write high performance I/O code in dotnet. It's available via a NuGet package which once installed into your project provides access to pipes and a bunch of helper methods to make working with streams much easier and more performant.
You can create a pipe from a stream, or more specifically a PipeReader
, once created it manages all memory and
buffering on your behalf providing a neat API to efficiently read the data. Upon reading data it provides an allocation
free way to access and manipulate it with types like ReadOnlySequence<T>
, ReadOnlySpan<T>
, and
ReadOnlyMemory<T>
.
The PipeReader
allows us to tell it how much of the data we have read and consumed using AdvanceTo
. Say for instance
you try to read the currently buffered data but it doesn't contain a newline character, you will want the reader to
continue buffering data till your able to process it. In this case you can tell the reader how much you have "consumed"
and how much you have "examined", the "examined" data will be returned the next time ReadAsync
returns along with any
newly buffered data.
Optimized implementation
So let's take a look at writing an optimized verison. Firstly we need to install the System.IO.Pipelines
NuGet package
so that we can utilize PipeReader
:
With optimization often comes complication of code, for that reason the optimized implementation will be split into two parts so it's a bit easier to digest. So let's start with our main method:
In the ReadFromStreamAsync
function defined above we are doing the following:
- Line 12: Creates a new
PipeReader
from our input stream. Again, this could be from any source as long as it's aStream
compatible withPipeReader
. - Lines 17-18: We start a new
while (true)
loop and start the loop by awaiting a read of data from our pipe withvar result = await reader.ReadAsync(cancellationToken);
. This returns a result containing the currently buffered data from the pipe. - Line 20: Next we attempt to read a line of data from our newly obtained buffer, the contents of
TryReadLine
will be detailed below. But for now you just need to know that this function attempts to read up to any newline character and returns the current line to the caller in the variablejsonData
. If it can't find a newline it returnsfalse
. - Line 21: If
TryReadLine
returnedtrue
,jsonData
will be aReadOnlySequence<byte>
representing the data for the current line in our file as UTF-8 bytes. LuckilySystem.Text.Json
can handle a stream of UTF-8 bytes so from here we simply pass it ontoDeserializeJsonData
(implemented on lines 42-46) and in return we get our deserialized data as a dotnet object. From there it's added to our results list. - Line 23: Next we need to tell the
PipeReader
how much data we have consumed and examined. To do this we callAdvanceTo
withbuffer.Start
andbuffer.End
respectively. Thanks toTryReadLine
the values for these are already set as required, again that will be covered that below. - Lines 25-26: Next we can check if the result is completed, if we've consumed all data in the stream this value will
be
true
. If not we know that we still have data to process so wecontinue
into another iteration of ourwhile (true)
loop. - Lines 28-29: If the result was completed then we are at the end of the file. We perform a final check to see if
there are any bytes left in the buffer, if there are we check if the first byte is not a
\n
character. If not then we have a single JSON line left to process so again we callDeserializeJsonData
to process the data for the remaining line. - Line 34: Finally we can tell the reader that we've completed reading our data and it can release any resources it's holding internally.
Above I referenced TryReadLine
multiple times but didn't show it's implementation, mainly just to keep the explanations
in smaller consumable chunks. So now let's look at how TryReadLine
works:
Taking in our buffer
from the parent function we initialize a new SequenceReader<byte>(buffer);
which is a
helper for working with ReadOnlySequence<byte>
. We then continue to loop until the reader signals that it has
ended with while (!reader.End)
.
Then using another helper method TryReadToAny
we read until we find either a \r
or \n
line ending character. If a
line ending is not found then we jump down to line 53 and advance the reader past what we've already consumed.
Otherwise, it returns our "line" of data as the variable sequence
.
Finally on lines 47-50 we slice the input buffer
past the data we've consumed, thus updating the buffer.Start
we
saw above in the first section to tell the pipe the data we have "consumed" and return true
with our line
data.
And that's it! You've now got a function to read NdJson from a stream using a PipeReader
and System.Text.Json
.
I told you it was a lot more complex code than the original but as you'll see in the benchmarks below it pays dividends!
Benchmarks
To show the impact of our optimized implementation I wrote a quick benchmark using BenchmarkDotNet. We run both our original and optimized versions against 3 separate NdJson files containing; 65MB, 679MB, and 6.5GB bytes of data respectively. The results of which are below:
Method | Input | Mean | Ratio | Gen0 | Gen1 | Gen2 | Allocated | Alloc Ratio |
---|---|---|---|---|---|---|---|---|
Original | 100 Lines - 65MB | 130.51 ms | 1.00 | 2000 | 2000 | 2000 | 242.20 MB | 1.00 |
Optimized | 100 Lines - 65MB | 37.26 ms | 0.28 | 1533 | 1533 | 1533 | 112.72 MB | 0.47 |
Original | 1,000 Lines - 679MB | 1,365.14 ms | 1.00 | 7000 | 6000 | 6000 | 2524.87 MB | 1.00 |
Optimized | 1,000 Lines - 679MB | 350.63 ms | 0.26 | 5000 | 5000 | 5000 | 1181.42 MB | 0.47 |
Original | 10,000 Lines - 6.5GB | 12,853.32 ms | 1.00 | 33000 | 20000 | 16000 | 24697.14 MB | 1.00 |
Optimized | 10,000 Lines - 6.5GB | 3,056.79 ms | 0.24 | 10000 | 9000 | 8000 | 11536.37 MB | 0.47 |
As you can see our Optimized version is up to 4.20x faster with less than half the memory allocations! Which with larger files like our 6.5GB example, can reduce your Gen1/Gen2 garbage collections by half and reduces Gen0 GC's even further! This of course can yield a further improvement to your applications as a whole by reducing the garbage collections occurring and the percentage of time .NET spends doing garbage collections instead of running your business logic.
Final thoughts
So should you go and replace everywhere you read a stream with a pipe? Well that depends on your performance margins, if you're working with an application that needs the absolute best in class performance possible then I'd thoroughly recommend investigating if using pipes and low level JSON can improve your performance.
Otherwise, if you aren't as worried about your performance or don't have as strict an SLA to achieve then I'd recommend sticking with the simple approach. It's about weighing up the cost of maintenance of the much more complex code in the optimized version against the benefits you would see from increasing your performance. I'm sure you've heard of premature optimization, no sense spending time optimizing code if it's already "fast enough" for your use case.
Additionally, if you're reading smaller files with much less data then the simple version is almost definately sufficient. But it's good to know that when you need to handle larger volumes of data dotnet has you covered!
Thanks for taking the time to read my post, I hope you enjoyed reading it! If you did I would greatly appreciate it if you shared it with your friends and colleagues.
Whether you did or you didn't I would love to hear your feedback; what works, what doesn't, did I leave anything out? Unfortunately I haven't implemented comments yet, but my socials are linked in the footer of this page if you wish to contact me.