Skip to content

Commit

Permalink
Support manual control of the bulk operation (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevejgordon authored Jan 22, 2024
1 parent ec97f60 commit 5f560a0
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,9 @@ public IndexChannelOptions(HttpTransport transport) : base(transport) { }
/// <para>https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-api-request-body</para>
/// </summary>
public Func<TEvent, string, bool>? BulkUpsertLookup { get; set; }

/// <summary>
/// Control the operation header for each bulk operation.
/// </summary>
public OperationMode OperationMode { get; set; }
}
25 changes: 25 additions & 0 deletions src/Elastic.Ingest.Elasticsearch/Indices/OperationMode.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
namespace Elastic.Ingest.Elasticsearch.Indices;

/// <summary>
/// Determines the operation header for each bulk operation.
/// </summary>
public enum OperationMode
{
/// <summary>
/// The mode will be determined automatically based on default rules for the preferred mode.
/// </summary>
Auto = 0,

/// <summary>
/// Each document will be sent with an 'index' bulk operation header.
/// </summary>
Index = 1,

/// <summary>
/// Each document will be sent with a 'create' bulk operation header.
/// </summary>
Create = 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,20 @@ public static BulkOperationHeader CreateBulkOperationHeaderForIndex<TEvent>(TEve

var id = options.BulkOperationIdLookup?.Invoke(@event);

if (options.OperationMode == OperationMode.Index)
{
return skipIndexName
? !string.IsNullOrWhiteSpace(id) ? new IndexOperation { Id = id } : new IndexOperation()
: !string.IsNullOrWhiteSpace(id) ? new IndexOperation { Index = index, Id = id } : new IndexOperation { Index = index };
}

if (options.OperationMode == OperationMode.Create)
{
return skipIndexName
? !string.IsNullOrWhiteSpace(id) ? new CreateOperation { Id = id } : new CreateOperation()
: !string.IsNullOrWhiteSpace(id) ? new CreateOperation { Index = index, Id = id } : new CreateOperation { Index = index };
}

if (!string.IsNullOrWhiteSpace(id) && id != null && (options.BulkUpsertLookup?.Invoke(@event, id) ?? false))
return skipIndexName ? new UpdateOperation { Id = id } : new UpdateOperation { Id = id, Index = index };

Expand Down

0 comments on commit 5f560a0

Please sign in to comment.