Appearance
场景设定
假设我们有一个电子商务系统。当用户成功下单后,我们需要完成两件事情:
- 更新订单状态到数据库。
- 发布一个“订单已创建”事件,通知其他服务(比如库存服务、邮件服务)进行后续处理。
目标:确保这两个操作原子性地发生,即要么都成功,要么都失败。
第一部分:SQL 数据库表设计
我们需要两张表:
orders
表:存储订单业务数据。outbox
表:发件箱表,存储待发布的事件。
sql
-- 1. orders 表:存储业务数据
CREATE TABLE IF NOT EXISTS orders (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
amount INTEGER NOT NULL,
status VARCHAR(50) NOT NULL DEFAULT 'pending', -- 订单状态
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 创建索引以优化查询,尤其对于 orders 表的 status 字段
CREATE INDEX IF NOT EXISTS idx_orders_status ON orders (status);
-- 2. outbox 表:发件箱表,存储待发送的事件
CREATE TABLE IF NOT EXISTS outbox (
id SERIAL PRIMARY KEY,
aggregate_type VARCHAR(50) NOT NULL, -- 事件关联的聚合类型,例如 'Order'
aggregate_id VARCHAR(50) NOT NULL, -- 事件关联的聚合ID,例如订单ID
event_type VARCHAR(50) NOT NULL, -- 事件类型,例如 'OrderCreated'
payload JSONB NOT NULL, -- 事件的详细数据(JSON格式)
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed BOOLEAN DEFAULT FALSE, -- 是否已处理/发送
processed_at TIMESTAMP NULL -- 处理/发送时间
);
-- 创建索引以优化查询,尤其对于 outbox 表的 processed 字段
CREATE INDEX IF NOT EXISTS idx_outbox_processed ON outbox (processed, created_at);
解释:
orders
表很简单,就是普通的业务表。outbox
表是发件箱模式的核心。aggregate_type
和aggregate_id
:帮助我们知道这个事件是关于哪个业务实体的(例如,关于订单ID为123的订单事件)。event_type
:描述事件的类型。payload
:事件的实际内容,使用PostgreSQL的JSONB类型,它比普通JSON类型更高效且支持索引。processed
:这是一个非常关键的字段。它指示这条事件是否已经被消息转发器成功读取并发送。默认是FALSE
(未处理)。processed_at
:记录事件被成功处理的时间。
第二部分:业务服务代码 (Python)
这段代码模拟了我们的订单服务。当创建一个新订单时,它会在同一个本地事务中完成两件事:插入订单数据和插入发件箱记录。
python
import psycopg2
import json
import datetime
import time
from psycopg2.extras import RealDictCursor
DATABASE_URL = "postgresql://username:password@localhost:5432/ecommerce"
def get_db_connection():
"""获取数据库连接"""
conn = psycopg2.connect(DATABASE_URL)
return conn
def setup_database():
"""初始化数据库表"""
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("""
CREATE TABLE IF NOT EXISTS orders (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
amount DECIMAL(10,2) NOT NULL,
status VARCHAR(50) NOT NULL DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_orders_status ON orders (status);")
cursor.execute("""
CREATE TABLE IF NOT EXISTS outbox (
id SERIAL PRIMARY KEY,
aggregate_type VARCHAR(50) NOT NULL,
aggregate_id VARCHAR(50) NOT NULL,
event_type VARCHAR(50) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed BOOLEAN DEFAULT FALSE,
processed_at TIMESTAMP NULL
);
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_outbox_processed ON outbox (processed, created_at);")
conn.commit()
print("数据库表初始化完成。")
except Exception as e:
conn.rollback()
print(f"数据库初始化失败: {e}")
finally:
cursor.close()
conn.close()
def create_order(user_id, product_id, amount):
"""
创建订单并记录事件到发件箱(在同一个事务中)
"""
conn = get_db_connection()
cursor = conn.cursor()
order_id = None
try:
# 1. 插入订单业务数据
cursor.execute(
"INSERT INTO orders (user_id, product_id, amount, status) VALUES (%s, %s, %s, %s) RETURNING id",
(user_id, product_id, amount, 'completed') # 假设直接完成
)
order_id = cursor.fetchone()[0]
print(f"订单 {order_id} 业务数据插入成功。")
# 2. 准备事件数据
event_payload = {
"order_id": order_id,
"user_id": user_id,
"product_id": product_id,
"amount": amount,
"timestamp": datetime.datetime.now().isoformat()
}
# 3. 插入事件到发件箱表
cursor.execute(
"INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload) VALUES (%s, %s, %s, %s)",
('Order', str(order_id), 'OrderCreated', json.dumps(event_payload))
)
print(f"事件 'OrderCreated' 已写入发件箱表,关联订单ID: {order_id}。")
# 4. 提交事务:确保业务数据和发件箱记录同时成功
conn.commit()
print(f"事务提交成功:订单 {order_id} 创建并事件记录完成。")
return order_id
except Exception as e:
# 发生错误时回滚事务:确保业务数据和发件箱记录都不被写入
conn.rollback()
print(f"创建订单失败,事务回滚: {e}")
return None
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
setup_database()
print("\n--- 模拟成功创建订单 ---")
order_id_success = create_order(user_id=101, product_id=2001, amount=99.99)
if order_id_success:
print(f"成功创建订单 ID: {order_id_success}")
print("\n--- 模拟创建订单失败(例如,数据库连接中断在提交前)---")
# 为了演示失败,我们强制抛出异常
original_execute = psycopg2.Cursor.execute
def mock_execute_fail(*args, **kwargs):
if "INSERT INTO outbox" in args[0] and create_order.fail_next_outbox_insert:
create_order.fail_next_outbox_insert = False # 只失败一次
raise psycopg2.OperationalError("模拟发件箱插入失败")
return original_execute(*args, **kwargs)
psycopg2.Cursor.execute = mock_execute_fail
create_order.fail_next_outbox_insert = True # 标记下一次 outbox 插入失败
order_id_fail = create_order(user_id=102, product_id=2002, amount=150.00)
if order_id_fail is None:
print("模拟创建订单失败,订单数据和发件箱记录均未写入,符合预期。")
psycopg2.Cursor.execute = original_execute # 恢复正常执行
# 检查数据库状态
conn = get_db_connection()
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute("SELECT id, status FROM orders")
print("\n当前 orders 表:")
for row in cursor.fetchall():
print(f" Order ID: {row['id']}, Status: {row['status']}")
cursor.execute("SELECT id, event_type, processed FROM outbox")
print("\n当前 outbox 表:")
for row in cursor.fetchall():
print(f" Outbox ID: {row['id']}, Event Type: {row['event_type']}, Processed: {row['processed']}")
cursor.close()
conn.close()
代码解释:
setup_database()
: 负责创建orders
和outbox
表。create_order()
: 这是核心的业务方法。- 它首先创建一个数据库连接并开启事务。
- 关键点:
INSERT INTO orders
和INSERT INTO outbox
语句在同一个try...except
块中,并且由conn.commit()
统一提交。 - 如果两者都成功,事务提交。
- 如果其中任何一个操作失败(例如,网络中断,数据库连接中断,或者像我们模拟的
outbox
插入失败),except
块会被触发,并调用conn.rollback()
。这意味着所有在事务中执行的更改(包括订单插入和发件箱记录插入)都会被撤销,从而保证了原子性。 json.dumps(event_payload)
将Python字典转换为JSON字符串存储。
第三部分:消息转发器 (Message Relayer) 代码 (Python)
这是一个独立的进程或服务,它会周期性地扫描 outbox
表,查找尚未发送的事件,然后将其发送到消息队列(这里用 print
模拟)。
python
import psycopg2
import json
import datetime
import time
from psycopg2.extras import RealDictCursor
DATABASE_URL = "postgresql://username:password@localhost:5432/ecommerce"
def get_db_connection():
"""获取数据库连接"""
conn = psycopg2.connect(DATABASE_URL)
return conn
def publish_to_message_broker(event_type, payload):
"""
模拟将事件发布到实际的消息队列(如 Kafka, RabbitMQ, Redis Streams等)
在真实场景中,这里会调用消息队列的客户端库。
"""
print(f" [MQ Publisher] 正在发布事件: 类型='{event_type}', 数据={json.dumps(payload, indent=2)}")
# 模拟网络延迟或发布成功/失败
# if random.random() < 0.1: # 10%几率模拟发布失败
# raise Exception("模拟消息队列发布失败")
print(f" [MQ Publisher] 事件 '{event_type}' 发布成功。")
def process_outbox_events():
"""
扫描发件箱表,处理未发送的事件
"""
conn = get_db_connection()
cursor = conn.cursor(cursor_factory=RealDictCursor)
try:
# 查询未处理的事件
# 使用 SELECT FOR UPDATE SKIP LOCKED 来锁定行,防止其他Relayer同时处理
cursor.execute("""
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox
WHERE processed = FALSE
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 10
""")
events_to_process = cursor.fetchall()
if not events_to_process:
print("没有待处理的发件箱事件。")
return 0
processed_count = 0
for event_row in events_to_process:
event_id = event_row['id']
event_type = event_row['event_type']
payload = event_row['payload'] # PostgreSQL的JSONB类型会自动转换为Python字典
try:
print(f"尝试处理发件箱事件 ID: {event_id}, 类型: {event_type}")
# 1. 将事件发布到消息队列
publish_to_message_broker(event_type, payload)
# 2. 更新发件箱记录状态为已处理
cursor.execute(
"UPDATE outbox SET processed = %s, processed_at = %s WHERE id = %s",
(True, datetime.datetime.now(), event_id)
)
conn.commit() # 每次更新提交,保证即使处理中途崩溃,已发送的事件状态也已更新
print(f"成功处理发件箱事件 ID: {event_id}。")
processed_count += 1
except Exception as e:
# 如果发布失败,记录错误并回滚(或不更新状态,等待下次重试)
# 这里我们选择不更新状态,让它在下次轮询时重试。
print(f"处理发件箱事件 ID: {event_id} 失败: {e}。将保留以便重试。")
conn.rollback() # 回滚当前事件的处理,确保其状态未改变
# 在真实系统中,可能需要记录失败日志,或引入指数退避重试机制
break # 停止处理当前批次,等待下次轮询
return processed_count
except Exception as e:
print(f"消息转发器发生致命错误: {e}")
conn.rollback() # 确保任何未提交的事务被回滚
return -1
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
print("消息转发器启动...")
while True:
try:
count = process_outbox_events()
if count > 0:
print(f"成功处理了 {count} 个事件。")
time.sleep(5) # 每5秒轮询一次
except KeyboardInterrupt:
print("消息转发器已停止。")
break
except Exception as e:
print(f"主循环错误: {e}")
time.sleep(10) # 错误后等待更长时间再重试
代码解释:
process_outbox_events()
: 这是消息转发器的核心逻辑。- 轮询查询:它会查询
outbox
表中processed = FALSE
的记录。ORDER BY created_at ASC
确保按时间顺序处理。LIMIT 10
每次处理一批,避免单次操作处理过多导致超时或资源耗尽。 - PostgreSQL特有的行锁定:使用
FOR UPDATE SKIP LOCKED
子句,这是PostgreSQL提供的一个强大的并发控制机制。它允许我们锁定要处理的行,同时跳过已经被其他进程锁定的行。这确保了在多个消息转发器实例运行时,每个事件只会被一个实例处理。 - 发布事件:对于查询到的每条未处理事件,它会调用
publish_to_message_broker()
来模拟发布事件到实际的消息队列。 - 更新状态:如果
publish_to_message_broker()
成功,转发器会UPDATE outbox SET processed = TRUE
,并将processed_at
设置为当前时间。注意这里:每次成功发布后,会立即conn.commit()
。 这确保了即使消息转发器在处理批次中的某个事件后崩溃,之前已经成功发送并标记为processed=TRUE
的事件也不会丢失或重复发送(因为它已被标记)。 - 错误处理与重试:如果
publish_to_message_broker()
失败,try...except
会捕获异常。我们选择不更新processed
状态,并且conn.rollback()
会回滚当前事件的状态更新(确保其仍是processed=FALSE
)。这样,在下一次轮询时,该事件会再次被选中并重试发送。
- 轮询查询:它会查询
如何运行
- 将上述
setup_database()
,create_order()
,publish_to_message_broker()
,process_outbox_events()
等代码分别保存为两个文件,例如app.py
和message_relayer.py
。 - 首先运行
app.py
:bash它会初始化数据库,并模拟创建订单(一个成功,一个失败)。你会看到python app.py
orders
表中会有一条completed
状态的订单,以及outbox
表中一条processed=FALSE
的事件。 - 然后在一个新的终端窗口运行
message_relayer.py
:bash它会每5秒轮询一次。你会看到它检测到python message_relayer.py
outbox
中的未处理事件,模拟发布它,然后将其状态更新为processed=TRUE
。
总结发件箱模式在分布式系统中的作用
结合上面的代码,我们可以更清晰地理解发件箱模式的作用:
保证业务数据和消息发布的原子性(通过
app.py
演示):- 在
create_order
函数中,INSERT INTO orders
和INSERT INTO outbox
发生在同一个本地数据库事务中。 - 如果其中任何一个操作失败(无论是因为业务逻辑错误,还是数据库连接问题,或者我们模拟的
outbox
插入失败),整个事务都会rollback()
。这意味着订单数据和发件箱记录要么一起成功写入,要么一起回滚,从而避免了"订单已创建但事件未发布"或"事件已发布但订单未创建"这种不一致的状态。这是发件箱模式最核心的价值。
- 在
实现可靠的消息发布(通过
message_relayer.py
演示):message_relayer.py
是一个独立运行的服务。它持续扫描outbox
表中的processed = FALSE
记录。- 即使消息队列(例如
publish_to_message_broker
模拟的)暂时不可用,或者转发器自身崩溃,消息也安全地保存在数据库的outbox
表中。 - 当消息队列恢复或转发器重启后,它会再次尝试发送这些未处理的消息,直到成功。这样就保证了消息最终会被发布出去,避免了消息丢失。
解耦业务逻辑与消息发送细节(通过
app.py
和message_relayer.py
分离演示):app.py
中的create_order
逻辑只关心将业务数据和事件记录写入数据库,它不需要知道事件如何被发送(例如,不需要直接连接Kafka、处理序列化、错误重试等)。- 这些复杂的发送细节(如轮询、重试、错误处理)全部由
message_relayer.py
负责。这使得业务代码更简洁,关注点分离,易于维护。
避免分布式事务的复杂性:
- 我们没有使用XA事务或两阶段提交(2PC)这种重量级的分布式事务协议。发件箱模式利用了本地事务的原子性,然后通过一个异步的、可重试的机制来实现最终一致性。这大大简化了系统架构,提高了性能和可用性。
支持最终一致性:
- 通过可靠地发布事件,发件箱模式是实现分布式系统最终一致性的关键。当订单服务发布"订单已创建"事件后,其他服务(如库存服务、邮件服务)可以消费这个事件并进行自己的处理,即使不是强实时一致,也能保证系统最终达到一个一致的状态。
发件箱模式是分布式系统中实现可靠事件驱动架构和最终一致性的有效手段。它巧妙地将分布式原子性难题转化为本地事务和异步重试的组合,兼顾了可靠性、性能和可扩展性。