Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to streamstone v3 with latest Azure Data APIs #57

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/CloudActors.Package/CloudActors.Package.msbuildproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="NuGetizer" Version="1.2.0" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\CloudActors.CodeAnaysis\CloudActors.CodeAnaysis.csproj" />
Expand Down
5 changes: 2 additions & 3 deletions src/CloudActors.Streamstone/CloudActors.Streamstone.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Devlooped.CloudStorageAccount" Version="1.2.0" />
<PackageReference Include="Microsoft.Orleans.Runtime" Version="7.2.1" />
<PackageReference Include="NuGetizer" Version="1.2.0" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="Microsoft.Azure.Cosmos.Table" Version="1.0.8" />
<PackageReference Include="Streamstone" Version="2.3.1" />
<PackageReference Include="Streamstone" Version="3.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System;
using System.ComponentModel;
using Devlooped;
using Devlooped.CloudActors;
using Microsoft.Azure.Cosmos.Table;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
Expand Down
49 changes: 27 additions & 22 deletions src/CloudActors.Streamstone/StreamstoneStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Table;
using Azure;
using Azure.Data.Tables;
using Orleans;
using Orleans.Runtime;
using Orleans.Storage;
using Streamstone;
using Streamstone.Utility;

namespace Devlooped.CloudActors;

public class StreamstoneStorage : IGrainStorage
{
// We cache table names to avoid running CreateIfNotExistsAsync on each access.
readonly ConcurrentDictionary<string, Task<CloudTable>> tables = new();
readonly ConcurrentDictionary<string, Task<TableClient>> tables = new();
readonly CloudStorageAccount storage;
readonly StreamstoneOptions options;

Expand All @@ -25,7 +27,7 @@ public StreamstoneStorage(CloudStorageAccount storage, StreamstoneOptions? optio
public async Task ClearStateAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
{
var table = await GetTable(storage, stateName);
await table.ExecuteAsync(TableOperation.Delete(new TableEntity(table.Name, grainId.Key.ToString()!)));
await table.SubmitTransactionAsync([new TableTransactionAction(TableTransactionActionType.Delete, new TableEntity(table.Name, grainId.Key.ToString()!))]);
}

public async Task ReadStateAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
Expand All @@ -46,9 +48,8 @@ public async Task ReadStateAsync<T>(string stateName, GrainId grainId, IGrainSta
if (options.AutoSnapshot)
{
// See if we can quickly load from most recent snapshot.
var result = await table.ExecuteAsync(TableOperation.Retrieve<EventEntity>(rowId, typeof(T).FullName));
if (result.HttpStatusCode == 200 &&
result.Result is EventEntity entity &&
var result = await table.GetEntityIfExistsAsync<EventEntity>(rowId, typeof(T).FullName ?? typeof(T).Name);
if (result.HasValue && result.Value is EventEntity entity &&
typeof(T).Assembly.GetName() is { } asm &&
// We only apply snapshots where major.minor matches the current version, otherwise,
// we might be losing important business logic changes.
Expand Down Expand Up @@ -76,16 +77,16 @@ entity.Data is string data &&
}
else
{
var result = await table.ExecuteAsync(TableOperation.Retrieve<EventEntity>(table.Name, rowId));
if (result.HttpStatusCode == 404 ||
result.Result is not EventEntity entity ||
var result = await table.GetEntityIfExistsAsync<EventEntity>(table.Name, rowId);
if (!result.HasValue ||
result.Value is not EventEntity entity ||
entity.Data is not string data ||
// TODO: how to deal with versioning in this case?
JsonSerializer.Deserialize<T>(data, options.JsonOptions) is not { } instance)
return;

grainState.State = instance;
grainState.ETag = result.Etag;
grainState.ETag = entity.ETag.ToString("G");
grainState.RecordExists = true;
}
}
Expand All @@ -112,18 +113,17 @@ public async Task WriteStateAsync<T>(string stateName, GrainId grainId, IGrainSt
try
{
var includes = options.AutoSnapshot ?
new ITableEntity[]
{
[
new EventEntity
{
PartitionKey = table.Name,
RowKey = typeof(T).FullName,
RowKey = typeof(T).FullName ?? typeof(T).Name,
Data = JsonSerializer.Serialize(grainState.State, options.JsonOptions),
DataVersion = new Version(asm.Version?.Major ?? 0, asm.Version?.Minor ?? 0).ToString(),
Type = $"{type.FullName}, {asm.Name}",
Version = stream.Version + state.Events.Count
}
} : Array.Empty<ITableEntity>();
] : Array.Empty<ITableEntity>();

await Stream.WriteAsync(partition,
int.TryParse(grainState.ETag, out var version) ? version : 0,
Expand All @@ -141,36 +141,41 @@ await Stream.WriteAsync(partition,
}
else
{
var result = await table.ExecuteAsync(TableOperation.InsertOrReplace(new EventEntity
var result = await table.SubmitTransactionAsync([new TableTransactionAction(TableTransactionActionType.UpsertReplace, new EventEntity
{
PartitionKey = table.Name,
RowKey = rowId,
ETag = grainState.ETag,
RowKey = rowId!,
ETag = new ETag(grainState.ETag),
Data = JsonSerializer.Serialize(grainState.State, options.JsonOptions),
DataVersion = new Version(asm.Version?.Major ?? 0, asm.Version?.Minor ?? 0).ToString(),
Type = $"{type.FullName}, {asm.Name}",
}));
})]);

grainState.ETag = result.Etag;
grainState.ETag = result.Value[0].Headers.ETag?.ToString();
grainState.RecordExists = true;
}
}

async Task<CloudTable> GetTable(CloudStorageAccount storage, string name)
async Task<TableClient> GetTable(CloudStorageAccount storage, string name)
{
var getTable = tables.GetOrAdd(name, async key =>
{
var client = storage.CreateCloudTableClient();
var table = client.GetTableReference(key);
var table = client.GetTableClient(key);
await table.CreateIfNotExistsAsync();
return table;
});

return await getTable;
}

class EventEntity : TableEntity
class EventEntity : ITableEntity
{
public string PartitionKey { get; set; } = "";
public string RowKey { get; set; } = "";
public DateTimeOffset? Timestamp { get; set; }
public ETag ETag { get; set; }

public string? Data { get; set; }
public string? DataVersion { get; set; }
public string? Type { get; set; }
Expand Down
1 change: 0 additions & 1 deletion src/CloudActors/CloudActors.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

<ItemGroup>
<PackageReference Include="NuGetizer" Version="1.2.0" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="Microsoft.Orleans.Serialization.Abstractions" Version="7.2.1" />
<PackageReference Include="PolySharp" PrivateAssets="All" Version="1.13.2" />
</ItemGroup>
Expand Down
9 changes: 3 additions & 6 deletions src/Tests/Account.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
using Devlooped;
using Devlooped.CloudActors;
using Microsoft.AspNetCore.Builder;
using Microsoft.Azure.Cosmos.Table;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
Expand All @@ -32,14 +32,11 @@ public class TestAccounts : IAsyncDisposable
{
public TestAccounts() => CloudStorageAccount.DevelopmentStorageAccount
.CreateCloudTableClient()
.GetTableReference(nameof(Account))
.DeleteIfExistsAsync()
.Wait();
.DeleteTable(nameof(Account));

public async ValueTask DisposeAsync() => await CloudStorageAccount.DevelopmentStorageAccount
.CreateCloudTableClient()
.GetTableReference(nameof(Account))
.DeleteIfExistsAsync();
.DeleteTableAsync(nameof(Account));

[Fact]
public async Task HostedGrain()
Expand Down
5 changes: 2 additions & 3 deletions src/Tests/Customer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Devlooped;
using Devlooped.CloudActors;
using Microsoft.Azure.Cosmos.Table;
using Microsoft.Extensions.DependencyInjection;
using Orleans;
using Xunit.Abstractions;
Expand All @@ -19,8 +19,7 @@ public async Task HostedGrain()
{
await CloudStorageAccount.DevelopmentStorageAccount
.CreateCloudTableClient()
.GetTableReference("customer")
.DeleteIfExistsAsync();
.DeleteTableAsync("customer");

var bus = fixture.Cluster.ServiceProvider.GetRequiredService<IActorBus>();

Expand Down
9 changes: 1 addition & 8 deletions src/Tests/OrleansTest.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Devlooped;
using Devlooped.CloudActors;
using Microsoft.Azure.Cosmos.Table;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
using Orleans;
using Orleans.Core;
using Orleans.Hosting;
using Orleans.Providers;
using Orleans.Runtime;
using Orleans.Storage;
using Orleans.TestingHost;
Expand Down
8 changes: 3 additions & 5 deletions src/Tests/StreamstoneTests.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
using System.Collections.Generic;
using System.Text.Json;
using System.Threading.Tasks;
using Devlooped;
using Devlooped.CloudActors;
using Microsoft.Azure.Cosmos.Table;
using Moq;
using Newtonsoft.Json;
using Orleans;
Expand All @@ -18,8 +18,7 @@ public async Task ReadWrite()
{
await CloudStorageAccount.DevelopmentStorageAccount
.CreateCloudTableClient()
.GetTableReference(nameof(Account))
.DeleteIfExistsAsync();
.DeleteTableAsync(nameof(Account));

var account = new Account("1");
account.Deposit(new Deposit(100));
Expand All @@ -42,8 +41,7 @@ public async Task ReadWriteComplexObject()
{
await CloudStorageAccount.DevelopmentStorageAccount
.CreateCloudTableClient()
.GetTableReference("CloudActorWallet")
.DeleteIfExistsAsync();
.DeleteTableAsync("CloudActorWallet");

var wallet = new Wallet("1");
wallet.AddFunds("USD", 100);
Expand Down
Loading