Elegant way of publishing domain events

Elegant way of publishing domain events (part 2)

data: 9 września, 2020
czas czytania: 6
autor: Mariusz Macheta

In the previous article, I described how we resolved our problems with event publication by using the Ambient Context pattern.

This solution have been working fine for us for quite a long time, but at the beginning we weren’t absolutely happy about it – this was rather a trade-off to allow us to publish events from within our entities. In the meantime, we have identified the following drawbacks of the solution:

  • MessageBusProvider isn’t a class name that you would expect inside your perfectly modelled core domain – it’s technical and certainly doesn’t follow Ubiquitous Language. 
  • It is very difficult to reason about the scope of the ambient context without navigating through code and investigating the implementation.
  • The ambient context needs to be initialized prior to its first use, so temporal coupling is present. There is a risk of exception cropping up at runtime which can be mitigated by a proper pipeline setup, but still, this is something that complicates the overall design.

Over the years, also Mark Seemann has changed his view on that topic. As he said: “I never use that pattern myself, so it’s clear to me that for all the situations that I typically encounter, there’s always better solutions, with no significant trade-offs” so maybe also in this case a better solution exists? In the next paragraphs, I will present the improvements implemented in this solution and how we addressed the drawbacks we found.

Do we really need to inject services into entities?

While there certainly is a need for dispatching events to their corresponding endpoints, the dispatching itself should not be immediate. All pending events form a state, and the only problem is where to store it. Ambient context holds it globally for the current operation, but does it really need to be global? Maybe we can simply float that state instead.

In order to remove all external references, the entity is the only place where we can store published events. Any published events will be a part of a given entity state. In fact, this is a common practice in event sourcing frameworks! To not repeat the code in each entity, Layer supertype may be created.

    public abstract class Entity
    {
        public IReadOnlyCollection<DomainEvent> PublishedEvents => _publishedEvents.AsReadOnly();
 
        private List<DomainEvent> _publishedEvents = new List<DomainEvent>();
 
        public void Publish(DomainEvent @event)
        {
            _publishedEvents.Add(@event);
        }
    }

And usage is really simple:

public void Approve(User user)
        {
            ApprovalRequest request = GetApprovalRequestForUser(user);
            request.Approve(user);
 
            if (ApprovalRequests.All(x => x.HasBeenApproved))
            {
                Publish(new ApprovalCompletedEvent(ApprovalId));
            }
        }

Publishing Events

In such a setup, the only thing left to do is to find every entity that was modified, grab its events and push them to correct infrastructure. If you are using Object Relational Mapper, then it is probably implementing the Unit of Work pattern under the hood so it should be pretty trivial to do those steps. 

We were using Entity Framework as our ORM. Its DbContext exposes ChangeTracker which can be queried to find all the entities that were loaded. We decided to wrap it in a custom class which may be used along with different contexts (we had several of them within our application).

public abstract class UnitOfWork<T> where T : DbContext
    {
        protected readonly T _context;
        protected readonly HashSet<object> _publishedEvents;
 
        public UnitOfWork(T context)
        {
            _context = context;
            _publishedEvents = new HashSet<object>();
        }
 
        protected async Task PerformSave(IMessageBus messageBus)
        {
            var eventsToPublish = GetEventsToPublish();
            await _context.SaveChangesAsync();
            await DispatchEvents(eventsToPublish, messageBus);
        }
 
        protected IOrderedEnumerable<DomainEvent> GetEventsToPublish()
        {
            return _context.ChangeTracker.Entries()
                .Select(e => e.Entity)
                .OfType<Entity>()
                .SelectMany(e => e.PublishedEvents)
                .OrderBy(e => e.PublicationTime);
        }
 
        protected async Task DispatchEvents(
            IOrderedEnumerable<DomainEvent> eventsToPublish,
            IMessageBus messageBus)
        {
            foreach (var @event in eventsToPublish)
            {
                if (_publishedEvents.Add(@event))
                {
                    await messageBus.Publish(@event);
                }
            }
        }
    }

We decided to use HashSet to prevent sending duplicates, as in our codebase were the situations when PerformSave was invoked multiple times on the same UnitOfWork. As an alternative, the collection could be just cleared. 

UnitOfWork class is abstract to allow creating specialised subclasses depending on the context. Some could have MessageBus injected by the constructor, and some could expect it to be provided as a method parameter.

public class ApiUnitOfWork : UnitOfWork<ApprovalContext>, IApiUnitOfWork
    {
        private readonly IMessageBus _messageBus;
 
        public ApiUnitOfWork(ApprovalContext context, IMessageBus messageBus) : base(context)
        {
            _messageBus = messageBus;
        }
 
        public Task Save() => PerformSave(_messageBus);
    }

    public class BusUnitOfWork : UnitOfWork<ApprovalContext>, IBusUnitOfWork
    {
        public BusUnitOfWork(ApprovalContext context) : base(context)
        {
        }
 
        public Task Save(IMessageBus messageBus) => PerformSave(messageBus);
    }

It is very important to decide when DispatchingEvents happens: before or after saving changes to the database. It really depends on what is used for transporting events and if it is capable of enlisting in a database transaction. It’s required to consider what will happen if an exception is thrown within any of those methods.

The last thing to do is to use it in your application layer:

public class ApproveCommandHandler : ICommandHandler<ApproveCommand>
    {
        private readonly IApprovalRepository _approvalRepository;
        private readonly IUserRepository _userRepository;
        private readonly IApiUnitOfWork _unitOfWork;
 
        public ApproveCommandHandler(
            IApprovalRepository approvalRepository,
            IUserRepository userRepository,
            IApiUnitOfWork unitOfWork)
        {
            _approvalRepository = approvalRepository;
            _userRepository = userRepository;
            _unitOfWork = unitOfWork;
        }
 
        public async Task Handle(ApproveCommand command)
        {
            Approval approval = await _approvalRepository.GetApproval(command.ApprovalId);
            User approver = await _userRepository.GetUser(command.ApproverId);
 
            approval.Approve(approver);
 
            await _unitOfWork.Save();
        }

Finally

I think that both solutions offer a fine and clean way of publishing domain events. I find the latter one much easier to understand and a bit easier to maintain. Often when new requirements appear, there is a chance to revisit the current architecture and try to improve it.

Newsletter IT leaks

Dzielimy się inspiracjami i nowinkami z branży IT. Szanujemy Twój czas - obiecujemy nie spamować i wysyłać wiadomości raz na dwa miesiące.

Subscribe to our newsletter

Administratorem Twoich danych osobowych jest Future Processing S.A. z siedzibą w Gliwicach. Twoje dane będziemy przetwarzać w celu przesyłania cyklicznego newslettera dot. branży IT. W każdej chwili możesz się wypisać lub edytować swoje dane. Więcej informacji znajdziesz w naszej polityce prywatności.

Subscribe to our newsletter

Administratorem Twoich danych osobowych jest Future Processing S.A. z siedzibą w Gliwicach. Twoje dane będziemy przetwarzać w celu przesyłania cyklicznego newslettera dot. branży IT. W każdej chwili możesz się wypisać lub edytować swoje dane. Więcej informacji znajdziesz w naszej polityce prywatności.