Powering Autonomous Capability Domains using an Enterprise Event Stream

Disclaimer Copyright Rocket Mortgage, LLC [2022 – 2023]. The author (Nish Nishant) has been authorized to post this article under license from Rocket Mortgage. This article is is the property of Rocket Mortgage, LLC and is not authorized for redistribution.

Author’s Note – I wrote this article for my employer’s public facing technology blog, but for various reasons, that blog was retired. Since I did not want to lose the content forever, I am re-publishing it here with explicit approval, and hence the disclaimer above.

Abstract

As software systems grow in number and complexity, they end up being inadvertently dependent on each other. This slows down the delivery velocity across technology companies. Teams are forced to delay deliverables as they wait for other cohorts to finish functionality that they’re reliant on. This delay also results in teams not evolving their legacy components.

Solutions to these challenges include defining capability domains, creating boundary APIs and using an enterprise-wide event stream to achieve clean separation across capability domains. This article focuses on the core patterns required to achieve this solution and does not recommend any particular tool-stacks other than as coincidental examples.

Capability Domains

Defining and mapping a technology enterprise’s software systems into domains based on capability is a common first step in avoiding dependencies between systems. Typically, these domains match the technology enterprise’s organizational structure, though it isn’t required. Each capability domain owns the functionality to implement its specific capability, and ideally, there wouldn’t be any overlap across domains.

Increasing the autonomy of the domain is one of the goals of capability domains, which means that systems owned by the domain are considered internal to each domain. As a result, the teams building those systems have the freedom to dictate their technology roadmaps.

A common anti-pattern here is when internal systems integrate directly with other systems from another domain. Doing this is a form of negative coupling that negates the advantages of defining and mapping enterprise-wide capability domains. The preferred way to expose a domain’s functionality to another with minimal coupling is through domain boundaries.

Domain Boundaries

Domain boundaries are public interfaces on the edge of the domain. Typically, these public interfaces are implemented as public REST APIs with published service contracts. Domains within an enterprise need to set mutual expectations that they will only integrate with other domains via that domain’s boundary APIs. Doing so solves the problem of negative coupling that arises as a side effect of directly integrating with internal systems outside their domain.

The boundary APIs also need to be backwards compatible, and it’s good practice to maintain a formal API versioning system. Boundary APIs themselves lack the autonomy internal domain systems enjoy, but this is a very small price to avoid roadmap-hindering dependencies.

Enterprise Event Streams

An event stream is a continuous stream of events broken down into named event topics. An enterprise event stream is a shared event stream where all capability domains publish and receive messages.

Event streams are typically built on data streaming platforms such as Confluent Kafka, Amazon Kinesis or Azure Event Hubs. Regardless of the vendor you choose, they all have a standard, core set of features to stream event messages and are all built for high scalability, concurrency, resiliency and performance. However, more critical than your vendor choice is standardizing event schemas, capability owners and controlling the access to those events.

Combining Domains and Event Streams

Figure 1 – Domains sitting around an Enterprise Event Stream

Once implemented together, capability domains and an enterprise event stream can help an enterprise achieve a high degree of independence across domains. Figure 1 shows multiple domains (X, Y, Z and W); none of those have direct coupling to another domain. They only integrate through the enterprise event stream. There are also multiple consumers (A, B, C and D) and those consumers either integrate with the enterprise event stream or with the domain API, but never directly with an internal domain component such as the App APIs inside domain X.

There are two fundamental tenets to this pattern:

  1. Domains directly integrate with other domains solely through domain boundary APIs (often referred to as domain APIs).
  2. Domains produce and consume enterprise events via the enterprise event stream.

The enterprise events produced by a domain and their domain API are the only public contracts exposed. They’re the only aspects of their domain that must follow enterprise standards and be backwards compatible. Everything else that powers that domain’s capability is internal to the domain, which gives the domain owners a high level of freedom concerning their choice of implementation languages, libraries, container technologies, cloud resource preferences and software standards.

For example, one domain may use .NET and C# on Microsoft Azure because they have team members proficient in those technologies. Even if the rest of the enterprise uses Java on Amazon Web Services, both domains publish standardized events to the enterprise event stream and expose a public REST API, which are agnostic to their technology stack of choice.

Event Payload Patterns

Figure 2 – Common Payload Patterns

Different enterprises choose approaches that work best for their business requirements. In general, there are three types of payload patterns that are used for enterprise events. Figure 2 illustrates three different consumers: A, B and C, each of them using one of the payload patterns. Consumer A uses the Lightweight pattern, consumer B uses the Full Payload pattern, while Consumer C uses the Hybrid approach.

Lightweight Events

For lightweight events, the event payload is often a single identifier or a URI (uniform resource identifier). Consumers receive the event and then make an API call to a domain API to retrieve the current state of the data associated with that event. The benefits of lightweight events are that you save on data size as the events are extremely minimal and you do not need to worry about event ordering since the consuming systems always retrieve the latest state of the data from the source domain. The disadvantage of lightweight events is that the consumer must make an API call to retrieve the data it needs.

Full Payloads

In full payloads, the event payload is a complete snapshot of the state of the event data at a fixed point in time. Consuming systems have all the information they need when they receive the event. These systems save on having to make a call to a domain API. The disadvantage of full payloads is that the order in which the events are processed matters, which is a solved problem as all of the popular vendors support guaranteed message ordering.

Hybrid Approach

In the hybrid approach, the event payload is a partial dataset of the event state. The payload includes the source identifiers and URIs, and a subset of the state of the event. This allows some consumers to determine if they need to retrieve further data from the domain API while allowing other consumers to operate off the subset state of the data in the event payload. The hybrid approach is considered the best of both worlds by many.

There is no perfect way to do this, and it’s a sign of a mature enterprise if they choose to use more than one of these approaches to fit specific requirements.

Conclusion

Highly independent and fast-delivering technology companies use an enterprise event stream and cataloging of multiple capability domains. They couple this with domain owners who follow standards, and the company grows through the additive effects of the capability domains’ advancements.

Advertisement

Serializing enums as strings

Enumerations in C# are ints by default, which is fine most of the time. That said, there are situations where you’d want to store and retrieve them using their string names, especially when you are persisting them to a database. The string names make them more readable for one reason. For another, a re-ordering of the enums (caused inadvertently by the addition or removal of an entry) would change their int values, which can potentially break your data when reading them back from a database. Consider the following types – Model and Seasons.

enum Seasons
{
    Spring,
    Summer,
    Fall,
    Winter
}

class Model
{
    public string Name { get; set; } = String.Empty;

    public Seasons Season { get; set; }
}

If you serialize an instance of Model, you’d see the int value for the Season property.

var model = new Model
{
    Name = "Test Model",
    Season = Seasons.Fall,
};

var json  = JsonConvert.SerializeObject(model, Formatting.Indented);
Console.WriteLine(json);

Output looks like this.

{
  "Name": "Test Model",
  "Season": 2
}

It’s fairly easy to change this behavior though. Just add a JsonConverter attribute to the Season property.

class Model
{
    public string Name { get; set; } = String.Empty;

    [JsonConverter(typeof(StringEnumConverter))]
    public Seasons Season { get; set; }
}

Output is now nice and clean.

{
  "Name": "Test Model",
  "Season": "Fall"
}

You get the same behavior both when serializing and deserializing, and the latter is important when reading data back from a database.

How about if you have a collection of enums though? That’s simple too, just replace the JsonConverter attribute with the following JsonProperty attribute.

class Model
{
    public string Name { get; set; } = String.Empty;

    [JsonProperty(ItemConverterType = typeof(StringEnumConverter))]
    public List<Seasons> Seasons { get; set; } = new List<Seasons>();
}

Output is as expected.

{
  "Name": "Test Model",
  "Seasons": [
    "Fall",
    "Winter"
  ]
}

Implementing asynchronous endpoints in REST with callbacks

(click on the image to view it in full resolution)

This is an extension of the asynchronous endpoint pattern where the consumer can register a callback URL, which is invoked when the task is completed. This only works if the consumer has the option to expose an externally hittable REST endpoint. The advantage is that the consumer does not have to keep polling the service to see if the task has completed.

Once the task completes, the service invokes the callback URL. A common approach is to send just a task id to the callback, and the consumer then invokes a call to the service to fetch the full payload. A less common approach (if sensitive info is not included in the payload) is to send the entire result-set to the callback endpoint.

On the service side, the implementation is done by reacting to the task completed event by invoking the registered callback for the completed task. There are several ways to do this. A simple approach is for the task worker itself to invoke the callback. The disadvantage there is that it’s complex to now support retries if the callback endpoint is down. With AWS, a common approach is to update the data in a DynamoDB table and for a Lambda to be triggered off that, and the Lambda would then invoke the callback. A similar approach for Azure is to have a Function triggered off Table storage or Cosmos DB, which would invoke the callback. Both AWS and Azure have mechanisms to add retry to the function/lambda, to handle scenarios where the callback URI is down.

Implementing asynchronous endpoints in REST

When you have an API endpoint that takes too long to run, it can result in consumers facing time-outs or blocking scenarios. Now while consumers can use background workers to make these long running calls, it’s far more convenient if the service is implemented to support an asynchronous operation. A very common approach is to queue up the work on the service side and to return a task-id to the consumer. The consumer can then periodically poll the API with the task-id which returns a response indicating whether the task has been completed or not. Once the task completes, the consumer then makes a call to retrieve the completed task response. In some implementations, these two operations are combined, so it’s the same endpoint that both returns the pending status as well as the completed result.

On the service side, the implementation is typically done using a queue. The first call generates and persists a task-id which is returned, and the task is queued up in a queue (such as AWS SQS or Azure Queues). You can then trigger off a worker via AWS Lambdas or Azure Functions which performs the task and then marks the task as completed.

Adding a lambda target with EventBridge

Unlike Kafka or Kinesis, you don’t really have a concept of a persisted topic or stream to which multiple consumers can consume from. Instead you need to set rules/targets, so it’s more like a concious push than a pull. Arguably, this does add a little bit of coupling in the sense that the producing team is aware of their consumers – but in many scenarios, that may actually be preferred.

Continuing with the example from the previous post, you’d need to add a new rule with the following event pattern.

{
  "source": ["demo-event"]
}

If you are wondering where that came from, here’s how you constructed your event entries. So the pattern above is basically saying, any time an event is pushed to the bus with that source, fire the rule.

PutEventsRequestEntry requestEntry1 = new PutEventsRequestEntry
{
    Source = "demo-event",
    EventBusName = "dev-event-bus",
    DetailType = "mock.client",
    Detail = payload1
};

You can now add a target to the rule. To test it out, the simplest approach is to set a cloudwatch log group as the target. Now you can just use the AWS console to confirm that your events are being received by the bus and that the rule is firing. Once you do that, you can test out the lambda target by creating a very simple lambda in C#.

public class Function
{        
    public async Task FunctionHandler(
        CloudWatchEvent<object> input, 
        ILambdaContext context)
    {
        context.Logger.LogLine($"Lambda: {input.Source}");
    }
}

The main thing is to note the signature for the FunctionHandler method. The events are going to be of type CloudWatchEvent<object>. And the one line of code just logs it to cloudwatch.

Amazon EventBridge

Amazon EventBridge is not an apples-to-apples comparable to Kafka or Kinesis, but it does enable event driven software architectures and is a serverless offering, so if you are heavily invested in AWS, it is very easy to move forward with it. It’s trivial to create an event bus (Amazon calls them custom event buses, but it’s basically just a named bus), and to produce events to it. For each event bus, you can have one or more rules associated with it. The rules dictate what happens when a certain event is fired – and it’s very basic pattern matching against the event schema. Each rule can have one or more targets – you can use lambdas or cloudwatch log groups as targets among others. Recently they added support for API targets – so basically you can point to any REST endpoint.

Producing Events

If your dev stack is .NET Core, you can use the AWSSDK.EventBridge NuGet package (official from AWS). Here’s some sample code that shows how to produce events to a specific bus. You can produce multiple events with a single call, but the bus receives them as individual events. So the example code below produces two separate events – meaning the targets receive them separately.

class Program
{
    static async Task Main()
    {
        string payload1 = JsonConvert.SerializeObject
            (
                new
                {
                    Firstname = "Joe",
                    Lastname = "Spodzter",
                    Email = "joe.s.100@yahoo.com"
                }
            );

        string payload2 = JsonConvert.SerializeObject
            (
                new
                {
                    Firstname = "Mary",
                    Lastname = "Spodzter",
                    Email = "joe.s.100@yahoo.com"
                }
            );

        PutEventsRequestEntry requestEntry1 = new PutEventsRequestEntry
        {
            Source = "demo-event",
            EventBusName = "dev-event-bus",
            DetailType = "mock.client",
            Detail = payload1
        };

        PutEventsRequestEntry requestEntry2 = new PutEventsRequestEntry
        {
            Source = "demo-event",
            EventBusName = "dev-event-bus",
            DetailType = "mock.client",
            Detail = payload2
        };

        using var client = new AmazonEventBridgeClient(RegionEndpoint.USEast2);

        var response = await client.PutEventsAsync
            (
                new PutEventsRequest()
                {
                    Entries =
                    {
                        requestEntry1, requestEntry2
                    }
                }
            );

        Console.WriteLine(response.HttpStatusCode);
    }
}

.NET Core OData client using Microsoft.OData.Client

If you are using .NET Core, you’ll have quickly found that the .NET OData packages don’t work with it. Fortunately, the most recent (as of today) version of Microsoft.OData.Client (v7.5.0) works fine with .NET Core. Once you add the package to your project, you get a T4 file generated for you (.tt extension). Edit the Configuration class in the file and put in your connection info.

public static class Configuration
{
  public const string MetadataDocumentUri = "http://localhost:58200/odata/";

  public const bool UseDataServiceCollection = true;

  public const string NamespacePrefix = "ConsoleAppClient";

  public const string TargetLanguage = "CSharp";

  public const bool EnableNamingAlias = true;

  public const bool IgnoreUnexpectedElementsAndAttributes = true;
}

Now you can use the auto-generated Container class to make OData calls against any OData service. Here’s some example usage.

var container = new Container(new Uri("http://localhost:58200/odata/"));

var response = await container.Clients.ExecuteAsync();

var query = (DataServiceQuery)container
    .Clients
    .Expand("Address")
    .Where(it => it.Id > 10);

var resp2 = await query.ExecuteAsync();
foreach (Client item in resp2)
{
    Console.WriteLine(item.Address.City);
}

foreach (Client item in response)
{
    Console.WriteLine(item.Name);
}

var c = container.Clients.ByKey(100);
var s = await c.NameLength().GetValueAsync();
Console.WriteLine(s);

If you look at your IIS logs, you’ll see corresponding OData queries in there.

GET /odata/Clients - 58200 - ::1 Microsoft.OData.Client/7.5.0 - 200 0 0 31
GET /odata/Clients $filter=Id gt 10&$expand=Address 58200 - ::1 Microsoft.OData.Client/7.5.0 - 200 0 0 32
GET /odata/Clients(100)/Default.NameLength() - 58200 - ::1 Microsoft.OData.Client/7.5.0 - 200 0 0 130

ASP.NET Core, OData, and Swashbuckle – workaround for error

If you are trying to use Swashbuckle with an ASP.NET Core project that uses OData, you are going to get an error on the swagger endpoint. The error will be something like this.

InvalidOperationException: No media types found in ‘Microsoft.AspNet.OData.Formatter.ODataOutputFormatter.SupportedMediaTypes’. Add at least one media type to the list of supported media types.

Until one of these guys between them fix this, here’s the hackish fix. It won’t enable Swagger for the OData controllers, but it will stop Swagger from breaking for the other controllers.

services.AddMvc(op =>
{
    foreach (var formatter in op.OutputFormatters
        .OfType<ODataOutputFormatter>()
        .Where(it => !it.SupportedMediaTypes.Any()))
    {
        formatter.SupportedMediaTypes.Add(
            new MediaTypeHeaderValue("application/prs.mock-odata"));
    }
    foreach (var formatter in op.InputFormatters
        .OfType<ODataInputFormatter>()
        .Where(it => !it.SupportedMediaTypes.Any()))
    {
        formatter.SupportedMediaTypes.Add(
            new MediaTypeHeaderValue("application/prs.mock-odata"));
    }
});

OData with ASP.NET Core

If you are writing .NET Core REST services to expose data entities, I would recommend using OData now that ASP.NET Core support has been added. You need to add the Nuget package – Microsoft.AspNetCore.OData (at the time of writing, the latest stable version is 7.0.1). One caveat is that Swagger and Swashbuckle will not work with your OData controllers. The issue has been reported on the Swashbuckle git forums, and hopefully it will be resolved in the not too far future. I quickly tested out a demo app and it seemed to work as expected. Here are the model classes.

public class Client
{
    [Key]
    public int Id { get; set; }

    public string Name { get; set; }

    public Address Address { get; set; }
}

public class Address
{
    [Key]
    public int Id { get; set; }

    public string Street { get; set; }

    public string City { get; set; }

    public string State { get; set; }
}

Here’s what you add to ConfigureServices. Add it before the call to AddMvc.

public void ConfigureServices(IServiceCollection services)
{
    services.AddDbContext<ClientContext>(op =>
    {
        op.UseInMemoryDatabase("clientsDatabase");
    });

    services.AddOData();

And here are the changes made to the Configure method. Those are extension methods added by the OData package, and essentially turns on those OData features on your service. Note that I’ve added two entity sets as well as a function on one of the entities.

public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
    app.UseMvc(rb => 
    {
        rb.Expand().OrderBy().Select().Filter();
        rb.MapODataServiceRoute("odata", "odata", BuildEdmModel());
    });
}

public static IEdmModel BuildEdmModel()
{
    var builder = new ODataConventionModelBuilder();
    builder.EntitySet<Client>("Clients");
    builder.EntitySet<Address>("Addresses");
    builder.EntityType<Client>()
        .Function("NameLength")
        .Returns<string>();
    return builder.GetEdmModel();
}

And this is my mocked up data context class (based off EF Core).

public class ClientContext : DbContext
{
    public ClientContext(DbContextOptions<ClientContext> options) : base(options)
    {
    }

    public DbSet<Client> Clients { get; set; }

    public DbSet<Address> Addresses { get; set; }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        base.OnModelCreating(modelBuilder);
    }

    public void InitializeIfEmpty()
    {
        if (this.Clients.Count() == 0)
        {
            var address = new Address()
            {
                Id = 25,
                Street = "100 Main St.",
                City = "Columbus",
                State = "OH"
            };

            this.Addresses.Add(address);

            this.Clients.Add(new Client()
            {
                Id = 100,
                Name = "Sam Slick",
                Address = address
            });

            this.Clients.Add(new Client()
            {
                Id = 105,
                Name = "Janet Slick",
                Address = address
            });

            this.SaveChanges();
        }
    }
}

This is how we setup a Clients endpoint.

public class ClientsController : ODataController
{
    private ClientContext _dbContext;

    public ClientsController(ClientContext context)
    {
        _dbContext = context;

        _dbContext.InitializeIfEmpty();
    }

Adding methods to pull all clients, a specific client, and a specific client’s address.

[EnableQuery]
public IActionResult Get()
{
    return Ok(_dbContext.Clients);
}

[EnableQuery]
public IActionResult Get(int key)
{
    Client client = _dbContext.Clients
        .Include(c => c.Address)
        .FirstOrDefault(it => it.Id == key);

    return Ok(client);
}

[EnableQuery]
public IActionResult GetAddress([FromODataUri]int key)
{
    Client client = _dbContext.Clients
        .Include(c => c.Address)
        .FirstOrDefault(it => it.Id == key);

    return base.Ok(client.Address);
}

Here’s how you would add a Post action.

[EnableQuery]
public IActionResult Post([FromBody] Client client)
{
    _dbContext.Add(client);
    _dbContext.SaveChanges();
    return Created(client);
}

And here’s how you expose the OData action.

public IActionResult NameLength([FromODataUri]int key)
{
    var client = _dbContext.Clients.FirstOrDefault(it => it.Id == key);

    return Ok($"{client.Name} -> {client.Name.Length}");
}

That’s all for the demo. Now you can make standard OData queries against this API.

http://localhost:52913/odata/clients
http://localhost:52913/odata/clients?$expand=Address
http://localhost:52913/odata/clients/105/address
http://localhost:52913/odata/clients/105/NameLength