Skip to content

Latest commit

 

History

History
1063 lines (826 loc) · 51.5 KB

File metadata and controls

1063 lines (826 loc) · 51.5 KB

Events and the Message Bus

事件与消息总线

So far we’ve spent a lot of time and energy on a simple problem that we could easily have solved with Django. You might be asking if the increased testability and expressiveness are really worth all the effort.

到目前为止,我们花费了大量时间和精力解决一个可以轻松用Django解决的简单问题。你可能会问,增加的可测试性和表达能力是否 真的 值得这些努力。

In practice, though, we find that it’s not the obvious features that make a mess of our codebases: it’s the goop around the edge. It’s reporting, and permissions, and workflows that touch a zillion objects.

然而,在实践中,我们发现并不是那些显而易见的功能让代码库变得混乱,而是边缘部分的杂乱。比如,报告、权限管理,以及涉及无数对象的工作流程。

Our example will be a typical notification requirement: when we can’t allocate an order because we’re out of stock, we should alert the buying team. They’ll go and fix the problem by buying more stock, and all will be well.

我们的示例将是一个典型的通知需求:当我们因为缺货而无法分配订单时,我们应该提醒采购团队。他们会通过采购更多的库存来解决问题,一切就迎刃而解了。

For a first version, our product owner says we can just send the alert by email.

对于第一个版本,我们的产品负责人表示可以仅通过电子邮件发送提醒。

Let’s see how our architecture holds up when we need to plug in some of the mundane stuff that makes up so much of our systems.

让我们看看当我们需要引入一些构成系统大部分的琐碎内容时,我们的架构能否经受住考验。

We’ll start by doing the simplest, most expeditious thing, and talk about why it’s exactly this kind of decision that leads us to the Big Ball of Mud.

我们将从最简单、最迅速的方法入手,并探讨为什么正是这种决定会将我们引向“大泥球”的困境。

Then we’ll show how to use the Domain Events pattern to separate side effects from our use cases, and how to use a simple Message Bus pattern for triggering behavior based on those events. We’ll show a few options for creating those events and how to pass them to the message bus, and finally we’ll show how the Unit of Work pattern can be modified to connect the two together elegantly, as previewed in Events flowing through the system(流经系统的事件).

然后,我们将展示如何使用 领域事件 模式将副作用与用例分离开,并且如何使用一个简单的 消息总线 模式基于这些事件触发行为。 我们会展示一些创建这些事件的选项,以及如何将它们传递给消息总线,最后将展示如何修改工作单元模式以优雅地将两者连接在一起, 正如在Events flowing through the system(流经系统的事件)中预览的一样。

apwp 0801
Figure 1. Events flowing through the system(流经系统的事件)
Tip

The code for this chapter is in the chapter_08_events_and_message_bus branch on GitHub:

本章的代码位于 chapter_08_events_and_message_bus 分支,https://oreil.ly/M-JuL[在GitHub上]:

git clone https://github.com/cosmicpython/code.git
cd code
git checkout chapter_08_events_and_message_bus
# or to code along, checkout the previous chapter:
git checkout chapter_07_aggregate

Avoiding Making a Mess

避免制造混乱

So. Email alerts when we run out of stock. When we have new requirements like ones that really have nothing to do with the core domain, it’s all too easy to start dumping these things into our web controllers.

那么,当我们库存不足时发送电子邮件提醒。当我们遇到类似这样的新需求时,尤其是那些与核心领域 并没有真正关系 的需求,很容易就会开始把这些东西堆到我们的Web控制器里。

First, Let’s Avoid Making a Mess of Our Web Controllers

首先,让我们避免把我们的 Web 控制器搞得一团糟

As a one-off hack, this might be OK:

作为一个一次性的临时解决方案,这 也许 还可以接受:

Example 1. Just whack it in the endpoint—what could go wrong? (src/allocation/entrypoints/flask_app.py)(直接把它塞到端点里——能出什么问题呢?)
@app.route("/allocate", methods=["POST"])
def allocate_endpoint():
    line = model.OrderLine(
        request.json["orderid"],
        request.json["sku"],
        request.json["qty"],
    )
    try:
        uow = unit_of_work.SqlAlchemyUnitOfWork()
        batchref = services.allocate(line, uow)
    except (model.OutOfStock, services.InvalidSku) as e:
        send_mail(
            "out of stock",
            "stock_admin@made.com",
            f"{line.orderid} - {line.sku}"
        )
        return {"message": str(e)}, 400

    return {"batchref": batchref}, 201

…​but it’s easy to see how we can quickly end up in a mess by patching things up like this. Sending email isn’t the job of our HTTP layer, and we’d like to be able to unit test this new feature.

…​但不难看出,通过像这样打补丁,我们很快就可能陷入混乱。发送电子邮件并不是我们HTTP层的职责,而且我们希望能够对这个新功能进行单元测试。

And Let’s Not Make a Mess of Our Model Either

同时也不要让我们的模型陷入混乱

Assuming we don’t want to put this code into our web controllers, because we want them to be as thin as possible, we may look at putting it right at the source, in the model:

假设我们不想把这段代码放在我们的 Web 控制器中,因为我们希望它们尽可能简洁,那么我们可能会考虑直接把它放到源头——模型中:

Example 2. Email-sending code in our model isn’t lovely either (src/allocation/domain/model.py)(我们模型中的邮件发送代码同样也不够优雅)
    def allocate(self, line: OrderLine) -> str:
        try:
            batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
            #...
        except StopIteration:
            email.send_mail("stock@made.com", f"Out of stock for {line.sku}")
            raise OutOfStock(f"Out of stock for sku {line.sku}")

But that’s even worse! We don’t want our model to have any dependencies on infrastructure concerns like email.send_mail.

但这就更糟糕了!我们不希望我们的模型对诸如 email.send_mail 这样的基础设施问题有任何依赖。

This email-sending thing is unwelcome goop messing up the nice clean flow of our system. What we’d like is to keep our domain model focused on the rule "You can’t allocate more stuff than is actually available."

这个发送电子邮件的功能是不受欢迎的 杂乱,它破坏了我们系统的干净流畅。我们希望的是,让我们的领域模型专注于规则:“你不能分配超过实际可用的库存。”

Or the Service Layer!

或者用服务层!

The requirement "Try to allocate some stock, and send an email if it fails" is an example of workflow orchestration: it’s a set of steps that the system has to follow to achieve a goal.

需求“尝试分配一些库存,如果失败则发送一封邮件”是一个工作流编排的示例:它是一组系统必须遵循以 实现 目标的步骤。

We’ve written a service layer to manage orchestration for us, but even here the feature feels out of place:

我们已经编写了一个服务层来为我们管理编排,但即使在这里,这个功能也显得格格不入:

Example 3. And in the service layer, it’s out of place (src/allocation/service_layer/services.py)(而在服务层中,它显得格格不入)
def allocate(
    orderid: str, sku: str, qty: int,
    uow: unit_of_work.AbstractUnitOfWork,
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f"Invalid sku {line.sku}")
        try:
            batchref = product.allocate(line)
            uow.commit()
            return batchref
        except model.OutOfStock:
            email.send_mail("stock@made.com", f"Out of stock for {line.sku}")
            raise

Catching an exception and reraising it? It could be worse, but it’s definitely making us unhappy. Why is it so hard to find a suitable home for this code?

捕获一个异常然后重新抛出?这可能还不算最糟,但它确实让我们感到不快。为什么要为这段代码找到一个合适的归宿会这么困难呢?

Single Responsibility Principle

单一职责原则

Really, this is a violation of the single responsibility principle (SRP).[1] Our use case is allocation. Our endpoint, service function, and domain methods are all called allocate, not allocate_and_send_mail_if_out_of_stock.

实际上,这是违反了单一职责原则(SRP)。脚注:[ 这个原则是 SOLID中的 S。] 我们的用例是分配。我们的端点、服务函数和领域方法都被称为 allocate,而不是`allocate_and_send_mail_if_out_of_stock`。

Tip
Rule of thumb: if you can’t describe what your function does without using words like "then" or "and," you might be violating the SRP. 经验法则:如果你在描述函数的作用时无法避免使用“然后”或“和”这样的词语,那么你可能违反了单一职责原则(SRP)。

One formulation of the SRP is that each class should have only a single reason to change. When we switch from email to SMS, we shouldn’t have to update our allocate() function, because that’s clearly a separate responsibility.

单一职责原则(SRP)的一种表述是,每个类应该只有一个导致其变化的原因。当我们从电子邮件切换到短信时, 不应该需要更新我们的`allocate()`函数,因为这显然是一个独立的职责。

To solve the problem, we’re going to split the orchestration into separate steps so that the different concerns don’t get tangled up.[2] The domain model’s job is to know that we’re out of stock, but the responsibility of sending an alert belongs elsewhere. We should be able to turn this feature on or off, or to switch to SMS notifications instead, without needing to change the rules of our domain model.

为了解决这个问题,我们准备将编排分解为独立的步骤,这样不同的关注点就不会混杂在一起。脚注:[ 我们的技术审阅者Ed Jung喜欢说,当你从命令式流程控制切换到基于事件的流程控制时,你就将 编排 转换成了 协作。] 领域模型的职责是知道我们缺货了,但发送警报的责任应该属于其他地方。我们应该能够开启或关闭此功能,或者切换到短信通知,而不需要修改领域模型的规则。

We’d also like to keep the service layer free of implementation details. We want to apply the dependency inversion principle to notifications so that our service layer depends on an abstraction, in the same way as we avoid depending on the database by using a unit of work.

我们还希望让服务层不包含实现细节。我们希望将依赖反转原则应用于通知, 这样我们的服务层就依赖于一个抽象,就像我们通过使用工作单元(unit of work)来避免依赖数据库一样。

All Aboard the Message Bus!

全员登上消息总线!

The patterns we’re going to introduce here are Domain Events and the Message Bus. We can implement them in a few ways, so we’ll show a couple before settling on the one we like most.

我们将在这里介绍的模式是 领域事件(Domain Events)消息总线(Message Bus)。它们可以通过几种方式实现, 因此我们会先展示几个实现方式,然后再确定我们最喜欢的那一个。

The Model Records Events

模型记录事件

First, rather than being concerned about emails, our model will be in charge of recording events—facts about things that have happened. We’ll use a message bus to respond to events and invoke a new operation.

首先,我们的模型不再关注电子邮件,而是负责记录 事件(events) ——即已经发生的事实。我们将使用消息总线来响应这些事件并触发新的操作。

Events Are Simple Dataclasses

事件是简单的数据类

An event is a kind of value object. Events don’t have any behavior, because they’re pure data structures. We always name events in the language of the domain, and we think of them as part of our domain model.

事件 是一种 值对象。事件没有任何行为,因为它们是纯数据结构。我们总是用领域的语言为事件命名,并将它们视为领域模型的一部分。

We could store them in model.py, but we may as well keep them in their own file (this might be a good time to consider refactoring out a directory called domain so that we have domain/model.py and domain/events.py):

我们可以将它们存储在 model.py 中,但不妨将它们放在单独的文件中(此时,可以考虑重构出一个名为 domain 的目录, 这样我们就有了 domain/model.pydomain/events.py):

Example 4. Event classes (src/allocation/domain/events.py)(事件类)
from dataclasses import dataclass


class Event:  #(1)
    pass


@dataclass
class OutOfStock(Event):  #(2)
    sku: str
  1. Once we have a number of events, we’ll find it useful to have a parent class that can store common attributes. It’s also useful for type hints in our message bus, as you’ll see shortly. 当我们有多个事件时,会发现拥有一个父类来存储通用属性是很有用的。此外,这对于在消息总线中的类型提示也很有帮助,稍后你会看到这一点。

  2. dataclasses are great for domain events too. dataclasses 对于领域事件也非常出色。

The Model Raises Events

模型触发事件

When our domain model records a fact that happened, we say it raises an event.

当我们的领域模型记录一个发生的事实时,我们称其为 触发(raise) 一个事件。

Here’s what it will look like from the outside; if we ask Product to allocate but it can’t, it should raise an event:

从外部来看,它会是这样的:如果我们请求 Product 分配库存但失败了,它应该 触发 一个事件:

Example 5. Test our aggregate to raise events (tests/unit/test_product.py)(测试我们的聚合以触发事件)
def test_records_out_of_stock_event_if_cannot_allocate():
    batch = Batch("batch1", "SMALL-FORK", 10, eta=today)
    product = Product(sku="SMALL-FORK", batches=[batch])
    product.allocate(OrderLine("order1", "SMALL-FORK", 10))

    allocation = product.allocate(OrderLine("order2", "SMALL-FORK", 1))
    assert product.events[-1] == events.OutOfStock(sku="SMALL-FORK")  #(1)
    assert allocation is None
  1. Our aggregate will expose a new attribute called .events that will contain a list of facts about what has happened, in the form of Event objects. 我们的聚合将公开一个名为 .events 的新属性,该属性将以 Event 对象的形式包含一个关于已发生事实的列表。

Here’s what the model looks like on the inside:

以下是模型的内部实现:

Example 6. The model raises a domain event (src/allocation/domain/model.py)(模型触发了一个领域事件)
class Product:
    def __init__(self, sku: str, batches: List[Batch], version_number: int = 0):
        self.sku = sku
        self.batches = batches
        self.version_number = version_number
        self.events = []  # type: List[events.Event]  #(1)

    def allocate(self, line: OrderLine) -> str:
        try:
            #...
        except StopIteration:
            self.events.append(events.OutOfStock(line.sku))  #(2)
            # raise OutOfStock(f"Out of stock for sku {line.sku}")  #(3)
            return None
  1. Here’s our new .events attribute in use. 以下是我们使用新的 .events 属性的示例。

  2. Rather than invoking some email-sending code directly, we record those events at the place they occur, using only the language of the domain. 我们并没有直接调用发送电子邮件的代码,而是在事件发生的地方记录这些事件,仅使用领域的语言来描述。

  3. We’re also going to stop raising an exception for the out-of-stock case. The event will do the job the exception was doing. 我们还将停止在缺货情况下抛出异常。事件将完成之前由异常承担的任务。

Note
We’re actually addressing a code smell we had until now, which is that we were using exceptions for control flow. In general, if you’re implementing domain events, don’t raise exceptions to describe the same domain concept. As you’ll see later when we handle events in the Unit of Work pattern, it’s confusing to have to reason about events and exceptions together. 实际上,我们正在解决之前存在的一种代码异味,也就是我们 用异常来控制流程。通常来说,如果你正在实现领域事件, 不要通过抛出异常来描述相同的领域概念。正如你稍后会在处理工作单元模式中的事件时看到的那样,同时考虑事件和异常是令人困惑的。

The Message Bus Maps Events to Handlers

消息总线将事件映射到处理器

A message bus basically says, "When I see this event, I should invoke the following handler function." In other words, it’s a simple publish-subscribe system. Handlers are subscribed to receive events, which we publish to the bus. It sounds harder than it is, and we usually implement it with a dict:

消息总线的基本作用是,“当我看到这个事件时,我应该调用以下处理器函数。” 换句话说,它是一个简单的发布-订阅系统。处理器 订阅 接收事件, 而我们将事件发布到总线中。这听起来比实际要复杂,而我们通常用一个字典来实现它:

Example 7. Simple message bus (src/allocation/service_layer/messagebus.py)(简单消息总线)
def handle(event: events.Event):
    for handler in HANDLERS[type(event)]:
        handler(event)


def send_out_of_stock_notification(event: events.OutOfStock):
    email.send_mail(
        "stock@made.com",
        f"Out of stock for {event.sku}",
    )


HANDLERS = {
    events.OutOfStock: [send_out_of_stock_notification],
}  # type: Dict[Type[events.Event], List[Callable]]
Note
Note that the message bus as implemented doesn’t give us concurrency because only one handler will run at a time. Our objective isn’t to support parallel threads but to separate tasks conceptually, and to keep each UoW as small as possible. This helps us to understand the codebase because the "recipe" for how to run each use case is written in a single place. See the following sidebar. 请注意,目前实现的消息总线并不支持并发,因为一次只能运行一个处理器。我们的目标并不是支持并行线程,而是从概念上分离任务, 并尽可能让每个工作单元保持小巧。这有助于我们理解代码库,因为每个用例的“运行步骤”都集中记录在一个地方。请参阅以下侧边栏。
Is This Like Celery?(这像 Celery 吗?)

Celery is a popular tool in the Python world for deferring self-contained chunks of work to an asynchronous task queue. The message bus we’re presenting here is very different, so the short answer to the above question is no; our message bus has more in common with an Express.js app, a UI event loop, or an actor framework.

CeleryPython 领域中一个流行的工具,用于将独立的工作块推送到异步任务队列中。我们在这里介绍的消息总线与它非常不同, 所以对于上面问题的简短回答是“不”;我们的消息总线更类似于 Express.js 应用程序、UI 事件循环或 actor 框架。

If you do have a requirement for moving work off the main thread, you can still use our event-based metaphors, but we suggest you use external events for that. There’s more discussion in [chapter_11_external_events_tradeoffs], but essentially, if you implement a way of persisting events to a centralized store, you can subscribe other containers or other microservices to them. Then that same concept of using events to separate responsibilities across units of work within a single process/service can be extended across multiple processes—​which may be different containers within the same service, or totally different microservices.

如果你确实有将工作从主线程移出的需求,你仍然可以使用我们基于事件的比喻,不过我们建议你为此使用 外部事件(external event)。 关于这一点,在[chapter_11_external_events_tradeoffs]中有更多讨论,但关键在于,如果你实现了一种将事件持久化到集中存储的方法, 就可以让其他容器或其他微服务订阅这些事件。然后,那种在单个进程/服务内使用事件来分离工作单元间职责的概念, 就可以扩展到多个进程中——这些进程可以是同一服务中的不同容器,也可以是完全不同的微服务。

If you follow us in this approach, your API for distributing tasks is your event classes—or a JSON representation of them. This allows you a lot of flexibility in who you distribute tasks to; they need not necessarily be Python services. Celery’s API for distributing tasks is essentially "function name plus arguments," which is more restrictive, and Python-only.

如果你按照我们的这种方法,你用于分发任务的API就是你的事件 ——或者是它们的JSON表示形式。 这为你在分发任务的对象上提供了很大的灵活性;这些对象不一定非得是 Python 服务。而 Celery 用于分发任务的API本质上是“函数名称加参数”, 这种方法更具限制性,并且仅限于 Python

Option 1: The Service Layer Takes Events from the Model and Puts Them on the Message Bus

选项 1:服务层从模型中获取事件并将其放置到消息总线上

Our domain model raises events, and our message bus will call the right handlers whenever an event happens. Now all we need is to connect the two. We need something to catch events from the model and pass them to the message bus—​the publishing step.

我们的领域模型触发事件,而我们的消息总线将在事件发生时调用相应的处理器。现在我们只需要将两者连接起来。 我们需要某种机制来捕获模型中的事件并将其传递到消息总线——这是 发布 的步骤。

The simplest way to do this is by adding some code into our service layer:

最简单的方式是在我们的服务层中添加一些代码:

Example 8. The service layer with an explicit message bus (src/allocation/service_layer/services.py)(具有显式消息总线的服务层)
from . import messagebus
...

def allocate(
    orderid: str, sku: str, qty: int,
    uow: unit_of_work.AbstractUnitOfWork,
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f"Invalid sku {line.sku}")
        try:  #(1)
            batchref = product.allocate(line)
            uow.commit()
            return batchref
        finally:  #(1)
            messagebus.handle(product.events)  #(2)
  1. We keep the try/finally from our ugly earlier implementation (we haven’t gotten rid of all exceptions yet, just OutOfStock). 我们保留了之前丑陋实现中的 try/finally(我们还没有完全去掉 所有 异常,只是移除了 OutOfStock)。

  2. But now, instead of depending directly on an email infrastructure, the service layer is just in charge of passing events from the model up to the message bus. 但现在,服务层不再直接依赖于电子邮件基础设施,而只是负责将模型中的事件传递到消息总线上。

That already avoids some of the ugliness that we had in our naive implementation, and we have several systems that work like this one, in which the service layer explicitly collects events from aggregates and passes them to the message bus.

这已经避免了我们在原始实现中遇到的一些丑陋之处,而且我们有多个类似的系统,其中服务层明确地从聚合中收集事件并将它们传递到消息总线。

Option 2: The Service Layer Raises Its Own Events

选项 2:服务层触发自己的事件

Another variant on this that we’ve used is to have the service layer in charge of creating and raising events directly, rather than having them raised by the domain model:

我们使用过的另一种变体是让服务层直接负责创建和触发事件,而不是由领域模型触发事件:

Example 9. Service layer calls messagebus.handle directly (src/allocation/service_layer/services.py)(服务层直接调用 messagebus.handle)
def allocate(
    orderid: str, sku: str, qty: int,
    uow: unit_of_work.AbstractUnitOfWork,
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f"Invalid sku {line.sku}")
        batchref = product.allocate(line)
        uow.commit() #(1)

        if batchref is None:
            messagebus.handle(events.OutOfStock(line.sku))
        return batchref
  1. As before, we commit even if we fail to allocate because the code is simpler this way and it’s easier to reason about: we always commit unless something goes wrong. Committing when we haven’t changed anything is safe and keeps the code uncluttered. 和以前一样,即使分配失败我们也会提交,因为这样代码更简单且更易于理解:除非出问题,否则我们总是提交。 当没有更改任何内容时提交是安全的,同时也能保持代码简洁。

Again, we have applications in production that implement the pattern in this way. What works for you will depend on the particular trade-offs you face, but we’d like to show you what we think is the most elegant solution, in which we put the unit of work in charge of collecting and raising events.

同样,我们也有一些生产中的应用程序是以这种方式实现该模式的。对你来说,哪种方法有效取决于你所面临的具体权衡, 但我们想向你展示我们认为最优雅的解决方案,其中我们将工作单元负责收集和触发事件。

Option 3: The UoW Publishes Events to the Message Bus

选项 3:工作单元将事件发布到消息总线

The UoW already has a try/finally, and it knows about all the aggregates currently in play because it provides access to the repository. So it’s a good place to spot events and pass them to the message bus:

工作单元已经有了一个 try/finally,并且它了解当前正在使用的所有聚合,因为它提供了对仓储的访问。 因此,它是捕捉事件并将它们传递到消息总线的一个好位置:

Example 10. The UoW meets the message bus (src/allocation/service_layer/unit_of_work.py)(工作单元与消息总线相遇)
class AbstractUnitOfWork(abc.ABC):
    ...

    def commit(self):
        self._commit()  #(1)
        self.publish_events()  #(2)

    def publish_events(self):  #(2)
        for product in self.products.seen:  #(3)
            while product.events:
                event = product.events.pop(0)
                messagebus.handle(event)

    @abc.abstractmethod
    def _commit(self):
        raise NotImplementedError

...

class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
    ...

    def _commit(self):  #(1)
        self.session.commit()
  1. We’ll change our commit method to require a private ._commit() method from subclasses. 我们将修改提交方法,使其需要子类实现一个私有的 ._commit() 方法。

  2. After committing, we run through all the objects that our repository has seen and pass their events to the message bus. 在提交之后,我们会遍历仓储中所有被访问过的对象,并将它们的事件传递到消息总线。

  3. That relies on the repository keeping track of aggregates that have been loaded using a new attribute, .seen, as you’ll see in the next listing. 这依赖于仓储通过一个新属性 .seen 来跟踪已加载的聚合对象,正如你将在接下来的代码示例中看到的。

Note
Are you wondering what happens if one of the handlers fails? We’ll discuss error handling in detail in [chapter_10_commands]. 你是否在想,如果某个处理器失败会发生什么?我们将在 [chapter_10_commands] 中详细讨论错误处理。
Example 11. Repository tracks aggregates that pass through it (src/allocation/adapters/repository.py)(仓储跟踪通过它的聚合)
class AbstractRepository(abc.ABC):
    def __init__(self):
        self.seen = set()  # type: Set[model.Product]  #(1)

    def add(self, product: model.Product):  #(2)
        self._add(product)
        self.seen.add(product)

    def get(self, sku) -> model.Product:  #(3)
        product = self._get(sku)
        if product:
            self.seen.add(product)
        return product

    @abc.abstractmethod
    def _add(self, product: model.Product):  #(2)
        raise NotImplementedError

    @abc.abstractmethod  #(3)
    def _get(self, sku) -> model.Product:
        raise NotImplementedError


class SqlAlchemyRepository(AbstractRepository):
    def __init__(self, session):
        super().__init__()
        self.session = session

    def _add(self, product):  #(2)
        self.session.add(product)

    def _get(self, sku):  #(3)
        return self.session.query(model.Product).filter_by(sku=sku).first()
  1. For the UoW to be able to publish new events, it needs to be able to ask the repository for which Product objects have been used during this session. We use a set called .seen to store them. That means our implementations need to call super().__init__(). 为了让工作单元能够发布新的事件,它需要能够从仓储中获取出在哪个 Product 对象在本次会话中被使用过。 我们使用一个名为 .seenset 来存储这些对象。这意味着我们的实现需要调用 super().__init__()

  2. The parent add() method adds things to .seen, and now requires subclasses to implement ._add(). 父类的 add() 方法会将对象添加到 .seen 中,并且现在要求子类实现 ._add() 方法。

  3. Similarly, .get() delegates to a ._get() function, to be implemented by subclasses, in order to capture objects seen. 类似地,.get() 委托给一个 ._get() 函数,由子类实现,以便捕获被访问过的对象。

Note
The use of ._underscorey() methods and subclassing is definitely not the only way you could implement these patterns. Have a go at the "Exercise for the Reader" in this chapter and experiment with some alternatives. 使用 ._underscorey() 方法和子类化绝对不是实现这些模式的唯一方法。 试着完成本章中的 "读者练习",并尝试一些替代方案。

After the UoW and repository collaborate in this way to automatically keep track of live objects and process their events, the service layer can be totally free of event-handling concerns:

在工作单元和仓储以这种方式协作,自动跟踪活动对象并处理它们的事件之后,服务层就可以完全摆脱事件处理的事务:

Example 12. Service layer is clean again (src/allocation/service_layer/services.py)(服务层再次变得简洁)
def allocate(
    orderid: str, sku: str, qty: int,
    uow: unit_of_work.AbstractUnitOfWork,
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f"Invalid sku {line.sku}")
        batchref = product.allocate(line)
        uow.commit()
        return batchref

We do also have to remember to change the fakes in the service layer and make them call super() in the right places, and to implement underscorey methods, but the changes are minimal:

我们还需要记住修改服务层中的伪对象,确保在正确的位置调用 super(),并实现那些以下划线开头的方法,不过这些更改是很小的:

Example 13. Service-layer fakes need tweaking (tests/unit/test_services.py)(服务层的伪对象需要调整)
class FakeRepository(repository.AbstractRepository):
    def __init__(self, products):
        super().__init__()
        self._products = set(products)

    def _add(self, product):
        self._products.add(product)

    def _get(self, sku):
        return next((p for p in self._products if p.sku == sku), None)

...

class FakeUnitOfWork(unit_of_work.AbstractUnitOfWork):
    ...

    def _commit(self):
        self.committed = True
Exercise for the Reader(读者练习)

Are you finding all those ._add() and ._commit() methods "super-gross," in the words of our beloved tech reviewer Hynek? Does it "make you want to beat Harry around the head with a plushie snake"? Hey, our code listings are only meant to be examples, not the perfect solution! Why not go see if you can do better?

你是否觉得所有那些 ._add()._commit() 方法“超级恶心”?正如我们尊敬的技术审阅者 Hynek 所说的那样, 它是否“让你想拿一条软绵绵的玩具蛇去揍 Harry 一顿”?嘿,我们的代码示例仅仅是为了演示,而不是完美的解决方案!为什么不去看看你是否能做得更好呢?

One composition over inheritance way to go would be to implement a wrapper class:

一种采用 组合优于继承 的方式是实现一个包装类:

Example 14. A wrapper adds functionality and then delegates (src/adapters/repository.py)(一个包装器添加了功能后再进行委托)
class TrackingRepository:
    seen: Set[model.Product]

    def __init__(self, repo: AbstractRepository):
        self.seen = set()  # type: Set[model.Product]
        self._repo = repo

    def add(self, product: model.Product):  #(1)
        self._repo.add(product)  #(1)
        self.seen.add(product)

    def get(self, sku) -> model.Product:
        product = self._repo.get(sku)
        if product:
            self.seen.add(product)
        return product
  1. By wrapping the repository, we can call the actual .add() and .get() methods, avoiding weird underscorey methods. 通过包装仓储,我们可以调用实际的 .add().get() 方法,从而避免使用那些奇怪的以下划线开头的方法。

See if you can apply a similar pattern to our UoW class in order to get rid of those Java-y _commit() methods too. You can find the code on GitHub.

试试看能否将类似的模式应用到我们的工作单元类中,从而去掉那些有点像 Java 风格的 _commit() 方法。 你可以在 GitHub 找到对应的代码。

Switching all the ABCs to typing.Protocol is a good way to force yourself to avoid using inheritance. Let us know if you come up with something nice!

将所有的抽象基类(ABCs)切换为 typing.Protocol 是一个很好的方法,可以迫使你避免使用继承。如果你想出了一些不错的方案,请告诉我们!

You might be starting to worry that maintaining these fakes is going to be a maintenance burden. There’s no doubt that it is work, but in our experience it’s not a lot of work. Once your project is up and running, the interface for your repository and UoW abstractions really don’t change much. And if you’re using ABCs, they’ll help remind you when things get out of sync.

你可能开始担心维护这些伪对象(fakes)会成为一个维护负担。毫无疑问,这确实需要一些工作,但根据我们的经验,这并不会耗费太多精力。 一旦你的项目启动并运行起来,仓储和工作单元抽象的接口实际上变化不大。而且,如果你使用抽象基类(ABCs),它们会在接口不同步时提醒你。

Wrap-Up

总结

Domain events give us a way to handle workflows in our system. We often find, listening to our domain experts, that they express requirements in a causal or temporal way—for example, "When we try to allocate stock but there’s none available, then we should send an email to the buying team."

领域事件为我们提供了一种方式来处理系统中的工作流。我们经常发现,倾听领域专家时,他们会以因果或时间顺序的方式表达需求——例如, “当我们尝试分配库存但没有库存可用时,我们应该向采购团队发送一封电子邮件。”

The magic words "When X, then Y" often tell us about an event that we can make concrete in our system. Treating events as first-class things in our model helps us make our code more testable and observable, and it helps isolate concerns.

“当 X,然后 Y”这样的魔法词语通常暗示我们可以在系统中实现的一个事件。在模型中将事件视为一等公民有助于我们使代码更加可测试和可观察, 同时也有助于隔离关注点。

And Domain events: the trade-offs(领域事件:权衡分析) shows the trade-offs as we see them.

Domain events: the trade-offs(领域事件:权衡分析) 展示了我们所看到的权衡。

Table 1. Domain events: the trade-offs(领域事件:权衡分析)
Pros(优点) Cons(缺点)
  • A message bus gives us a nice way to separate responsibilities when we have to take multiple actions in response to a request. 当我们需要对一个请求采取多个动作时,消息总线为我们提供了一种很好的方式来分离职责。

  • Event handlers are nicely decoupled from the "core" application logic, making it easy to change their implementation later. 事件处理器与“核心”应用逻辑很好地解耦,这使得以后更改其实现变得容易。

  • Domain events are a great way to model the real world, and we can use them as part of our business language when modeling with stakeholders. 领域事件是建模现实世界的一种绝佳方式,在与利益相关者进行建模时,我们可以将它们作为业务语言的一部分使用。

  • The message bus is an additional thing to wrap your head around; the implementation in which the unit of work raises events for us is neat but also magic. It’s not obvious when we call commit that we’re also going to go and send email to people. 消息总线是一个需要额外理解的组件;让工作单元为我们触发事件的实现方式虽然很 巧妙,但也有些“魔法”感。当我们调用 commit 时, 并不直观地让人联想到我们还会去给人们发送电子邮件。

  • What’s more, that hidden event-handling code executes synchronously, meaning your service-layer function doesn’t finish until all the handlers for any events are finished. That could cause unexpected performance problems in your web endpoints (adding asynchronous processing is possible but makes things even more confusing). 此外,这些隐藏的事件处理代码是 同步 执行的,这意味着你的服务层函数在任何事件的所有处理器完成之前都不会结束。 这可能会在你的 Web 端点中引发意想不到的性能问题(添加异步处理是可能的,但会让事情变得更加 复杂)。

  • More generally, event-driven workflows can be confusing because after things are split across a chain of multiple handlers, there is no single place in the system where you can understand how a request will be fulfilled. 更普遍地说,事件驱动的工作流可能会令人困惑,因为当处理被分散到多个处理器链中后,系统中就没有一个单一的位置可以让你清楚地了解一个请求是如何被完成的。

  • You also open yourself up to the possibility of circular dependencies between your event handlers, and infinite loops. 你还可能会面临事件处理器之间出现循环依赖和无限循环的风险。

Events are useful for more than just sending email, though. In [chapter_07_aggregate] we spent a lot of time convincing you that you should define aggregates, or boundaries where we guarantee consistency. People often ask, "What should I do if I need to change multiple aggregates as part of a request?" Now we have the tools we need to answer that question.

不过,事件的用途远不限于发送电子邮件。在 [chapter_07_aggregate] 中,我们花费了大量时间来说服你应该定义聚合, 或者说定义那些我们可以保证一致性的边界。人们经常会问,“如果我需要在一个请求中修改多个聚合,我该怎么办?” 现在我们有了回答这个问题所需的工具。

If we have two things that can be transactionally isolated (e.g., an order and a product), then we can make them eventually consistent by using events. When an order is canceled, we should find the products that were allocated to it and remove the allocations.

如果我们有两个可以在事务上隔离的对象(例如,一个订单和一个 产品),那么我们可以通过使用事件使它们 最终一致。 当一个订单被取消时,我们应该找到分配给它的产品并移除这些 分配

Domain Events and the Message Bus Recap(领域事件和消息总线回顾)

Events can help with the single responsibility principle(事件可以帮助贯彻单一职责原则)

Code gets tangled up when we mix multiple concerns in one place. Events can help us to keep things tidy by separating primary use cases from secondary ones. We also use events for communicating between aggregates so that we don’t need to run long-running transactions that lock against multiple tables. 当我们将多个关注点混杂在一起时,代码就会变得复杂。事件可以通过将主要用例与次要用例分离来帮助我们保持代码简洁。 我们还使用事件在聚合之间进行通信,这样就不需要运行会锁定多个表的长时间事务。

A message bus routes messages to handlers(消息总线将消息路由到处理器)

You can think of a message bus as a dict that maps from events to their consumers. It doesn’t "know" anything about the meaning of events; it’s just a piece of dumb infrastructure for getting messages around the system. 你可以将消息总线看作一个从事件映射到其消费者的字典。它并不“了解”事件的含义;它只是一个将消息在系统中分发的简单基础设施。

Option 1: Service layer raises events and passes them to message bus(选项 1:服务层触发事件并将其传递到消息总线)

The simplest way to start using events in your system is to raise them from handlers by calling bus.handle(some_new_event) after you commit your unit of work. 在系统中开始使用事件的最简单方法是从处理器中触发它们,即在提交工作单元后调用 bus.handle(some_new_event)

Option 2: Domain model raises events, service layer passes them to message bus(选项 2:领域模型触发事件,服务层将它们传递到消息总线)

The logic about when to raise an event really should live with the model, so we can improve our system’s design and testability by raising events from the domain model. It’s easy for our handlers to collect events off the model objects after commit and pass them to the bus. 关于何时触发事件的逻辑确实应该存在于模型中,因此通过从领域模型触发事件,我们可以改进系统的设计和测试性。在 commit 之后, 处理器可以很容易地从模型对象中收集事件并将它们传递到消息总线。

Option 3: UoW collects events from aggregates and passes them to message bus(选项 3:工作单元从聚合中收集事件并将它们传递到消息总线)

Adding bus.handle(aggregate.events) to every handler is annoying, so we can tidy up by making our unit of work responsible for raising events that were raised by loaded objects. This is the most complex design and might rely on ORM magic, but it’s clean and easy to use once it’s set up. 在每个处理器中添加 bus.handle(aggregate.events) 会很繁琐,因此我们可以通过让工作单元负责触发由已加载对象触发的事件来简化流程。 虽然这是最复杂的设计,并且可能依赖于 ORM 的一些“魔法”,但一旦设置完成,它就会非常简洁且易于使用。

In [chapter_09_all_messagebus], we’ll look at this idea in more detail as we build a more complex workflow with our new message bus.

[chapter_09_all_messagebus] 中,我们将更详细地探讨这个想法,并使用我们的新消息总线构建一个更复杂的工作流。


1. This principle is the S in SOLID.
2. Our tech reviewer Ed Jung likes to say that when you change from imperative flow control to event-based flow control, you’re changing orchestration into choreography.