简介
如何避免写流水账代码
@RestController
@RequestMapping("/")
public class CheckoutController {
@Resource
private ItemService itemService;
@Resource
private InventoryService inventoryService;
@Resource
private OrderRepository orderRepository;
@PostMapping("checkout")
public Result<OrderDO> checkout(Long itemId, Integer quantity) {
// 1) Session管理
Long userId = SessionUtils.getLoggedInUserId();
if (userId <= 0) {
return Result.fail("Not Logged In");
}
// 2)参数校验
if (itemId <= 0 || quantity <= 0 || quantity >= 1000) {
return Result.fail("Invalid Args");
}
// 3)外部数据补全
ItemDO item = itemService.getItem(itemId);
if (item == null) {
return Result.fail("Item Not Found");
}
// 4)调用外部服务
boolean withholdSuccess = inventoryService.withhold(itemId, quantity);
if (!withholdSuccess) {
return Result.fail("Inventory not enough");
}
// 5)领域计算
Long cost = item.getPriceInCents() * quantity;
// 6)领域对象操作
OrderDO order = new OrderDO();
order.setItemId(itemId);
order.setBuyerId(userId);
order.setSellerId(item.getSellerId());
order.setCount(quantity);
order.setTotalCost(cost);
// 7)数据持久化
orderRepository.createOrder(order);
// 8)返回
return Result.success(order);
}
}
这段代码里混杂了业务计算、校验逻辑、基础设施、和通信协议等,在未来无论哪一部分的逻辑变更都会直接影响到这段代码,长期当后人不断的在上面叠加新的逻辑时,会造成代码复杂度增加、逻辑分支越来越多,最终造成bug或者没人敢重构的历史包袱。
接口层
接口层的接口的数量和业务间的隔离:在传统REST和RPC的接口规范中,通常一个领域的接口,无论是REST的Resource资源的GET/POST/DELETE,还是RPC的方法,是追求相对固定的,统一的,而且会追求统一个领域的方法放在一个领域的服务或Controller中。刻意去追求接口的统一通常会导致方法中的参数膨胀,或者导致方法的膨胀。假设有一个宠物卡和一个亲子卡的业务公用一个开卡服务,但是宠物需要传入宠物类型,亲子的需要传入宝宝年龄。可以发现宠物卡和亲子卡虽然看起来像是类似的需求,但并非是“同样需求”的,可以预见到在未来的某个时刻,这两个业务的需求和需要提供的接口会越走越远,所以需要将这两个接口类拆分开。
一个Interface层的类应该是“小而美”的,应该是面向“一个单一的业务”或“一类同样需求的业务”,需要尽量避免用同一个类承接不同类型业务的需求。也许会有人问,如果按照这种做法,会不会产生大量的接口类,导致代码逻辑重复?答案是不会,因为在DDD分层架构里,接口类的核心作用仅仅是协议层,每类业务的协议可以是不同的,而真实的业务逻辑会沉淀到应用层。也就是说Interface和Application的关系是多对多的。因为业务需求是快速变化的,所以接口层也要跟着快速变化,通过独立的接口层可以避免业务间相互影响,但我们希望相对稳定的是Application层的逻辑。
应用层——为什么要用CQE对象?
ApplicationService负责了业务流程的编排,是将原有业务流水账代码剥离了校验逻辑、领域计算、持久化等逻辑之后剩余的流程,是“胶水层”代码。
Command/Query/Event
通常在很多代码里,能看到接口上有多个参数
Result<OrderDO> checkout(Long itemId, Integer quantity);
如果需要在接口上增加参数,考虑到向前兼容,则需要增加一个方法:
Result<OrderDO> checkout(Long itemId, Integer quantity);
Result<OrderDO> checkout(Long itemId, Integer quantity, Integer channel);
传统的接口写法有几个问题:
- 接口膨胀:一个查询条件一个方法
- 难以扩展:每新增一个参数都有可能需要调用方升级
- 难以测试:接口一多,职责随之变得繁杂,业务场景各异,测试用例难以维护
- 这种类型的参数罗列,本身没有任何业务上的”语意“,只是一堆参数而已,无法明确的表达出来意图。
CQE和DTO有什么区别呢?
- CQE:CQE对象是ApplicationService的输入,是有明确的”意图“的,所以这个对象必须保证其”正确性“。
- 当入参改为了CQE之后,我们可以利用java标准JSR303或JSR380的Bean Validation来前置这个校验逻辑。
- 因为CQE是有“意图”和“语意”的,我们需要尽量避免CQE对象的复用,哪怕所有的参数都一样,只要他们的语意不同,尽量还是要用不同的对象。
- DTO:DTO对象只是数据容器,只是为了和外部交互,所以本身不包含任何逻辑,只是贫血对象。 因为CQE是”意图“,所以CQE对象在理论上可以有”无限“个,每个代表不同的意图;但是DTO作为模型数据容器,和模型一一对应,所以是有限的。
DDD设计实例
Domain-driven Design Example译文领域驱动设计示例 未读
源码实例:Domain-Driven Design领域驱动设计落地节选自《领域驱动设计第7章》假设我们正在为一家货运公司开发新的软件,最初的需求包括三项基本功能:
- 事先预约货物
- 跟踪客户货物的主要处理流程
- 当货物到达其处理过程中的某个位置时,自动向客户寄送发票
对应的博客 领域驱动设计DDD和CQRS落地 未读
cqrs
cqrs 全称command and Query Responsibility Segregation(隔离),也就是命令(增删改)与查询(查)职责分离。如果把Command 操作变成Event Sourcing,那么只需要记录不可修改的事件,并通过回溯事件得到数据的状态。
材料1
《软件架构设计》读写分离模型的典型特征:分别为读和写设计不同的数据结构。
材料2
阿里高级技术专家方法论:如何写复杂业务代码?一般来说实践DDD有两个过程:
- 套概念阶段:了解了一些DDD的概念,然后在代码中“使用”Aggregation Root,Bounded Context,Repository等等这些概念。更进一步,也会使用一定的分层策略。然而这种做法一般对复杂度的治理并没有多大作用。
- 融会贯通阶段:术语已经不再重要,理解DDD的本质是统一语言、边界划分和面向对象分析的方法。
你写的代码是别人的噩梦吗?从领域建模的必要性谈起软件的世界里没有银弹,是用事务脚本还是领域模型没有对错之分,关键看是否合适。就像自营和平台哪个模式更好?答案是都很好,所以亚马逊可以有三方入住,阿里也可以有自建仓嘛。实际上,CQRS就是对事务脚本和领域模型两种模式的综合,因为对于Query和报表的场景,使用领域模型往往会把简单的事情弄复杂,此时完全可以用奥卡姆剃刀把领域层剃掉,直接访问Infrastructure。
材料3
详解 CQRS 架构模式在拥有大量数据和复杂实体模型的大型应用程序中,一些实现细节随着时间推移变成了“核心”部分。有时候,这些东西是工程师在很明确的情况下完成的,但更多的是以一种隐式甚至是无意的方式发生。于是,新需求可能与现有的实现不一致,以至于根本无法很好地容纳它们。
对于一部分场景,CQRS 是一种非常有用的架构模式。在基于 CQRS 的系统中,命令 (写操作) 和查询 (读操作) 所使用的数据模型是有区别的。命令模型用于有效地执行写 / 更新操作,而查询模型用于有效地支持各种读模式。通过领域事件或其他各种机制将命令模型中的变更传播到查询模型中,让两个模型之间的数据保持同步。只用于读取的数据模式看起来就像是一个缓存。事实上,查询模型可以使用 Redis 这样的缓存技术来实现。但是,CQRS 不只是为了分离数据的写入和读取,它的根本目的是为了实现数据的多重表示,每一种表示都能够满足某些用户的需求。CQRS 可能会有多种查询模式,每个模式可能使用不同的物理实现。有些可能使用数据库,有些可能使用 Redis,等等。
什么时候不该使用 CQRS?在系统中使用 CQRS 会带来显著的认知负担和复杂性。开发人员必须面对至少两个数据模型和多种技术选择,所有这些都是不可忽略的负担。第二个问题是如何保持命令模型和查询模型的数据同步。如果选择了异步方式,那么整个系统就要承担最终一致性所带来的后果。这可能非常麻烦,特别是当用户希望系统能够立即反映出他们的操作时,即使是单个一致性要求也会危及整个系统的设计。
aggregate-framework
aggregate-framework 主要由公司大牛实现,所以通过分析它的源码来深入理解ddd和cqrs
领域驱动思想的 跟响应式编程 也有异曲同工之妙,剖析响应式编程的本质。将顺序式的命令编程 改为 事件/数据流。
示例使用
假设订单PricedOrder 有place和confirm 两个行为,其后续处理都是通过PricedOrder.apply(xxEvent)来进行的,dao类负责crud(这个没变)。service 只是负责 操作dao类以及(直接或间接)PricedOrder.apply(xxEvent) 方法。
- 订单的创建,触发了支付过程
- 支付过程
model 只是apply,剩下的交给eventHandler来做。这里有一个事件驱动的思想,即不是寻常的A==> B ;A ==> C;A ==>D,而是A 干完自己的活儿,发个事件就返回了。剩下的调用BCD,由eventbus 驱动执行。那么handler之间如何编组,如何揉和成一个统一的关系。那问题来了
- 后续处理是同步还是异步进行
- 一致性如何保证
- 业务处理如何接入
- 整个工作流程什么样
源码分析
domainObject Repository
在AggregateRoot接口里定义了一个方法apply,用来注册Domain Event。当调用Repository方法save来保存AggregateRoot时,将注册的Domain Event发布。
有三条线:
- 领域驱动
- 事务执行
- 异步执行
领域驱动
代码执行的核心是:
public interface EventBus {
public void subscribe(EventListener eventListener);
void publishInTransaction(EventMessage[] messages, LocalTransactionExecutor localTransactionExecutor);
}
从一个例子看执行流程:
public class OrderService {
@Transactional
public PricedOrder placeOrder(int productId, int price) {
PricedOrder pricedOrder = OrderFactory.buildOrder(productId, price);
return orderRepository.save(pricedOrder);
}
}
@Service
public class OrderHandler {
@EventHandler
public void handleOrderCreatedEvent(OrderPlacedEvent event) {
Payment payment = PaymentFactory.buildPayment(event.getPricedOrder().getId(),
String.format("p000%s", event.getPricedOrder().getId()), event.getPricedOrder().getTotalAmount());
paymentRepository.save(payment);
}
}
- 系统启动的时候,EventHandler 标记的类和方法会被封装为EventListener,加入到EventBus中
PricedOrder pricedOrder = OrderFactory.buildOrder(productId, price);
中执行了PricedOrder构造函数,执行了pricedOrder.apply(new OrderPlacedEvent(this));
本质上将pricedOrder 转换成了 EventMessage-
orderRepository.save(pricedOrder)
触发执行eventBus.publishInTransaction(EventMessage[] messages, LocalTransactionExecutor localTransactionExecutor);
然后各个eventhandler 就被触发执行了。当然,在spring Transaction场景下,eventBus.publishInTransaction也可以由事务调用触发。- 向threadlocal 挂一个clientSession
- 向clientSession 加入 AggregateEntry,AggregateEntry 聚合了pricedOrder 和一个全局的eventBus。同时挂一个实际的save逻辑
-
clientSession commit,主体就是执行 eventBus.publishInTransaction
- 因为eventBus是全局的,里面的EventListener太多, 所以要找到和EventMessage匹配的EventListener
- EventListener 根据 EventMessage 执行逻辑
- 执行save domain 逻辑本身
- clientSession flush、postHandle、closeClientSession 等完成后续流程
仅仅靠注解,如何知道OrderHandler.handleOrderCreatedEvent
处理的就是pricedOrder.apply(new OrderPlacedEvent(this));
?根据参数类型。这也是整个eventbus的意义所在:发布者发布事件,监听者监听事件。框架将整个过程整合在一起并处理。
“发布者发布事件,监听者监听事件”的优势在于:举一个例子, 笔者实现配置中心系统时,新增一个配置需要进行很多关联操作:
- 打掉系统中的缓存
- 新增ConfigChange数据
- 新增ChangeLog数据
并且随着业务需求的调整,新增一个配置要做的工作越来越多,并且在不断变化。新增配置如此,更改配置就更不用多说。后来,笔者提供了一个ConfigAddListener、ConfigChangeListener等。关心这些事件的人实现这个listener即可。
事务
@Transactional
public PricedOrder placeOrder(int productId, int price) {
PricedOrder pricedOrder = OrderFactory.buildOrder(productId, price);
return orderRepository.save(pricedOrder);
}
此处只是借用了spring-tx 的 Transactional 注解的调用接口做外壳,在其回调方法中塞的是compensable-transaction的事务处理逻辑。
异步执行
当eventhanlder 标记为异步任务时,将任务加入到Disruptor中。Disruptor是一个高性能队列,可以当做一个高性能的ArrayBlockingQueue 使用。然后有一个独立的Executor从Disruptor 取出任务并执行。
异步执行,带来几个问题
- 异步任务执行失败了怎么办?
- 异步离不开 队列,队列里的消息丢了(比如停电)怎么办
- 异步任务执行失败后重试,那重试好几次怎么办?
- 重试的时候,当时触发事件的 事件源本身就没有保存成功,或者状态改变怎么办?比如用户下了个订单,然后一个异步任务去发货,结果发货任务开始执行的时候用户把订单取消了。
所以在aggregate-framework 中,下单操作是以下逻辑
- 生成一个下单event
- event 保存到redis/db
- 订单数据库操作
- 发布 下单event
- 异步执行发货等任务
- 所有任务执行完了,删除redis/db 中的event
针对提出的几个问题
- 执行失败,便按照规定次数重试,若仍失败,等待手动处理(event一直在)
- 使用redis/db 等将event 持久化,并且先于订单存储操作,类似于mysql 写数据到磁盘之前先写日志
- eventhandler 必须支持幂等性
- eventhandler 执行之前,执行一个check检查函数,判断订单状态
eventhandler 不管由哪个实例产生, 可以由任意一个实例执行
缓存
聚合根 一次拉的数据太多,所以框架本身支持缓存, 也因此带来 缓存并发修改问题
小结
aggregate-framework 哪些部分是ddd,哪些是额外增强的
- eventhandler ,本身来自于事件驱动
- 事务支持,是为了一致性
- 异步执行,是为了效率
事件驱动大多数时候会用到异步,使用异步的话,一致性、异常处理、重启消息丢失等都来了。同步下不是问题的问题 都出现了,同步下是问题的,更复杂了。
和公司内大佬关于支付系统为何用ddd,以及ddd与微服务的关系(项目拆分角度)问题的讨论。
-
在做支付系统的时候,DDD提供了一个切分系统的思路,防止系统变成一个大煤球. 这个切分思想的好处是让工作经验比较浅的人做也不会出太大问题.通过事件发布机制把业务各环节串联起来,AGG提供的可靠机制保证不丢失数据
-
DDD是逻辑上的切分.微服务是实现上的切分。按DDD做模块切分,最终实现如果还是在一个应用里面,那就是单一应用程序.如果把DDD的模块分散成多个app,通过发布事件的方式建立联系协调工作,那就是微服务的实现