7: Aggregates and Consistency Boundaries
In this chapter, we’d like to revisit our domain model to talk about invariants and constraints, and see how our domain objects can maintain their own internal consistency, both conceptually and in persistent storage. We’ll discuss the concept of a consistency boundary and show how making it explicit can help us to build high-performance software without compromising maintainability.
Adding the Product aggregate shows a preview of where we’re headed: we’ll introduce
a new model object called Product
to wrap multiple batches, and we’ll make
the old allocate()
domain service available as a method on Product
instead.
Why? Let’s find out.
Tip
|
The code for this chapter is in the chapter_07_aggregate branch on GitHub: git clone https://github.com/cosmicpython/code.git cd code git checkout chapter_07_aggregate # or to code along, checkout the previous chapter: git checkout chapter_06_uow |
Why Not Just Run Everything in a Spreadsheet?
What’s the point of a domain model, anyway? What’s the fundamental problem we’re trying to address?
Couldn’t we just run everything in a spreadsheet? Many of our users would be delighted by that. Business users like spreadsheets because they’re simple, familiar, and yet enormously powerful.
In fact, an enormous number of business processes do operate by manually sending spreadsheets back and forth over email. This "CSV over SMTP" architecture has low initial complexity but tends not to scale very well because it’s difficult to apply logic and maintain consistency.
Who is allowed to view this particular field? Who’s allowed to update it? What happens when we try to order –350 chairs, or 10,000,000 tables? Can an employee have a negative salary?
These are the constraints of a system. Much of the domain logic we write exists to enforce these constraints in order to maintain the invariants of the system. The invariants are the things that have to be true whenever we finish an operation.
Invariants, Constraints, and Consistency
The two words are somewhat interchangeable, but a constraint is a rule that restricts the possible states our model can get into, while an invariant is defined a little more precisely as a condition that is always true.
If we were writing a hotel-booking system, we might have the constraint that double bookings are not allowed. This supports the invariant that a room cannot have more than one booking for the same night.
Of course, sometimes we might need to temporarily bend the rules. Perhaps we need to shuffle the rooms around because of a VIP booking. While we’re moving bookings around in memory, we might be double booked, but our domain model should ensure that, when we’re finished, we end up in a final consistent state, where the invariants are met. If we can’t find a way to accommodate all our guests, we should raise an error and refuse to complete the operation.
Let’s look at a couple of concrete examples from our business requirements; we’ll start with this one:
An order line can be allocated to only one batch at a time.
This is a business rule that imposes an invariant. The invariant is that an
order line is allocated to either zero or one batch, but never more than one.
We need to make sure that our code never accidentally calls Batch.allocate()
on two different batches for the same line, and currently, there’s nothing
there to explicitly stop us from doing that.
Invariants, Concurrency, and Locks
Let’s look at another one of our business rules:
We can’t allocate to a batch if the available quantity is less than the quantity of the order line.
Here the constraint is that we can’t allocate more stock than is available to a batch, so we never oversell stock by allocating two customers to the same physical cushion, for example. Every time we update the state of the system, our code needs to ensure that we don’t break the invariant, which is that the available quantity must be greater than or equal to zero.
In a single-threaded, single-user application, it’s relatively easy for us to maintain this invariant. We can just allocate stock one line at a time, and raise an error if there’s no stock available.
This gets much harder when we introduce the idea of concurrency. Suddenly we might be allocating stock for multiple order lines simultaneously. We might even be allocating order lines at the same time as processing changes to the batches themselves.
We usually solve this problem by applying locks to our database tables. This prevents two operations from happening simultaneously on the same row or same table.
As we start to think about scaling up our app, we realize that our model
of allocating lines against all available batches may not scale. If we process
tens of thousands of orders per hour, and hundreds of thousands of
order lines, we can’t hold a lock over the whole batches
table for
every single one—we’ll get deadlocks or performance problems at the very least.
What Is an Aggregate?
OK, so if we can’t lock the whole database every time we want to allocate an
order line, what should we do instead? We want to protect the invariants of our
system but allow for the greatest degree of concurrency. Maintaining our
invariants inevitably means preventing concurrent writes; if multiple users can
allocate DEADLY-SPOON
at the same time, we run the risk of overallocating.
On the other hand, there’s no reason we can’t allocate DEADLY-SPOON
at the
same time as FLIMSY-DESK
. It’s safe to allocate two products at the
same time because there’s no invariant that covers them both. We don’t need them
to be consistent with each other.
The Aggregate pattern is a design pattern from the DDD community that helps us to resolve this tension. An aggregate is just a domain object that contains other domain objects and lets us treat the whole collection as a single unit.
The only way to modify the objects inside the aggregate is to load the whole thing, and to call methods on the aggregate itself.
As a model gets more complex and grows more entity and value objects, referencing each other in a tangled graph, it can be hard to keep track of who can modify what. Especially when we have collections in the model as we do (our batches are a collection), it’s a good idea to nominate some entities to be the single entrypoint for modifying their related objects. It makes the system conceptually simpler and easy to reason about if you nominate some objects to be in charge of consistency for the others.
For example, if we’re building a shopping site, the Cart might make a good aggregate: it’s a collection of items that we can treat as a single unit. Importantly, we want to load the entire basket as a single blob from our data store. We don’t want two requests to modify the basket at the same time, or we run the risk of weird concurrency errors. Instead, we want each change to the basket to run in a single database transaction.
We don’t want to modify multiple baskets in a transaction, because there’s no use case for changing the baskets of several customers at the same time. Each basket is a single consistency boundary responsible for maintaining its own invariants.
An AGGREGATE is a cluster of associated objects that we treat as a unit for the purpose of data changes.
Domain-Driven Design blue book
Per Evans, our aggregate has a root entity (the Cart) that encapsulates access to items. Each item has its own identity, but other parts of the system will always refer to the Cart only as an indivisible whole.
Tip
|
Just as we sometimes use _leading_underscores to mark methods or functions
as "private," you can think of aggregates as being the "public" classes of our
model, and the rest of the entities and value objects as "private."
|
Choosing an Aggregate
What aggregate should we use for our system? The choice is somewhat arbitrary, but it’s important. The aggregate will be the boundary where we make sure every operation ends in a consistent state. This helps us to reason about our software and prevent weird race issues. We want to draw a boundary around a small number of objects—the smaller, the better, for performance—that have to be consistent with one another, and we need to give this boundary a good name.
The object we’re manipulating under the covers is Batch
. What do we call a
collection of batches? How should we divide all the batches in the system into
discrete islands of consistency?
We could use Shipment
as our boundary. Each shipment contains several
batches, and they all travel to our warehouse at the same time. Or perhaps we
could use Warehouse
as our boundary: each warehouse contains many batches,
and counting all the stock at the same time could make sense.
Neither of these concepts really satisfies us, though. We should be able to
allocate DEADLY-SPOONs
or FLIMSY-DESKs
in one go, even if they’re not in the
same warehouse or the same shipment. These concepts have the wrong granularity.
When we allocate an order line, we’re interested only in batches
that have the same SKU as the order line. Some sort of concept like
GlobalSkuStock
could work: a collection of all the batches for a given SKU.
It’s an unwieldy name, though, so after some bikeshedding via SkuStock
, Stock
,
ProductStock
, and so on, we decided to simply call it Product
—after all,
that was the first concept we came across in our exploration of the
domain language back in [chapter_01_domain_model].
So the plan is this: when we want to allocate an order line, instead of
Before: allocate against all batches using the domain service, where we look up all the Batch
objects in
the world and pass them to the allocate()
domain service…
[plantuml, apwp_0702, config=plantuml.cfg] @startuml scale 4 hide empty members package "Service Layer" as services { class "allocate()" as allocate { } hide allocate circle hide allocate members } package "Domain Model" as domain_model { class Batch { } class "allocate()" as allocate_domain_service { } hide allocate_domain_service circle hide allocate_domain_service members } package Repositories { class BatchRepository { list() } } allocate -> BatchRepository: list all batches allocate --> allocate_domain_service: allocate(orderline, batches) @enduml
…we’ll move to the world of After: ask Product to allocate against its batches, in which there is a new
Product
object for the particular SKU of our order line, and it will be in charge
of all the batches for that SKU, and we can call a .allocate()
method on that
instead.
[plantuml, apwp_0703, config=plantuml.cfg] @startuml scale 4 hide empty members package "Service Layer" as services { class "allocate()" as allocate { } } hide allocate circle hide allocate members package "Domain Model" as domain_model { class Product { allocate() } class Batch { } } package Repositories { class ProductRepository { get() } } allocate -> ProductRepository: get me the product for this SKU allocate --> Product: product.allocate(orderline) Product o- Batch: has @enduml
Let’s see how that looks in code form:
class Product:
def __init__(self, sku: str, batches: List[Batch]):
self.sku = sku #(1)
self.batches = batches #(2)
def allocate(self, line: OrderLine) -> str: #(3)
try:
batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
batch.allocate(line)
return batch.reference
except StopIteration:
raise OutOfStock(f"Out of stock for sku {line.sku}")
-
Product
's main identifier is thesku
. -
Our
Product
class holds a reference to a collection ofbatches
for that SKU. -
Finally, we can move the
allocate()
domain service to be a method on theProduct
aggregate.
Note
|
This Product might not look like what you’d expect a Product
model to look like. No price, no description, no dimensions.
Our allocation service doesn’t care about any of those things.
This is the power of bounded contexts; the concept
of a product in one app can be very different from another.
See the following sidebar for more discussion.
|
One Aggregate = One Repository
Once you define certain entities to be aggregates, we need to apply the rule that they are the only entities that are publicly accessible to the outside world. In other words, the only repositories we are allowed should be repositories that return aggregates.
Note
|
The rule that repositories should only return aggregates is the main place where we enforce the convention that aggregates are the only way into our domain model. Be wary of breaking it! |
In our case, we’ll switch from BatchRepository
to ProductRepository
:
class AbstractUnitOfWork(abc.ABC):
products: repository.AbstractProductRepository
...
class AbstractProductRepository(abc.ABC):
@abc.abstractmethod
def add(self, product):
...
@abc.abstractmethod
def get(self, sku) -> model.Product:
...
The ORM layer will need some tweaks so that the right batches automatically get
loaded and associated with Product
objects. The nice thing is, the Repository
pattern means we don’t have to worry about that yet. We can just use
our FakeRepository
and then feed through the new model into our service
layer to see how it looks with Product
as its main entrypoint:
def add_batch(
ref: str, sku: str, qty: int, eta: Optional[date],
uow: unit_of_work.AbstractUnitOfWork,
):
with uow:
product = uow.products.get(sku=sku)
if product is None:
product = model.Product(sku, batches=[])
uow.products.add(product)
product.batches.append(model.Batch(ref, sku, qty, eta))
uow.commit()
def allocate(
orderid: str, sku: str, qty: int,
uow: unit_of_work.AbstractUnitOfWork,
) -> str:
line = OrderLine(orderid, sku, qty)
with uow:
product = uow.products.get(sku=line.sku)
if product is None:
raise InvalidSku(f"Invalid sku {line.sku}")
batchref = product.allocate(line)
uow.commit()
return batchref
What About Performance?
We’ve mentioned a few times that we’re modeling with aggregates because we want to have high-performance software, but here we are loading all the batches when we only need one. You might expect that to be inefficient, but there are a few reasons why we’re comfortable here.
First, we’re purposefully modeling our data so that we can make a single query to the database to read, and a single update to persist our changes. This tends to perform much better than systems that issue lots of ad hoc queries. In systems that don’t model this way, we often find that transactions slowly get longer and more complex as the software evolves.
Second, our data structures are minimal and comprise a few strings and integers per row. We can easily load tens or even hundreds of batches in a few milliseconds.
Third, we expect to have only 20 or so batches of each product at a time. Once a batch is used up, we can discount it from our calculations. This means that the amount of data we’re fetching shouldn’t get out of control over time.
If we did expect to have thousands of active batches for a product, we’d have a couple of options. For one, we could use lazy-loading for the batches in a product. From the perspective of our code, nothing would change, but in the background, SQLAlchemy would page through data for us. This would lead to more requests, each fetching a smaller number of rows. Because we need to find only a single batch with enough capacity for our order, this might work pretty well.
If all else failed, we’d just look for a different aggregate. Maybe we could split up batches by region or by warehouse. Maybe we could redesign our data access strategy around the shipment concept. The Aggregate pattern is designed to help manage some technical constraints around consistency and performance. There isn’t one correct aggregate, and we should feel comfortable changing our minds if we find our boundaries are causing performance woes.
Optimistic Concurrency with Version Numbers
We have our new aggregate, so we’ve solved the conceptual problem of choosing an object to be in charge of consistency boundaries. Let’s now spend a little time talking about how to enforce data integrity at the database level.
Note
|
This section has a lot of implementation details; for example, some of it is Postgres-specific. But more generally, we’re showing one way of managing concurrency issues, but it is just one approach. Real requirements in this area vary a lot from project to project. You shouldn’t expect to be able to copy and paste code from here into production. |
We don’t want to hold a lock over the entire batches
table, but how will we
implement holding a lock over just the rows for a particular SKU?
One answer is to have a single attribute on the Product
model that acts as a marker for
the whole state change being complete and to use it as the single resource
that concurrent workers can fight over. If two transactions read the
state of the world for batches
at the same time, and both want to update
the allocations
tables, we force both to also try to update the
version_number
in the products
table, in such a way that only one of them
can win and the world stays consistent.
Sequence diagram: two transactions attempting a concurrent update on Product
illustrates two concurrent
transactions doing their read operations at the same time, so they see
a Product
with, for example, version=3
. They both call Product.allocate()
in order to modify a state. But we set up our database integrity
rules such that only one of them is allowed to commit
the new Product
with version=4
, and the other update is rejected.
Tip
|
Version numbers are just one way to implement optimistic locking. You
could achieve the same thing by setting the Postgres transaction isolation
level to SERIALIZABLE , but that often comes at a severe performance cost.
Version numbers also make implicit concepts explicit.
|
Product
[plantuml, apwp_0704, config=plantuml.cfg] @startuml scale 4 entity Model collections Transaction1 collections Transaction2 database Database Transaction1 -> Database: get product Database -> Transaction1: Product(version=3) Transaction2 -> Database: get product Database -> Transaction2: Product(version=3) Transaction1 -> Model: Product.allocate() Model -> Transaction1: Product(version=4) Transaction2 -> Model: Product.allocate() Model -> Transaction2: Product(version=4) Transaction1 -> Database: commit Product(version=4) Database -[#green]> Transaction1: OK Transaction2 -> Database: commit Product(version=4) Database -[#red]>x Transaction2: Error! version is already 4 @enduml
Implementation Options for Version Numbers
There are essentially three options for implementing version numbers:
-
version_number
lives in the domain; we add it to theProduct
constructor, andProduct.allocate()
is responsible for incrementing it. -
The service layer could do it! The version number isn’t strictly a domain concern, so instead our service layer could assume that the current version number is attached to
Product
by the repository, and the service layer will increment it before it does thecommit()
. -
Since it’s arguably an infrastructure concern, the UoW and repository could do it by magic. The repository has access to version numbers for any products it retrieves, and when the UoW does a commit, it can increment the version number for any products it knows about, assuming them to have changed.
Option 3 isn’t ideal, because there’s no real way of doing it without having to assume that all products have changed, so we’ll be incrementing version numbers when we don’t have to.[1]
Option 2 involves mixing the responsibility for mutating state between the service layer and the domain layer, so it’s a little messy as well.
So in the end, even though version numbers don’t have to be a domain concern, you might decide the cleanest trade-off is to put them in the domain:
class Product:
def __init__(self, sku: str, batches: List[Batch], version_number: int = 0): #(1)
self.sku = sku
self.batches = batches
self.version_number = version_number #(1)
def allocate(self, line: OrderLine) -> str:
try:
batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
batch.allocate(line)
self.version_number += 1 #(1)
return batch.reference
except StopIteration:
raise OutOfStock(f"Out of stock for sku {line.sku}")
-
There it is!
Tip
|
If you’re scratching your head at this version number business, it might
help to remember that the number isn’t important. What’s important is
that the Product database row is modified whenever we make a change to the
Product aggregate. The version number is a simple, human-comprehensible way
to model a thing that changes on every write, but it could equally be a
random UUID every time.
|
Testing for Our Data Integrity Rules
Now to make sure we can get the behavior we want: if we have two
concurrent attempts to do allocation against the same Product
, one of them
should fail, because they can’t both update the version number.
First, let’s simulate a "slow" transaction using a function that does allocation and then does an explicit sleep:[2]
def try_to_allocate(orderid, sku, exceptions):
line = model.OrderLine(orderid, sku, 10)
try:
with unit_of_work.SqlAlchemyUnitOfWork() as uow:
product = uow.products.get(sku=sku)
product.allocate(line)
time.sleep(0.2)
uow.commit()
except Exception as e:
print(traceback.format_exc())
exceptions.append(e)
Then we have our test invoke this slow allocation twice, concurrently, using threads:
def test_concurrent_updates_to_version_are_not_allowed(postgres_session_factory):
sku, batch = random_sku(), random_batchref()
session = postgres_session_factory()
insert_batch(session, batch, sku, 100, eta=None, product_version=1)
session.commit()
order1, order2 = random_orderid(1), random_orderid(2)
exceptions = [] # type: List[Exception]
try_to_allocate_order1 = lambda: try_to_allocate(order1, sku, exceptions)
try_to_allocate_order2 = lambda: try_to_allocate(order2, sku, exceptions)
thread1 = threading.Thread(target=try_to_allocate_order1) #(1)
thread2 = threading.Thread(target=try_to_allocate_order2) #(1)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
[[version]] = session.execute(
"SELECT version_number FROM products WHERE sku=:sku",
dict(sku=sku),
)
assert version == 2 #(2)
[exception] = exceptions
assert "could not serialize access due to concurrent update" in str(exception) #(3)
orders = session.execute(
"SELECT orderid FROM allocations"
" JOIN batches ON allocations.batch_id = batches.id"
" JOIN order_lines ON allocations.orderline_id = order_lines.id"
" WHERE order_lines.sku=:sku",
dict(sku=sku),
)
assert orders.rowcount == 1 #(4)
with unit_of_work.SqlAlchemyUnitOfWork() as uow:
uow.session.execute("select 1")
-
We start two threads that will reliably produce the concurrency behavior we want:
read1, read2, write1, write2
. -
We assert that the version number has been incremented only once.
-
We can also check on the specific exception if we like.
-
And we double-check that only one allocation has gotten through.
Enforcing Concurrency Rules by Using Database Transaction Isolation Levels
To get the test to pass as it is, we can set the transaction isolation level on our session:
DEFAULT_SESSION_FACTORY = sessionmaker(
bind=create_engine(
config.get_postgres_uri(),
isolation_level="REPEATABLE READ",
)
)
Tip
|
Transaction isolation levels are tricky stuff, so it’s worth spending time understanding the Postgres documentation.[3] |
Pessimistic Concurrency Control Example: SELECT FOR UPDATE
There are multiple ways to approach this, but we’ll show one. SELECT FOR UPDATE
produces different behavior; two concurrent transactions will not be allowed to
do a read on the same rows at the same time:
SELECT FOR UPDATE
is a way of picking a row or rows to use as a lock
(although those rows don’t have to be the ones you update). If two
transactions both try to SELECT FOR UPDATE
a row at the same time, one will
win, and the other will wait until the lock is released. So this is an example
of pessimistic concurrency control.
Here’s how you can use the SQLAlchemy DSL to specify FOR UPDATE
at
query time:
def get(self, sku):
return (
self.session.query(model.Product)
.filter_by(sku=sku)
.with_for_update()
.first()
)
This will have the effect of changing the concurrency pattern from
read1, read2, write1, write2(fail)
to
read1, write1, read2, write2(succeed)
Some people refer to this as the "read-modify-write" failure mode. Read "PostgreSQL Anti-Patterns: Read-Modify-Write Cycles" for a good overview.
We don’t really have time to discuss all the trade-offs between REPEATABLE READ
and SELECT FOR UPDATE
, or optimistic versus pessimistic locking in general.
But if you have a test like the one we’ve shown, you can specify the behavior
you want and see how it changes. You can also use the test as a basis for
performing some performance experiments.
Wrap-Up
Specific choices around concurrency control vary a lot based on business circumstances and storage technology choices, but we’d like to bring this chapter back to the conceptual idea of an aggregate: we explicitly model an object as being the main entrypoint to some subset of our model, and as being in charge of enforcing the invariants and business rules that apply across all of those objects.
Choosing the right aggregate is key, and it’s a decision you may revisit over time. You can read more about it in multiple DDD books. We also recommend these three online papers on effective aggregate design by Vaughn Vernon (the "red book" author).
Aggregates: the trade-offs has some thoughts on the trade-offs of implementing the Aggregate pattern.
Pros | Cons |
---|---|
|
|
Part I Recap
Do you remember A component diagram for our app at the end of Part I, the diagram we showed at the beginning of [part1] to preview where we were heading?
So that’s where we are at the end of Part I. What have we achieved? We’ve seen how to build a domain model that’s exercised by a set of high-level unit tests. Our tests are living documentation: they describe the behavior of our system—the rules upon which we agreed with our business stakeholders—in nice readable code. When our business requirements change, we have confidence that our tests will help us to prove the new functionality, and when new developers join the project, they can read our tests to understand how things work.
We’ve decoupled the infrastructural parts of our system, like the database and API handlers, so that we can plug them into the outside of our application. This helps us to keep our codebase well organized and stops us from building a big ball of mud.
By applying the dependency inversion principle, and by using ports-and-adapters-inspired patterns like Repository and Unit of Work, we’ve made it possible to do TDD in both high gear and low gear and to maintain a healthy test pyramid. We can test our system edge to edge, and the need for integration and end-to-end tests is kept to a minimum.
Lastly, we’ve talked about the idea of consistency boundaries. We don’t want to lock our entire system whenever we make a change, so we have to choose which parts are consistent with one another.
For a small system, this is everything you need to go and play with the ideas of domain-driven design. You now have the tools to build database-agnostic domain models that represent the shared language of your business experts. Hurrah!
Note
|
At the risk of laboring the point—we’ve been at pains to point out that each pattern comes at a cost. Each layer of indirection has a price in terms of complexity and duplication in our code and will be confusing to programmers who’ve never seen these patterns before. If your app is essentially a simple CRUD wrapper around a database and isn’t likely to be anything more than that in the foreseeable future, you don’t need these patterns. Go ahead and use Django, and save yourself a lot of bother. |
In Part II, we’ll zoom out and talk about a bigger topic: if aggregates are our boundary, and we can update only one at a time, how do we model processes that cross consistency boundaries?