Skip to content

Latest commit

 

History

History
328 lines (261 loc) · 11.9 KB

File metadata and controls

328 lines (261 loc) · 11.9 KB

Appendix A: Swapping Out the Infrastructure: Do Everything with CSVs

更换基础设施:用CSV完成一切

This appendix is intended as a little illustration of the benefits of the Repository, Unit of Work, and Service Layer patterns. It’s intended to follow from [chapter_06_uow].

本附录旨在稍作说明 仓储、工作单元和服务层模式的优势。它是为了衔接[chapter_06_uow]的内容。

Just as we finish building out our Flask API and getting it ready for release, the business comes to us apologetically, saying they’re not ready to use our API and asking if we could build a thing that reads just batches and orders from a couple of CSVs and outputs a third CSV with allocations.

就在我们完成 Flask API 的构建并准备发布时,业务团队带着歉意找到我们,说他们还没准备好使用我们的 API, 并询问我们是否能构建一个能够从几个 CSV 中读取批次和订单数据,并输出第三个包含分配结果的 CSV 的工具。

Ordinarily this is the kind of thing that might have a team cursing and spitting and making notes for their memoirs. But not us! Oh no, we’ve ensured that our infrastructure concerns are nicely decoupled from our domain model and service layer. Switching to CSVs will be a simple matter of writing a couple of new Repository and UnitOfWork classes, and then we’ll be able to reuse all of our logic from the domain layer and the service layer.

通常情况下,这种需求可能会让团队咒骂连连、怒气冲天,并将其记入他们的回忆录。但我们不一样!哦不, 我们已经确保我们的基础设施逻辑与领域模型和服务层完美解耦。切换到 CSV 只需要编写几个新的 仓储工作单元 类就可以了, 之后我们就能够重用领域层和服务层的 所有 逻辑。

Here’s an E2E test to show you how the CSVs flow in and out:

下面是一个端到端(E2E)测试,向你展示 CSV 数据是如何流入和流出的:

Example 1. A first CSV test (tests/e2e/test_csv.py)(第一个 CSV 测试)
def test_cli_app_reads_csvs_with_batches_and_orders_and_outputs_allocations(make_csv):
    sku1, sku2 = random_ref("s1"), random_ref("s2")
    batch1, batch2, batch3 = random_ref("b1"), random_ref("b2"), random_ref("b3")
    order_ref = random_ref("o")
    make_csv("batches.csv", [
        ["ref", "sku", "qty", "eta"],
        [batch1, sku1, 100, ""],
        [batch2, sku2, 100, "2011-01-01"],
        [batch3, sku2, 100, "2011-01-02"],
    ])
    orders_csv = make_csv("orders.csv", [
        ["orderid", "sku", "qty"],
        [order_ref, sku1, 3],
        [order_ref, sku2, 12],
    ])

    run_cli_script(orders_csv.parent)

    expected_output_csv = orders_csv.parent / "allocations.csv"
    with open(expected_output_csv) as f:
        rows = list(csv.reader(f))
    assert rows == [
        ["orderid", "sku", "qty", "batchref"],
        [order_ref, sku1, "3", batch1],
        [order_ref, sku2, "12", batch2],
    ]

Diving in and implementing without thinking about repositories and all that jazz, you might start with something like this:

如果不考虑 仓储 等各种模式,直接开始实现,你可能会从类似这样的代码入手:

Example 2. A first cut of our CSV reader/writer (src/bin/allocate-from-csv)(CSV 读写器的初步实现)
#!/usr/bin/env python
import csv
import sys
from datetime import datetime
from pathlib import Path

from allocation.domain import model


def load_batches(batches_path):
    batches = []
    with batches_path.open() as inf:
        reader = csv.DictReader(inf)
        for row in reader:
            if row["eta"]:
                eta = datetime.strptime(row["eta"], "%Y-%m-%d").date()
            else:
                eta = None
            batches.append(
                model.Batch(
                    ref=row["ref"], sku=row["sku"], qty=int(row["qty"]), eta=eta
                )
            )
    return batches


def main(folder):
    batches_path = Path(folder) / "batches.csv"
    orders_path = Path(folder) / "orders.csv"
    allocations_path = Path(folder) / "allocations.csv"

    batches = load_batches(batches_path)

    with orders_path.open() as inf, allocations_path.open("w") as outf:
        reader = csv.DictReader(inf)
        writer = csv.writer(outf)
        writer.writerow(["orderid", "sku", "batchref"])
        for row in reader:
            orderid, sku = row["orderid"], row["sku"]
            qty = int(row["qty"])
            line = model.OrderLine(orderid, sku, qty)
            batchref = model.allocate(line, batches)
            writer.writerow([line.orderid, line.sku, batchref])


if __name__ == "__main__":
    main(sys.argv[1])

It’s not looking too bad! And we’re reusing our domain model objects and our domain service.

看起来还不错!而且我们复用了领域模型对象和领域服务。

But it’s not going to work. Existing allocations need to also be part of our permanent CSV storage. We can write a second test to force us to improve things:

但这行不通。现有的分配也需要成为我们永久 CSV 存储的一部分。我们可以编写第二个测试来促使我们改进:

Example 3. And another one, with existing allocations (tests/e2e/test_csv.py)(另一个现有分配的测试)
def test_cli_app_also_reads_existing_allocations_and_can_append_to_them(make_csv):
    sku = random_ref("s")
    batch1, batch2 = random_ref("b1"), random_ref("b2")
    old_order, new_order = random_ref("o1"), random_ref("o2")
    make_csv("batches.csv", [
        ["ref", "sku", "qty", "eta"],
        [batch1, sku, 10, "2011-01-01"],
        [batch2, sku, 10, "2011-01-02"],
    ])
    make_csv("allocations.csv", [
        ["orderid", "sku", "qty", "batchref"],
        [old_order, sku, 10, batch1],
    ])
    orders_csv = make_csv("orders.csv", [
        ["orderid", "sku", "qty"],
        [new_order, sku, 7],
    ])

    run_cli_script(orders_csv.parent)

    expected_output_csv = orders_csv.parent / "allocations.csv"
    with open(expected_output_csv) as f:
        rows = list(csv.reader(f))
    assert rows == [
        ["orderid", "sku", "qty", "batchref"],
        [old_order, sku, "10", batch1],
        [new_order, sku, "7", batch2],
    ]

And we could keep hacking about and adding extra lines to that load_batches function, and some sort of way of tracking and saving new allocations—but we already have a model for doing that! It’s called our Repository and Unit of Work patterns.

我们可以继续不断折腾,在 load_batches 函数中添加额外的代码,以及某种方式来跟踪和保存新的分配——但我们已经 有一个现成的模型来处理这些问题了!这就是我们的 仓储 和工作单元模式。

All we need to do ("all we need to do") is reimplement those same abstractions, but with CSVs underlying them instead of a database. And as you’ll see, it really is relatively straightforward.

我们所需要做的(“我们所需要做的”)只是重新实现这些相同的抽象,但用 CSV 作为其底层存储,而不是数据库。 正如你将看到的,这实际上相对来说相当简单。

Implementing a Repository and Unit of Work for CSVs

为 CSV 实现一个 仓储 和工作单元

Here’s what a CSV-based repository could look like. It abstracts away all the logic for reading CSVs from disk, including the fact that it has to read two different CSVs (one for batches and one for allocations), and it gives us just the familiar .list() API, which provides the illusion of an in-memory collection of domain objects:

以下是一个基于 CSV 的 仓储 的实现示例。它抽象了从磁盘读取 CSV 的所有逻辑, 包括必须读取 两个不同的 CSV (一个用于批次,一个用于分配)的事实,并为我们提供了熟悉的 .list() API, 这营造出一个内存中领域对象集合的假象:

Example 4. A repository that uses CSV as its storage mechanism (src/allocation/service_layer/csv_uow.py)(一个使用 CSV 作为存储机制的仓储)
class CsvRepository(repository.AbstractRepository):
    def __init__(self, folder):
        self._batches_path = Path(folder) / "batches.csv"
        self._allocations_path = Path(folder) / "allocations.csv"
        self._batches = {}  # type: Dict[str, model.Batch]
        self._load()

    def get(self, reference):
        return self._batches.get(reference)

    def add(self, batch):
        self._batches[batch.reference] = batch

    def _load(self):
        with self._batches_path.open() as f:
            reader = csv.DictReader(f)
            for row in reader:
                ref, sku = row["ref"], row["sku"]
                qty = int(row["qty"])
                if row["eta"]:
                    eta = datetime.strptime(row["eta"], "%Y-%m-%d").date()
                else:
                    eta = None
                self._batches[ref] = model.Batch(ref=ref, sku=sku, qty=qty, eta=eta)
        if self._allocations_path.exists() is False:
            return
        with self._allocations_path.open() as f:
            reader = csv.DictReader(f)
            for row in reader:
                batchref, orderid, sku = row["batchref"], row["orderid"], row["sku"]
                qty = int(row["qty"])
                line = model.OrderLine(orderid, sku, qty)
                batch = self._batches[batchref]
                batch._allocations.add(line)

    def list(self):
        return list(self._batches.values())

And here’s what a UoW for CSVs would look like:

以下是基于 CSV 的工作单元 (UoW) 的实现示例:

Example 5. A UoW for CSVs: commit = csv.writer (src/allocation/service_layer/csv_uow.py)(基于 CSV 的工作单元:commit = csv.writer)
class CsvUnitOfWork(unit_of_work.AbstractUnitOfWork):
    def __init__(self, folder):
        self.batches = CsvRepository(folder)

    def commit(self):
        with self.batches._allocations_path.open("w") as f:
            writer = csv.writer(f)
            writer.writerow(["orderid", "sku", "qty", "batchref"])
            for batch in self.batches.list():
                for line in batch._allocations:
                    writer.writerow(
                        [line.orderid, line.sku, line.qty, batch.reference]
                    )

    def rollback(self):
        pass

And once we have that, our CLI app for reading and writing batches and allocations to CSV is pared down to what it should be—a bit of code for reading order lines, and a bit of code that invokes our existing service layer:

一旦我们实现了这些,我们的 CLI 应用程序,用于读取和写入批次和分配到 CSV,就可以被简化为它应有的样子——一些用于读取订单项的代码, 以及一些调用我们 现有 服务层的代码:

Example 6. Allocation with CSVs in nine lines (src/bin/allocate-from-csv)(九行代码实现用 CSV 进行分配)
def main(folder):
    orders_path = Path(folder) / "orders.csv"
    uow = csv_uow.CsvUnitOfWork(folder)
    with orders_path.open() as f:
        reader = csv.DictReader(f)
        for row in reader:
            orderid, sku = row["orderid"], row["sku"]
            qty = int(row["qty"])
            services.allocate(orderid, sku, qty, uow)

Ta-da! Now are y’all impressed or what?

瞧! 现在你们是不是感到惊叹了?

Much love,

满怀敬意,

Bob and Harry

Bob 和 Harry