diff --git a/Directory.Build.props b/Directory.Build.props
index b10b8ad..ac45ac0 100644
--- a/Directory.Build.props
+++ b/Directory.Build.props
@@ -16,7 +16,7 @@
-
+
diff --git a/README.md b/README.md
index ea255e9..8654375 100644
--- a/README.md
+++ b/README.md
@@ -94,28 +94,31 @@ public static void ProcessCsvFile(string csvFilePath)
### Simpler
1. Create a new instance of your visitor.
-1. Call one of the `Csv.Process*` methods, passing in whatever format your data is in along with your visitor.
+1. Use one of the `CsvSyncInput` or `CsvAsyncInput` methods to create an input object you can use to describe the data to your visitor.
Examples:
```csharp
public static void ProcessCsvFile(string csvFilePath)
{
Console.WriteLine($"Started reading '{csvFilePath}'.");
- Csv.ProcessFile(csvFilePath, new MyVisitor(maxFieldLength: 1000));
+ CsvSyncInput.ForMemoryMappedFile(csvFilePath)
+ .Process(new MyVisitor(maxFieldLength: 1000));
Console.WriteLine($"Finished reading '{csvFilePath}'.");
}
public static void ProcessCsvStream(Stream csvStream)
{
- Console.WriteLine($"Started reading '{csvFilePath}'.");
- Csv.ProcessStream(csvStream, new MyVisitor(maxFieldLength: 1000));
- Console.WriteLine($"Finished reading '{csvFilePath}'.");
+ Console.WriteLine($"Started reading CSV file.");
+ CsvSyncInput.ForStream(csvStream)
+ .Process(new MyVisitor(maxFieldLength: 1000));
+ Console.WriteLine($"Finished reading CSV file.");
}
-public static async ValueTask ProcessCsvStreamAsync(Stream csvStream, IProgress progress = null, CancellationToken cancellationToken = default)
+public static async Task ProcessCsvStreamAsync(Stream csvStream)
{
- Console.WriteLine($"Started reading '{csvFilePath}'.");
- await Csv.ProcessStreamAsync(csvStream, new MyVisitor(maxFieldLength: 1000), progress, cancellationToken);
- Console.WriteLine($"Finished reading '{csvFilePath}'.");
+ Console.WriteLine($"Started reading CSV file.");
+ await CsvAsyncInput.ForStream(csvStream)
+ .ProcessAsync(new MyVisitor(maxFieldLength: 1000));
+ Console.WriteLine($"Finished reading CSV file.");
}
```
diff --git a/calculate-coverage.cmd b/calculate-coverage.cmd
index 7376195..7cb4edd 100644
--- a/calculate-coverage.cmd
+++ b/calculate-coverage.cmd
@@ -9,6 +9,7 @@ tools\OpenCover.4.7.922\tools\OpenCover.Console.exe ^
"-target:%DotNetPath%" ^
"-targetArgs:test -c Release --no-build" ^
"-filter:+[Cursively]* +[Cursively.*]* -[Cursively.Tests]*" ^
+ -excludebyattribute:System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverageAttribute ^
-output:tools\raw-coverage-results.xml ^
-register:user ^
-oldstyle
diff --git a/doc/benchmark-1.2.0.md b/doc/benchmark-1.2.0.md
new file mode 100644
index 0000000..ffdeddc
--- /dev/null
+++ b/doc/benchmark-1.2.0.md
@@ -0,0 +1,77 @@
+This benchmark tests the simple act of counting how many records are in a CSV file. It's not a simple count of how many lines are in the text file: line breaks within quoted fields must be treated as data, and multiple line breaks in a row must be treated as one, since each record must have at least one field. Therefore, assuming correct implementations, this benchmark should test the raw CSV processing speed.
+
+Cursively eliminates a ton of overhead found in libraries such as CsvHelper by restricting the allowed input encodings and using the visitor pattern as its only means of output. Cursively can scan through the original bytes of the input to do its work, and it can give slices of the input data directly to the consumer without having to copy or allocate.
+
+Therefore, these benchmarks are somewhat biased in favor of Cursively, as CsvHelper relies on external code to transform the data to UTF-16. This isn't as unfair as that makes it sound: the overwhelming majority of input files are probably UTF-8 anyway (or a compatible SBCS), so this transformation is something that practically every user will experience.
+
+- Input files can be found here: https://github.com/airbreather/Cursively/tree/v1.2.0/test/Cursively.Benchmark/large-csv-files.zip
+- Benchmark source code is this: https://github.com/airbreather/Cursively/tree/v1.2.0/test/Cursively.Benchmark
+
+As of version 1.2.0, these benchmarks no longer run on .NET Framework targets, because earlier benchmarks have shown comparable ratios.
+
+Raw BenchmarkDotNet output is at the bottom, but here are some numbers derived from it showing the throughput of CsvHelper compared to the throughput of each of five different ways of using Cursively on multiple different kinds of files. This summary does not indicate anything about the GC pressure:
+
+| File | Size (bytes) | CsvHelper (MB/s) | Cursively 1* (MB/s) | Cursively 2* (MB/s) | Cursively 3* (MB/s) | Cursively 4* (MB/s) | Cursively 5* (MB/s) |
+|-------------------------|-------------:|-----------------:|--------------------:|--------------------:|--------------------:|--------------------:|--------------------:|
+| 100-huge-records | 2900444 | 27.68 | 482.15 (x17.42) | 528.08 (x19.08) | 443.99 (x16.04) | 448.17 (x16.19) | 408.70 (x14.77) |
+| 100-huge-records-quoted | 4900444 | 29.40 | 304.64 (x10.36) | 325.31 (x11.07) | 295.21 (x10.04) | 293.08 (x09.97) | 285.03 (x09.70) |
+| 10k-empty-records | 10020000 | 14.59 | 311.97 (x21.38) | 311.27 (x21.33) | 283.40 (x19.42) | 297.41 (x20.38) | 268.23 (x18.38) |
+| mocked | 12731500 | 74.29 | 3871.72 (x52.11) | 3771.89 (x50.77) | 1748.01 (x23.53) | 2103.55 (x28.31) | 1240.09 (x16.69) |
+| worldcitiespop | 151492068 | 39.39 | 622.85 (x15.81) | 617.22 (x15.67) | 518.00 (x13.15) | 538.54 (x13.67) | 450.63 (x11.44) |
+
+\*Different Cursively methods are:
+1. Directly using `CsvTokenizer`
+1. `CsvSyncInput.ForMemory`
+1. `CsvSyncInput.ForMemoryMappedFile`
+1. `CsvSyncInput.ForStream` (using a `FileStream`)
+1. `CsvAsyncInput.ForStream` (using a `FileStream` opened in asynchronous mode)
+
+Raw BenchmarkDotNet output:
+
+``` ini
+
+BenchmarkDotNet=v0.11.5, OS=Windows 10.0.18362
+Intel Core i7-6850K CPU 3.60GHz (Skylake), 1 CPU, 12 logical and 6 physical cores
+.NET Core SDK=3.0.100-preview6-012264
+ [Host] : .NET Core 2.2.6 (CoreCLR 4.6.27817.03, CoreFX 4.6.27818.02), 64bit RyuJIT
+ Job-UPPUKA : .NET Core 2.2.6 (CoreCLR 4.6.27817.03, CoreFX 4.6.27818.02), 64bit RyuJIT
+
+Server=True
+
+```
+| Method | csvFile | Mean | Error | StdDev | Ratio | RatioSD | Gen 0 | Gen 1 | Gen 2 | Allocated |
+|--------------------------------------------- |--------------------- |-------------:|-----------:|-----------:|------:|--------:|-----------:|---------:|------:|-------------:|
+| CountRowsUsingCursivelyRaw | 100-huge-records | 5.737 ms | 0.0037 ms | 0.0033 ms | 1.00 | 0.00 | - | - | - | 48 B |
+| CountRowsUsingCursivelyArrayInput | 100-huge-records | 5.238 ms | 0.0066 ms | 0.0062 ms | 0.91 | 0.00 | - | - | - | 96 B |
+| CountRowsUsingCursivelyMemoryMappedFileInput | 100-huge-records | 6.230 ms | 0.0159 ms | 0.0141 ms | 1.09 | 0.00 | - | - | - | 544 B |
+| CountRowsUsingCursivelyFileStreamInput | 100-huge-records | 6.172 ms | 0.0080 ms | 0.0067 ms | 1.08 | 0.00 | - | - | - | 272 B |
+| CountRowsUsingCursivelyAsyncFileStreamInput | 100-huge-records | 6.768 ms | 0.1349 ms | 0.1262 ms | 1.18 | 0.02 | - | - | - | 1360 B |
+| CountRowsUsingCsvHelper | 100-huge-records | 99.938 ms | 0.3319 ms | 0.3105 ms | 17.42 | 0.06 | 400.0000 | 200.0000 | - | 110256320 B |
+| | | | | | | | | | | |
+| CountRowsUsingCursivelyRaw | 100-h(...)uoted [23] | 15.341 ms | 0.0305 ms | 0.0255 ms | 1.00 | 0.00 | - | - | - | 48 B |
+| CountRowsUsingCursivelyArrayInput | 100-h(...)uoted [23] | 14.366 ms | 0.0167 ms | 0.0156 ms | 0.94 | 0.00 | - | - | - | 96 B |
+| CountRowsUsingCursivelyMemoryMappedFileInput | 100-h(...)uoted [23] | 15.831 ms | 0.0487 ms | 0.0455 ms | 1.03 | 0.00 | - | - | - | 544 B |
+| CountRowsUsingCursivelyFileStreamInput | 100-h(...)uoted [23] | 15.946 ms | 0.0383 ms | 0.0358 ms | 1.04 | 0.00 | - | - | - | 272 B |
+| CountRowsUsingCursivelyAsyncFileStreamInput | 100-h(...)uoted [23] | 16.396 ms | 0.2821 ms | 0.2771 ms | 1.07 | 0.02 | - | - | - | 1360 B |
+| CountRowsUsingCsvHelper | 100-h(...)uoted [23] | 158.968 ms | 0.1382 ms | 0.1154 ms | 10.36 | 0.02 | 333.3333 | - | - | 153579848 B |
+| | | | | | | | | | | |
+| CountRowsUsingCursivelyRaw | 10k-empty-records | 30.631 ms | 0.1009 ms | 0.0894 ms | 1.00 | 0.00 | - | - | - | 48 B |
+| CountRowsUsingCursivelyArrayInput | 10k-empty-records | 30.699 ms | 0.0624 ms | 0.0584 ms | 1.00 | 0.00 | - | - | - | 96 B |
+| CountRowsUsingCursivelyMemoryMappedFileInput | 10k-empty-records | 33.718 ms | 0.0873 ms | 0.0817 ms | 1.10 | 0.00 | - | - | - | 544 B |
+| CountRowsUsingCursivelyFileStreamInput | 10k-empty-records | 32.130 ms | 0.0944 ms | 0.0737 ms | 1.05 | 0.00 | - | - | - | 272 B |
+| CountRowsUsingCursivelyAsyncFileStreamInput | 10k-empty-records | 35.625 ms | 0.7018 ms | 0.7801 ms | 1.17 | 0.03 | - | - | - | 1360 B |
+| CountRowsUsingCsvHelper | 10k-empty-records | 654.743 ms | 13.0238 ms | 16.9346 ms | 21.42 | 0.51 | 2000.0000 | - | - | 420832856 B |
+| | | | | | | | | | | |
+| CountRowsUsingCursivelyRaw | mocked | 3.136 ms | 0.0038 ms | 0.0034 ms | 1.00 | 0.00 | - | - | - | 48 B |
+| CountRowsUsingCursivelyArrayInput | mocked | 3.219 ms | 0.0623 ms | 0.0741 ms | 1.02 | 0.02 | - | - | - | 96 B |
+| CountRowsUsingCursivelyMemoryMappedFileInput | mocked | 6.946 ms | 0.0553 ms | 0.0490 ms | 2.21 | 0.02 | - | - | - | 544 B |
+| CountRowsUsingCursivelyFileStreamInput | mocked | 5.772 ms | 0.0365 ms | 0.0324 ms | 1.84 | 0.01 | - | - | - | 272 B |
+| CountRowsUsingCursivelyAsyncFileStreamInput | mocked | 9.791 ms | 0.1129 ms | 0.1056 ms | 3.12 | 0.04 | - | - | - | 1360 B |
+| CountRowsUsingCsvHelper | mocked | 163.426 ms | 3.2351 ms | 3.1773 ms | 52.08 | 0.97 | 333.3333 | - | - | 115757736 B |
+| | | | | | | | | | | |
+| CountRowsUsingCursivelyRaw | worldcitiespop | 231.955 ms | 1.0755 ms | 0.9534 ms | 1.00 | 0.00 | - | - | - | 48 B |
+| CountRowsUsingCursivelyArrayInput | worldcitiespop | 234.071 ms | 1.0749 ms | 0.9529 ms | 1.01 | 0.01 | - | - | - | 96 B |
+| CountRowsUsingCursivelyMemoryMappedFileInput | worldcitiespop | 278.909 ms | 3.0866 ms | 2.8872 ms | 1.20 | 0.01 | - | - | - | 544 B |
+| CountRowsUsingCursivelyFileStreamInput | worldcitiespop | 268.271 ms | 3.4632 ms | 2.8920 ms | 1.16 | 0.02 | - | - | - | 272 B |
+| CountRowsUsingCursivelyAsyncFileStreamInput | worldcitiespop | 320.606 ms | 1.6204 ms | 1.5157 ms | 1.38 | 0.01 | - | - | - | 1360 B |
+| CountRowsUsingCsvHelper | worldcitiespop | 3,667.940 ms | 60.8394 ms | 56.9092 ms | 15.82 | 0.24 | 15000.0000 | - | - | 3096694312 B |
diff --git a/doc/release-notes.md b/doc/release-notes.md
index 1096e59..7c85199 100644
--- a/doc/release-notes.md
+++ b/doc/release-notes.md
@@ -1,5 +1,13 @@
# Cursively Release Notes
+## [1.2.0](https://github.com/airbreather/Cursively/milestone/4)
+- Added fluent helpers to replace the `Csv.ProcessFoo` methods with something that's easier to maintain without being meaningfully less convenient to use ([#15](https://github.com/airbreather/Cursively/issues/15)).
+- Deprecated the ability to ignore a leading UTF-8 byte order mark inside the header-aware visitor, per [#14](https://github.com/airbreather/Cursively/issues/14).
+ - Instead, it's up to the source of the input to skip (or not skip) sending a leading UTF-8 BOM to the tokenizer in the first place.
+ - By default, all the fluent helpers from the previous bullet point will ignore a leading UTF-8 BOM if present. This behavior may be disabled by chaining `.WithIgnoreUTF8ByteOrderMark(false)`.
+- Improved how the header-aware visitor behaves when the creator requests very high limits ([#17](https://github.com/airbreather/Cursively/issues/17)).
+- Fixed a rare off-by-one issue in the header-aware visitor that would happen when a header is exactly as long as the configured maximum **and** its last byte is exactly the last byte of the input chunk that happens to contain it ([#16](https://github.com/airbreather/Cursively/issues/16)).
+
## [1.1.0](https://github.com/airbreather/Cursively/milestone/1)
- Several further performance optimizations. Most significantly, inlining and tuning a critical `ReadOnlySpan` extension method.
- In some cases, this increased throughput by a factor of 3.
diff --git a/doc/toc.yml b/doc/toc.yml
index c94d485..5212eae 100644
--- a/doc/toc.yml
+++ b/doc/toc.yml
@@ -3,7 +3,7 @@
- name: API Documentation
href: obj/api/
- name: Benchmark
- href: benchmark-1.1.0.md
+ href: benchmark-1.2.0.md
- name: Release Notes
href: release-notes.md
- name: NuGet Package
diff --git a/generate-docs.cmd b/generate-docs.cmd
index b4f9dff..794f57a 100644
--- a/generate-docs.cmd
+++ b/generate-docs.cmd
@@ -2,7 +2,7 @@
REM ===========================================================================
REM Regenerates the https://airbreather.github.io/Cursively content locally
REM ===========================================================================
-set DOCFX_PACKAGE_VERSION=2.42.4
+set DOCFX_PACKAGE_VERSION=2.43.1
pushd %~dp0
REM incremental / cached builds tweak things about the output, so let's do it
REM all fresh if we can help it...
diff --git a/src/Cursively/Csv.cs b/src/Cursively/Csv.cs
index eca7491..cb41eec 100644
--- a/src/Cursively/Csv.cs
+++ b/src/Cursively/Csv.cs
@@ -1,7 +1,7 @@
using System;
+using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.IO.MemoryMappedFiles;
-using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
@@ -10,6 +10,8 @@ namespace Cursively
///
/// Contains helper methods for CSV processing.
///
+ [ExcludeFromCodeCoverage]
+ [Obsolete("ProcessFoo methods have been moved to instance methods on dedicated 'input' types for better composability.")]
public static class Csv
{
///
@@ -25,8 +27,15 @@ public static class Csv
///
/// Thrown when is .
///
- public static void ProcessStream(Stream csvStream, CsvReaderVisitorBase visitor) =>
- ProcessStream(csvStream, visitor, 81920);
+ [Obsolete("Use CsvSyncInput.ForStream(csvStream).Process(visitor).")]
+ public static void ProcessStream(Stream csvStream, CsvReaderVisitorBase visitor)
+ {
+ CsvSyncInput.ForStream(csvStream)
+ .WithMinReadBufferByteCount(81920)
+ .WithReadBufferPool(null)
+ .WithIgnoreUTF8ByteOrderMark(false)
+ .Process(visitor);
+ }
///
/// Describes the contents of a CSV stream to the given instance of the
@@ -51,32 +60,14 @@ public static void ProcessStream(Stream csvStream, CsvReaderVisitorBase visitor)
/// Thrown when does not support reading (i.e.,
/// is ).
///
+ [Obsolete("Use CsvSyncInput.ForStream(csvStream).WithMinReadBufferByteCount(bufferSize).Process(visitor).")]
public static void ProcessStream(Stream csvStream, CsvReaderVisitorBase visitor, int bufferSize)
{
- if (csvStream is null)
- {
- throw new ArgumentNullException(nameof(csvStream));
- }
-
- if (bufferSize <= 0)
- {
- throw new ArgumentOutOfRangeException(nameof(bufferSize), bufferSize, "Must be greater than zero.");
- }
-
- if (!csvStream.CanRead)
- {
- throw new ArgumentException("Stream does not support reading.", nameof(csvStream));
- }
-
- byte[] buffer = new byte[bufferSize];
- var tokenizer = new CsvTokenizer();
- int cnt;
- while ((cnt = csvStream.Read(buffer, 0, buffer.Length)) != 0)
- {
- tokenizer.ProcessNextChunk(new ReadOnlySpan(buffer, 0, cnt), visitor);
- }
-
- tokenizer.ProcessEndOfStream(visitor);
+ CsvSyncInput.ForStream(csvStream)
+ .WithMinReadBufferByteCount(bufferSize)
+ .WithReadBufferPool(null)
+ .WithIgnoreUTF8ByteOrderMark(false)
+ .Process(visitor);
}
///
@@ -132,8 +123,16 @@ public static void ProcessStream(Stream csvStream, CsvReaderVisitorBase visitor,
/// object backing is disposed before the asynchronous
/// operation terminates.
///
- public static ValueTask ProcessStreamAsync(Stream csvStream, CsvReaderVisitorBase visitor, IProgress progress = null, CancellationToken cancellationToken = default) =>
- ProcessStreamAsync(csvStream, visitor, 81920, progress, cancellationToken);
+ [Obsolete("Use CsvAsyncInput.ForStream(csvStream).ProcessAsync(visitor, progress, cancellationToken).")]
+ public static async ValueTask ProcessStreamAsync(Stream csvStream, CsvReaderVisitorBase visitor, IProgress progress = null, CancellationToken cancellationToken = default)
+ {
+ await CsvAsyncInput.ForStream(csvStream)
+ .WithMinReadBufferByteCount(81920)
+ .WithReadBufferPool(null)
+ .WithIgnoreUTF8ByteOrderMark(false)
+ .ProcessAsync(visitor, progress, cancellationToken)
+ .ConfigureAwait(false);
+ }
///
/// Describes the contents of a CSV stream to the given instance of the
@@ -194,38 +193,15 @@ public static ValueTask ProcessStreamAsync(Stream csvStream, CsvReaderVisitorBas
/// object backing is disposed before the asynchronous
/// operation terminates.
///
+ [Obsolete("Use CsvAsyncInput.ForStream(csvStream).WithMinReadBufferByteCount(bufferSize).ProcessAsync(visitor, progress, cancellationToken).")]
public static async ValueTask ProcessStreamAsync(Stream csvStream, CsvReaderVisitorBase visitor, int bufferSize, IProgress progress = null, CancellationToken cancellationToken = default)
{
- if (csvStream is null)
- {
- throw new ArgumentNullException(nameof(csvStream));
- }
-
- if (bufferSize <= 0)
- {
- throw new ArgumentOutOfRangeException(nameof(bufferSize), bufferSize, "Must be greater than zero.");
- }
-
- if (!csvStream.CanRead)
- {
- throw new ArgumentException("Stream does not support reading.", nameof(csvStream));
- }
-
- byte[] buffer = new byte[bufferSize];
- var tokenizer = new CsvTokenizer();
- int cnt;
- while ((cnt = await csvStream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
- {
- tokenizer.ProcessNextChunk(new ReadOnlySpan(buffer, 0, cnt), visitor);
- progress?.Report(cnt);
-
- // not all streams support cancellation, so we might as well do this ourselves. it
- // does involve a volatile read, so don't go overboard.
- cancellationToken.ThrowIfCancellationRequested();
- }
-
- tokenizer.ProcessEndOfStream(visitor);
- progress?.Report(0);
+ await CsvAsyncInput.ForStream(csvStream)
+ .WithMinReadBufferByteCount(bufferSize)
+ .WithReadBufferPool(null)
+ .WithIgnoreUTF8ByteOrderMark(false)
+ .ProcessAsync(visitor, progress, cancellationToken)
+ .ConfigureAwait(false);
}
///
@@ -275,45 +251,12 @@ public static async ValueTask ProcessStreamAsync(Stream csvStream, CsvReaderVisi
///
/// See .
///
- public static unsafe void ProcessFile(string csvFilePath, CsvReaderVisitorBase visitor)
+ [Obsolete("Use CsvSyncInput.ForMemoryMappedFile(csvFilePath).Process(visitor).")]
+ public static void ProcessFile(string csvFilePath, CsvReaderVisitorBase visitor)
{
- using (var fl = new FileStream(csvFilePath, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, FileOptions.SequentialScan))
- {
- long length = fl.Length;
- if (length == 0)
- {
- return;
- }
-
- var tokenizer = new CsvTokenizer();
- using (var memoryMappedFile = MemoryMappedFile.CreateFromFile(fl, null, 0, MemoryMappedFileAccess.Read, HandleInheritability.None, leaveOpen: true))
- using (var accessor = memoryMappedFile.CreateViewAccessor(0, 0, MemoryMappedFileAccess.Read))
- {
- var handle = accessor.SafeMemoryMappedViewHandle;
- byte* ptr = null;
- RuntimeHelpers.PrepareConstrainedRegions();
- try
- {
- handle.AcquirePointer(ref ptr);
- while (length > int.MaxValue)
- {
- tokenizer.ProcessNextChunk(new ReadOnlySpan(ptr, int.MaxValue), visitor);
- length -= int.MaxValue;
- ptr += int.MaxValue;
- }
-
- tokenizer.ProcessNextChunk(new ReadOnlySpan(ptr, unchecked((int)length)), visitor);
- tokenizer.ProcessEndOfStream(visitor);
- }
- finally
- {
- if (ptr != null)
- {
- handle.ReleasePointer();
- }
- }
- }
- }
+ CsvSyncInput.ForMemoryMappedFile(csvFilePath)
+ .WithIgnoreUTF8ByteOrderMark(false)
+ .Process(visitor);
}
}
}
diff --git a/src/Cursively/CsvAsyncInput.cs b/src/Cursively/CsvAsyncInput.cs
new file mode 100644
index 0000000..31a3539
--- /dev/null
+++ b/src/Cursively/CsvAsyncInput.cs
@@ -0,0 +1,91 @@
+using System;
+using System.Buffers;
+using System.Diagnostics.CodeAnalysis;
+using System.IO;
+using System.IO.Pipelines;
+using System.Threading;
+using System.Threading.Tasks;
+
+using Cursively.Inputs;
+
+namespace Cursively
+{
+ ///
+ /// Helpers to create inputs that describe CSV data streams asynchronously.
+ ///
+ public static class CsvAsyncInput
+ {
+ ///
+ /// Creates an input that can describe the contents of a given to an
+ /// instance of , asynchronously.
+ ///
+ ///
+ /// The that contains the CSV data.
+ ///
+ ///
+ /// An instance of wrapping .
+ ///
+ ///
+ /// Thrown when is non- and its
+ /// is .
+ ///
+ [SuppressMessage("Microsoft.Design", "CA1062:ValidateArgumentsOfPublicMethods")] // Microsoft.CodeAnalysis.FxCopAnalyzers 2.9.3 has a false positive. Remove when fixed
+ public static CsvAsyncStreamInput ForStream(Stream csvStream)
+ {
+ csvStream = csvStream ?? Stream.Null;
+ if (!csvStream.CanRead)
+ {
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
+ throw new ArgumentException("Stream does not support reading.", nameof(csvStream));
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
+ }
+
+ return new CsvAsyncStreamInput((byte)',', csvStream, 65536, ArrayPool.Shared, true);
+ }
+
+ ///
+ /// Creates an input that can describe the contents of a given to
+ /// an instance of , asynchronously.
+ ///
+ ///
+ /// The that contains the CSV data.
+ ///
+ ///
+ /// An instance of wrapping .
+ ///
+ public static CsvPipeReaderInput ForPipeReader(PipeReader reader)
+ {
+ reader = reader ?? NullPipeReader.Instance;
+ return new CsvPipeReaderInput((byte)',', reader, true);
+ }
+
+ private sealed class NullPipeReader : PipeReader
+ {
+ public static readonly NullPipeReader Instance = new NullPipeReader();
+
+ private NullPipeReader() { }
+
+ public override void AdvanceTo(SequencePosition consumed) { }
+
+ public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) { }
+
+ public override void CancelPendingRead() { }
+
+ public override void Complete(Exception exception = null) { }
+
+ public override void OnWriterCompleted(Action callback, object state) { }
+
+ public override ValueTask ReadAsync(CancellationToken cancellationToken = default)
+ {
+ TryRead(out var result);
+ return new ValueTask(result);
+ }
+
+ public override bool TryRead(out ReadResult result)
+ {
+ result = new ReadResult(ReadOnlySequence.Empty, false, true);
+ return true;
+ }
+ }
+ }
+}
diff --git a/src/Cursively/CsvReaderVisitorWithUTF8HeadersBase.cs b/src/Cursively/CsvReaderVisitorWithUTF8HeadersBase.cs
index dae9ca4..edb8dc6 100644
--- a/src/Cursively/CsvReaderVisitorWithUTF8HeadersBase.cs
+++ b/src/Cursively/CsvReaderVisitorWithUTF8HeadersBase.cs
@@ -2,6 +2,7 @@
using System.Collections.Immutable;
using System.Diagnostics;
using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
using System.Text;
namespace Cursively
@@ -61,6 +62,34 @@ namespace Cursively
///
public abstract class CsvReaderVisitorWithUTF8HeadersBase : CsvReaderVisitorBase
{
+ ///
+ ///
+ /// The maximum value that's legal for the maximum header count (0x7FEFFFFF).
+ ///
+ ///
+ /// Staying within this limit does not guarantee that you will be immune to
+ /// even with enough system virtual memory (that depends
+ /// on your configuration). This is just the threshold that, if exceeded, guarantees that
+ /// you actually *will* see on mainstream frameworks if
+ /// Cursively actually tried to go that high, so this is used as a "fail-fast".
+ ///
+ ///
+ protected static readonly int MaxMaxHeaderCount = 0x7FEFFFFF;
+
+ ///
+ ///
+ /// The maximum value that's legal for the maximum header length (0x7FEFFFFF).
+ ///
+ ///
+ /// Staying within this limit does not guarantee that you will be immune to
+ /// even with enough system virtual memory (that depends
+ /// on your configuration). This is just the threshold that, if exceeded, guarantees that
+ /// you actually *will* see on mainstream frameworks if
+ /// Cursively actually tried to go that high, so this is used as a "fail-fast".
+ ///
+ ///
+ protected static readonly int MaxMaxHeaderLength = 0x7FEFFFFF;
+
///
/// The value used by to initialize the
/// maximum number of headers (1,000).
@@ -77,6 +106,7 @@ public abstract class CsvReaderVisitorWithUTF8HeadersBase : CsvReaderVisitorBase
/// The value used by to initialize the
/// value indicating whether or not to ignore a leading UTF-8 BOM (true).
///
+ [Obsolete("Always pass in 'false' instead, per airbreather/Cursively#14")]
protected static readonly bool DefaultIgnoreUTF8IdentifierOnFirstHeaderField = true;
///
@@ -87,12 +117,16 @@ public abstract class CsvReaderVisitorWithUTF8HeadersBase : CsvReaderVisitorBase
private static readonly UTF8Encoding EncodingToUse = new UTF8Encoding(false, false);
- private readonly Decoder _headerDecoder;
+ private readonly int _maxHeaderCount;
- private readonly ImmutableArray.Builder _headersBuilder;
+ private readonly int _maxHeaderLength;
+
+ private readonly Decoder _headerDecoder;
private readonly bool _ignoreUTF8IdentifierOnFirstHeaderField;
+ private ImmutableArray.Builder _headersBuilder;
+
private char[] _headerBuffer;
private ImmutableArray _headers;
@@ -104,10 +138,11 @@ public abstract class CsvReaderVisitorWithUTF8HeadersBase : CsvReaderVisitorBase
///
/// Initializes a new instance of the class.
///
+ [Obsolete("Use the parameterized constructor, passing in 'false' for the flag to ignore a UTF-8 identifier on the first header field; instead, remove UTF-8 identifiers on the input itself. See airbreather/Cursively#14.")]
protected CsvReaderVisitorWithUTF8HeadersBase()
: this(maxHeaderCount: DefaultMaxHeaderCount,
maxHeaderLength: DefaultMaxHeaderLength,
- ignoreUTF8IdentifierOnFirstHeaderField: DefaultIgnoreUTF8IdentifierOnFirstHeaderField,
+ ignoreUTF8IdentifierOnFirstHeaderField: true,
decoderFallback: DefaultDecoderFallback)
{
}
@@ -124,8 +159,15 @@ protected CsvReaderVisitorWithUTF8HeadersBase()
/// Default: .
///
///
+ ///
/// A value indicating whether or not to ignore a leading UTF-8 BOM.
/// Default: .
+ ///
+ ///
+ /// This parameter was a mistake (see airbreather/Cursively#14) and will be removed in 2.x.
+ /// Instead, always pass in , and remove UTF-8 identifiers directly
+ /// at the source instead of leaving it up to the visitor.
+ ///
///
///
/// The fallback logic used when the decoder encounters invalid UTF-8 bytes.
@@ -136,18 +178,23 @@ protected CsvReaderVisitorWithUTF8HeadersBase()
///
///
/// Thrown when or is
- /// less than 1.
+ /// less than 1 or greater than the maximum for that parameter
+ /// ( / ).
///
protected CsvReaderVisitorWithUTF8HeadersBase(int maxHeaderCount, int maxHeaderLength, bool ignoreUTF8IdentifierOnFirstHeaderField, DecoderFallback decoderFallback)
{
- if (maxHeaderCount < 1)
+ if (maxHeaderCount < 1 || maxHeaderCount > MaxMaxHeaderCount)
{
- throw new ArgumentOutOfRangeException(nameof(maxHeaderCount), maxHeaderCount, "Must be greater than zero.");
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
+ throw new ArgumentOutOfRangeException(nameof(maxHeaderCount), maxHeaderCount, "Must be greater than zero and not greater than MaxMaxHeaderCount.");
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
}
- if (maxHeaderLength < 1)
+ if (maxHeaderLength < 1 || maxHeaderLength > MaxMaxHeaderLength)
{
- throw new ArgumentOutOfRangeException(nameof(maxHeaderLength), maxHeaderLength, "Must be greater than zero.");
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
+ throw new ArgumentOutOfRangeException(nameof(maxHeaderLength), maxHeaderLength, "Must be greater than zero and not greater than MaxMaxHeaderLength.");
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
}
if (decoderFallback is null)
@@ -157,9 +204,11 @@ protected CsvReaderVisitorWithUTF8HeadersBase(int maxHeaderCount, int maxHeaderL
_ignoreUTF8IdentifierOnFirstHeaderField = ignoreUTF8IdentifierOnFirstHeaderField;
- _headersBuilder = ImmutableArray.CreateBuilder(maxHeaderCount);
+ _maxHeaderCount = maxHeaderCount;
+ _headersBuilder = ImmutableArray.CreateBuilder();
- _headerBuffer = new char[maxHeaderLength];
+ _maxHeaderLength = maxHeaderLength;
+ _headerBuffer = new char[8];
_headerDecoder = EncodingToUse.GetDecoder();
_headerDecoder.Fallback = decoderFallback;
@@ -196,7 +245,9 @@ protected ImmutableArray Headers
[MethodImpl(MethodImplOptions.NoInlining)]
private static void ThrowExceptionWhenHeadersAreStillBeingBuilt() =>
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
throw new InvalidOperationException("Headers are still being built.");
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
///
/// Gets the zero-based index of the field that is currently being read. The value should
@@ -304,7 +355,9 @@ protected virtual void VisitMissingDataFields()
if (_headers.IsDefault)
{
// we will never do this, but a cheeky subclass might.
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
throw new InvalidOperationException("This method is only intended to be called by the base class.");
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
}
throw new CursivelyMissingDataFieldsException(_headers.Length, _currentFieldIndex);
@@ -328,7 +381,9 @@ protected virtual void VisitUnexpectedDataField()
if (_headers.IsDefault)
{
// we will never do this, but a cheeky subclass might.
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
throw new InvalidOperationException("This method is only intended to be called by the base class.");
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
}
throw new CursivelyExtraDataFieldsException(_headers.Length);
@@ -339,18 +394,12 @@ private unsafe void VisitPartialFieldContentsSlow(ReadOnlySpan chunk)
{
if (_headers.IsDefault)
{
- if (_headersBuilder.Capacity == _headersBuilder.Count)
- {
- throw new CursivelyTooManyHeadersException(_headersBuilder.Capacity);
- }
-
- if (chunk.IsEmpty)
+ if (_headersBuilder.Count == _maxHeaderCount)
{
- // the tokenizer will never do this, but an external caller might.
- return;
+ throw new CursivelyTooManyHeadersException(_maxHeaderCount);
}
- fixed (byte* b = &chunk[0])
+ fixed (byte* b = &MemoryMarshal.GetReference(chunk))
{
VisitHeaderChunk(b, chunk.Length, false);
}
@@ -368,24 +417,14 @@ private unsafe void VisitEndOfFieldSlow(ReadOnlySpan chunk)
{
if (_headers.IsDefault)
{
- if (_headersBuilder.Capacity == _headersBuilder.Count)
+ if (_headersBuilder.Count == _maxHeaderCount)
{
- throw new CursivelyTooManyHeadersException(_headersBuilder.Capacity);
+ throw new CursivelyTooManyHeadersException(_maxHeaderCount);
}
- if (chunk.IsEmpty)
+ fixed (byte* b = &MemoryMarshal.GetReference(chunk))
{
- // the tokenizer will never do this, but an external caller might. note that
- // the Decoder methods require a non-null pointer, even if the length is zero.
- byte b = 0xFF;
- VisitHeaderChunk(&b, 0, true);
- }
- else
- {
- fixed (byte* b = &chunk[0])
- {
- VisitHeaderChunk(b, chunk.Length, true);
- }
+ VisitHeaderChunk(b, chunk.Length, true);
}
int headerBufferOffset = 0;
@@ -419,17 +458,17 @@ private void VisitEndOfRecordSlow()
if (_headersBuilder.Count == 0)
{
// the tokenizer will never do this, but an external caller might.
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
throw new InvalidOperationException("No fields were present in the header record.");
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
}
- // this is almost equivalent to setting _headers = _headersBuilder.ToImmutable(),
- // but this does a better job rewarding people for setting the max field count to
- // the actual field count, which will often be the case.
_headersBuilder.Capacity = _headersBuilder.Count;
_headers = _headersBuilder.MoveToImmutable();
_currentFieldIndex = _headers.Length;
- // we're done building headers, so free up our buffer.
+ // we're done building headers, so free up our buffers.
+ _headersBuilder = null;
_headerBuffer = null;
// let the subclass know that the headers are ready, in case it wants to set up some
@@ -452,20 +491,68 @@ private void VisitEndOfRecordSlow()
private unsafe void VisitHeaderChunk(byte* b, int byteCount, bool flush)
{
+ // Decoder methods require non-null pointers, even if the lengths are zero. See
+ // dotnet/corefx#32861 for some discussion about the issue. When it starts making sense
+ // to target netstandard2.1, then we can stop with all the pointer stuff and just use
+ // spans directly. FWIW, it seems counter-intuitive, but it's actually correct to call
+ // this method unconditionally even if byteCount happens to be 0:
+ // - the tokenizer never calls VisitPartial* with an empty span, so checking before the
+ // method call in those cases would only benefit external callers of VisitPartial*.
+ // - from VisitEnd*, we need to tell the Decoder that the last chunk we sent it was
+ // actually the end of what we had so that it can trigger the fallback logic if a
+ // sequence started off as valid UTF-8 but was terminated abruptly.
+ void* garbageNonNullPointer = (void*)0xDEADBEEF;
+
+ if (byteCount == 0)
+ {
+ b = (byte*)garbageNonNullPointer;
+ }
+
int charCount = _headerDecoder.GetCharCount(b, byteCount, flush);
- if (_headerBufferConsumed + charCount <= _headerBuffer.Length)
+
+ int neededLength = _headerBufferConsumed + charCount;
+ int maxLength = _maxHeaderLength;
+ if (neededLength > maxLength)
+ {
+ throw new CursivelyHeaderIsTooLongException(_headerBuffer.Length);
+ }
+
+ EnsureHeaderBufferCapacity(neededLength);
+
+ // at this point, _headerBufferConsumed is guaranteed to be an index in _headerBuffer...
+ // ...unless charCount is 0, in which case it *might* point to one past the end (#16).
+ if (charCount == 0)
+ {
+ _headerDecoder.GetChars(b, byteCount, (char*)garbageNonNullPointer, 0, flush);
+ }
+ else
{
fixed (char* c = &_headerBuffer[_headerBufferConsumed])
{
_headerDecoder.GetChars(b, byteCount, c, charCount, flush);
}
+
+ _headerBufferConsumed += charCount;
}
- else
+ }
+
+ private void EnsureHeaderBufferCapacity(int neededLength)
+ {
+ if (neededLength > _headerBuffer.Length)
{
- throw new CursivelyHeaderIsTooLongException(_headerBuffer.Length);
- }
+ int maxLength = _maxHeaderLength;
+ int newLength = _headerBuffer.Length;
+
+ while (newLength < neededLength)
+ {
+ // double it until we reach the max length
+ newLength = maxLength - newLength > newLength
+ ? newLength + newLength
+ : maxLength;
+ }
- _headerBufferConsumed += charCount;
+ Array.Resize(ref _headerBuffer, newLength);
+ }
}
}
}
diff --git a/src/Cursively/CsvSyncInput.cs b/src/Cursively/CsvSyncInput.cs
new file mode 100644
index 0000000..ced7fc1
--- /dev/null
+++ b/src/Cursively/CsvSyncInput.cs
@@ -0,0 +1,115 @@
+using System;
+using System.Buffers;
+using System.Diagnostics.CodeAnalysis;
+using System.IO;
+
+using Cursively.Inputs;
+
+namespace Cursively
+{
+ ///
+ /// Helpers to create inputs that describe CSV data streams synchronously.
+ ///
+ public static class CsvSyncInput
+ {
+ ///
+ /// Creates an input that can describe the contents of a given to an
+ /// instance of , synchronously.
+ ///
+ ///
+ /// The that contains the CSV data.
+ ///
+ ///
+ /// An instance of wrapping .
+ ///
+ ///
+ /// Thrown when is non- and its
+ /// is .
+ ///
+ [SuppressMessage("Microsoft.Design", "CA1062:ValidateArgumentsOfPublicMethods")] // Microsoft.CodeAnalysis.FxCopAnalyzers 2.9.3 has a false positive. Remove when fixed
+ public static CsvSyncStreamInput ForStream(Stream csvStream)
+ {
+ csvStream = csvStream ?? Stream.Null;
+ if (!csvStream.CanRead)
+ {
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
+ throw new ArgumentException("Stream does not support reading.", nameof(csvStream));
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
+ }
+
+ return new CsvSyncStreamInput((byte)',', csvStream, 65536, ArrayPool.Shared, true);
+ }
+
+ ///
+ /// Creates an input that can describe the contents of a given file to an instance of
+ /// , synchronously using memory-mapping.
+ ///
+ ///
+ ///
+ /// The path to the file that contains the CSV data.
+ ///
+ ///
+ /// The only validation that Cursively does is .
+ ///
+ ///
+ ///
+ /// An instance of wrapping
+ /// .
+ ///
+ ///
+ /// Thrown when is .
+ ///
+ ///
+ /// Thrown when is non-, but is either
+ /// empty or whitespace-only.
+ ///
+ public static CsvMemoryMappedFileInput ForMemoryMappedFile(string csvFilePath)
+ {
+ if (csvFilePath is null)
+ {
+ throw new ArgumentNullException(nameof(csvFilePath));
+ }
+
+ if (string.IsNullOrWhiteSpace(csvFilePath))
+ {
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
+ throw new ArgumentException("Cannot be blank", nameof(csvFilePath));
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
+ }
+
+ return new CsvMemoryMappedFileInput((byte)',', csvFilePath, true);
+ }
+
+ ///
+ /// Creates an input that can describe the contents of a given
+ /// of bytes to an instance of
+ /// , synchronously.
+ ///
+ ///
+ /// The of bytes that contains the CSV data.
+ ///
+ ///
+ /// An instance of wrapping .
+ ///
+ public static CsvReadOnlyMemoryInput ForMemory(ReadOnlyMemory memory)
+ {
+ return new CsvReadOnlyMemoryInput((byte)',', memory, true);
+ }
+
+ ///
+ /// Creates an input that can describe the contents of a given
+ /// of bytes to an instance of
+ /// , synchronously.
+ ///
+ ///
+ /// The of bytes that contains the CSV data.
+ ///
+ ///
+ /// An instance of wrapping .
+ ///
+ public static CsvReadOnlySequenceInput ForSequence(ReadOnlySequence sequence)
+ {
+ return new CsvReadOnlySequenceInput((byte)',', sequence, true);
+ }
+ }
+}
diff --git a/src/Cursively/CsvTokenizer.cs b/src/Cursively/CsvTokenizer.cs
index 0595009..ebec4ac 100644
--- a/src/Cursively/CsvTokenizer.cs
+++ b/src/Cursively/CsvTokenizer.cs
@@ -1,4 +1,5 @@
using System;
+using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
namespace Cursively
@@ -173,17 +174,14 @@ public CsvTokenizer()
///
public CsvTokenizer(byte delimiter)
{
- switch (delimiter)
+ if (!IsValidDelimiter(delimiter))
{
- case CR:
- case LF:
- case QUOTE:
- throw new ArgumentException("Must not be a carriage return, linefeed, or double-quote.", nameof(delimiter));
-
- default:
- _delimiter = delimiter;
- break;
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
+ throw new ArgumentException("Must not be a carriage return, linefeed, or double-quote.", nameof(delimiter));
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
}
+
+ _delimiter = delimiter;
}
[Flags]
@@ -197,6 +195,32 @@ private enum ParserFlags : byte
CutAtPotentiallyTerminalDoubleQuote = 0b00010000,
}
+ ///
+ /// Checks if a particular byte value is legal for , i.e.,
+ /// that it is not 0x0A, 0x0D, or 0x22.
+ ///
+ ///
+ /// The single byte to expect to see between fields of the same record. This may not be an
+ /// end-of-line or double-quote character, as those have special meanings.
+ ///
+ ///
+ /// if the delimiter is legal for ,
+ /// otherwise.
+ ///
+ public static bool IsValidDelimiter(byte delimiter)
+ {
+ switch (delimiter)
+ {
+ case CR:
+ case LF:
+ case QUOTE:
+ return false;
+
+ default:
+ return true;
+ }
+ }
+
///
/// Accepts the next (or first) chunk of data in the CSV stream, and informs an instance of
/// what it contains.
@@ -211,13 +235,11 @@ private enum ParserFlags : byte
///
/// If is empty, this method will do nothing.
///
+ [SuppressMessage("Microsoft.Design", "CA1062:ValidateArgumentsOfPublicMethods")] // Microsoft.CodeAnalysis.FxCopAnalyzers 2.9.3 has a false positive. Remove when fixed
public void ProcessNextChunk(ReadOnlySpan chunk, CsvReaderVisitorBase visitor)
{
- if (visitor is null)
- {
- // "null object" pattern.
- visitor = CsvReaderVisitorBase.Null;
- }
+ // "null object" pattern.
+ visitor = visitor ?? CsvReaderVisitorBase.Null;
byte delimiter = _delimiter;
@@ -299,13 +321,11 @@ public void ProcessNextChunk(ReadOnlySpan chunk, CsvReaderVisitorBase visi
/// the last time that this method was called), then this method will do nothing.
///
///
+ [SuppressMessage("Microsoft.Design", "CA1062:ValidateArgumentsOfPublicMethods")] // Microsoft.CodeAnalysis.FxCopAnalyzers 2.9.3 has a false positive. Remove when fixed
public void ProcessEndOfStream(CsvReaderVisitorBase visitor)
{
- if (visitor is null)
- {
- visitor = CsvReaderVisitorBase.Null;
- }
-
+ // "null object" pattern.
+ visitor = visitor ?? CsvReaderVisitorBase.Null;
ProcessEndOfRecord(default, visitor);
}
@@ -375,6 +395,10 @@ private void PickUpFromLastTime(ref ReadOnlySpan readBuffer, CsvReaderVisi
// slice off the data up to the quote and the next byte that we read.
readBuffer = readBuffer.Slice(idx + 2);
}
+ else if ((_parserFlags & ParserFlags.CutAtPotentiallyTerminalDoubleQuote) != 0)
+ {
+ HandleBufferCutAtPotentiallyTerminalDoubleQuote(ref readBuffer, visitor);
+ }
else
{
// this is expected to be rare: either we were cut between field reads, or we're
@@ -382,12 +406,6 @@ private void PickUpFromLastTime(ref ReadOnlySpan readBuffer, CsvReaderVisi
// the field; by this point, we don't save enough state to remember which case we're
// in, so VisitNonstandardQuotedField **MUST** have been correctly called (or not)
// before entering this section.
- if ((_parserFlags & ParserFlags.CutAtPotentiallyTerminalDoubleQuote) != 0)
- {
- HandleBufferCutAtPotentiallyTerminalDoubleQuote(ref readBuffer, visitor);
- return;
- }
-
for (int idx = 0; idx < readBuffer.Length; idx++)
{
byte b = readBuffer[idx];
diff --git a/src/Cursively/Cursively.csproj b/src/Cursively/Cursively.csproj
index ebdabd9..f35dcf2 100644
--- a/src/Cursively/Cursively.csproj
+++ b/src/Cursively/Cursively.csproj
@@ -16,9 +16,11 @@
+
+
-
+
diff --git a/src/Cursively/Inputs/CsvAsyncInputBase.cs b/src/Cursively/Inputs/CsvAsyncInputBase.cs
new file mode 100644
index 0000000..0e1267f
--- /dev/null
+++ b/src/Cursively/Inputs/CsvAsyncInputBase.cs
@@ -0,0 +1,97 @@
+using System;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Cursively.Inputs
+{
+ ///
+ /// Models a CSV source data stream that can be processed asynchronously.
+ ///
+ public abstract class CsvAsyncInputBase
+ {
+ private protected static readonly byte[] UTF8BOM = { 0xEF, 0xBB, 0xBF };
+
+ private static readonly object ProcessingHasStartedSentinel = new object();
+
+ private object _processingHasStarted;
+
+ ///
+ /// Describes the contents of this CSV data stream to a .
+ ///
+ ///
+ /// The to describe this CSV data stream to.
+ ///
+ ///
+ /// An optional instance that will receive a report of the size
+ /// of each chunk (in bytes) as processing finishes, followed by one more report with a zero
+ /// when the last chunk in the stream has been processed.
+ ///
+ ///
+ /// An optional that may be used to signal cancellation.
+ ///
+ ///
+ /// A encapsulating the operation.
+ ///
+ ///
+ /// Thrown when this stream has already been processed.
+ ///
+ ///
+ /// Thrown to acknowledge a canceled . Some subclasses
+ /// may throw an instance of a subclass, such as .
+ ///
+ public async ValueTask ProcessAsync(CsvReaderVisitorBase visitor, IProgress progress = null, CancellationToken cancellationToken = default)
+ {
+ if (!(Interlocked.CompareExchange(ref _processingHasStarted, ProcessingHasStartedSentinel, null) is null))
+ {
+ ThrowProcessingHasAlreadyStartedException();
+ }
+
+ await ProcessAsyncCore(visitor, progress, cancellationToken).ConfigureAwait(false);
+ }
+
+ ///
+ /// Implements the inner logic for .
+ ///
+ ///
+ /// The to describe this CSV data stream to.
+ ///
+ ///
+ /// An optional instance that will receive a report of the size
+ /// of each chunk (in bytes) as processing finishes, followed by one more report with a zero
+ /// when the last chunk in the stream has been processed.
+ ///
+ ///
+ /// An optional that may be used to signal cancellation.
+ ///
+ ///
+ /// A encapsulating the operation.
+ ///
+ ///
+ /// Thrown when this stream has already been processed.
+ ///
+ ///
+ /// Thrown to acknowledge a canceled . Some subclasses
+ /// may throw an instance of a subclass, such as .
+ ///
+ protected abstract ValueTask ProcessAsyncCore(CsvReaderVisitorBase visitor, IProgress progress, CancellationToken cancellationToken);
+
+ ///
+ /// Throws if has already been called for this instance.
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ protected void ThrowIfProcessingHasAlreadyStarted()
+ {
+ if (_processingHasStarted == ProcessingHasStartedSentinel)
+ {
+ ThrowProcessingHasAlreadyStartedException();
+ }
+ }
+
+ [MethodImpl(MethodImplOptions.NoInlining)]
+ private static void ThrowProcessingHasAlreadyStartedException() =>
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
+ throw new InvalidOperationException("Processing has already been started.");
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
+ }
+}
diff --git a/src/Cursively/Inputs/CsvAsyncStreamInput.cs b/src/Cursively/Inputs/CsvAsyncStreamInput.cs
new file mode 100644
index 0000000..38e47c7
--- /dev/null
+++ b/src/Cursively/Inputs/CsvAsyncStreamInput.cs
@@ -0,0 +1,251 @@
+using System;
+using System.Buffers;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Cursively.Inputs
+{
+ ///
+ /// Implementation of backed by a .
+ ///
+ public sealed class CsvAsyncStreamInput : CsvAsyncInputBase
+ {
+ private readonly byte _delimiter;
+
+ private readonly Stream _csvStream;
+
+ private readonly int _minReadBufferByteCount;
+
+ private readonly ArrayPool _readBufferPool;
+
+ private readonly bool _ignoreUTF8ByteOrderMark;
+
+ internal CsvAsyncStreamInput(byte delimiter, Stream csvStream, int minReadBufferByteCount, ArrayPool readBufferPool, bool ignoreUTF8ByteOrderMark)
+ {
+ _delimiter = delimiter;
+ _csvStream = csvStream;
+ _minReadBufferByteCount = minReadBufferByteCount;
+ _readBufferPool = readBufferPool;
+ _ignoreUTF8ByteOrderMark = ignoreUTF8ByteOrderMark;
+ }
+
+ ///
+ /// Creates a new instance of the class as a copy of this
+ /// one, with the given delimiter.
+ ///
+ ///
+ /// The delimiter to use. Use to test whether
+ /// or not a particular value is valid.
+ ///
+ ///
+ /// A new instance of the class as a copy of this one, with
+ /// the given delimiter.
+ ///
+ ///
+ /// Thrown when is one of the illegal values.
+ ///
+ ///
+ /// Thrown when has already been called.
+ ///
+ public CsvAsyncStreamInput WithDelimiter(byte delimiter)
+ {
+ if (!CsvTokenizer.IsValidDelimiter(delimiter))
+ {
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
+ throw new ArgumentException("Must not be a carriage return, linefeed, or double-quote.", nameof(delimiter));
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
+ }
+
+ ThrowIfProcessingHasAlreadyStarted();
+ return new CsvAsyncStreamInput(delimiter, _csvStream, _minReadBufferByteCount, _readBufferPool, _ignoreUTF8ByteOrderMark);
+ }
+
+ ///
+ /// Creates a new instance of the class as a copy of this
+ /// one, reading in chunks of sizes that are at least the given number of bytes.
+ ///
+ ///
+ ///
+ /// The minimum size, in bytes, of chunks to read from the buffer.
+ ///
+ ///
+ /// When using an , this is the value that will be used for
+ /// , so larger chunks should be expected.
+ ///
+ ///
+ /// When not using an (i.e., on instances configured by calling
+ /// passing in ),
+ /// this is the actual size of any arrays that will be allocated on the managed heap.
+ ///
+ ///
+ ///
+ /// A new instance of the class as a copy of this one,
+ /// using the given to provide temporary buffers for the
+ /// to read into.
+ ///
+ ///
+ /// Thrown when is not greater than zero.
+ ///
+ ///
+ /// Thrown when has already been called.
+ ///
+ public CsvAsyncStreamInput WithMinReadBufferByteCount(int minReadBufferByteCount)
+ {
+ if (minReadBufferByteCount < 1)
+ {
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
+ throw new ArgumentOutOfRangeException(nameof(minReadBufferByteCount), minReadBufferByteCount, "Must be greater than zero.");
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
+ }
+
+ ThrowIfProcessingHasAlreadyStarted();
+ return new CsvAsyncStreamInput(_delimiter, _csvStream, minReadBufferByteCount, _readBufferPool, _ignoreUTF8ByteOrderMark);
+ }
+
+ ///
+ /// Creates a new instance of the class as a copy of this
+ /// one, using the given to provide temporary buffers for the
+ /// to read into.
+ ///
+ ///
+ /// The to provide temporary buffers for the
+ /// to read into, or if the temporary buffers should be allocated
+ /// directly on the managed heap.
+ ///
+ ///
+ /// A new instance of the class as a copy of this one,
+ /// using the given to provide temporary buffers for the
+ /// to read into.
+ ///
+ ///
+ /// Thrown when has already been called.
+ ///
+ public CsvAsyncStreamInput WithReadBufferPool(ArrayPool readBufferPool)
+ {
+ ThrowIfProcessingHasAlreadyStarted();
+ return new CsvAsyncStreamInput(_delimiter, _csvStream, _minReadBufferByteCount, readBufferPool, _ignoreUTF8ByteOrderMark);
+ }
+
+ ///
+ /// Creates a new instance of the class as a copy of this
+ /// one, with the given flag indicating whether or not a leading UTF-8 byte order mark, if
+ /// present, should be omitted from the first field's data.
+ ///
+ ///
+ /// A value indicating whether or not a leading UTF-8 byte order mark, if present, should be
+ /// omitted from the first field's data.
+ ///
+ ///
+ /// A new instance of the class as a copy of this one, with
+ /// the given flag indicating whether or not a leading UTF-8 byte order mark, if present,
+ /// should be omitted from the first field's data.
+ ///
+ ///
+ /// Thrown when has already been called.
+ ///
+ public CsvAsyncStreamInput WithIgnoreUTF8ByteOrderMark(bool ignoreUTF8ByteOrderMark)
+ {
+ ThrowIfProcessingHasAlreadyStarted();
+ return new CsvAsyncStreamInput(_delimiter, _csvStream, _minReadBufferByteCount, _readBufferPool, ignoreUTF8ByteOrderMark);
+ }
+
+ ///
+ protected override async ValueTask ProcessAsyncCore(CsvReaderVisitorBase visitor, IProgress progress, CancellationToken cancellationToken)
+ {
+ // not all streams support cancellation, so we might as well do this ourselves. it
+ // does involve a volatile read, so don't go overboard.
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var tokenizer = new CsvTokenizer(_delimiter);
+ var csvStream = _csvStream;
+ int minReadBufferByteCount = _minReadBufferByteCount;
+ var readBufferPool = _readBufferPool;
+
+ byte[] readBuffer;
+ if (readBufferPool is null)
+ {
+ readBuffer = new byte[minReadBufferByteCount];
+ }
+ else
+ {
+ readBuffer = readBufferPool.Rent(minReadBufferByteCount);
+ }
+
+ try
+ {
+ if (_ignoreUTF8ByteOrderMark && await EatUTF8BOMAsync(tokenizer, visitor, csvStream, readBuffer, progress, cancellationToken).ConfigureAwait(false))
+ {
+ return;
+ }
+
+ int cnt;
+ while ((cnt = await csvStream.ReadAsync(readBuffer, 0, readBuffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
+ {
+ // not all streams support cancellation, so we might as well do this ourselves. it
+ // does involve a volatile read, so don't go overboard.
+ cancellationToken.ThrowIfCancellationRequested();
+
+ tokenizer.ProcessNextChunk(new ReadOnlySpan(readBuffer, 0, cnt), visitor);
+ progress?.Report(cnt);
+ }
+ }
+ finally
+ {
+ readBufferPool?.Return(readBuffer, clearArray: true);
+ }
+
+ tokenizer.ProcessEndOfStream(visitor);
+ progress?.Report(0);
+ }
+
+ private static async ValueTask EatUTF8BOMAsync(CsvTokenizer tokenizer, CsvReaderVisitorBase visitor, Stream csvStream, byte[] readBuffer, IProgress progress, CancellationToken cancellationToken)
+ {
+ if (readBuffer.Length < 3)
+ {
+ // don't bother pooling; nobody should really ever care.
+ readBuffer = new byte[3];
+ }
+
+ int byteCount = 0;
+ while (byteCount < 3)
+ {
+ int readLength = await csvStream.ReadAsync(readBuffer, byteCount, readBuffer.Length - byteCount, cancellationToken).ConfigureAwait(false);
+
+ // not all streams support cancellation, so we might as well do this ourselves. it
+ // does involve a volatile read, so don't go overboard.
+ cancellationToken.ThrowIfCancellationRequested();
+
+ if (readLength == 0)
+ {
+ if (byteCount != 0)
+ {
+ if (!new ReadOnlySpan(readBuffer, 0, byteCount).SequenceEqual(new ReadOnlySpan(UTF8BOM, 0, byteCount)))
+ {
+ tokenizer.ProcessNextChunk(new ReadOnlySpan(readBuffer, 0, byteCount), visitor);
+ }
+
+ progress?.Report(byteCount);
+ }
+
+ tokenizer.ProcessEndOfStream(visitor);
+ progress?.Report(0);
+ return true;
+ }
+
+ byteCount += readLength;
+ }
+
+ var buf = new ReadOnlyMemory(readBuffer, 0, byteCount);
+ if (buf.Span.StartsWith(UTF8BOM))
+ {
+ buf = buf.Slice(3);
+ }
+
+ tokenizer.ProcessNextChunk(buf.Span, visitor);
+ progress?.Report(byteCount);
+
+ return false;
+ }
+ }
+}
diff --git a/src/Cursively/Inputs/CsvMemoryMappedFileInput.cs b/src/Cursively/Inputs/CsvMemoryMappedFileInput.cs
new file mode 100644
index 0000000..afe7e1b
--- /dev/null
+++ b/src/Cursively/Inputs/CsvMemoryMappedFileInput.cs
@@ -0,0 +1,137 @@
+using System;
+using System.IO;
+using System.IO.MemoryMappedFiles;
+using System.Runtime.CompilerServices;
+
+namespace Cursively.Inputs
+{
+ ///
+ /// Implementation of backed by a file from the filesystem that
+ /// will be processed by mapping it into virtual memory and then treating it like a contiguous
+ /// array of bytes.
+ ///
+ public sealed class CsvMemoryMappedFileInput : CsvSyncInputBase
+ {
+ private readonly byte _delimiter;
+
+ private readonly string _csvFilePath;
+
+ private readonly bool _ignoreUTF8ByteOrderMark;
+
+ internal CsvMemoryMappedFileInput(byte delimiter, string csvFilePath, bool ignoreUTF8ByteOrderMark)
+ {
+ _delimiter = delimiter;
+ _csvFilePath = csvFilePath;
+ _ignoreUTF8ByteOrderMark = ignoreUTF8ByteOrderMark;
+ }
+
+ ///
+ /// Creates a new instance of the class as a copy of this
+ /// one, with the given delimiter.
+ ///
+ ///
+ /// The delimiter to use. Use to test whether
+ /// or not a particular value is valid.
+ ///
+ ///
+ /// A new instance of the class as a copy of this one, with
+ /// the given delimiter.
+ ///
+ ///
+ /// Thrown when is one of the illegal values.
+ ///
+ ///
+ /// Thrown when has already been called.
+ ///
+ public CsvMemoryMappedFileInput WithDelimiter(byte delimiter)
+ {
+ if (!CsvTokenizer.IsValidDelimiter(delimiter))
+ {
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
+ throw new ArgumentException("Must not be a carriage return, linefeed, or double-quote.", nameof(delimiter));
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
+ }
+
+ ThrowIfProcessingHasAlreadyStarted();
+ return new CsvMemoryMappedFileInput(delimiter, _csvFilePath, _ignoreUTF8ByteOrderMark);
+ }
+
+ ///
+ /// Creates a new instance of the class as a copy of this
+ /// one, with the given flag indicating whether or not a leading UTF-8 byte order mark, if
+ /// present, should be omitted from the first field's data.
+ ///
+ ///
+ /// A value indicating whether or not a leading UTF-8 byte order mark, if present, should be
+ /// omitted from the first field's data.
+ ///
+ ///
+ /// A new instance of the class as a copy of this one, with
+ /// the given flag indicating whether or not a leading UTF-8 byte order mark, if present,
+ /// should be omitted from the first field's data.
+ ///
+ ///
+ /// Thrown when has already been called.
+ ///
+ public CsvMemoryMappedFileInput WithIgnoreUTF8ByteOrderMark(bool ignoreUTF8ByteOrderMark)
+ {
+ ThrowIfProcessingHasAlreadyStarted();
+ return new CsvMemoryMappedFileInput(_delimiter, _csvFilePath, ignoreUTF8ByteOrderMark);
+ }
+
+ ///
+ protected override unsafe void ProcessCore(CsvReaderVisitorBase visitor)
+ {
+ var tokenizer = new CsvTokenizer(_delimiter);
+
+ using (var fl = new FileStream(_csvFilePath, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, FileOptions.SequentialScan))
+ {
+ long length = fl.Length;
+ if (length == 0)
+ {
+ tokenizer.ProcessEndOfStream(visitor);
+ return;
+ }
+
+ using (var memoryMappedFile = MemoryMappedFile.CreateFromFile(fl, null, 0, MemoryMappedFileAccess.Read, HandleInheritability.None, leaveOpen: true))
+ using (var accessor = memoryMappedFile.CreateViewAccessor(0, 0, MemoryMappedFileAccess.Read))
+ {
+ var handle = accessor.SafeMemoryMappedViewHandle;
+ byte* ptr = null;
+ RuntimeHelpers.PrepareConstrainedRegions();
+ try
+ {
+ handle.AcquirePointer(ref ptr);
+
+ if (_ignoreUTF8ByteOrderMark)
+ {
+ var head = new ReadOnlySpan(UTF8BOM, 0, length < 3 ? unchecked((int)length) : 3);
+ if (head.SequenceEqual(new ReadOnlySpan(ptr, head.Length)))
+ {
+ length -= head.Length;
+ ptr += head.Length;
+ }
+ }
+
+ while (length > int.MaxValue)
+ {
+ tokenizer.ProcessNextChunk(new ReadOnlySpan(ptr, int.MaxValue), visitor);
+ length -= int.MaxValue;
+ ptr += int.MaxValue;
+ }
+
+ tokenizer.ProcessNextChunk(new ReadOnlySpan(ptr, unchecked((int)length)), visitor);
+ tokenizer.ProcessEndOfStream(visitor);
+ }
+ finally
+ {
+ if (ptr != null)
+ {
+ handle.ReleasePointer();
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/src/Cursively/Inputs/CsvPipeReaderInput.cs b/src/Cursively/Inputs/CsvPipeReaderInput.cs
new file mode 100644
index 0000000..b108f51
--- /dev/null
+++ b/src/Cursively/Inputs/CsvPipeReaderInput.cs
@@ -0,0 +1,207 @@
+using System;
+using System.Buffers;
+using System.IO.Pipelines;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Cursively.Inputs
+{
+ ///
+ /// Implementation of backed by a .
+ ///
+ public sealed class CsvPipeReaderInput : CsvAsyncInputBase
+ {
+ private readonly byte _delimiter;
+
+ private readonly PipeReader _reader;
+
+ private readonly bool _ignoreUTF8ByteOrderMark;
+
+ internal CsvPipeReaderInput(byte delimiter, PipeReader reader, bool ignoreUTF8ByteOrderMark)
+ {
+ _delimiter = delimiter;
+ _reader = reader;
+ _ignoreUTF8ByteOrderMark = ignoreUTF8ByteOrderMark;
+ }
+
+ ///
+ /// Creates a new instance of the class as a copy of this
+ /// one, with the given delimiter.
+ ///
+ ///
+ /// The delimiter to use. Use to test whether
+ /// or not a particular value is valid.
+ ///
+ ///
+ /// A new instance of the class as a copy of this one, with
+ /// the given delimiter.
+ ///
+ ///
+ /// Thrown when is one of the illegal values.
+ ///
+ ///
+ /// Thrown when has already been called.
+ ///
+ public CsvPipeReaderInput WithDelimiter(byte delimiter)
+ {
+ if (!CsvTokenizer.IsValidDelimiter(delimiter))
+ {
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
+ throw new ArgumentException("Must not be a carriage return, linefeed, or double-quote.", nameof(delimiter));
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
+ }
+
+ ThrowIfProcessingHasAlreadyStarted();
+ return new CsvPipeReaderInput(delimiter, _reader, _ignoreUTF8ByteOrderMark);
+ }
+
+ ///
+ /// Creates a new instance of the class as a copy of this
+ /// one, with the given flag indicating whether or not a leading UTF-8 byte order mark, if
+ /// present, should be omitted from the first field's data.
+ ///
+ ///
+ /// A value indicating whether or not a leading UTF-8 byte order mark, if present, should be
+ /// omitted from the first field's data.
+ ///
+ ///
+ /// A new instance of the class as a copy of this one, with
+ /// the given flag indicating whether or not a leading UTF-8 byte order mark, if present,
+ /// should be omitted from the first field's data.
+ ///
+ ///
+ /// Thrown when has already been called.
+ ///
+ public CsvPipeReaderInput WithIgnoreUTF8ByteOrderMark(bool ignoreUTF8ByteOrderMark)
+ {
+ ThrowIfProcessingHasAlreadyStarted();
+ return new CsvPipeReaderInput(_delimiter, _reader, ignoreUTF8ByteOrderMark);
+ }
+
+ ///
+ protected override async ValueTask ProcessAsyncCore(CsvReaderVisitorBase visitor, IProgress progress, CancellationToken cancellationToken)
+ {
+ var tokenizer = new CsvTokenizer(_delimiter);
+ var reader = _reader;
+
+ if (_ignoreUTF8ByteOrderMark && await EatUTF8BOMAsync(tokenizer, visitor, progress, cancellationToken).ConfigureAwait(false))
+ {
+ return;
+ }
+
+ while (true)
+ {
+ var result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
+
+ if (result.IsCanceled)
+ {
+ throw new OperationCanceledException(cancellationToken);
+ }
+
+ var buffer = result.Buffer;
+ foreach (var segment in buffer)
+ {
+ tokenizer.ProcessNextChunk(segment.Span, visitor);
+ }
+
+ reader.AdvanceTo(buffer.End);
+ if (progress != null)
+ {
+ long totalLength = buffer.Length;
+ while (totalLength > int.MaxValue)
+ {
+ progress.Report(int.MaxValue);
+ totalLength -= int.MaxValue;
+ }
+
+ if (totalLength != 0)
+ {
+ progress.Report(unchecked((int)totalLength));
+ }
+ }
+
+ if (result.IsCompleted)
+ {
+ break;
+ }
+ }
+
+ tokenizer.ProcessEndOfStream(visitor);
+ progress?.Report(0);
+ }
+
+ private async ValueTask EatUTF8BOMAsync(CsvTokenizer tokenizer, CsvReaderVisitorBase visitor, IProgress progress, CancellationToken cancellationToken)
+ {
+ var reader = _reader;
+
+ ReadOnlySequence buffer;
+
+ // keep asking for more until we've seen either 3+ bytes or the end of the data.
+ while (true)
+ {
+ var result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
+
+ if (result.IsCanceled)
+ {
+ throw new OperationCanceledException(cancellationToken);
+ }
+
+ buffer = result.Buffer;
+ if (buffer.Length >= 3)
+ {
+ // we've seen 3+ bytes.
+ break;
+ }
+
+ if (result.IsCompleted)
+ {
+ // we've seen the end of the data.
+ Finish();
+ tokenizer.ProcessEndOfStream(visitor);
+ reader.AdvanceTo(buffer.End);
+ progress?.Report(0);
+ return true;
+ }
+
+ // tell the reader that we've looked at everything it had to give us, and we weren't
+ // able to consume any of it, so the next read should have everything we've seen so
+ // far, plus at least one more byte.
+ reader.AdvanceTo(buffer.Start, buffer.End);
+ }
+
+ Finish();
+ return false;
+
+ void Finish()
+ {
+ Span upToFirstThreeBytes = stackalloc byte[3];
+ int alreadyEaten = 0;
+ foreach (var segment in buffer)
+ {
+ int lengthToCopy = 3 - alreadyEaten;
+ if (lengthToCopy > segment.Length)
+ {
+ lengthToCopy = segment.Length;
+ }
+
+ segment.Slice(0, lengthToCopy).Span.CopyTo(upToFirstThreeBytes.Slice(alreadyEaten, lengthToCopy));
+ alreadyEaten += lengthToCopy;
+ if (alreadyEaten == 3)
+ {
+ break;
+ }
+ }
+
+ upToFirstThreeBytes = upToFirstThreeBytes.Slice(0, alreadyEaten);
+ var head = new ReadOnlySpan(UTF8BOM, 0, alreadyEaten);
+ if (!upToFirstThreeBytes.SequenceEqual(head))
+ {
+ tokenizer.ProcessNextChunk(upToFirstThreeBytes, visitor);
+ }
+
+ reader.AdvanceTo(buffer.GetPosition(alreadyEaten));
+ progress?.Report(alreadyEaten);
+ }
+ }
+ }
+}
diff --git a/src/Cursively/Inputs/CsvReadOnlyMemoryInput.cs b/src/Cursively/Inputs/CsvReadOnlyMemoryInput.cs
new file mode 100644
index 0000000..fea1ddd
--- /dev/null
+++ b/src/Cursively/Inputs/CsvReadOnlyMemoryInput.cs
@@ -0,0 +1,99 @@
+using System;
+
+namespace Cursively.Inputs
+{
+ ///
+ /// Implementation of backed by a
+ /// of bytes.
+ ///
+ public sealed class CsvReadOnlyMemoryInput : CsvSyncInputBase
+ {
+ private readonly byte _delimiter;
+
+ private readonly ReadOnlyMemory _memory;
+
+ private readonly bool _ignoreUTF8ByteOrderMark;
+
+ internal CsvReadOnlyMemoryInput(byte delimiter, ReadOnlyMemory memory, bool ignoreUTF8ByteOrderMark)
+ {
+ _delimiter = delimiter;
+ _memory = memory;
+ _ignoreUTF8ByteOrderMark = ignoreUTF8ByteOrderMark;
+ }
+
+ ///
+ /// Creates a new instance of the class as a copy of this
+ /// one, with the given delimiter.
+ ///
+ ///
+ /// The delimiter to use. Use to test whether
+ /// or not a particular value is valid.
+ ///
+ ///
+ /// A new instance of the class as a copy of this one, with
+ /// the given delimiter.
+ ///
+ ///
+ /// Thrown when is one of the illegal values.
+ ///
+ ///
+ /// Thrown when has already been called.
+ ///
+ public CsvReadOnlyMemoryInput WithDelimiter(byte delimiter)
+ {
+ if (!CsvTokenizer.IsValidDelimiter(delimiter))
+ {
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
+ throw new ArgumentException("Must not be a carriage return, linefeed, or double-quote.", nameof(delimiter));
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
+ }
+
+ ThrowIfProcessingHasAlreadyStarted();
+ return new CsvReadOnlyMemoryInput(delimiter, _memory, _ignoreUTF8ByteOrderMark);
+ }
+
+ ///
+ /// Creates a new instance of the class as a copy of this
+ /// one, with the given flag indicating whether or not a leading UTF-8 byte order mark, if
+ /// present, should be omitted from the first field's data.
+ ///
+ ///
+ /// A value indicating whether or not a leading UTF-8 byte order mark, if present, should be
+ /// omitted from the first field's data.
+ ///
+ ///
+ /// A new instance of the class as a copy of this one, with
+ /// the given flag indicating whether or not a leading UTF-8 byte order mark, if present,
+ /// should be omitted from the first field's data.
+ ///
+ ///
+ /// Thrown when has already been called.
+ ///
+ public CsvReadOnlyMemoryInput WithIgnoreUTF8ByteOrderMark(bool ignoreUTF8ByteOrderMark)
+ {
+ ThrowIfProcessingHasAlreadyStarted();
+ return new CsvReadOnlyMemoryInput(_delimiter, _memory, ignoreUTF8ByteOrderMark);
+ }
+
+ ///
+ protected override void ProcessCore(CsvReaderVisitorBase visitor)
+ {
+ ProcessFullSegment(_memory.Span, _ignoreUTF8ByteOrderMark, new CsvTokenizer(_delimiter), visitor);
+ }
+
+ internal static void ProcessFullSegment(ReadOnlySpan bytes, bool ignoreUTF8ByteOrderMark, CsvTokenizer tokenizer, CsvReaderVisitorBase visitor)
+ {
+ if (ignoreUTF8ByteOrderMark)
+ {
+ var head = new ReadOnlySpan(UTF8BOM, 0, bytes.Length < 3 ? bytes.Length : 3);
+ if (bytes.StartsWith(head))
+ {
+ bytes = bytes.Slice(head.Length);
+ }
+ }
+
+ tokenizer.ProcessNextChunk(bytes, visitor);
+ tokenizer.ProcessEndOfStream(visitor);
+ }
+ }
+}
diff --git a/src/Cursively/Inputs/CsvReadOnlySequenceInput.cs b/src/Cursively/Inputs/CsvReadOnlySequenceInput.cs
new file mode 100644
index 0000000..7faa924
--- /dev/null
+++ b/src/Cursively/Inputs/CsvReadOnlySequenceInput.cs
@@ -0,0 +1,181 @@
+using System;
+using System.Buffers;
+
+namespace Cursively.Inputs
+{
+ ///
+ /// Implementation of backed by a
+ /// of bytes.
+ ///
+ public sealed class CsvReadOnlySequenceInput : CsvSyncInputBase
+ {
+ private readonly byte _delimiter;
+
+ private readonly ReadOnlySequence _sequence;
+
+ private readonly bool _ignoreUTF8ByteOrderMark;
+
+ internal CsvReadOnlySequenceInput(byte delimiter, ReadOnlySequence sequence, bool ignoreUTF8ByteOrderMark)
+ {
+ _delimiter = delimiter;
+ _sequence = sequence;
+ _ignoreUTF8ByteOrderMark = ignoreUTF8ByteOrderMark;
+ }
+
+ ///
+ /// Creates a new instance of the class as a copy of this
+ /// one, with the given delimiter.
+ ///
+ ///
+ /// The delimiter to use. Use to test whether
+ /// or not a particular value is valid.
+ ///
+ ///
+ /// A new instance of the class as a copy of this one, with
+ /// the given delimiter.
+ ///
+ ///
+ /// Thrown when is one of the illegal values.
+ ///
+ ///
+ /// Thrown when has already been called.
+ ///
+ public CsvReadOnlySequenceInput WithDelimiter(byte delimiter)
+ {
+ if (!CsvTokenizer.IsValidDelimiter(delimiter))
+ {
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
+ throw new ArgumentException("Must not be a carriage return, linefeed, or double-quote.", nameof(delimiter));
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
+ }
+
+ ThrowIfProcessingHasAlreadyStarted();
+ return new CsvReadOnlySequenceInput(delimiter, _sequence, _ignoreUTF8ByteOrderMark);
+ }
+
+ ///
+ /// Creates a new instance of the class as a copy of this
+ /// one, with the given flag indicating whether or not a leading UTF-8 byte order mark, if
+ /// present, should be omitted from the first field's data.
+ ///
+ ///
+ /// A value indicating whether or not a leading UTF-8 byte order mark, if present, should be
+ /// omitted from the first field's data.
+ ///
+ ///
+ /// A new instance of the class as a copy of this one, with
+ /// the given flag indicating whether or not a leading UTF-8 byte order mark, if present,
+ /// should be omitted from the first field's data.
+ ///
+ ///
+ /// Thrown when has already been called.
+ ///
+ public CsvReadOnlySequenceInput WithIgnoreUTF8ByteOrderMark(bool ignoreUTF8ByteOrderMark)
+ {
+ ThrowIfProcessingHasAlreadyStarted();
+ return new CsvReadOnlySequenceInput(_delimiter, _sequence, ignoreUTF8ByteOrderMark);
+ }
+
+ ///
+ protected override void ProcessCore(CsvReaderVisitorBase visitor)
+ {
+ var tokenizer = new CsvTokenizer(_delimiter);
+ bool ignoreUTF8ByteOrderMark = _ignoreUTF8ByteOrderMark;
+ var bytes = _sequence;
+
+ if (bytes.IsSingleSegment)
+ {
+ CsvReadOnlyMemoryInput.ProcessFullSegment(bytes.First.Span, ignoreUTF8ByteOrderMark, tokenizer, visitor);
+ return;
+ }
+
+ var enumerator = bytes.GetEnumerator();
+ if (ignoreUTF8ByteOrderMark && EatUTF8BOM(tokenizer, visitor, ref enumerator))
+ {
+ return;
+ }
+
+ while (enumerator.MoveNext())
+ {
+ tokenizer.ProcessNextChunk(enumerator.Current.Span, visitor);
+ }
+
+ tokenizer.ProcessEndOfStream(visitor);
+ }
+
+ private static bool EatUTF8BOM(CsvTokenizer tokenizer, CsvReaderVisitorBase visitor, ref ReadOnlySequence.Enumerator enumerator)
+ {
+ ReadOnlyMemory segment;
+ while (true)
+ {
+ if (!enumerator.MoveNext())
+ {
+ tokenizer.ProcessEndOfStream(visitor);
+ return true;
+ }
+
+ segment = enumerator.Current;
+ if (!segment.IsEmpty)
+ {
+ break;
+ }
+ }
+
+ var span = segment.Span;
+
+ ReadOnlySpan head = UTF8BOM;
+
+ // this greed should **probably** pay off most of the time.
+ if (span.Length >= 3)
+ {
+ if (span.StartsWith(head))
+ {
+ span = span.Slice(3);
+ }
+
+ tokenizer.ProcessNextChunk(span, visitor);
+ return false;
+ }
+
+ int alreadyEaten = 0;
+ while (true)
+ {
+ if (span[0] == head[alreadyEaten])
+ {
+ span = span.Slice(1);
+ if (++alreadyEaten == 3)
+ {
+ tokenizer.ProcessNextChunk(span, visitor);
+ return false;
+ }
+ }
+ else
+ {
+ tokenizer.ProcessNextChunk(head.Slice(0, alreadyEaten), visitor);
+ tokenizer.ProcessNextChunk(span, visitor);
+ return false;
+ }
+
+ if (span.IsEmpty)
+ {
+ while (true)
+ {
+ if (!enumerator.MoveNext())
+ {
+ tokenizer.ProcessEndOfStream(visitor);
+ return true;
+ }
+
+ segment = enumerator.Current;
+ if (!segment.IsEmpty)
+ {
+ break;
+ }
+ }
+
+ span = segment.Span;
+ }
+ }
+ }
+ }
+}
diff --git a/src/Cursively/Inputs/CsvSyncInputBase.cs b/src/Cursively/Inputs/CsvSyncInputBase.cs
new file mode 100644
index 0000000..bc21ad5
--- /dev/null
+++ b/src/Cursively/Inputs/CsvSyncInputBase.cs
@@ -0,0 +1,66 @@
+using System;
+using System.Runtime.CompilerServices;
+using System.Threading;
+
+namespace Cursively.Inputs
+{
+ ///
+ /// Models a CSV source data stream that can be processed synchronously.
+ ///
+ public abstract class CsvSyncInputBase
+ {
+ private protected static readonly byte[] UTF8BOM = { 0xEF, 0xBB, 0xBF };
+
+ private static readonly object ProcessingHasStartedSentinel = new object();
+
+ private object _processingHasStarted;
+
+ ///
+ /// Describes the contents of this CSV data stream to a .
+ ///
+ ///
+ /// The to describe this CSV data stream to.
+ ///
+ ///
+ /// Thrown when this stream has already been processed.
+ ///
+ public void Process(CsvReaderVisitorBase visitor)
+ {
+ if (!(Interlocked.CompareExchange(ref _processingHasStarted, ProcessingHasStartedSentinel, null) is null))
+ {
+ ThrowProcessingHasAlreadyStartedException();
+ }
+
+ ProcessCore(visitor);
+ }
+
+ ///
+ /// Implements the inner logic for .
+ ///
+ ///
+ /// The to describe this CSV data stream to.
+ ///
+ ///
+ /// The base class will call this method at most once per instance.
+ ///
+ protected abstract void ProcessCore(CsvReaderVisitorBase visitor);
+
+ ///
+ /// Throws if has already been called for this instance.
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ protected void ThrowIfProcessingHasAlreadyStarted()
+ {
+ if (_processingHasStarted == ProcessingHasStartedSentinel)
+ {
+ ThrowProcessingHasAlreadyStartedException();
+ }
+ }
+
+ [MethodImpl(MethodImplOptions.NoInlining)]
+ private static void ThrowProcessingHasAlreadyStartedException() =>
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
+ throw new InvalidOperationException("Processing has already been started.");
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
+ }
+}
diff --git a/src/Cursively/Inputs/CsvSyncStreamInput.cs b/src/Cursively/Inputs/CsvSyncStreamInput.cs
new file mode 100644
index 0000000..f75b10e
--- /dev/null
+++ b/src/Cursively/Inputs/CsvSyncStreamInput.cs
@@ -0,0 +1,230 @@
+using System;
+using System.Buffers;
+using System.IO;
+
+namespace Cursively.Inputs
+{
+ ///
+ /// Implementation of backed by a .
+ ///
+ public sealed class CsvSyncStreamInput : CsvSyncInputBase
+ {
+ private readonly byte _delimiter;
+
+ private readonly Stream _csvStream;
+
+ private readonly int _minReadBufferByteCount;
+
+ private readonly ArrayPool _readBufferPool;
+
+ private readonly bool _ignoreUTF8ByteOrderMark;
+
+ internal CsvSyncStreamInput(byte delimiter, Stream csvStream, int minReadBufferByteCount, ArrayPool readBufferPool, bool ignoreUTF8ByteOrderMark)
+ {
+ _delimiter = delimiter;
+ _csvStream = csvStream;
+ _minReadBufferByteCount = minReadBufferByteCount;
+ _readBufferPool = readBufferPool;
+ _ignoreUTF8ByteOrderMark = ignoreUTF8ByteOrderMark;
+ }
+
+ ///
+ /// Creates a new instance of the class as a copy of this
+ /// one, with the given delimiter.
+ ///
+ ///
+ /// The delimiter to use. Use to test whether
+ /// or not a particular value is valid.
+ ///
+ ///
+ /// A new instance of the class as a copy of this one, with
+ /// the given delimiter.
+ ///
+ ///
+ /// Thrown when is one of the illegal values.
+ ///
+ ///
+ /// Thrown when has already been called.
+ ///
+ public CsvSyncStreamInput WithDelimiter(byte delimiter)
+ {
+ if (!CsvTokenizer.IsValidDelimiter(delimiter))
+ {
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
+ throw new ArgumentException("Must not be a carriage return, linefeed, or double-quote.", nameof(delimiter));
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
+ }
+
+ ThrowIfProcessingHasAlreadyStarted();
+ return new CsvSyncStreamInput(delimiter, _csvStream, _minReadBufferByteCount, _readBufferPool, _ignoreUTF8ByteOrderMark);
+ }
+
+ ///
+ /// Creates a new instance of the class as a copy of this
+ /// one, reading in chunks of sizes that are at least the given number of bytes.
+ ///
+ ///
+ ///
+ /// The minimum size, in bytes, of chunks to read from the buffer.
+ ///
+ ///
+ /// When using an , this is the value that will be used for
+ /// , so larger chunks should be expected.
+ ///
+ ///
+ /// When not using an (i.e., on instances configured by calling
+ /// passing in ),
+ /// this is the actual size of any arrays that will be allocated on the managed heap.
+ ///
+ ///
+ ///
+ /// A new instance of the class as a copy of this one,
+ /// using the given to provide temporary buffers for the
+ /// to read into.
+ ///
+ ///
+ /// Thrown when is not greater than zero.
+ ///
+ ///
+ /// Thrown when has already been called.
+ ///
+ public CsvSyncStreamInput WithMinReadBufferByteCount(int minReadBufferByteCount)
+ {
+ if (minReadBufferByteCount < 1)
+ {
+#pragma warning disable CA1303 // Do not pass literals as localized parameters
+ throw new ArgumentOutOfRangeException(nameof(minReadBufferByteCount), minReadBufferByteCount, "Must be greater than zero.");
+#pragma warning restore CA1303 // Do not pass literals as localized parameters
+ }
+
+ ThrowIfProcessingHasAlreadyStarted();
+ return new CsvSyncStreamInput(_delimiter, _csvStream, minReadBufferByteCount, _readBufferPool, _ignoreUTF8ByteOrderMark);
+ }
+
+ ///
+ /// Creates a new instance of the class as a copy of this
+ /// one, using the given to provide temporary buffers for the
+ /// to read into.
+ ///
+ ///
+ /// The to provide temporary buffers for the
+ /// to read into, or if the temporary buffers should be allocated
+ /// directly on the managed heap.
+ ///
+ ///
+ /// A new instance of the class as a copy of this one,
+ /// using the given to provide temporary buffers for the
+ /// to read into.
+ ///
+ ///
+ /// Thrown when has already been called.
+ ///
+ public CsvSyncStreamInput WithReadBufferPool(ArrayPool readBufferPool)
+ {
+ ThrowIfProcessingHasAlreadyStarted();
+ return new CsvSyncStreamInput(_delimiter, _csvStream, _minReadBufferByteCount, readBufferPool, _ignoreUTF8ByteOrderMark);
+ }
+
+ ///
+ /// Creates a new instance of the class as a copy of this
+ /// one, with the given flag indicating whether or not a leading UTF-8 byte order mark, if
+ /// present, should be omitted from the first field's data.
+ ///
+ ///
+ /// A value indicating whether or not a leading UTF-8 byte order mark, if present, should be
+ /// omitted from the first field's data.
+ ///
+ ///
+ /// A new instance of the class as a copy of this one, with
+ /// the given flag indicating whether or not a leading UTF-8 byte order mark, if present,
+ /// should be omitted from the first field's data.
+ ///
+ ///
+ /// Thrown when has already been called.
+ ///
+ public CsvSyncStreamInput WithIgnoreUTF8ByteOrderMark(bool ignoreUTF8ByteOrderMark)
+ {
+ ThrowIfProcessingHasAlreadyStarted();
+ return new CsvSyncStreamInput(_delimiter, _csvStream, _minReadBufferByteCount, _readBufferPool, ignoreUTF8ByteOrderMark);
+ }
+
+ ///
+ protected override void ProcessCore(CsvReaderVisitorBase visitor)
+ {
+ var tokenizer = new CsvTokenizer(_delimiter);
+ var csvStream = _csvStream;
+ int minReadBufferByteCount = _minReadBufferByteCount;
+ var readBufferPool = _readBufferPool;
+
+ byte[] readBuffer;
+ if (readBufferPool is null)
+ {
+ readBuffer = new byte[minReadBufferByteCount];
+ }
+ else
+ {
+ readBuffer = readBufferPool.Rent(minReadBufferByteCount);
+ }
+
+ try
+ {
+ if (_ignoreUTF8ByteOrderMark && EatUTF8BOM(tokenizer, visitor, csvStream, readBuffer))
+ {
+ return;
+ }
+
+ int cnt;
+ while ((cnt = csvStream.Read(readBuffer, 0, readBuffer.Length)) != 0)
+ {
+ tokenizer.ProcessNextChunk(new ReadOnlySpan(readBuffer, 0, cnt), visitor);
+ }
+ }
+ finally
+ {
+ readBufferPool?.Return(readBuffer, clearArray: true);
+ }
+
+ tokenizer.ProcessEndOfStream(visitor);
+ }
+
+ private static bool EatUTF8BOM(CsvTokenizer tokenizer, CsvReaderVisitorBase visitor, Stream csvStream, byte[] readBuffer)
+ {
+ if (readBuffer.Length < 3)
+ {
+ // don't bother pooling; nobody should really ever care.
+ readBuffer = new byte[3];
+ }
+
+ int byteCount = 0;
+ while (byteCount < 3)
+ {
+ int readLength = csvStream.Read(readBuffer, byteCount, readBuffer.Length - byteCount);
+ if (readLength == 0)
+ {
+ if (byteCount != 0)
+ {
+ if (!new ReadOnlySpan(readBuffer, 0, byteCount).SequenceEqual(new ReadOnlySpan(UTF8BOM, 0, byteCount)))
+ {
+ tokenizer.ProcessNextChunk(new ReadOnlySpan(readBuffer, 0, byteCount), visitor);
+ }
+ }
+
+ tokenizer.ProcessEndOfStream(visitor);
+ return true;
+ }
+
+ byteCount += readLength;
+ }
+
+ var buf = new ReadOnlySpan(readBuffer, 0, byteCount);
+ if (buf.StartsWith(UTF8BOM))
+ {
+ buf = buf.Slice(3);
+ }
+
+ tokenizer.ProcessNextChunk(buf, visitor);
+
+ return false;
+ }
+ }
+}
diff --git a/src/Directory.Build.props b/src/Directory.Build.props
index 4b420f8..ab4a2ac 100644
--- a/src/Directory.Build.props
+++ b/src/Directory.Build.props
@@ -22,11 +22,11 @@
-
+
-
+
diff --git a/test/Cursively.Benchmark/Program.cs b/test/Cursively.Benchmark/Program.cs
index c946b5a..154c03e 100644
--- a/test/Cursively.Benchmark/Program.cs
+++ b/test/Cursively.Benchmark/Program.cs
@@ -3,6 +3,7 @@
using System.IO.Compression;
using System.Runtime.CompilerServices;
using System.Text;
+using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
@@ -12,8 +13,6 @@
namespace Cursively.Benchmark
{
- [ClrJob]
- [CoreJob]
[GcServer(true)]
[MemoryDiagnoser]
public class Program
@@ -22,7 +21,7 @@ public class Program
[Benchmark(Baseline = true)]
[ArgumentsSource(nameof(CsvFiles))]
- public long CountRowsUsingCursively(CsvFile csvFile)
+ public long CountRowsUsingCursivelyRaw(CsvFile csvFile)
{
var visitor = new RowCountingVisitor();
var tokenizer = new CsvTokenizer();
@@ -31,6 +30,50 @@ public long CountRowsUsingCursively(CsvFile csvFile)
return visitor.RowCount;
}
+ [Benchmark]
+ [ArgumentsSource(nameof(CsvFiles))]
+ public long CountRowsUsingCursivelyArrayInput(CsvFile csvFile)
+ {
+ var visitor = new RowCountingVisitor();
+ CsvSyncInput.ForMemory(csvFile.FileData).Process(visitor);
+ return visitor.RowCount;
+ }
+
+ [Benchmark]
+ [ArgumentsSource(nameof(CsvFiles))]
+ public long CountRowsUsingCursivelyMemoryMappedFileInput(CsvFile csvFile)
+ {
+ var visitor = new RowCountingVisitor();
+ CsvSyncInput.ForMemoryMappedFile(csvFile.FullPath).Process(visitor);
+ return visitor.RowCount;
+ }
+
+ [Benchmark]
+ [ArgumentsSource(nameof(CsvFiles))]
+ public long CountRowsUsingCursivelyFileStreamInput(CsvFile csvFile)
+ {
+ var visitor = new RowCountingVisitor();
+ using (var stream = new FileStream(csvFile.FullPath, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, FileOptions.SequentialScan))
+ {
+ CsvSyncInput.ForStream(stream).Process(visitor);
+ }
+
+ return visitor.RowCount;
+ }
+
+ [Benchmark]
+ [ArgumentsSource(nameof(CsvFiles))]
+ public async Task CountRowsUsingCursivelyAsyncFileStreamInput(CsvFile csvFile)
+ {
+ var visitor = new RowCountingVisitor();
+ using (var stream = new FileStream(csvFile.FullPath, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, FileOptions.Asynchronous | FileOptions.SequentialScan))
+ {
+ await CsvAsyncInput.ForStream(stream).ProcessAsync(visitor);
+ }
+
+ return visitor.RowCount;
+ }
+
[Benchmark]
[ArgumentsSource(nameof(CsvFiles))]
public long CountRowsUsingCsvHelper(CsvFile csvFile)
@@ -49,13 +92,17 @@ public long CountRowsUsingCsvHelper(CsvFile csvFile)
}
}
- private static int Main()
+ private static async Task Main()
{
var prog = new Program();
foreach (var csvFile in CsvFiles)
{
- long rowCount = prog.CountRowsUsingCursively(csvFile);
- if (prog.CountRowsUsingCsvHelper(csvFile) != rowCount)
+ long rowCount = prog.CountRowsUsingCursivelyRaw(csvFile);
+ if (prog.CountRowsUsingCsvHelper(csvFile) != rowCount ||
+ prog.CountRowsUsingCursivelyArrayInput(csvFile) != rowCount ||
+ prog.CountRowsUsingCursivelyMemoryMappedFileInput(csvFile) != rowCount ||
+ prog.CountRowsUsingCursivelyFileStreamInput(csvFile) != rowCount ||
+ await prog.CountRowsUsingCursivelyAsyncFileStreamInput(csvFile) != rowCount)
{
Console.Error.WriteLine($"Failed on {csvFile}.");
return 1;
diff --git a/test/Cursively.Tests/CsvAsyncInputTestBase.cs b/test/Cursively.Tests/CsvAsyncInputTestBase.cs
new file mode 100644
index 0000000..b12d02c
--- /dev/null
+++ b/test/Cursively.Tests/CsvAsyncInputTestBase.cs
@@ -0,0 +1,63 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+
+using Cursively.Inputs;
+
+using Xunit;
+
+using static Cursively.Tests.TestHelpers;
+
+namespace Cursively.Tests
+{
+ public abstract class CsvAsyncInputTestBase
+ {
+ protected async Task RunTestAsync(Func sutCreator, string filePath, bool ignoreUTF8ByteOrderMark)
+ {
+ (byte[] fileData, int originalLength) = GetExpectedCsvData(filePath, ignoreUTF8ByteOrderMark);
+ var expected = TokenizeCsvFileUsingCursively(fileData, fileData.Length, (byte)',');
+
+ // run without progress
+ {
+ var sut = sutCreator();
+ var inputVisitor = new StringBufferingVisitor();
+
+ await sut.ProcessAsync(inputVisitor).ConfigureAwait(false);
+
+ Assert.Equal(expected, inputVisitor.Records);
+
+ await Assert.ThrowsAsync(() => sut.ProcessAsync(null).AsTask());
+ }
+
+ // run with progress
+ {
+ var sut = sutCreator();
+ var inputVisitor = new StringBufferingVisitor();
+
+ var readSoFar = new List();
+ var progress = new ImmediateProgress(readSoFar.Add);
+
+ await sut.ProcessAsync(inputVisitor, progress).ConfigureAwait(false);
+
+ Assert.Equal(expected, inputVisitor.Records);
+
+ Assert.Equal(originalLength, readSoFar.Sum());
+ Assert.Equal(0, readSoFar.Last());
+
+ await Assert.ThrowsAsync(() => sut.ProcessAsync(null).AsTask());
+ }
+ }
+
+ // Progress posts to a sync context. We can't have that.
+ private sealed class ImmediateProgress : IProgress
+ {
+ private readonly Action _handler;
+
+ public ImmediateProgress(Action handler) =>
+ _handler = handler;
+
+ public void Report(T value) => _handler(value);
+ }
+ }
+}
diff --git a/test/Cursively.Tests/CsvAsyncStreamInputTests.cs b/test/Cursively.Tests/CsvAsyncStreamInputTests.cs
new file mode 100644
index 0000000..b5131e3
--- /dev/null
+++ b/test/Cursively.Tests/CsvAsyncStreamInputTests.cs
@@ -0,0 +1,118 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Threading.Tasks;
+
+using Cursively.Inputs;
+
+using Xunit;
+
+using static Cursively.Tests.TestHelpers;
+
+namespace Cursively.Tests
+{
+ public sealed class CsvAsyncStreamInputTests : CsvAsyncInputTestBase
+ {
+ public static IEnumerable