Note: This is a writeup of a talk I gave at Code BEAM America 2021. I will update with a link to the video once it’s available. While some of the discussion of CRDTs is general, the implementation is specific to Elixir and the Erlang/OTP environment.

Conflict-free replicated data types

A conflict-free replicated data type (CRDT) is a data structure which can be replicated across multiple computers in a network, where the replicas can be updated independently and concurrently without coordination between the replicas, and where it is always mathematically possible to resolve inconsistencies that might come up.

— Marc Shapiro, Nuno Preguiça, et al

CRDTs are an interesting way to replicate and merge state in a distributed environment. To summarize the formal definition, CRDTs have the following properties:

  • No single source of truth. Unlike a centralized DB or a consensus protocol, every node is authoritative for the entire dataset.
  • Data is eventually consistent and there are no guarantees of linearizability across multiple nodes.
  • As the name implies, there are no merge conflicts.
  • Global convergence. All nodes will process all writes (eventually) and end up with an identical view of the data.

Types of CRDTs

A few specific ways of implementing CRDTs are in use today. While all of these are CRDTs, they have different tradeoffs for various use cases.

State based

With full state CRDTs, the entire dataset is sent and merged across nodes. This is the original implementation of CRDTs. Sending the full state makes for reliable and simple convergence, but is very expensive. I’m not aware of any real world systems that do this, but anything is possible.

Operational transforms

Operational transforms are a type of CRDT where only individual operations are sent to other nodes. An example of an operational transform could be “delete two characters starting at row 40, column 2”. The list of operational transforms can be replayed in order to recreate any intermediate state.

The order of operations determines how they are applied. In addition to ensuring consistent ordering, replication usually needs to guarantee a reliable exactly-once delivery within the CRDT framework or have some way to de-duplicate messages. Each operation is very cheap to send, in contrast with full state CRDTs.

Operational transforms are popular in real-time collaborative editing environments, such as Google Docs. Text editing functionality is easy to bolt on - undo can just revert the user’s last operation, for example. Each transform is generally small, and state can be reasonably pruned/snapshotted as it is likely not necessary to retain the entire history indefinitely.

Delta state

Delta CRDTs are the newest CRDT structure and they have some of the best properties of both full state and operational transforms. When the state is updated, the difference between the new and old state is propagated to other nodes. This incremental state is much smaller than that of a full state CRDT, but since the delta can be merged into the existing state the replication doesn’t need to be as rigid as with operational transforms. There is more tolerance for messages sent out of order or delivered multiple times.

Usage in the wild

To get away from the theoretical, let’s take a look at some places CRDTs are being used right now. Some databases make use of CRDTs for replication - Riak and distributed Redis are great examples. The Phoenix Presence library is particularly interesting as a more lightweight example for replicating state within the context of a web application.

The Phoenix framework uses CRDTs for presence functionality. The canonical example for Phoenix Presence is keeping track of the status of users within a chat room - the state of the room needs to be available to all servers, but individual users can join and leave from any given server.

The presence implementation creates an API in front of a pool of shards routed by topic. CRDTs are sent along with a timed heartbeat and then used to replicate presence info across a cluster.

Phoenix Presence is an implementation of Phoenix Tracker, which is a bit lower level and allows more user control over the structure of the data. State for Presence is a map of IDs keyed to lists of metadata:

# Example state
%{
  "123" => %{
    metas: [
      %{status: "online", phx_ref: ...}
    ]
  },
  "456" => %{
    metas: [
      %{status: "online", phx_ref: ...}
    ]
  }
}

The state sent between nodes is broken up into joins and leaves. Each section has status info for users (just the deltas, not all users) which can then be merged into the rest of the receiving node’s state.

# Example message sent as a CRDT
%{
  joins: %{
    "123" => %{
      metas: [
        %{status: "away", phx_ref: ...}
      ]
    }
  },
  leaves: %{
    "456" => %{
      metas: [
        %{status: "online", phx_ref: ...}
      ]
    }
  }
}

This is a great use case for CRDTs. In general, CRDTs are useful when you want to have:

  • A distributed cache
  • A read-heavy global configuration
  • Local events that should be replicated to the entire cluster
  • Any small, atomic, changes that have a local source of truth

CRDTs are a poor choice for:

  • Data that should be reliably persisted
  • Transactions, or anything that expects read-after-write consistency
  • Relational data
  • Large and frequent writes

DeltaCrdt and Horde

DeltaCrdt is an Elixir library that implements, obviously, delta CRDTs. It uses a merkle tree for the data structure which gets synchronized to all nodes in the cluster. The replication uses join decomposition and anti-entropy to limit the amount of network traffic, deltas are only sent where necessary to converge the cluster. Despite the tree structure, DeltaCrdt’s API provides the semantics of a key/value store.

iex> {:ok, crdt1} =
  DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, sync_interval: 3)
iex> {:ok, crdt2} =
  DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, sync_interval: 3)
iex> DeltaCrdt.set_neighbours(crdt1, [crdt2])
iex> DeltaCrdt.set_neighbours(crdt2, [crdt1])
iex> DeltaCrdt.to_map(crdt1)
%{}
iex> DeltaCrdt.put(crdt1, "CRDT", "is magic!")
iex> Process.sleep(10) # need to wait for propagation for the doctest
iex> DeltaCrdt.to_map(crdt2)
%{"CRDT" => "is magic!"}

The author of DeltaCrdt, Derek Kraan, used it as a basis for another fantastic library called Horde. Horde provides a distributed name registration and process supervisor on top of delta CRDTs. A GenServer process can be started at the cluster level, rather than the individual node level, and addressed with the process registration :via tuples. Horde has really well written documentation that explains how to use it better than I could.

Building on delta CRDTs

The JumpWire engine currently runs without an external database. An autoscaling, self-discovered cluster of nodes is created with work distributed between them. Some data needs to be available across the cluster, and since running in an ephemeral cloud environment increases the likelihood of individual nodes disappearing at any time, the data needs to be replicated. There are a few battle tested solutions for this kind of problem. A consensus protocol would work but would create a centralized point for both failures and data growth. Ideally we would avoid having a special class of servers and simply treat every node in the cluster as interchangeable. Another option would be to use a hash ring architecture to ensure each piece of data is replicated to a few nodes, which definitely has merits. Or we could use delta CRDTs to replicate the data to every node.

It is important to consider the type of data and read/write patterns when selecting a replication model. For the JumpWire engine, there are two separate use cases we want to replicate. The first is event data - individual nodes generate immutable events as they perform work, and those events need to be resilient for later retrieval. The second use case is for storing global configuration data. This is data generated at runtime, usually by making a live configuration change on the cluster. Every node needs access to this configuration, the size of the data is pretty small, and there are far more read operations than writes.

Delta CRDTs are a great fit for replicating the configuration data and for the event use case either method would work well. Given that events are generated and written more frequently than they are read, a ring topology might be the best choice. However, adding an additional distributed replication protocol would increase complexity without adding too much value, so we use delta CRDTs for both sets of data. The implementation uses the DeltaCrdt library as its base but wraps it with more specific behaviour for this use case, making it easy to reuse across modules.

Baseline module

The module Aviato.DeltaCrdt provides the core of our implementation:

defmodule Aviato.DeltaCrdt do
  defmacro __using__(opts) do
    quote bind_quoted: [opts: opts] do

      cluster = opts[:cluster]
      opts = opts
      |> Keyword.put_new(:cluster_mod, {cluster || Aviato.DeltaCrdt, []})
      |> Keyword.put_new(:cluster_name, cluster || __MODULE__)
      |> Keyword.put_new(:crdt_mod, :"#{__MODULE__}.Crdt")
      |> Keyword.put_new(:crdt_opts, [])

      @crdt opts[:crdt_mod]

      defmodule CrdtSupervisor do
        use Supervisor

        @cluster_name opts[:cluster_name]
        @cluster_mod opts[:cluster_mod]
        @crdt opts[:crdt_mod]
        @crdt_opts opts[:crdt_opts]

        def start_link(init_opts) do
          Supervisor.start_link(__MODULE__, init_opts, name: __MODULE__)
        end

        @impl true
        def init(init_opts) do
          crdt_opts = [crdt: DeltaCrdt.AWLWWMap]
          |> Keyword.merge(@crdt_opts)
          |> Keyword.merge(init_opts)
          |> Keyword.put(:name, @crdt)

          {cluster_mod, cluster_opts} = @cluster_mod
          cluster_opts = [crdt: @crdt, name: @cluster_name]
          |> Keyword.merge(cluster_opts)
          |> Keyword.merge(init_opts)

          children = [
            {DeltaCrdt, crdt_opts},
            {Horde.NodeListener, @cluster_name},
            {cluster_mod, cluster_opts},
          ]
          Supervisor.init(children, strategy: :one_for_one)
        end
      end

      def child_spec(opts) do
        CrdtSupervisor.child_spec(opts)
      end
    end
  end

  use GenServer

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: opts[:name])
  end

  @impl true
  def init(opts) do
    members = Horde.NodeListener.make_members(opts[:name])
    state = opts
    |> Enum.into(%{})
    |> Map.put(:members, members)
    {:ok, state}
  end

  @impl true
  def handle_call({:set_members, members}, _from, state = %{crdt: crdt, name: name}) do
    neighbors = members
    |> Stream.filter(fn member -> member != {name, Node.self()} end)
    |> Enum.map(fn {_, node} -> {crdt, node} end)

    DeltaCrdt.set_neighbours(crdt, neighbors)

    {:reply, :ok, %{state | members: members}}
  end

  @impl true
  def handle_call(:members, _from, state = %{members: members}) do
    {:reply, MapSet.to_list(members), state}
  end
end

Most of this module is contained within the __using__/1 macro. It also defines a GenServer with a handler for {:set_members, members}. This is called by Horde and will synchronize the overall BEAM membership with the specific CRDT membership. This module is used by default with the :cluster_mod option, but it can be overridden if there’s a need to limit CRDT membership.

The child_spec/1 function within the __using__/1 block is passed to the CrdtSupervisor module, so when the module that calls using is added to a supervision tree the nested Supervisor gets added. That Supervisor in turn starts three processes:

  • DeltaCrdt handles state merging and replication. Replication happens across nodes that are specifically added to this CRDT group, not necessarily every node in the cluster.
  • Horde.NodeListener reports on changes to the membership of the BEAM cluster. The {:set_members, members} call is triggered to synchronize the node list with the CRDT membership.
  • cluster_mod, which is Aviato.DeltaCrdt by default, is responsible for keeping the CRDT membership updated as the BEAM nodes change.

Events module

Although delta CRDTs may not be perfect for the event replication usage pattern, it’s a useful example of how to use this module. We define an adapter for events handling that specifies how to set and read data:

defmodule Aviato.Events.Adapters.DeltaCrdt do
  use Aviato.DeltaCrdt

  def store(event = %{flight_id: flight_id, id: id}) do
    DeltaCrdt.put(@crdt, {flight_id, id}, event)
    {:ok, event}
  end

  def flight_events(flight_id) do
    DeltaCrdt.to_map(@crdt)
    |> Stream.filter(fn {{^flight_id, _}, _} -> true; _ -> false end)
    |> Stream.map(fn {_, event} -> event end)
    |> Enum.sort_by(&(&1.timestamp), DateTime)
  end

  def all_events() do
    DeltaCrdt.to_map(@crdt) |> Stream.flat_map(fn {_, events} -> events end)
  end
end

The store/1 function takes in an event that should be replicated to the cluster. The event contains an ID and a flight_id that can be used to group events together, and both are used as the map key. If another event with the same flight_id and id is stored, it will completely overwrite the previous event. The DeltaCrdt library operates at a single depth, so updates to a nested map are treated as if everything under the root key changed.

Looking up events for a specific flight_id is done by filtering on the keys, then returning a sorted list of just the events. All events can also be returned by just taking the entire data structure and mapping the events without the keys. Other lookup functions could be easily written - the important thing here is the CRDT is accessed as a map, so either the entire map needs to be operated on as a whole or a specific key must be known.

Agent interface

To make configuration data easier to work with, we further extended the DeltaCrdt module to work like an Agent:

defmodule Aviato.Agent do
  defmacro __using__(opts) do
    quote do
      @agent __MODULE__

      use Aviato.DeltaCrdt,
        cluster_mod: {Aviato.Agent, unquote(opts)},
        cluster_name: @agent

      def get(), do: Agent.get(@agent, & &1)
      def get(fun) when is_function(fun, 1), do: Agent.get(@agent, fun)
      def get(key, default \ %{}), do: get(fn data -> Map.get(data, key, default) end)
      def get(mod, fun, args), do: Agent.get(@agent, mod, fun, args)
      def update(fun), do: Agent.update(@agent, fun)
      def update(mod, fun, args), do: Agent.update(@agent, mod, fun, args)
      def get_and_update(fun), do: Agent.get_and_update(@agent, fun)
      def get_and_update(mod, fun, args), do: Agent.get_and_update(@agent, mod, fun, args)
      def put(key, value), do: update(fn data -> Map.put(data, key, value) end)
    end
  end

  use GenServer

  def start_link(args) do
    args = Enum.into(args, %{})
    GenServer.start_link(__MODULE__, args, name: args[:name])
  end

  def init(args = %{crdt: crdt}) do
    {data, args} = Map.pop(args, :data, %{})
    {:ok, data} = Agent.Server.init(fn -> data end)
    DeltaCrdt.put(crdt, :agent_data, data)
    Aviato.DeltaCrdt.init(args)
  end

  def handle_call({:set_members, members}, from, state) do
    Aviato.DeltaCrdt.handle_call({:set_members, members}, from, state)
  end

  def handle_call(:members, from, state) do
    Aviato.DeltaCrdt.handle_call(:members, from, state)
  end

  def handle_call(op, from, state = %{crdt: crdt}) do
    process_update(crdt, fn data ->
      {res, reply, data} = Agent.Server.handle_call(op, from, data)
      {data, {res, reply, state}}
    end)
  end

  def handle_cast(op, state = %{crdt: crdt}) do
    process_update(crdt, fn data ->
      {:noreply, data} = Agent.Server.handle_cast(op, data)
      {data, {:noreply, state}}
    end)
  end

  def code_change(old, state = %{crdt: crdt}, fun) do
    process_update(crdt, fn data ->
      {:ok, data} = Agent.Server.code_change(old, data, fun)
      {data, {:ok, state}}
    end)
  end

  defp process_update(crdt, fun) when is_function(fun, 1) do
    {data, result} = DeltaCrdt.read(crdt, [:agent_data]) |> Map.get(:agent_data) |> fun.()
    DeltaCrdt.put(crdt, :agent_data, data)
    result
  end
end

The Aviato.Agent module has an API matching a standard Agent, but instead of being an abstraction on top of a GenServer it keeps all its state in a CRDT. Aviato.Agent is run under the CrdtSupervisor and wraps operations normally performed by an Agent.Server process, in addition to membership changes for the CRDT cluster. All updates and reads happen under the key :agent_data in the CRDT.

Using the CRDT agent is very simple. A new module gets defined for each CRDT group that should synchronize data, with optional default data:

defmodule Aviato.GlobalConfig do
  use Aviato.Agent, data: %{
    certs: %{},
    acme: %Aviato.GlobalConfig.ACME{},
    http_auth: %{},
  }
end

The new module gets put in the application’s supervision tree, and then can be used like a standard agent - Aviato.GlobalConfig.put(:certs, %{domain => cert}).

Summary

With the above two modules, and the underlying DeltaCrdt library, we now have an easy to use interface for running delta CRDT clusters. Each module that calls use Aviato.DeltaCrdt or use Aviato.Agent will create its own set of processes and CRDT synchronization. While we haven’t yet tested the scalability limits of this approach, it has been performing quite well for keeping the infrequently written configuration data synchronized across a cluster of ephemeral cloud instances.