Java API: Interface for implementing a ReplicatedData in Java.
Java API: Interface for implementing a ReplicatedData in Java.
The type parameter D
is a self-recursive type to be defined by the
concrete implementation.
E.g. class TwoPhaseSet extends AbstractReplicatedData<TwoPhaseSet>
Akka extension for convenient configuration and use of the Replicator.
Akka extension for convenient configuration and use of the
Replicator. Configuration settings are defined in the
akka.cluster.ddata
section, see reference.conf
.
Implements a boolean flag CRDT that is initialized to false
and
can be switched to true
.
Implements a boolean flag CRDT that is initialized to false
and
can be switched to true
. true
wins over false
in merge.
This class is immutable, i.e. "modifying" methods return a new instance.
Implements a 'Growing Counter' CRDT, also called a 'G-Counter'.
Implements a 'Growing Counter' CRDT, also called a 'G-Counter'.
It is described in the paper A comprehensive study of Convergent and Commutative Replicated Data Types.
A G-Counter is a increment-only counter (inspired by vector clocks) in which only increment and merge are possible. Incrementing the counter adds 1 to the count for the current node. Divergent histories are resolved by taking the maximum count for each node (like a vector clock merge). The value of the counter is the sum of all node counts.
This class is immutable, i.e. "modifying" methods return a new instance.
Implements a 'Add Set' CRDT, also called a 'G-Set'.
Implements a 'Add Set' CRDT, also called a 'G-Set'. You can't remove elements of a G-Set.
It is described in the paper A comprehensive study of Convergent and Commutative Replicated Data Types.
A G-Set doesn't accumulate any garbage apart from the elements themselves.
This class is immutable, i.e. "modifying" methods return a new instance.
Key for the key-value data in Replicator.
Key for the key-value data in Replicator. The type of the data value
is defined in the key. Keys are compared equal if the id
strings are equal,
i.e. use unique identifiers.
Specific classes are provided for the built in data types, e.g. ORSetKey, and you can create your own keys.
Specialized ORMap with LWWRegister values.
Specialized ORMap with LWWRegister values.
LWWRegister
relies on synchronized clocks and should only be used when the choice of
value is not important for concurrent updates occurring within the clock skew.
Instead of using timestamps based on System.currentTimeMillis()
time it is possible to
use a timestamp value based on something else, for example an increasing version number
from a database record that is used for optimistic concurrency control.
For first-write-wins semantics you can use the LWWRegister#reverseClock instead of the LWWRegister#defaultClock
This class is immutable, i.e. "modifying" methods return a new instance.
Implements a 'Last Writer Wins Register' CRDT, also called a 'LWW-Register'.
Implements a 'Last Writer Wins Register' CRDT, also called a 'LWW-Register'.
It is described in the paper A comprehensive study of Convergent and Commutative Replicated Data Types.
Merge takes the register with highest timestamp. Note that this
relies on synchronized clocks. LWWRegister
should only be used when the choice of
value is not important for concurrent updates occurring within the clock skew.
Merge takes the register updated by the node with lowest address (UniqueAddress
is ordered)
if the timestamps are exactly the same.
Instead of using timestamps based on System.currentTimeMillis()
time it is possible to
use a timestamp value based on something else, for example an increasing version number
from a database record that is used for optimistic concurrency control.
For first-write-wins semantics you can use the LWWRegister#reverseClock instead of the LWWRegister#defaultClock
This class is immutable, i.e. "modifying" methods return a new instance.
Implements a 'Observed Remove Map' CRDT, also called a 'OR-Map'.
Implements a 'Observed Remove Map' CRDT, also called a 'OR-Map'.
It has similar semantics as an ORSet, but in case of concurrent updates the values are merged, and must therefore be ReplicatedData types themselves.
This class is immutable, i.e. "modifying" methods return a new instance.
An immutable multi-map implementation.
Implements a 'Observed Remove Set' CRDT, also called a 'OR-Set'.
Implements a 'Observed Remove Set' CRDT, also called a 'OR-Set'. Elements can be added and removed any number of times. Concurrent add wins over remove.
It is not implemented as in the paper A comprehensive study of Convergent and Commutative Replicated Data Types. This is more space efficient and doesn't accumulate garbage for removed elements. It is described in the paper An optimized conflict-free replicated set The implementation is inspired by the Riak DT riak_dt_orswot.
The ORSet has a version vector that is incremented when an element is added to
the set. The node -> count
pair for that increment is stored against the
element as its "birth dot". Every time the element is re-added to the set,
its "birth dot" is updated to that of the node -> count
version vector entry
resulting from the add. When an element is removed, we simply drop it, no tombstones.
When an element exists in replica A and not replica B, is it because A added
it and B has not yet seen that, or that B removed it and A has not yet seen that?
In this implementation we compare the dot
of the present element to the version vector
in the Set it is absent from. If the element dot is not "seen" by the Set version vector,
that means the other set has yet to see this add, and the item is in the merged
Set. If the Set version vector dominates the dot, that means the other Set has removed this
element already, and the item is not in the merged Set.
This class is immutable, i.e. "modifying" methods return a new instance.
Implements a 'Increment/Decrement Counter' CRDT, also called a 'PN-Counter'.
Implements a 'Increment/Decrement Counter' CRDT, also called a 'PN-Counter'.
It is described in the paper A comprehensive study of Convergent and Commutative Replicated Data Types.
PN-Counters allow the counter to be incremented by tracking the increments (P) separate from the decrements (N). Both P and N are represented as two internal GCounters. Merge is handled by merging the internal P and N counters. The value of the counter is the value of the P counter minus the value of the N counter.
This class is immutable, i.e. "modifying" methods return a new instance.
Map of named counters.
ReplicatedData that has support for pruning of data belonging to a specific node may implement this interface.
ReplicatedData that has support for pruning of data belonging to a specific node may implement this interface. When a node is removed from the cluster these methods will be used by the Replicator to collapse data from the removed node into some other node in the cluster.
Interface for implementing a state based convergent replicated data type (CvRDT).
Interface for implementing a state based convergent replicated data type (CvRDT).
ReplicatedData types must be serializable with an Akka Serializer. It is highly recommended to implement a serializer with Protobuf or similar. The built in data types are marked with ReplicatedDataSerialization and serialized with akka.cluster.ddata.protobuf.ReplicatedDataSerializer.
Serialization of the data types are used in remote messages and also for creating message digests (SHA-1) to detect changes. Therefore it is important that the serialization produce the same bytes for the same content. For example sets and maps should be sorted deterministically in the serialization.
ReplicatedData types should be immutable, i.e. "modifying" methods should return a new instance.
Marker trait for ReplicatedData
serialized by
akka.cluster.ddata.protobuf.ReplicatedDataSerializer.
A replicated in-memory data store supporting low latency and high availability requirements.
A replicated in-memory data store supporting low latency and high availability requirements.
The Replicator
actor takes care of direct replication and gossip based
dissemination of Conflict Free Replicated Data Types (CRDTs) to replicas in the
the cluster.
The data types must be convergent CRDTs and implement ReplicatedData, i.e.
they provide a monotonic merge function and the state changes always converge.
You can use your own custom ReplicatedData types, and several types are provided by this package, such as:
For good introduction to the CRDT subject watch the The Final Causal Frontier and Eventually Consistent Data Structures talk by Sean Cribbs and and the talk by Mark Shapiro and read the excellent paper A comprehensive study of Convergent and Commutative Replicated Data Types by Mark Shapiro et. al.
The Replicator
actor must be started on each node in the cluster, or group of
nodes tagged with a specific role. It communicates with other Replicator
instances
with the same path (without address) that are running on other nodes . For convenience it
can be used with the DistributedData extension.
To modify and replicate a ReplicatedData value you send a Replicator.Update message
to the local Replicator
.
The current data value for the key
of the Update
is passed as parameter to the modify
function of the Update
. The function is supposed to return the new value of the data, which
will then be replicated according to the given consistency level.
The modify
function is called by the Replicator
actor and must therefore be a pure
function that only uses the data parameter and stable fields from enclosing scope. It must
for example not access sender()
reference of an enclosing actor.
Update
is intended to only be sent from an actor running in same local ActorSystem
as
the Replicator
, because the modify
function is typically not serializable.
You supply a write consistency level which has the following meaning:
WriteLocal
the value will immediately only be written to the local replica,
and later disseminated with gossipWriteTo(n)
the value will immediately be written to at least n
replicas,
including the local replicaWriteMajority
the value will immediately be written to a majority of replicas, i.e.
at least N/2 + 1
replicas, where N is the number of nodes in the cluster
(or cluster role group)WriteAll
the value will immediately be written to all nodes in the cluster
(or all nodes in the cluster role group)As reply of the Update
a Replicator.UpdateSuccess is sent to the sender of the
Update
if the value was successfully replicated according to the supplied consistency
level within the supplied timeout. Otherwise a Replicator.UpdateFailure subclass is
sent back. Note that a Replicator.UpdateTimeout reply does not mean that the update completely failed
or was rolled back. It may still have been replicated to some nodes, and will eventually
be replicated to all nodes with the gossip protocol.
You will always see your own writes. For example if you send two Update
messages
changing the value of the same key
, the modify
function of the second message will
see the change that was performed by the first Update
message.
In the Update
message you can pass an optional request context, which the Replicator
does not care about, but is included in the reply messages. This is a convenient
way to pass contextual information (e.g. original sender) without having to use ask
or local correlation data structures.
To retrieve the current value of a data you send Replicator.Get message to the
Replicator
. You supply a consistency level which has the following meaning:
ReadLocal
the value will only be read from the local replicaReadFrom(n)
the value will be read and merged from n
replicas,
including the local replicaReadMajority
the value will be read and merged from a majority of replicas, i.e.
at least N/2 + 1
replicas, where N is the number of nodes in the cluster
(or cluster role group)ReadAll
the value will be read and merged from all nodes in the cluster
(or all nodes in the cluster role group)As reply of the Get
a Replicator.GetSuccess is sent to the sender of the
Get
if the value was successfully retrieved according to the supplied consistency
level within the supplied timeout. Otherwise a Replicator.GetFailure is sent.
If the key does not exist the reply will be Replicator.NotFound.
You will always read your own writes. For example if you send a Update
message
followed by a Get
of the same key
the Get
will retrieve the change that was
performed by the preceding Update
message. However, the order of the reply messages are
not defined, i.e. in the previous example you may receive the GetSuccess
before
the UpdateSuccess
.
In the Get
message you can pass an optional request context in the same way as for the
Update
message, described above. For example the original sender can be passed and replied
to after receiving and transforming GetSuccess
.
You may also register interest in change notifications by sending Replicator.Subscribe
message to the Replicator
. It will send Replicator.Changed messages to the registered
subscriber when the data for the subscribed key is updated. Subscribers will be notified
periodically with the configured notify-subscribers-interval
, and it is also possible to
send an explicit Replicator.FlushChanges
message to the Replicator
to notify the subscribers
immediately.
The subscriber is automatically removed if the subscriber is terminated. A subscriber can also be deregistered with the Replicator.Unsubscribe message.
A data entry can be deleted by sending a Replicator.Delete message to the local
local Replicator
. As reply of the Delete
a Replicator.DeleteSuccess is sent to
the sender of the Delete
if the value was successfully deleted according to the supplied
consistency level within the supplied timeout. Otherwise a Replicator.ReplicationDeleteFailure
is sent. Note that ReplicationDeleteFailure
does not mean that the delete completely failed or
was rolled back. It may still have been replicated to some nodes, and may eventually be replicated
to all nodes.
A deleted key cannot be reused again, but it is still recommended to delete unused
data entries because that reduces the replication overhead when new nodes join the cluster.
Subsequent Delete
, Update
and Get
requests will be replied with Replicator.DataDeleted.
Subscribers will receive Replicator.DataDeleted.
One thing that can be problematic with CRDTs is that some data types accumulate history (garbage).
For example a GCounter
keeps track of one counter per node. If a GCounter
has been updated
from one node it will associate the identifier of that node forever. That can become a problem
for long running systems with many cluster nodes being added and removed. To solve this problem
the Replicator
performs pruning of data associated with nodes that have been removed from the
cluster. Data types that need pruning have to implement RemovedNodePruning. The pruning consists
of several steps:
maxPruningDissemination
duration has elapsed. The time measurement is stopped when any
replica is unreachable, so it should be configured to worst case in a healthy cluster.PruningInitialized
marker in the data envelope.
This is gossiped to all other nodes and they mark it as seen when they receive it.PruningInitialized
marker
the leader performs the pruning and changes the marker to PruningPerformed
so that nobody
else will redo the pruning. The data envelope with this pruning state is a CRDT itself.
The pruning is typically performed by "moving" the part of the data associated with
the removed node to the leader node. For example, a GCounter
is a Map
with the node as key
and the counts done by that node as value. When pruning the value of the removed node is
moved to the entry owned by the leader node. See RemovedNodePruning#prune.maxPruningDissemination
duration after pruning the last entry from the
removed node the PruningPerformed
markers in the data envelope are collapsed into a
single tombstone entry, for efficiency. Clients may continue to use old data and therefore
all data are always cleared from parts associated with tombstoned nodes.Representation of a Vector-based clock (counting clock), inspired by Lamport logical clocks.
Representation of a Vector-based clock (counting clock), inspired by Lamport logical clocks.
Reference: 1) Leslie Lamport (1978). "Time, clocks, and the ordering of events in a distributed system". Communications of the ACM 21 (7): 558-565. 2) Friedemann Mattern (1988). "Virtual Time and Global States of Distributed Systems". Workshop on Parallel and Distributed Algorithms: pp. 215-226
Based on code from akka.cluster.VectorClock
.
This class is immutable, i.e. "modifying" methods return a new instance.
VersionVector module with helper classes and methods.