diff --git a/src/core/CloudStreams.Core/Extensions/CloudEventExtensions.cs b/src/core/CloudStreams.Core/Extensions/CloudEventExtensions.cs index fe4fcecd..962e3aa1 100644 --- a/src/core/CloudStreams.Core/Extensions/CloudEventExtensions.cs +++ b/src/core/CloudStreams.Core/Extensions/CloudEventExtensions.cs @@ -35,6 +35,7 @@ public static class CloudEventExtensions { string str => ulong.Parse(str), ulong num => num, + Decimal num => (ulong)num, JsonElement jsonElem => Neuroglia.Serialization.Json.JsonSerializer.Default.Deserialize(jsonElem), _ => null }; diff --git a/src/dashboard/CloudStreams.Dashboard/Components/Timeline/Store.cs b/src/dashboard/CloudStreams.Dashboard/Components/Timeline/Store.cs index b80e1263..c14e1b1f 100644 --- a/src/dashboard/CloudStreams.Dashboard/Components/Timeline/Store.cs +++ b/src/dashboard/CloudStreams.Dashboard/Components/Timeline/Store.cs @@ -176,6 +176,7 @@ public async Task GatherCloudEventsAsync() else { bool fetchMore = true; + ulong length = StreamReadOptions.MaxLength; long offset = options.Offset ?? (options.Direction == StreamReadDirection.Forwards ? 0 : -1); do { @@ -184,12 +185,13 @@ public async Task GatherCloudEventsAsync() Direction = options.Direction, Partition = options.Partition, Offset = offset, - Length = StreamReadOptions.MaxLength + Length = length }; var cloudEvents = await (await cloudStreamsApi.CloudEvents.Stream.ReadStreamAsync(readOptions, this.CancellationTokenSource.Token).ConfigureAwait(false)).ToListAsync().ConfigureAwait(false); data.AddRange(cloudEvents!); - offset = (long)cloudEvents.Last()!.GetSequence()!; - fetchMore = cloudEvents.Count > 1 && (ulong)data.Count < options!.Length; + offset = (long)cloudEvents.Last()!.GetSequence()! + (options.Direction == StreamReadDirection.Forwards ? 1 : -1); + length = Math.Min(options!.Length - (ulong)data.Count, StreamReadOptions.MaxLength); + fetchMore = cloudEvents.Count > 1 && length != 0; } while(fetchMore); }