Elegant way of publishing domain events (part 1)
- Processes, standards and quality
- Technologies
- Others
When I design my domain entities, I like to follow Domain-Driven Design principles.
I strive to make them persistent and infrastructure-agnostic as I feel that this is the proper way to receive pure domain logic which can be easily covered by unit tests. And I can usually achieve all of these with more or less effort, but the domain event publication is the area that I find challenging because it requires a proper piece of infrastructure to dispatch events to listeners.
Let’s define a domain
I think that in many mature organisations we may face a situation when approval from a manager or a group of managers is required to proceed with a given operation – for example, be it an order, a payment or a discount that is higher than usual.
Let’s assume that in our organisation such approvals follow very simple rules. Each approval is created with one or many recipients. In order to proceed with the operation, all approvers must authorise it, and at least one rejection is sufficient to stop further processing. Proper events should be published if any of those situations occur. It could look like that:
public class Approval
{
public int ApprovalId { get; private set; }
public string Subject { get; private set; }
public string Description { get; private set; }
public ApprovalStatus Status { get; private set; }
public IEnumerable<ApprovalRequest> ApprovalRequests { get; private set; }
protected Approval() { }
public Approval(string subject, string description, List<User> requiredApprovers)
{
Subject = subject;
Description = description;
Status = ApprovalStatus.Pending;
ApprovalRequests = requiredApprovers.Select(u => ApprovalRequest.ForUser(u)).ToList();
}
public void Approve(User user)
{
ApprovalRequest request = GetApprovalRequestForUser(user);
request.Approve(user);
if (ApprovalRequests.All(x => x.HasBeenApproved))
{
Status = ApprovalStatus.Approved;
// Publish ApprovalCompleted
}
}
public void Reject(User user, string reason)
{
ApprovalRequest request = GetApprovalRequestForUser(user);
request.Reject(user, reason);
if (ApprovalRequests.Any(x => x.HasBeenRejected))
{
Status = ApprovalStatus.Rejected;
// Publish Approval Rejected
}
}
private ApprovalRequest GetApprovalRequestForUser(User user)
{
var request = ApprovalRequests.FirstOrDefault(a => a.ApproverId == user.UserId);
if (request == null)
{
throw new UserIsNotParticipantOfApprovalProcesException(this, user);
}
return request;
}
}
How to tackle that?
So a few years ago, we were facing the same problem, and our journey has begun.
Injecting message bus abstraction into our domain entities was the first solution that we considered. Unfortunately, this would have polluted our domain object interface with infrastructure concern and also it was not viable with our ORM. Another option was to pass it as a parameter in each method which could publish events, but this would have also polluted the API of our domain entity, and it felt a little cumbersome.
We also thought about returning domain events from methods that change the state of an entity. I used to believe that this isn’t a good idea as it would violate command, and query segregation principle – methods that change the state shouldn’t return anything, and query methods shouldn’t change the state. Nowadays, I think this approach may be worth a try as it avoids any side effects. You may find an example here.
Ambient Context to the rescue
So, we needed a dependency that would be available in our domain classes but without actually polluting their API. There is a pattern called Ambient Context that specifically addresses this issue. You may have seen it in the excellent book “Dependency Injection in .Net” by Mark Seemann. Udi Dahan promoted a similar solution on his blog also.
Ambient Context is very similar to the Singleton Pattern, the only difference being that it allows changing instance. The most basic implementation can look like this:
public abstract class MessageBusProvider
{
private static MessageBusProvider _current = new VoidMessageBusProvider();
public static MessageBusProvider Current
{
get
{
return _current;
}
set
{
if (value == null)
{
throw new ArgumentException("Message bus provider cannot be null");
}
_current = value;
}
}
public abstract void Publish<TEvent>(TEvent anEvent);
}
The most vital thing is that MessageBusProvider is an abstraction and it is possible to change/switch it thanks to writable property. Because of that, it is easy to write a proper unit test against classes which are using it. The concrete implementation using NServiceBus could look like:
public class NServiceBusMessageBusProvider : MessageBusProvider
{
private readonly IBus _bus;
public NServiceBusMessageBusProvider(IBus bus)
{
_bus = bus;
}
public override void Publish<TEvent>(TEvent anEvent) => _bus.Publish(anEvent);
}
Please note that this implementation has no state, it delegates all messages to NServiceBus session instance which is initialised once on program start-up. The context doesn’t change afterwards. It simplifies things a little as there is no need to think about concurrency and threads.
Its usage looks like this:
public void Approve(User user)
{
ApprovalRequest request = GetApprovalRequestForUser(user);
request.Approve(user);
if (ApprovalRequests.All(x => x.HasBeenApproved))
{
MessageBusProvider.Current.Publish(new ApprovalCompletedEvent(ApprovalId));
}
}
In need of a state
Over time, we started to notice that the aforementioned solution has several drawbacks.
Events were dispatched immediately to the queue, and state changes were saved to the database at later stages. A message could be delivered to their endpoints even as the transaction was undergoing a rollback if selected queuing solution had no transaction support. We used MSMQ as our message transport which is capable of enlisting within a pending transaction, so our events weren’t dispatched till commit. Despite that we didn’t want to rely on that feature as it could pose a problem if there would be a need to replace a queuing technology in future, we felt that more control on dispatching moment could be beneficial.
With our system growing, the need for a proper distributed tracing started to emerge. While processing a request or a message we wanted to grab current tracing headers and pass it to subsequent requests and messages. It would allow us to track business action across different services. In order to do that the message data should be available within MessageBusProvider. It was not possible due to the context was being used as a singleton.
Proper context within ambient context
In .Net environment, there is a way to hold local data in a static field. This may be achieved by using ThreadLocal or AsyncLocal classes. The former provides a separate instance for each thread while the latter does that for whole operation flow. We picked AsyncLocal as we have a lot of async methods within our codebase.
public abstract class MessageBusProvider
{
private static readonly AsyncLocal<MessageBusProvider> _current = new AsyncLocal<MessageBusProvider>();
public static MessageBusProvider Current
{
get
{
if (_current.Value == null)
{
throw new InvalidOperationException("MessageBusProvider is not initialized");
}
return _current.Value;
}
set
{
if (value == null)
{
throw new ArgumentException("Message bus provider cannot be null.");
}
if (_current.Value != null)
{
throw new ArgumentException("Provider already assigned for current operation");
}
_current.Value = value;
}
}
public abstract void Publish<TEvent>(TEvent anEvent);
public abstract Task DispatchEvents();
}
In that form Ambient Context needs to be initialised at the beginning of each API request or message handling, it can be hooked as a middleware in ASP.NET Core or somewhere into the pipeline of message bus depending on the context. Decorator pattern may be used also if the context isn’t used prior to that invocation:
public class ScopeProvidingMessageHandlerDecorator<T> : IHandleMessages<T>
{
private readonly IHandleMessages<T> _innerHandler;
public ScopeProvidingMessageHandlerDecorator(IHandleMessages<T> innerHandler)
{
_innerHandler = innerHandler;
}
public async Task Handle(T command, IMessageHandlerContext context)
{
MessageBusProvider.Current = new NServiceBusMessageBusProvider(context);
await _innerHandler.Handle(command, context);
await MessageBusProvider.Current.DispatchEvents();
}
}
This is a possible implementation of NServiceBus message handler decorator. You can see how the local operation state (IMessageHandlerContext) is passed to Ambient Context and is available for use. Published events are stored in memory and can be dispatched at a convenient time.