在分布式系统中,事务的处理分布在不同组件、服务中,因此分布式事务的 ACID 保障面临着一些特殊难点。本系列文章介绍了 21 种分布式事务设计模式,并分析其实现原理和优缺点,在面对具体分布式事务问题时,可以选择合适的模式进行处理。原文: Exploring Solutions for Distributed Transactions (3)
David Dvořáček @Unsplash
在不同业务场景下,可以有不同的解决方案,常见方法有:
阻塞重试(Blocking Retry)
二阶段和三阶段提交(Two-Phase Commit (2PC) and Three-Phase Commit (3PC))
基于后台队列的异步处理(Using Queues to Process Asynchronously in the Background)
TCC 补偿(TCC Compensation Matters)
本地消息表(异步保证)/发件箱模式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
MQ 事务(MQ Transaction)
Saga 模式(Saga Pattern)
事件驱动(Event Sourcing)
命令查询职责分离(Command Query Responsibility Segregation, CQRS)
原子提交(Atomic Commitment)
并行提交(Parallel Commits)
事务复制(Transactional Replication)
一致性算法(Consensus Algorithms)
时间戳排序(Timestamp Ordering)
乐观并发控制(Optimistic Concurrency Control)
拜占庭容错(Byzantine Fault Tolerance, BFT)
分布式锁(Distributed Locking)
分片(Sharding)
多版本并发控制(Multi-Version Concurrency Control, MVCC)
分布式快照(Distributed Snapshots)
主从复制(Leader-Follower Replication)
本文将介绍 Saga、事件驱动以及 CQRS 三种模式。
7. Saga 模式(Saga Pattern)
图片来源: https://docs.aws.amazon.com/prescriptive-guidance/latest/modernization-data-persistence/saga-pattern.html
管理跨多个微服务的长时间事务。
将事务分解为一系列较小的、独立的步骤,每个步骤都由单独的微服务管理。
包含如下步骤:
协调微服务负责接收事务初始请求。
协调微服务通过向第一个负责处理事务的微服务发送消息来启动事务。
第一个微服务执行事务,并将消息发送回协调微服务,反馈其是否成功。
如果第一步成功,协调微服务将向负责事务下一步的微服务发送消息。
如果第一步失败,协调微服务发送补偿动作来撤消失败步骤的影响。
重复步骤 3-5,直到每个微服务要么完成其步骤,要么在失败时触发补偿操作(回滚)。
一旦所有步骤都成功完成,协调微服务就会发送一条消息,表明整个事务已经成功。
如果任何步骤失败并且触发了补偿操作(回滚),则协调微服务将发送一条消息,指示整个事务失败。
import pika
import json
# Define the RabbitMQ connection parameters
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
# Define the messages to be sent between services
start_order_message = {'order_id': '12345', 'items': [{'id': '1', 'name': 'item1'}, {'id': '2', 'name': 'item2'}]}
payment_message = {'order_id': '12345', 'amount': 100.0}
shipping_message = {'order_id': '12345', 'items': [{'id': '1', 'name': 'item1'}, {'id': '2', 'name': 'item2'}]}
# Define the compensation messages to be sent in case of failure
cancel_payment_message = {'order_id': '12345', 'amount': 100.0}
cancel_shipping_message = {'order_id': '12345', 'items': [{'id': '1', 'name': 'item1'}, {'id': '2', 'name': 'item2'}]}
# Define the function to send messages to the RabbitMQ broker
def send_message(queue_name, message):
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue_name)
channel.basic_publish(exchange='', routing_key=queue_name, body=json.dumps(message))
connection.close()
# Define the function to handle the start of the order
def start_order():
# Send the start order message to the Order service
send_message('start_order', start_order_message)
# Define the function to handle the payment of the order
def payment():
try:
# Send the payment message to the Payment service
send_message('payment', payment_message)
except Exception as e:
# Send the cancel payment message to the Payment service in case of failure
send_message('cancel_payment', cancel_payment_message)
raise e
# Define the function to handle the shipping of the order
def shipping():
try:
# Send the shipping message to the Shipping service
send_message('shipping', shipping_message)
except Exception as e:
# Send the cancel shipping message to the Shipping service in case of failure
send_message('cancel_shipping', cancel_shipping_message)
raise e
# Define the function to handle the cancellation of the order
def cancel_order():
# Send the cancel payment message to the Payment service
send_message('cancel_payment', cancel_payment_message)
# Send the cancel shipping message to the Shipping service
send_message('cancel_shipping', cancel_shipping_message)
# Define the main function to execute the Saga
def execute_saga():
try:
# Start the order
start_order()
# Perform the payment
payment()
# Perform the shipping
shipping()
except Exception as e:
# Cancel the order in case of failure
cancel_order()
raise e
# Call the main function to execute the Saga
execute_saga()
复制代码
示例代码
用 RabbitMQ 作为简单消息代理
定义了在服务之间发送的五条消息: start_order_message
、payment_message
、shipping_message
、cancel_payment_message
以及cancel_shipping_message
。
start_order
函数将start_order_message
发送给order_service
。
在收到来自start_order
函数的消息后,order_service
创建订单,并发送回包含order_id
的确认消息。
一旦start_order
函数接收到确认消息,将发送payment_message
给payment_service
来处理订单支付。
如果支付成功,payment_service
将返回一条包含payment_id
的确认消息。
start_order
函数将shipping_message
发送给shipping_service
,以便在付款成功后发货。
如果发货成功,shipping_service
将返回一条包含shipping_id
的确认消息。
如果上述任何步骤失败,则回滚事务,分别给shipping_service
和payment_service
发送cancel_shipping_message
和cancel_payment_message
,撤销所做的更改。
通过向start_order
发送初始消息、监听确认消息以及在发生故障时处理补偿来处理整个 Saga 流程。换句话说,Saga 模式涉及一系列补偿操作,以便在发生故障时撤消事务的影响。
优点
管理跨多个微服务的长时间事务
避免服务独立运行时出现不一致或数据损坏
如果事务中的某个步骤失败,则提供补偿操作
允许服务自主、独立运行
缺点
适用场景
尝试-确认-取消(TCC)模式与 Saga 模式的相似之处
尝试-确认-取消(TCC)模式与 Saga 模式的不同之处
Saga 模式使用前向恢复法,每个服务在出现故障时启动一个补偿事务,而 TCC 模式使用后向恢复法,每个服务验证事务是否可以继续,然后才确认或取消。
Saga 模式将事务表示为事件序列,事件由相关服务之间发送的消息表示。TCC 模式将事务表示为由所涉及的每个服务执行的操作序列。
Saga 模式适用于涉及多个服务的长时间事务,而 TCC 模式适用于涉及较少服务的短时间事务。
Saga 模式的实现可能比 TCC 模式更复杂,要求每个服务能够发起补偿事务并处理潜在故障。
8. 事件驱动(Event Sourcing)
图片来源: https://eventuate.io/whyeventsourcing.html
对应用程序状态所做的所有更改作为一系列事件。
将这些事件存储在数据库或事件日志中,从而提供应用程序状态随时间变化的完整审计跟踪。
涉及如下步骤:
每当应用程序状态发生变化时,就会捕获相应事件,事件包含所有更改相关信息(例如已修改的数据和进行更改的用户)。
事件存储在事件日志中,可以用数据库或消息代理实现,每个事件都有一个唯一标识符,并带有时间戳,以确保事件有序。
通过按时间顺序重播事件日志中的事件来重构应用程序当前状态。该过程包括将应用程序的状态初始化为其初始状态,然后依次应用每个事件来更新状态。
一旦状态被重构,就可以对其进行查询,以提供有关应用程序当前状态的信息。
可以实时处理事件,触发其他动作或更新。
事件处理完成后,可以将其归档或删除以释放存储空间。
图片来源: https://eventuate.io/whyeventsourcing.html
import uuid
import json
import time
class BankAccount:
def __init__(self):
self.balance = 0
self.event_sourcing = EventSourcing()
def deposit(self, amount):
event = Event('deposit', {'amount': amount})
self.event_sourcing.add_event(event)
self.balance += amount
def withdraw(self, amount):
if self.balance < amount:
raise ValueError('Insufficient balance')
event = Event('withdraw', {'amount': amount})
self.event_sourcing.add_event(event)
self.balance -= amount
def get_balance(self):
return self.balance
def get_events(self):
return self.event_sourcing.get_events()
def get_event_by_id(self, event_id):
return self.event_sourcing.get_event_by_id(event_id)
def replay_events(self):
self.balance = 0
for event in self.event_sourcing.get_events():
if event.type == 'deposit':
self.balance += event.data['amount']
elif event.type == 'withdraw':
self.balance -= event.data['amount']
class Event:
def __init__(self, type, data):
self.id = uuid.uuid4()
self.timestamp = int(time.time())
self.type = type
self.data = data
class EventSourcing:
def __init__(self):
self.event_store = EventStore()
def add_event(self, event):
self.event_store.store_event(event)
def get_events(self):
return self.event_store.get_events()
def get_event_by_id(self, event_id):
return self.event_store.get_event_by_id(event_id)
class EventStore:
def __init__(self):
self.events = []
def store_event(self, event):
self.events.append(event)
def get_events(self):
return self.events
def get_event_by_id(self, event_id):
for event in self.events:
if event.id == event_id:
return event
raise ValueError('Event not found')
class Auditor:
def __init__(self, event_store):
self.event_store = event_store
def log_events(self):
for event in self.event_store.get_events():
print(json.dumps({'id': str(event.id), 'type': event.type, 'data': event.data}))
account = BankAccount()
auditor = Auditor(account.event_sourcing.event_store)
account.deposit(100)
account.withdraw(50)
account.deposit(75)
print('Current balance:', account.get_balance())
print('All events:')
auditor.log_events()
event_id = account.get_events()[1].id
event = account.get_event_by_id(event_id)
print('Event details:', json.dumps({'id': str(event.id), 'type': event.type, 'data': event.data}))
复制代码
示例代码
BankAccount
类用来演示如何使用事件来重建实体状态,包含balance
属性,支持deposit
(存款)和withdraw
(取款)两种操作。
Event
类有类型和数据属性。
EventSourcing
类定义了 BankAccount 的event_sourcing
属性
EventSourcing
类包含event_store
属性作为事件列表。
EventStore
类有两个主要方法: store_event()
(在列表中存储事件)和get_events()
(从列表中检索事件)。
add_event
方法向event_store
添加新事件。
get_events
和get_event_by_id
方法可以通过 ID 检索所有事件或特定事件。
deposit
和withdraw
方法创建具有唯一 ID、时间戳和字典(包含操作信息,在本例中为操作类型和金额)的新Event
对象,事件被添加到BankAccount
实例的event_sourcing
属性中。
每次进行deposit
和withdraw
时,都会创建相应事件,并通过EventStore
类将其存储在事件存储中。
get_balance
方法返回帐户当前余额。
replay_events()
方法从事件存储中检索所有事件,并计算当前余额。遍历事件存储中的所有事件,并根据每个事件的类型和数据更新BankAccount
的balance
属性。
Auditor
类监听存储在事件存储库中的所有事件,并在终端上输出相应 log。
以 JSON 格式打印当前余额和所有事件,通过 ID 检索特定事件并打印其详细信息。
事件源模式是创建事件来表示对系统状态的更改,将这些事件存储在事件存储中,并重播事件以重建系统当前状态。
优点
缺点
适用场景
记录交易和财务事项
记录健康事件和医疗程序
记录订单事件和付款事件
注意事项
事件设计 —— 以细粒度方式捕捉系统状态的变化。事件应该是不可变的,这意味着事件被创建后不能被修改。事件的设计应该支持简单的查询和分析。
存储需求 —— 所有对系统状态的更改都以事件序列的形式存储。存储空间明显大于传统数据库。
数据迁移 —— 提前计划数据迁移,并考虑如何将数据从旧系统迁移到新的事件源系统。
9. 命令查询职责分离(Command Query Responsibility Segregation, CQRS)
图片来源: https://www.codeproject.com/Articles/555855/Introduction-to-CQRS
将读写操作分离到单独的服务或模型中
命令: 改变系统状态
查询: 返回数据
涉及如下步骤:
用户向系统发出读取或写入数据的请求
如果请求是命令(写操作),则将该命令发送给命令服务,命令服务处理请求并更新系统状态。
命令服务更新写入模型,其中包含系统的当前状态,并创建描述更改的事件。事件被添加到事件流中,事件流是系统中发生的所有事件的日志。
命令服务将事件发布到消息代理,消息代理将事件传递给感兴趣的订阅者。
如果请求是查询(读操作),则将查询发送给查询服务,查询服务从读模型中检索数据。
查询服务从读模型中检索数据并将其返回给用户。
如果用户想执行另一个写操作,则从步骤 2 开始重复该过程。
如果用户想要执行读操作,则从步骤 5 开始重复该过程。
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
class Command(ABC):
pass
class CreateProductCommand(Command):
def __init__(self, name: str, price: float):
self.name = name
self.price = price
class UpdateProductCommand(Command):
def __init__(self, product_id: str, name: Optional[str] = None, price: Optional[float] = None):
self.product_id = product_id
self.name = name
self.price = price
class DeleteProductCommand(Command):
def __init__(self, product_id: str):
self.product_id = product_id
class Query(ABC):
pass
class GetProductQuery(Query):
def __init__(self, product_id: str):
self.product_id = product_id
class GetAllProductsQuery(Query):
pass
class Product:
def __init__(self, id: str, name: str, price: float):
self.id = id
self.name = name
self.price = price
class ProductRepository:
def __init__(self):
self.products = []
def create(self, name: str, price: float) -> Product:
product = Product(str(len(self.products) + 1), name, price)
self.products.append(product)
return product
def get(self, id: str) -> Optional[Product]:
for product in self.products:
if product.id == id:
return product
return None
def get_all(self) -> List[Product]:
return self.products
def update(self, id: str, name: Optional[str] = None, price: Optional[float] = None) -> Optional[Product]:
for product in self.products:
if product.id == id:
if name is not None:
product.name = name
if price is not None:
product.price = price
return product
return None
def delete(self, id: str) -> bool:
for product in self.products:
if product.id == id:
self.products.remove(product)
return True
return False
class ProductCommandHandler:
def __init__(self, repository: ProductRepository):
self.repository = repository
def handle(self, command: Command) -> Optional[Product]:
if isinstance(command, CreateProductCommand):
return self.repository.create(command.name, command.price)
elif isinstance(command, UpdateProductCommand):
return self.repository.update(command.product_id, command.name, command.price)
elif isinstance(command, DeleteProductCommand):
success = self.repository.delete(command.product_id)
return success
class ProductQueryHandler:
def __init__(self, repository: ProductRepository):
self.repository = repository
def handle(self, query: Query) -> Optional[Any]:
if isinstance(query, GetProductQuery):
return self.repository.get(query.product_id)
elif isinstance(query, GetAllProductsQuery):
return self.repository.get_all()
class ProductService:
def __init__(self, command_handler: ProductCommandHandler, query_handler: ProductQueryHandler):
self.command_handler = command_handler
self.query_handler = query_handler
def create_product(self, name: str, price: float) -> Product:
command = CreateProductCommand(name, price)
return self.command_handler.handle(command)
def get_product(self, id: str) -> Optional[Product]:
query = GetProductQuery(id)
return self.query_handler.handle(query)
def get_all_products(self) -> List[Product]:
query = GetAllProductsQuery()
return self
复制代码
示例代码
一个产品管理系统。
抽象类Command
和Query
分别由具体类实现。
3 个命令类的实现: CreateProductCommand
、UpdateProductCommand
、DeleteProductCommand
CreateProductCommand
创建产品
UpdateProductCommand
更新产品
DeleteProductCommand
删除产品
2 个查询类的实现: GetProductQuery
和GetAllProductQuery
GetProductQuery
检索关于特定产品的信息
GetAllProductQuery
检索所有产品的信息
Product
类表示一个产品,包含id
、name
和price
ProductRepository
类处理产品数据的持久性,具有创建、检索、更新和删除产品的方法
ProductCommandHandler
类处理命令并将ProductRepository
作为依赖项
ProductQueryHandler
类处理查询并将ProductRepository
作为依赖项
两个handle
方法负责接受命令或查询,并返回适当的响应
ProductService
类作为客户端与产品管理系统交互的入口,将ProductCommandHandler
和ProductQueryHandler
作为依赖项,并公开用于创建、检索和列出产品的方法,这些方法只是对适当命令或查询的包装,并将其传递给相应的处理程序。
优点
缺点
适用场景
CQRS 和事件驱动的结合
图片来源: https://stackoverflow.com/questions/56728979/event-sourcing-why-a-dedicated-event-store
参考文献
Saga pattern
Microservices Pattern: Sagas
Saga Pattern
Saga Pattern Microservices
Saga Pattern for Microservices Distributed Transactions
Microservice Design Pattern - Saga
Event Driven Saga Pattern
How to Use Saga Pattern in Microservices
Saga Orchestration for Microservices Using the Outbox Pattern
Saga Without the Headaches
Event Sourcing
Event Sourcing - why a dedicated event store?
Beginner's Guide to Event Sourcing
Microservices Pattern: Event Sourcing
Event Sourcing pattern
Event Sourcing
Event Sourcing explained
CQRS Event Sourcing JAVA
Introduction to CQRS
CQRS Pattern
bliki: CQRS
Microservices Pattern: Command Query Responsibility Segregation (CQRS)
A Beginner's Guide to CQRS
CQRS Desgin Pattern in Microservices Architecture
The Command and Query Responsibility Segregation(CQRS)
Event Driven CQRS Pattern
CQRS Pattern
CQRS Software Architecture Pattern: The Good, the Bad, and the Ugly
你好,我是俞凡,在 Motorola 做过研发,现在在 Mavenir 做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI 等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。
微信公众号:DeepNoMind
评论