Skip to content

Commit

Permalink
Hacking new serialization with Steve at techorama
Browse files Browse the repository at this point in the history
  • Loading branch information
Mpdreamz committed Oct 8, 2024
1 parent db36a56 commit be24c65
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ public void Setup()
_data = StockData.CreateSampleData(DocumentsToIndex);
}

[Benchmark(Baseline = true)]
public async Task WriteToStreamAsync()
{
MemoryStream.Position = 0;
var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, _ => _bulkOperationHeader);
var requestData = new RequestData(
POST, "/_bulk", PostData.ReadOnlyMemory(bytes),
_transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory, new OpenTelemetryData()
);
await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None);
}
// [Benchmark(Baseline = true)]
// public async Task WriteToStreamAsync()
// {
// MemoryStream.Position = 0;
// var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, _ => _bulkOperationHeader);
// var requestData = new RequestData(
// POST, "/_bulk", PostData.ReadOnlyMemory(bytes),
// _transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory, new OpenTelemetryData()
// );
// await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None);
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ public void Setup()
_data = StockData.CreateSampleData(DocumentsToIndex);
}

[Benchmark(Baseline = true)]
public async Task WriteToStreamAsync()
{
MemoryStream.Position = 0;
var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, true));
var requestData = new RequestData(
POST, "/_bulk", PostData.ReadOnlyMemory(bytes),
_transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory, new OpenTelemetryData()
);
await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None);
}
// [Benchmark(Baseline = true)]
// public async Task WriteToStreamAsync()
// {
// MemoryStream.Position = 0;
// var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, true));
// var requestData = new RequestData(
// POST, "/_bulk", PostData.ReadOnlyMemory(bytes),
// _transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory, new OpenTelemetryData()
// );
// await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None);
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ public void Setup()
_data = StockData.CreateSampleData(DocumentsToIndex);
}

[Benchmark(Baseline = true)]
public async Task DynamicIndexName_WriteToStreamAsync()
{
MemoryStream.Position = 0;
var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, false));
var requestData = new RequestData(
POST, "/_bulk", PostData.ReadOnlyMemory(bytes),
_transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory, new OpenTelemetryData()
);
await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None);
}
// [Benchmark(Baseline = true)]
// public async Task DynamicIndexName_WriteToStreamAsync()
// {
// MemoryStream.Position = 0;
// var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, false));
// var requestData = new RequestData(
// POST, "/_bulk", PostData.ReadOnlyMemory(bytes),
// _transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory, new OpenTelemetryData()
// );
// await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None);
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

var bm = new BulkRequestCreationWithFixedIndexNameBenchmarks();
bm.Setup();
await bm.WriteToStreamAsync();
//await bm.WriteToStreamAsync();

var length = bm.MemoryStream.Length;

Expand Down
10 changes: 5 additions & 5 deletions build/scripts/CommandLine.fs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ with
interface IArgParserTemplate with
member this.Usage =
match this with
| Clean _ -> "clean known output locations"
| Build _ -> "Run build"
| Test _ -> "Runs build then tests"
| Release _ -> "runs build, tests, and create and validates the packages shy of publishing them"
| Publish _ -> "Runs the full release"
| Clean -> "clean known output locations"
| Build -> "Run build"
| Test -> "Runs build then tests"
| Release -> "runs build, tests, and create and validates the packages shy of publishing them"
| Publish -> "Runs the full release"

| SingleTarget _ -> "Runs the provided sub command without running their dependencies"
| Token _ -> "Token to be used to authenticate with github"
Expand Down
2 changes: 1 addition & 1 deletion global.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"sdk": {
"version": "6.0.302",
"version": "8.0.100",
"rollForward": "latestFeature",
"allowPrerelease": false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ public DataStreamChannel(DataStreamChannelOptions<TEvent> options, ICollection<I
_fixedHeader = new CreateOperation();
}

/// <inheritdoc cref="ElasticsearchChannelBase{TEvent,TChannelOptions}.CreateBulkOperationHeader"/>
protected override BulkOperationHeader CreateBulkOperationHeader(TEvent @event) => _fixedHeader;
/// <inheritdoc cref="GetIndexOp"/>
protected override IndexOp GetIndexOp(TEvent @event) => IndexOp.CreateNoParams;

/// <inheritdoc cref="MutateHeader"/>
protected override void MutateHeader(ref readonly BulkHeader header) { }

/// <inheritdoc cref="ElasticsearchChannelBase{TEvent,TChannelOptions}.TemplateName"/>
protected override string TemplateName => Options.DataStream.GetTemplateName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,67 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

#if NETSTANDARD2_1_OR_GREATER
using System;
using System.Buffers;
#else
using System.Collections.Generic;
#endif
using System;
using System.IO;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Channels;
using Elastic.Channels.Diagnostics;
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Ingest.Elasticsearch.Indices;
using Elastic.Ingest.Elasticsearch.Serialization;
using Elastic.Ingest.Transport;
using Elastic.Transport;
using Elastic.Transport.Products.Elasticsearch;
using static Elastic.Ingest.Elasticsearch.ElasticsearchChannelStatics;

namespace Elastic.Ingest.Elasticsearch.Serialization;
namespace Elastic.Ingest.Elasticsearch;

/// <summary> TODO </summary>
public enum IndexOp
{
/// <summary> </summary>
Index,
/// <summary> </summary>
IndexNoParams,
/// <summary> </summary>
Create,
/// <summary> </summary>
CreateNoParams,
/// <summary> </summary>
Delete,
/// <summary> </summary>
Update,
}

/// <summary> TODO </summary>
public readonly struct BulkHeader
{

}

/// <summary>
/// Provides static factory methods from producing request data for bulk requests.
/// An abstract base class for both <see cref="DataStreamChannel{TEvent}"/> and <see cref="IndexChannel{TEvent}"/>
/// <para>Coordinates most of the sending to- and bootstrapping of Elasticsearch</para>
/// </summary>
public static class BulkRequestDataFactory
public abstract partial class ElasticsearchChannelBase<TEvent, TChannelOptions>
: TransportChannelBase<TChannelOptions, TEvent, BulkResponse, BulkResponseItem>
where TChannelOptions : ElasticsearchChannelOptionsBase<TEvent>
{

#if NETSTANDARD2_1_OR_GREATER
/// <summary>
/// Get the NDJSON request body bytes for a page of <typeparamref name="TEvent"/> events.
/// </summary>
/// <typeparam name="TEvent">The type for the event being ingested.</typeparam>
/// <param name="page">A page of <typeparamref name="TEvent"/> events.</param>
/// <param name="options">The <see cref="ElasticsearchChannelOptionsBase{TEvent}"/> for the channel where the request will be written.</param>
/// <param name="createHeaderFactory">A function which takes an instance of <typeparamref name="TEvent"/> and produces the operation header containing the action and optional meta data.</param>
/// <returns>A <see cref="ReadOnlyMemory{T}"/> of <see cref="byte"/> representing the entire request body in NDJSON format.</returns>
public static ReadOnlyMemory<byte> GetBytes<TEvent>(ArraySegment<TEvent> page,
public ReadOnlyMemory<byte> GetBytes(ArraySegment<TEvent> page,
ElasticsearchChannelOptionsBase<TEvent> options, Func<TEvent, BulkOperationHeader> createHeaderFactory)
{
// ArrayBufferWriter inserts comma's when serializing multiple times
Expand Down Expand Up @@ -71,19 +102,25 @@ public static ReadOnlyMemory<byte> GetBytes<TEvent>(ArraySegment<TEvent> page,
}
#endif

/// <summary> TODO </summary>
protected abstract IndexOp GetIndexOp(TEvent @event);

/// <summary>
///
/// </summary>
/// <param name="header"></param>
protected abstract void MutateHeader(ref readonly BulkHeader header);

/// <summary>
/// Asynchronously write the NDJSON request body for a page of <typeparamref name="TEvent"/> events to <see cref="Stream"/>.
/// </summary>
/// <typeparam name="TEvent">The type for the event being ingested.</typeparam>
/// <param name="page">A page of <typeparamref name="TEvent"/> events.</param>
/// <param name="stream">The target <see cref="Stream"/> for the request.</param>
/// <param name="options">The <see cref="ElasticsearchChannelOptionsBase{TEvent}"/> for the channel where the request will be written.</param>
/// <param name="createHeaderFactory">A function which takes an instance of <typeparamref name="TEvent"/> and produces the operation header containing the action and optional meta data.</param>
/// <param name="ctx">The cancellation token to cancel operation.</param>
/// <returns></returns>
public static async Task WriteBufferToStreamAsync<TEvent>(ArraySegment<TEvent> page, Stream stream,
ElasticsearchChannelOptionsBase<TEvent> options, Func<TEvent, BulkOperationHeader> createHeaderFactory,
CancellationToken ctx = default)
public async Task WriteBufferToStreamAsync(
ArraySegment<TEvent> page, Stream stream, ElasticsearchChannelOptionsBase<TEvent> options, CancellationToken ctx = default)
{
#if NETSTANDARD2_1_OR_GREATER
var items = page;
Expand All @@ -99,12 +136,27 @@ public static async Task WriteBufferToStreamAsync<TEvent>(ArraySegment<TEvent> p
var @event = items[i];
if (@event == null) continue;

var indexHeader = createHeaderFactory(@event);
await JsonSerializer.SerializeAsync(stream, indexHeader, indexHeader.GetType(), SerializerOptions, ctx)
.ConfigureAwait(false);
var op = GetIndexOp(@event);
switch (op)
{
case IndexOp.IndexNoParams:
await SerializePlainIndexHeaderAsync(stream, ctx).ConfigureAwait(false);
break;
case IndexOp.Index:
case IndexOp.Create:
case IndexOp.Delete:
case IndexOp.Update:
var header = new BulkHeader();
MutateHeader(ref header);
await SerializeHeaderAsync(stream, ref header, SerializerOptions, ctx).ConfigureAwait(false);
break;


}

await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false);

if (indexHeader is UpdateOperation)
if (op == IndexOp.Update)
await stream.WriteAsync(DocUpdateHeaderStart, 0, DocUpdateHeaderStart.Length, ctx).ConfigureAwait(false);

if (options.EventWriter?.WriteToStreamAsync != null)
Expand All @@ -113,7 +165,7 @@ await JsonSerializer.SerializeAsync(stream, indexHeader, indexHeader.GetType(),
await JsonSerializer.SerializeAsync(stream, @event, SerializerOptions, ctx)
.ConfigureAwait(false);

if (indexHeader is UpdateOperation)
if (op == IndexOp.Update)
await stream.WriteAsync(DocUpdateHeaderEnd, 0, DocUpdateHeaderEnd.Length, ctx).ConfigureAwait(false);

await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false);
Expand All @@ -123,12 +175,11 @@ await JsonSerializer.SerializeAsync(stream, @event, SerializerOptions, ctx)
/// <summary>
/// Create the bulk operation header with the appropriate action and meta data for a bulk request targeting an index.
/// </summary>
/// <typeparam name="TEvent">The type for the event being ingested.</typeparam>
/// <param name="event">The <typeparamref name="TEvent"/> for which the header will be produced.</param>
/// <param name="options">The <see cref="IndexChannelOptions{TEvent}"/> for the channel.</param>
/// <param name="skipIndexName">Control whether the index name is included in the meta data for the operation.</param>
/// <returns>A <see cref="BulkOperationHeader"/> instance.</returns>
public static BulkOperationHeader CreateBulkOperationHeaderForIndex<TEvent>(TEvent @event, IndexChannelOptions<TEvent> options,
public static BulkOperationHeader CreateBulkOperationHeaderForIndex(TEvent @event, IndexChannelOptions<TEvent> options,
bool skipIndexName = false)
{
var indexTime = options.TimestampLookup?.Invoke(@event) ?? DateTimeOffset.Now;
Expand Down Expand Up @@ -161,4 +212,3 @@ public static BulkOperationHeader CreateBulkOperationHeaderForIndex<TEvent>(TEve
: skipIndexName ? new CreateOperation() : new CreateOperation { Index = index };
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Channels;
using Elastic.Channels.Diagnostics;
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Ingest.Elasticsearch.Indices;
using Elastic.Ingest.Elasticsearch.Serialization;
using Elastic.Ingest.Transport;
using Elastic.Transport;
using Elastic.Transport.Products.Elasticsearch;
using static Elastic.Ingest.Elasticsearch.ElasticsearchChannelStatics;

namespace Elastic.Ingest.Elasticsearch;

/// <summary>
/// An abstract base class for both <see cref="DataStreamChannel{TEvent}"/> and <see cref="IndexChannel{TEvent}"/>
/// <para>Coordinates most of the sending to- and bootstrapping of Elasticsearch</para>
/// </summary>
public abstract partial class ElasticsearchChannelBase<TEvent, TChannelOptions>
: TransportChannelBase<TChannelOptions, TEvent, BulkResponse, BulkResponseItem>
where TChannelOptions : ElasticsearchChannelOptionsBase<TEvent>
{
private Task SerializeHeaderAsync(Stream stream, ref readonly BulkHeader header, JsonSerializerOptions serializerOptions, CancellationToken ctx) =>
throw new NotImplementedException();

private Task SerializePlainIndexHeaderAsync(Stream stream, CancellationToken ctx) =>
throw new NotImplementedException();
}
21 changes: 9 additions & 12 deletions src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

namespace Elastic.Ingest.Elasticsearch;



/// <summary>
/// An abstract base class for both <see cref="DataStreamChannel{TEvent}"/> and <see cref="IndexChannel{TEvent}"/>
/// <para>Coordinates most of the sending to- and bootstrapping of Elasticsearch</para>
Expand Down Expand Up @@ -71,12 +73,12 @@ protected override Task<BulkResponse> ExportAsync(ITransport transport, ArraySeg
#if NETSTANDARD2_1
// Option is obsolete to prevent external users to set it.
#pragma warning disable CS0618
if (Options.UseReadOnlyMemory)
#pragma warning restore CS0618
{
var bytes = BulkRequestDataFactory.GetBytes(page, Options, CreateBulkOperationHeader);
return transport.RequestAsync<BulkResponse>(HttpMethod.POST, BulkUrl, PostData.ReadOnlyMemory(bytes), RequestParams, ctx);
}
// if (Options.UseReadOnlyMemory)
// #pragma warning restore CS0618
// {
// var bytes = BulkRequestDataFactory.GetBytes(page, Options, CreateBulkOperationHeader);
// return transport.RequestAsync<BulkResponse>(HttpMethod.POST, BulkUrl, PostData.ReadOnlyMemory(bytes), RequestParams, ctx);
// }
#endif
#pragma warning disable IDE0022 // Use expression body for method
return transport.RequestAsync<BulkResponse>(HttpMethod.POST, BulkUrl,
Expand All @@ -85,16 +87,11 @@ protected override Task<BulkResponse> ExportAsync(ITransport transport, ArraySeg
{
/* NOT USED */
},
async (b, stream, ctx) => { await BulkRequestDataFactory.WriteBufferToStreamAsync(b, stream, Options, CreateBulkOperationHeader, ctx).ConfigureAwait(false); })
async (b, stream, ctx) => { await WriteBufferToStreamAsync(b, stream, Options, ctx).ConfigureAwait(false); })
, RequestParams, ctx);
#pragma warning restore IDE0022 // Use expression body for method
}

/// <summary>
/// Asks implementations to create a <see cref="BulkOperationHeader"/> based on the <paramref name="event"/> being exported.
/// </summary>
protected abstract BulkOperationHeader CreateBulkOperationHeader(TEvent @event);

/// <summary> </summary>
protected class HeadIndexTemplateResponse : ElasticsearchResponse { }

Expand Down
Loading

0 comments on commit be24c65

Please sign in to comment.