Appendix C: Swapping Out the Infrastructure: Do Everything with CSVs
This appendix is intended as a little illustration of the benefits of the Repository, Unit of Work, and Service Layer patterns. It’s intended to follow from [chapter_06_uow].
Just as we finish building out our Flask API and getting it ready for release, the business comes to us apologetically, saying they’re not ready to use our API and asking if we could build a thing that reads just batches and orders from a couple of CSVs and outputs a third CSV with allocations.
Ordinarily this is the kind of thing that might have a team cursing and spitting
and making notes for their memoirs. But not us! Oh no, we’ve ensured that
our infrastructure concerns are nicely decoupled from our domain model and
service layer. Switching to CSVs will be a simple matter of writing a couple
of new Repository
and UnitOfWork
classes, and then we’ll be able to reuse
all of our logic from the domain layer and the service layer.
Here’s an E2E test to show you how the CSVs flow in and out:
def test_cli_app_reads_csvs_with_batches_and_orders_and_outputs_allocations(make_csv):
sku1, sku2 = random_ref("s1"), random_ref("s2")
batch1, batch2, batch3 = random_ref("b1"), random_ref("b2"), random_ref("b3")
order_ref = random_ref("o")
make_csv("batches.csv", [
["ref", "sku", "qty", "eta"],
[batch1, sku1, 100, ""],
[batch2, sku2, 100, "2011-01-01"],
[batch3, sku2, 100, "2011-01-02"],
])
orders_csv = make_csv("orders.csv", [
["orderid", "sku", "qty"],
[order_ref, sku1, 3],
[order_ref, sku2, 12],
])
run_cli_script(orders_csv.parent)
expected_output_csv = orders_csv.parent / "allocations.csv"
with open(expected_output_csv) as f:
rows = list(csv.reader(f))
assert rows == [
["orderid", "sku", "qty", "batchref"],
[order_ref, sku1, "3", batch1],
[order_ref, sku2, "12", batch2],
]
Diving in and implementing without thinking about repositories and all that jazz, you might start with something like this:
#!/usr/bin/env python
import csv
import sys
from datetime import datetime
from pathlib import Path
from allocation.domain import model
def load_batches(batches_path):
batches = []
with batches_path.open() as inf:
reader = csv.DictReader(inf)
for row in reader:
if row["eta"]:
eta = datetime.strptime(row["eta"], "%Y-%m-%d").date()
else:
eta = None
batches.append(
model.Batch(
ref=row["ref"], sku=row["sku"], qty=int(row["qty"]), eta=eta
)
)
return batches
def main(folder):
batches_path = Path(folder) / "batches.csv"
orders_path = Path(folder) / "orders.csv"
allocations_path = Path(folder) / "allocations.csv"
batches = load_batches(batches_path)
with orders_path.open() as inf, allocations_path.open("w") as outf:
reader = csv.DictReader(inf)
writer = csv.writer(outf)
writer.writerow(["orderid", "sku", "batchref"])
for row in reader:
orderid, sku = row["orderid"], row["sku"]
qty = int(row["qty"])
line = model.OrderLine(orderid, sku, qty)
batchref = model.allocate(line, batches)
writer.writerow([line.orderid, line.sku, batchref])
if __name__ == "__main__":
main(sys.argv[1])
It’s not looking too bad! And we’re reusing our domain model objects and our domain service.
But it’s not going to work. Existing allocations need to also be part of our permanent CSV storage. We can write a second test to force us to improve things:
def test_cli_app_also_reads_existing_allocations_and_can_append_to_them(make_csv):
sku = random_ref("s")
batch1, batch2 = random_ref("b1"), random_ref("b2")
old_order, new_order = random_ref("o1"), random_ref("o2")
make_csv("batches.csv", [
["ref", "sku", "qty", "eta"],
[batch1, sku, 10, "2011-01-01"],
[batch2, sku, 10, "2011-01-02"],
])
make_csv("allocations.csv", [
["orderid", "sku", "qty", "batchref"],
[old_order, sku, 10, batch1],
])
orders_csv = make_csv("orders.csv", [
["orderid", "sku", "qty"],
[new_order, sku, 7],
])
run_cli_script(orders_csv.parent)
expected_output_csv = orders_csv.parent / "allocations.csv"
with open(expected_output_csv) as f:
rows = list(csv.reader(f))
assert rows == [
["orderid", "sku", "qty", "batchref"],
[old_order, sku, "10", batch1],
[new_order, sku, "7", batch2],
]
And we could keep hacking about and adding extra lines to that load_batches
function,
and some sort of way of tracking and saving new allocations—but we already have a model for doing that! It’s called our Repository and Unit of Work patterns.
All we need to do ("all we need to do") is reimplement those same abstractions, but with CSVs underlying them instead of a database. And as you’ll see, it really is relatively straightforward.
Implementing a Repository and Unit of Work for CSVs
Here’s what a CSV-based repository could look like. It abstracts away all the
logic for reading CSVs from disk, including the fact that it has to read two
different CSVs (one for batches and one for allocations), and it gives us just
the familiar .list()
API, which provides the illusion of an in-memory
collection of domain objects:
class CsvRepository(repository.AbstractRepository):
def __init__(self, folder):
self._batches_path = Path(folder) / "batches.csv"
self._allocations_path = Path(folder) / "allocations.csv"
self._batches = {} # type: Dict[str, model.Batch]
self._load()
def get(self, reference):
return self._batches.get(reference)
def add(self, batch):
self._batches[batch.reference] = batch
def _load(self):
with self._batches_path.open() as f:
reader = csv.DictReader(f)
for row in reader:
ref, sku = row["ref"], row["sku"]
qty = int(row["qty"])
if row["eta"]:
eta = datetime.strptime(row["eta"], "%Y-%m-%d").date()
else:
eta = None
self._batches[ref] = model.Batch(ref=ref, sku=sku, qty=qty, eta=eta)
if self._allocations_path.exists() is False:
return
with self._allocations_path.open() as f:
reader = csv.DictReader(f)
for row in reader:
batchref, orderid, sku = row["batchref"], row["orderid"], row["sku"]
qty = int(row["qty"])
line = model.OrderLine(orderid, sku, qty)
batch = self._batches[batchref]
batch._allocations.add(line)
def list(self):
return list(self._batches.values())
And here’s what a UoW for CSVs would look like:
class CsvUnitOfWork(unit_of_work.AbstractUnitOfWork):
def __init__(self, folder):
self.batches = CsvRepository(folder)
def commit(self):
with self.batches._allocations_path.open("w") as f:
writer = csv.writer(f)
writer.writerow(["orderid", "sku", "qty", "batchref"])
for batch in self.batches.list():
for line in batch._allocations:
writer.writerow(
[line.orderid, line.sku, line.qty, batch.reference]
)
def rollback(self):
pass
And once we have that, our CLI app for reading and writing batches and allocations to CSV is pared down to what it should be—a bit of code for reading order lines, and a bit of code that invokes our existing service layer:
def main(folder):
orders_path = Path(folder) / "orders.csv"
uow = csv_uow.CsvUnitOfWork(folder)
with orders_path.open() as f:
reader = csv.DictReader(f)
for row in reader:
orderid, sku = row["orderid"], row["sku"]
qty = int(row["qty"])
services.allocate(orderid, sku, qty, uow)
Ta-da! Now are y’all impressed or what?
Much love,
Bob and Harry