This library allows you to use Grains Storage as a storage layer for Persistent Streams. Our benchmarks show 10% speed advantage over MemoryStreams
.
This is a Streaming Provider that uses Orleans' built-in StatefulGrain
system for queuing messages. The extra added value of this project is the FireAndForgetDelivery
option which can be disabled for tests, so they are easier to write with TestCluster
.
In production you should register the provider using the extension method for ISiloBuilder
. It is also required to add Grains storage providers for actual state and for subscriptions. The example below uses MemoryGrainStorage
which should not be used if you require the stream queues to be persistent.
siloBuilder.AddMemoryGrainStorageAsDefault()
.AddMemoryGrainStorage(ProviderConstants.DEFAULT_PUBSUB_PROVIDER_NAME)
.AddGrainsStreams(name: ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME,
queueCount: 1,
retry: TimeSpan.FromMinutes(1),
poison: TimeSpan.FromMinutes(3));
In test you should register the provider using the extension method for ISiloBuilder
. It is also required to add Grains storage providers for actual state and for subscriptions. The example below uses MemoryGrainStorage
which should not be used if you require the stream queues to be persistent. In this situation, every OnNextAsync
invocation can be awaited and the code will wait until that message is completed by all subscribers.
siloBuilder.ConfigureServices(Configure)
.AddMemoryGrainStorageAsDefault()
.AddMemoryGrainStorage(ProviderConstants.DEFAULT_PUBSUB_PROVIDER_NAME)
.AddGrainsStreamsForTests(name: ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME,
queueCount: 3,
retry: TimeSpan.FromSeconds(1),
poison: TimeSpan.FromSeconds(3));
In high throughput and high volume environments you will want to collect the TransactionItemGrain
objects early so they don't overload the memory of your silo. Use the below configuration and adjust it to your requirements:
siloBuilder.Configure<GrainCollectionOptions>(options =>
{
options.CollectionAge = TimeSpan.FromMinutes(1);
options.CollectionQuantum = TimeSpan.FromSeconds(5);
options.ClassSpecificCollectionAge[typeof(TransactionItemGrain<>).FullName] = options.CollectionQuantum * 2;
});
- Orleans.Streaming.Grains implements the Grain Stream Provider.
- Orleans.Streaming.Grains.Test test the Grain Stream Provider in an Orleans TestingHost.
Make sure to open the project folder in VS Code with Remote - Containers extension enabled.