A replicated journal and snapshot store implementation for Akka.Persistence backed by Apache Cassandra.
WARNING: The Akka.Persistence.Cassandra plugin is still in beta and the mechanics described below are subject to change.
To activate the journal plugin, add the following line to the actor system configuration file:
akka.persistence.journal.plugin = "cassandra-journal"
To activate the snapshot store plugin, add the following line to the actor system configuration file:
akka.persistence.snasphot-store.plugin = "cassandra-snapshot-store"
The default configuration will try to connect to a Cassandra cluster running on 127.0.0.1
for persisting messages
and snapshots. More information on the available configuration options is in the sections below.
Both the journal and the snapshot store plugins use the DataStax .NET Driver
for Cassandra to communicate with the cluster. The driver has an ISession
object which is used to execute statements
against the cluster (very similar to a DbConnection
object in ADO.NET). You can control the creation and
configuration of these session instance(s) by modifying the configuration under cassandra-sessions
. Out of the
box, both the journal and the snapshot store plugin will try to use a session called default
. You can override
the settings for that session with the following configuration keys:
cassandra-sessions.default.contact-points
: A comma-seperated list of contact points in the cluster in the format of eitherhost
orhost:port
. Default value is[ "127.0.0.1" ]
.cassandra-sessions.default.port
: Default port for contact points in the cluster, used if a contact point is not in [host:port] format. Default value is9042
.cassandra-sessions.default.credentials.username
: The username to login to Cassandra hosts. No authentication is used by default.cassandra-sessions.default.credentials.password
: The password corresponding to the username. No authentication is used by default.cassandra-sessions.default.ssl
: Boolean value indicating whether to use SSL when connecting to the cluster. No default value is set and so SSL is not used by default.cassandra-sessions.default.compression
: The type of compression to use when communicating with the cluster. No default value is set and so compression is not used by default.
If you require more advanced configuration of the ISession
object than the options provided here (for example, to
use a different session for the journal and snapshot store plugins or to configure the session via code or manage
it with an IoC container), see the Advanced Session Management section below.
- All operations of the journal plugin API are fully supported
- Uses Cassandra in a log-oriented way (i.e. data is only ever inserted but never updated)
- Uses marker records for permanent deletes to try and avoid the problem of reading many tombstones when replaying messages.
- Messages for a single persistence Id are partitioned across the cluster to avoid unbounded partition growth and support scalability by adding more nodes to the cluster.
As mentioned in the Quick Start section, you can activate the journal plugin by adding the following line to your actor system configuration file:
akka.persistence.journal.plugin = "cassandra-journal"
You can also override the journal's default settings with the following configuration keys:
cassandra-journal.class
: The Type name of the Cassandra journal plugin. Default value isAkka.Persistence.Cassandra.Journal.CassandraJournal, Akka.Persistence.Cassandra
.cassandra-journal.session-key
: The name (key) of the session to use when resolving anISession
instance. When using default session management, this points at a configuration section undercassandra-sessions
where the session's configuration is found. Default value isdefault
.cassandra-journal.use-quoted-identifiers
: Whether or not to quote the table and keyspace names when executing statements against Cassandra. Default value isfalse
.cassandra-journal.keyspace
: The keyspace to be created/used by the journal. Default value isakkanet
.cassandra-journal.keyspace-creation-options
: A string to be appended to theCREATE KEYSPACE
statement after theWITH
clause when the keyspace is automatically created. Use this to define options like the replication strategy. Default value isREPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
.cassandra-journal.keyspace-autocreate
: When true, the journal will automatically try to create the keyspace if it doesn't already exist on startup. Default value istrue
.cassandra-journal.table
: The name of the table to be created/used by the journal. Default value ismessages
.cassandra-journal.table-creation-properties
: A string to be appended to theCREATE TABLE
statement after theWITH
clause. Use this to define advanced table options likegc_grace_seconds
or one of the other many table options. Default value is an empty string.cassandra-journal.partition-size
: The approximate number of message rows to store in a single partition. Cannot be changed after table creation. Default value is5000000
.cassandra-journal.max-result-size
: The maximum number of messages to retrieve in a single request to Cassandra when replaying messages. Default value is50001
.cassandra-journal.read-consistency
: The consistency level to use for read operations. Default value isQuorum
.cassandra-journal.write-consistency
: The consistency level to use for write operations. Default value isQuorum
.
The default value for read and write consistency levels ensure that persistent actors can read their own writes.
Consider using LocalQuorum
for both reads and writes if using a Cassandra cluster with multiple datacenters.
- Snapshot IO is done in a fully asynchronous fashion, including deletes (the snapshot store plugin API only directly specifies synchronous methods for doing deletes)
As mentioned in the Quick Start section, you can activate the snapshot store plugin by adding the following line to your actor system configuration file:
akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store"
You can also override the snapshot store's default settings with the following configuration keys:
cassandra-snapshot-store.class
: The Type name of the Cassandra snapshot store plugin. Default value isAkka.Persistence.Cassandra.Snapshot.CassandraSnapshotStore, Akka.Persistence.Cassandra
.cassandra-snapshot-store.session-key
: The name (key) of the session to use when resolving anISession
instance. When using default session management, this points at a configuration section undercassandra-sessions
where the session's configuration is found. Default value isdefault
.cassandra-snapshot-store.use-quoted-identifiers
: Whether or not to quote the table and keyspace names when executing statements against Cassandra. Default value isfalse
.cassandra-snapshot-store.keyspace
: The keyspace to be created/used by the snapshot store. Default value isakkanet
.cassandra-snapshot-store.keyspace-creation-options
: A string to be appended to theCREATE KEYSPACE
statement after theWITH
clause when the keyspace is automatically created. Use this to define options like the replication strategy. Default value isREPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
.cassandra-snapshot-store.keyspace-autocreate
: When true, the snapshot store will automatically try to create the keyspace if it doesn't already exist on startup. Default value istrue
.cassandra-snapshot-store.table
: The name of the table to be created/used by the snapshot store. Default value issnapshots
.cassandra-snapshot-store.table-creation-properties
: A string to be appended to theCREATE TABLE
statement after theWITH
clause. Use this to define advanced table options likegc_grace_seconds
or one of the other many table options. Default value is an empty string.cassandra-snapshot-store.max-metadata-result-size
: The maximum number of snapshot metadata instances to retrieve in a single request when trying to find a snapshot that matches criteria. Default value is10
.cassandra-snapshot-store.read-consistency
: The consistency level to use for read operations. Default value isOne
.cassandra-snapshot-store.write-consistency
: The consistency level to use for write operations. Default value isOne
.
Consider using LocalOne
consistency level for both reads and writes if using a Cassandra cluster with multiple
datacenters.
In some advanced scenarios, you may want to have more control over how ISession
instances are created. Some
example scenarios might include:
- to use a different session instance for the journal and snapshot store plugins (i.e. maybe you have more than one Cassandra cluster and are storing journal messages and snapshots in different clusters)
- to access more advanced configuration options for building the session instance in code using the DataStax driver's cluster builder API directly
- to use session instances that have already been registered with an IoC container and are being managed there
If you want more control over how session instances are created or managed, you have two options depending on how much control you need.
It is possible to define configuration for more than one session instance under the cassandra-sessions
section of
your actor system's configuration file. To do this, just create your own section with a unique name/key for the
sub-section. All of the same options listed above in the Connecting to the Cluster
can then be used to configure that session. For example, I might define seperate configurations for my journal and
snapshot store plugins like this:
cassandra-sessions {
my-journal-session {
contact-points = [ "10.1.1.1", "10.1.1.2" ]
port = 9042
credentials {
username = "myusername"
password = "mypassword"
}
}
my-snapshot-session {
contact-points = [ "10.2.1.1:9142", "10.2.1.2:9142" ]
}
}
I can then tell the journal and snapshot store plugins to use those sessions by overriding each plugin's session-key
configuration like this:
cassandra-journal.session-key = "my-journal-session"
cassandra-snapshot-store.session-key = "my-snapshot-session"
You can also override how sessions are created, managed and resolved with your own code. Session management is
done as its own plugin for Akka.NET and a default implementation that uses the cassandra-sessions
section is
provided out of the box. If you want to provide your own implementation for doing this (for example, to manage
sessions with an IoC container or use the DataStax driver's cluster builder API to do more advanced configuration),
here are the steps you'll need to follow:
- Create a class that implements the
IManageSessions
interface fromAkka.Persistence.Cassandra.SessionManagement
. This interface is simple and just requires that you provide a way for resolving and releasing session instances. For example:
public class MySessionManager : IManageSessions
{
public override ISession ResolveSession(string key)
{
// Do something here to get the ISession instance (pull from IoC container, etc)
}
public override ISession ReleaseSession(ISession session)
{
// Do something here to release the session instance if necessary
}
}
- Next, you'll need to create an extension id provider class by inheriting from
ExtensionIdProvider<IManageSessions>
. This class is responsible for actually providing a copy of yourIManageSessions
implementation. For example:
public class MySessionExtension : ExtensionIdProvider<IManageSessions>
{
public override IManageSessions CreateExtension(ExtendedActorSystem system)
{
// Return a copy of your implementation of IManageSessions
return new MySessionManager();
}
}
- Lastly, you'll need to register your extension with the actor system when creating it in your application. For example:
var actorSystem = ActorSystem.Create("MyApplicationActorSystem");
var extensionId = new MySessionExtension();
actorSystem.RegisterExtension(extensionId);
The journal and snapshot store plugins will now call your code when resolving or releasing sessions.
The Cassandra tests are packaged and run as part of the default "All" build task.
In order to run the tests, you must do the following things:
- Download and install DataStax Community Edition of Cassandra from http://planetcassandra.org/cassandra/
- Install Cassandra with the default settings. The default connection string will connect to a Cassandra instance running at 127.0.0.1:9042 with no authentication. Here's the full default settings for an Akka.Persistence.Cassandra connection:
cassandra-sessions {
# The "default" Cassandra session, used by both the journal and snapshot store if not changed in
# the cassandra-journal and cassandra-snapshot-store configuration sections below
default {
# Comma-seperated list of contact points in the cluster in the format of either [host] or [host:port]
contact-points = [ "127.0.0.1" ]
# Default port for contact points in the cluster, used if a contact point is not in [host:port] format
port = 9042
}
}
cassandra-journal {
# Type name of the cassandra journal plugin
class = "Akka.Persistence.Cassandra.Journal.CassandraJournal, Akka.Persistence.Cassandra"
# The name (key) of the session to use when resolving an ISession instance. When using default session management,
# this points at configuration under the "cassandra-sessions" section where the session's configuration is found.
session-key = "default"
# Whether or not to quote table and keyspace names when executing statements against Cassandra
use-quoted-identifiers = false
# The keyspace to be created/used by the journal
keyspace = "akkanet"
# A string to be appended to the CREATE KEYSPACE statement after the WITH clause when the keyspace is
# automatically created. Use this to define options like replication strategy.
keyspace-creation-options = "REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"
# When true the journal will automatically try to create the keyspace if it doesn't already exist on start
keyspace-autocreate = true
# Name of the table to be created/used by the journal
table = "messages"
# A string to be appended to the CREATE TABLE statement after the WITH clause. Use this to define things
# like gc_grace_seconds or one of the many other table options.
table-creation-properties = ""
# The approximate number of rows per partition to use. Cannot be changed after table creation.
partition-size = 5000000
# The maximum number of messages to retrieve in one request when replaying messages
max-result-size = 50001
# Consistency level for reads
read-consistency = "Quorum"
# Consistency level for writes
write-consistency = "Quorum"
}
cassandra-snapshot-store {
# Type name of the cassandra snapshot store plugin
class = "Akka.Persistence.Cassandra.Snapshot.CassandraSnapshotStore, Akka.Persistence.Cassandra"
# The name (key) of the session to use when resolving an ISession instance. When using default session management,
# this points at configuration under the "cassandra-sessions" section where the session's configuration is found.
session-key = "default"
# Whether or not to quote table and keyspace names when executing statements against Cassandra
use-quoted-identifiers = false
# The keyspace to be created/used by the snapshot store
keyspace = "akkanet"
# A string to be appended to the CREATE KEYSPACE statement after the WITH clause when the keyspace is
# automatically created. Use this to define options like replication strategy.
keyspace-creation-options = "REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"
# When true the journal will automatically try to create the keyspace if it doesn't already exist on start
keyspace-autocreate = true
# Name of the table to be created/used by the snapshot store
table = "snapshots"
# A string to be appended to the CREATE TABLE statement after the WITH clause. Use this to define things
# like gc_grace_seconds or one of the many other table options.
table-creation-properties = ""
# The maximum number of snapshot metadata instances to retrieve in a single request when trying to find a
# snapshot that matches the criteria
max-metadata-result-size = 10
# Consistency level for reads
read-consistency = "One"
# Consistency level for writes
write-consistency = "One"
}