Skip to main content
Version: 0.15

Entity stream

Concept

So far, we figured out that an Aggregate is the transaction and consistency boundary within the domain model.

The aggregate store (application-level abstraction) uses an event store (infrastructure) to store events in streams. Each aggregate instance has its own stream, so the event store needs to be capable to read and write events from/to the correct stream.

When appending events to a stream, the append operation for a single stream must be transactional to ensure that the stream is consistent. Eventuous handles commands using the command service, and one command handler is the unit of work. All the events generated by the aggregate instance during the unit of work are appended to the stream as the final step in the command handling process.

Stream name

As a stream is a boundary for a single entity state, its name must be unique per entity. Therefore, the stream name usually consists of an identifiable class name associated with the entity state (aggregate type name, state type name) without suffixes like Aggregate, Entity or State, combined with the entity id. Eventuous uses a convention to separate those two with a dash.

For example, for an entity called Booking where it might be represented as a Booking aggregate or BookingState state record, the stream name will start with Booking- followed by the entity id. So, a booking with id 123 will have a stream name Booking-123.

Aggregate streams

For aggregates, Eventuous uses the AggregateType.Name combined with the aggregate id as the stream name. When using an aggregate-based command service, each command handler defines how the aggregate id is resolved from a command:

On<ImportBooking>()
.InState(ExpectedState.New)
.GetId(cmd => new(cmd.BookingId))
.Act((booking, cmd) => booking.Import(cmd.RoomId, new(cmd.CheckIn, cmd.CheckOut), new(cmd.Price)));

In the example above, if the ImportBooking.BookingId property has a value 123, the service will construct a BookingId instance with value 123. Then, the service will use StreamName.For<Booking>(id) where id is of type BookingId to determine the stream name. Following the example, it will use Booking-123 as the stream name, where Booking is the aggregate type name, and 123 is the aggregate identity value as string.

However, you might want to have more fine-grained control over the stream name. For example, you might want to include the tenant id in the stream name. It's possible to override the default convention by configuring the stream name mapping. The stream map contains a mapping between the aggregate identity type (derived from Id) and the stream name generation function. Therefore, any additional property of the aggregate identity type can be used to generate the stream name.

For example, the following code registers a stream name mapping for the Booking aggregate:

BookingId.cs
public record BookingId(string Value, string Tenant) : Id(Value);

Create a StreamNameMap and register in the container:

Program.cs
var streamNameMap = new StreamNameMap();
streamNameMap.Register<BookingId>(
id => new StreamName($"Booking-{id.Tenant}:{id.Value}")
);
builder.Services.AddSingleton(streamNameMap);
builder.Services.AddCommandService<BookingService, BookingState>();

Then, use the registered StreamNameMap in the CommandService:

BookingService.cs
// Aggregate service
public class BookingService : CommandService<Booking, BookingState, BookingId> {
public BookingService(IEventStore store, StreamNameMap streamNameMap)
: base(store, streamNameMap: streamNameMap) {
// command handlers registered here
}
}

State streams

State streams and aggregate streams are essentially the same. Aggregates don't persist anything else than their state. The difference from the API perspective is in how stream names are resolved by command services.

While aggregate-based services calculate stream names using the aggregate type name and aggregate identity, functional services expect you to specify a function to retrieve the stream name from a command when you defined the command handler.

Let's have a look at the following command handler definition:

// Constructor of BookingService : CommandService<BookingState>
On<BookRoom>()
.InState(ExpectedState.New)
.GetStream(cmd => GetStream(cmd.BookingId))
.Act(BookRoom);

Here, the command service is instructed to use the default way to resolve the stream name from a command. It uses the GetStream function available in the command service base class, which calls StreamName.ForState<TState> function. By convention, it will take the TState type name, remove State word from it if it's there, and append a dash followed the entity id to the string. So, if the BookRoom.Id property has a value 123, the stream name would be resolved as Booking-123.

Reversing stream name to identity

As you don't need to include the entity or aggregate id to the events, you might be wondering how to get the identity in, for example, downstream event handlers like read model projections. For that purpose, you'd need to resolve the identity from the stream name.

Convention-based

There are several options available to determine the entity or aggregate id, which then can be used as a document id or primary key for projected models, from stream names.

First, the StreamName struct provides the GetId() function that returns the convention-based identity. It takes part of the stream name string following the dash and returns it as a string. For example, if the stream name is Booking-123, GetId() will return 123.

For example, a projecting handler for MongoDB uses it like this:

On<BookingImported>(
b => b
.InsertOne
.Document(
(stream, e) => new(stream.GetId()) {
RoomId = e.RoomId,
CheckInDate = e.CheckIn,
CheckOutDate = e.CheckOut,
BookingPrice = e.Price,
Outstanding = e.Price
}
)
);

MongoDB projections also provide a DefaultId() helper that also uses the convention:

On<BookingPaymentRegistered>(
b => b
.UpdateOne
.DefaultId()
.Update((evt, update) => update.Set(x => x.PaidAmount, evt.AmountPaid))
);

Custom stream names

Convention-based approach won't work if you use custom stream names. In that case, you'd need to write code to calculate the identity value from stream names.

For example, your projections can retrieve the Id and TenantId from the StreamName in the IMessageConsumeContext<T>:

BookingStateProjection.cs
static UpdateDefinition<BookingDocument> HandleRoomBooked(
IMessageConsumeContext<V1.RoomBooked> ctx,
UpdateDefinitionBuilder<BookingDocument> update
) {
var evt = ctx.Message;

// Get Id and TenantId
var (id, tenantId) = ctx.Stream.ExtractMultiTenantIds();

return update
.SetOnInsert(x => x.Id, id)
.SetOnInsert(x => x.TenantId, tenantId)
.Set(x => x.GuestId, evt.GuestId)
.Set(x => x.RoomId, evt.RoomId)
.Set(x => x.CheckInDate, evt.CheckInDate)
.Set(x => x.CheckOutDate, evt.CheckOutDate)
.Set(x => x.BookingPrice, evt.BookingPrice)
.Set(x => x.Outstanding, evt.OutstandingAmount);
}

The snippet above uses the following extension method to extract the Id and TenantId from the StreamName:

StreamNameExtensions.cs
/// <summary>
/// Split the StreamName into multiple parts for multi tenant stream id.
/// </summary>
/// <param name="stream">The streamname</param>
/// <param name="separator">The seperator for splitting. Default is ':'.</param>
/// <returns>A tuple with TenantId and Id property.</returns>
/// <exception cref="InvalidStreamName">When stream id can't be split in 2 sections.</exception>
public static (string TenantId, string Id) ExtractMultiTenantIds(
this StreamName stream,
char separator = ':'
) {
var streamId = stream.GetId(); // Returns "tenant:id" from "Booking-tenant:id"
var parts = streamId.Split(separator);

if (parts.Length != 2) {
throw new InvalidStreamName(streamId);
}

return (parts[0], parts[1]);
}