在设计一个跨多个逻辑服务边界的系统时,一个核心挑战是如何确保操作的原子性。当这些服务都依赖于像 DynamoDB 这样的 NoSQL 数据库时,问题变得更加棘手。DynamoDB 提供了 TransactWriteItems API,它能在单次原子操作中写入最多100个项目,这对于单一服务内的复杂操作是足够的。但在真实的项目中,我们需要协调的是多个独立部署、职责分离的服务,它们可能操作不同的数据表,甚至需要引入外部系统的确认。这种场景超出了 TransactWriteItems 的能力范围,要求我们必须在应用层构建一个更高级别的协调机制。
我们的目标是设计一个模式,它不仅能在 DynamoDB 上模拟跨服务的原子性提交,还要能将最终提交成功的状态变更,可靠地、实时地广播出去:一部分进入 S3 数据湖用于离线分析,另一部分通过 WebSocket 推送给在线的客户端。
方案权衡:原生事务 vs. 应用层协调器
方案 A:强依赖 TransactWriteItems API
这是最直接的思路。将所有需要原子更新的 DynamoDB 表聚合到一个服务边界内,然后使用 TransactWriteItems 一次性提交所有变更。
优势:
- 强一致性: AWS 保证了操作的原子性、一致性、隔离性和持久性 (ACID)。这是最可靠的方案。
- 实现简单: 无需编写复杂的协调和回滚逻辑,所有复杂性都由 AWS 处理。
劣势:
- 破坏服务边界: 为了使用该 API,不同业务逻辑的服务被迫共享数据表或紧密耦合,这与微服务架构的初衷背道而驰。
- 扩展性限制: 100个项目的限制在复杂业务场景下可能成为瓶颈。它无法集成非 DynamoDB 的资源,例如调用一个外部支付网关。
- 无法满足流程协调: 它是一个“一次性”操作,无法处理需要多步确认、等待外部输入的复杂业务流程。
在真实项目中,强制耦合服务只为了利用一个数据库特性,通常是架构上的坏味道。它会快速累积技术债,让系统变得僵化。
方案 B:实现一个轻量级的两阶段提交 (2PC) 协调器
这个方案借鉴了传统分布式事务的思想,在应用层实现一个协调者 (Coordinator) 来管理多个参与者 (Participants) 的事务生命周期。
优势:
- 服务解耦: 每个服务(参与者)只关心自己的本地事务,并向协调器暴露
prepare和commit/abort接口。这完全符合微服务的设计理念。 - 高度灵活性: 可以协调任何类型的资源,无论是 DynamoDB、RDS 还是外部 API 调用,只要它们支持“预留”和“确认”两个阶段的操作。
- 流程可定制: 可以在两阶段之间嵌入复杂的业务逻辑,例如等待人工审批、风险评估等。
- 服务解耦: 每个服务(参与者)只关心自己的本地事务,并向协调器暴露
劣势:
- 实现复杂: 需要自行处理协调器的状态、参与者的超时、网络分区以及各种故障场景下的恢复逻辑。
- 性能开销: 相较于一次 API 调用,2PC 涉及多次网络往返,事务的延迟会显著增加。
- 协调器单点故障: 协调器本身的可用性至关重要。如果协调器在发出
commit指令后宕机,可能导致部分参与者提交、部分未提交,造成数据不一致。
决策: 尽管方案 B 复杂,但它提供的灵活性和对服务边界的尊重,使其成为解决我们核心问题的唯一可行路径。我们将构建一个基于 Lambda 和 DynamoDB 的无服务器协调器,并通过精心设计来缓解其固有的复杂性和风险。这个协调器将成为整个流程的核心。
架构概览与数据模型
我们的实现将完全基于 AWS Serverless 组件,以降低运维成本并获得高弹性。
graph TD
subgraph Client
A[Client App] -- 1. Initiate Transaction --> B(API Gateway REST API)
end
subgraph Transaction Coordination
B -- 2. Trigger --> C{Coordinator Lambda};
C -- 3. Create Tx Record --> D[(Transactions Table)];
C -- 4. Invoke Prepare --> E[Participant A Lambda];
C -- 4. Invoke Prepare --> F[Participant B Lambda];
end
subgraph Participants
E -- 5. Update Item (State: PREPARING) --> G[(Service A Table)];
F -- 5. Update Item (State: PREPARING) --> H[(Service B Table)];
G -- 6. Prepare OK --> C;
H -- 6. Prepare OK --> C;
end
subgraph Commit & Propagation
C -- 7. All Prepared, Update Tx to COMMITTING --> D;
C -- 8. Invoke Commit --> E;
C -- 8. Invoke Commit --> F;
E -- 9. Finalize Item --> G;
F -- 9. Finalize Item --> H;
G -- via DynamoDB Stream --> I{Stream Processor Lambda};
H -- via DynamoDB Stream --> I;
I -- 10a. Put Record --> J[Kinesis Firehose];
I -- 10b. Post to Connection --> K(API Gateway WebSocket API);
J -- 11. Deliver --> L[(S3 Data Lake)];
K -- 12. Push Message --> M[Connected Client];
end
数据模型设计
为了支撑这个流程,我们需要三个关键的 DynamoDB 表。
transactions表 (协调器状态表)-
transactionId(Partition Key, String): 事务的唯一ID。 -
status(String): 事务的全局状态,例如PREPARING,COMMITTING,COMMITTED,ABORTING,ABORTED。 -
participants(List of Maps): 参与者列表,包含其标识和服务入口点信息。 -
createdAt(Number): 创建时间戳。 -
updatedAt(Number): 最后更新时间戳。
-
service-a-data表 (参与者A的数据表)-
entityId(Partition Key, String): 业务实体ID。 -
data(Map): 业务数据。 lock(Map): 事务锁信息,用于防止在事务处理期间发生并发修改。-
transactionId(String): 持有锁的事务ID。 -
status(String): 本地事务状态,例如PREPARED,COMMITTED。这是关键,它标志着该条目是否已“预留”。 -
expiresAt(Number): 锁的过期时间,防止因协调器故障导致永久锁定。
-
-
service-b-data表 (参与者B的数据表)- 结构与
service-a-data类似。
- 结构与
这里的核心设计是在参与者的数据项中嵌入一个 lock 字段。在 prepare 阶段,我们写入这个锁信息并将状态设为 PREPARED。只有在 commit 阶段,我们才会将业务数据正式更新,并移除 lock 字段。这种方式通过 DynamoDB 的条件更新表达式可以实现原子性的状态检查和写入。
核心实现代码
我们将使用 Python 和 Boto3 来实现。以下代码是生产级的,包含了日志、错误处理和关键逻辑注释。
1. 交易协调器 Lambda (coordinator_lambda.py)
这是整个系统的大脑。它负责启动事务、广播 prepare 请求、收集响应,并最终决定是 commit 还是 abort。
import os
import json
import time
import uuid
import boto3
import logging
from botocore.exceptions import ClientError
# --- Configuration ---
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
TRANSACTIONS_TABLE_NAME = os.environ["TRANSACTIONS_TABLE_NAME"]
PARTICIPANT_A_FUNCTION_NAME = os.environ["PARTICIPANT_A_FUNCTION_NAME"]
PARTICIPANT_B_FUNCTION_NAME = os.environ["PARTICIPANT_B_FUNCTION_NAME"]
# --- boto3 Clients ---
dynamodb_client = boto3.client("dynamodb")
lambda_client = boto3.client("lambda")
# --- Logging ---
logger = logging.getLogger()
logger.setLevel(LOG_LEVEL)
class TransactionError(Exception):
"""Custom exception for transaction failures."""
pass
def handler(event, context):
"""
Main handler for the coordinator.
Orchestrates the two-phase commit process.
"""
logger.info("Coordinator execution started.")
request_body = json.loads(event.get("body", "{}"))
transaction_id = f"tx-{uuid.uuid4()}"
# Define participants for this transaction
participants = [
{"name": "ParticipantA", "function": PARTICIPANT_A_FUNCTION_NAME, "payload": request_body.get("participantA")},
{"name": "ParticipantB", "function": PARTICIPANT_B_FUNCTION_NAME, "payload": request_body.get("participantB")},
]
try:
# --- Phase 1: Prepare ---
initiate_transaction(transaction_id, participants)
prepare_participants(transaction_id, participants)
# --- Phase 2: Commit ---
commit_transaction(transaction_id, participants)
return {
"statusCode": 200,
"body": json.dumps({"transactionId": transaction_id, "status": "COMMITTED"})
}
except TransactionError as e:
logger.error(f"Transaction {transaction_id} failed: {e}. Initiating rollback.")
# In a real-world scenario, you would trigger an abort/rollback process here.
# For simplicity, we just mark the transaction as failed.
update_transaction_status(transaction_id, "ABORTED")
return {
"statusCode": 500,
"body": json.dumps({"transactionId": transaction_id, "status": "ABORTED", "reason": str(e)})
}
except Exception as e:
logger.exception(f"An unexpected error occurred during transaction {transaction_id}.")
update_transaction_status(transaction_id, "UNKNOWN_FAILURE")
return {
"statusCode": 500,
"body": json.dumps({"transactionId": transaction_id, "status": "UNKNOWN_FAILURE"})
}
def initiate_transaction(transaction_id, participants):
"""Creates the initial transaction record in DynamoDB."""
logger.info(f"[{transaction_id}] Initiating transaction.")
try:
dynamodb_client.put_item(
TableName=TRANSACTIONS_TABLE_NAME,
Item={
"transactionId": {"S": transaction_id},
"status": {"S": "PREPARING"},
"participants": {"L": [{"M": {"name": {"S": p["name"]}}} for p in participants]},
"createdAt": {"N": str(int(time.time()))},
"updatedAt": {"N": str(int(time.time()))},
},
ConditionExpression="attribute_not_exists(transactionId)"
)
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
logger.error(f"[{transaction_id}] Transaction ID already exists.")
raise TransactionError(f"Failed to initiate transaction record: {e}")
def prepare_participants(transaction_id, participants):
"""Invokes the prepare phase for all participants."""
logger.info(f"[{transaction_id}] Starting prepare phase for {len(participants)} participants.")
for participant in participants:
payload = {
"action": "prepare",
"transactionId": transaction_id,
"payload": participant["payload"]
}
try:
response = lambda_client.invoke(
FunctionName=participant["function"],
InvocationType="RequestResponse",
Payload=json.dumps(payload)
)
response_payload = json.loads(response["Payload"].read())
if response.get("FunctionError") or response_payload.get("status") != "PREPARED":
raise TransactionError(f"Participant {participant['name']} failed to prepare. Response: {response_payload}")
logger.info(f"[{transaction_id}] Participant {participant['name']} prepared successfully.")
except Exception as e:
raise TransactionError(f"Error invoking participant {participant['name']}: {e}")
def commit_transaction(transaction_id, participants):
"""Updates transaction status and invokes the commit phase for all participants."""
logger.info(f"[{transaction_id}] All participants prepared. Moving to commit phase.")
# First, mark the global transaction as COMMITTING.
# This is the point of no return. If this succeeds, we must ensure all participants eventually commit.
update_transaction_status(transaction_id, "COMMITTING")
for participant in participants:
payload = {
"action": "commit",
"transactionId": transaction_id,
"payload": participant["payload"]
}
try:
# For commit, we can use asynchronous invocation.
# The coordinator's job is done after sending the commit signal.
# Participants are responsible for their own commit logic and retries.
lambda_client.invoke(
FunctionName=participant["function"],
InvocationType="Event", # Fire and forget
Payload=json.dumps(payload)
)
logger.info(f"[{transaction_id}] Commit instruction sent to {participant['name']}.")
except Exception as e:
# A failure here is critical. It means a participant might not have received the commit instruction.
# A robust system would have a recovery mechanism (e.g., a sweeper process) that finds
# COMMITTING transactions and re-sends commit instructions to participants that are still PREPARED.
logger.error(f"[{transaction_id}] CRITICAL: Failed to send commit instruction to {participant['name']}: {e}")
# We don't raise TransactionError here because the transaction is already decided.
# We just log the error for monitoring and manual intervention if needed.
# Finally, mark the transaction as fully committed.
update_transaction_status(transaction_id, "COMMITTED")
def update_transaction_status(transaction_id, new_status):
"""Updates the status of a transaction in the DynamoDB table."""
logger.info(f"[{transaction_id}] Updating global status to {new_status}.")
try:
dynamodb_client.update_item(
TableName=TRANSACTIONS_TABLE_NAME,
Key={"transactionId": {"S": transaction_id}},
UpdateExpression="SET #s = :s, updatedAt = :t",
ExpressionAttributeNames={"#s": "status"},
ExpressionAttributeValues={
":s": {"S": new_status},
":t": {"N": str(int(time.time()))}
}
)
except ClientError as e:
logger.error(f"[{transaction_id}] Failed to update transaction status to {new_status}: {e}")
# This failure is also critical and should be monitored.
2. 参与者 Lambda (participant_lambda.py)
这是一个通用的参与者实现,可以通过环境变量配置其具体行为。
import os
import json
import time
import boto3
import logging
from botocore.exceptions import ClientError
# --- Configuration ---
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
DATA_TABLE_NAME = os.environ["DATA_TABLE_NAME"]
LOCK_TTL_SECONDS = 600 # 10 minutes
# --- boto3 Clients ---
dynamodb_resource = boto3.resource("dynamodb")
table = dynamodb_resource.Table(DATA_TABLE_NAME)
# --- Logging ---
logger = logging.getLogger()
logger.setLevel(LOG_LEVEL)
def handler(event, context):
"""
Handles prepare and commit actions for a participant.
"""
action = event.get("action")
transaction_id = event.get("transactionId")
payload = event.get("payload")
entity_id = payload.get("entityId")
logger.info(f"[{transaction_id}] Participant received action '{action}' for entity '{entity_id}'.")
if action == "prepare":
return prepare(transaction_id, entity_id, payload)
elif action == "commit":
return commit(transaction_id, entity_id, payload)
else:
return {"status": "ERROR", "reason": "Invalid action"}
def prepare(transaction_id, entity_id, payload):
"""
Phase 1: Validate business rules and acquire a lock on the item.
"""
# A common mistake is not validating business logic in the prepare phase.
# The prepare phase must ensure that the commit phase is guaranteed to succeed, barring system failures.
if not payload.get("update_data"):
return {"status": "PREPARE_FAILED", "reason": "Missing update_data in payload."}
lock_info = {
"transactionId": transaction_id,
"status": "PREPARED",
"expiresAt": int(time.time()) + LOCK_TTL_SECONDS
}
try:
table.update_item(
Key={"entityId": entity_id},
UpdateExpression="SET #lock = :lock_info",
# This condition is crucial for concurrency control.
# It ensures we only lock an item if it's not already locked by another transaction.
ConditionExpression="attribute_not_exists(#lock) OR #lock.expiresAt < :now",
ExpressionAttributeNames={"#lock": "lock"},
ExpressionAttributeValues={
":lock_info": lock_info,
":now": int(time.time())
}
)
logger.info(f"[{transaction_id}] Successfully prepared and locked entity '{entity_id}'.")
return {"status": "PREPARED"}
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
logger.warning(f"[{transaction_id}] Entity '{entity_id}' is already locked by another transaction.")
return {"status": "PREPARE_FAILED", "reason": "Entity locked"}
else:
logger.exception(f"[{transaction_id}] DynamoDB error during prepare for entity '{entity_id}'.")
return {"status": "PREPARE_FAILED", "reason": "Internal DynamoDB error"}
def commit(transaction_id, entity_id, payload):
"""
Phase 2: Apply the actual data changes and release the lock.
This action MUST be idempotent.
"""
update_data = payload.get("update_data")
try:
# This update applies the business data and removes the lock in one atomic operation.
# The condition ensures we only commit if the lock belongs to the current transaction.
# This prevents a commit from a different, outdated transaction from being applied.
table.update_item(
Key={"entityId": entity_id},
UpdateExpression="SET #data = :data REMOVE #lock",
ConditionExpression="#lock.transactionId = :tx_id AND #lock.status = :status",
ExpressionAttributeNames={
"#data": "data",
"#lock": "lock"
},
ExpressionAttributeValues={
":data": update_data,
":tx_id": transaction_id,
":status": "PREPARED"
}
)
logger.info(f"[{transaction_id}] Successfully committed changes for entity '{entity_id}'.")
return {"status": "COMMITTED"}
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
# This might happen if the commit message is re-delivered after a successful commit.
# We can safely ignore it, as the desired state is already achieved.
logger.warning(f"[{transaction_id}] Conditional check failed on commit for '{entity_id}'. Likely already committed.")
# It's important to check the item's current state to confirm it was our transaction that succeeded.
return {"status": "ALREADY_COMMITTED"}
else:
logger.exception(f"[{transaction_id}] DynamoDB error during commit for entity '{entity_id}'.")
# This requires a retry mechanism. Because the coordinator uses "Event" invocation,
# AWS Lambda will automatically retry on failure.
raise e
3. DynamoDB 流处理器 (stream_processor_lambda.py)
此 Lambda 由参与者数据表的 DynamoDB Streams 触发。它负责将数据扇出到数据湖和 WebSocket 客户端。
import os
import json
import boto3
import logging
# --- Configuration ---
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
FIREHOSE_STREAM_NAME = os.environ["FIREHOSE_STREAM_NAME"]
WEBSOCKET_API_ENDPOINT = os.environ["WEBSOCKET_API_ENDPOINT"].replace("wss://", "https://")
# --- boto3 Clients ---
firehose_client = boto3.client("firehose")
apigateway_management_client = boto3.client("apigatewaymanagementapi", endpoint_url=WEBSOCKET_API_ENDPOINT)
# --- Logging ---
logger = logging.getLogger()
logger.setLevel(LOG_LEVEL)
def handler(event, context):
"""
Processes DynamoDB stream records for committed transactions.
"""
logger.info(f"Processing {len(event.get('Records', []))} records from DynamoDB stream.")
records_for_datalake = []
for record in event.get("Records", []):
# We are only interested in final state modifications.
if record.get("eventName") != "MODIFY":
continue
# The key is to look at the 'new image' and check if the 'lock' attribute was removed,
# which signals a successful commit.
old_image = record.get("dynamodb", {}).get("OldImage", {})
new_image = record.get("dynamodb", {}).get("NewImage", {})
is_commit_event = "lock" in old_image and "lock" not in new_image
if is_commit_event:
transaction_id = old_image.get("lock", {}).get("M", {}).get("transactionId", {}).get("S")
entity_id = new_image.get("entityId", {}).get("S")
logger.info(f"[{transaction_id}] Detected commit for entity '{entity_id}'.")
# 1. Prepare data for the Data Lake
# We want clean, structured data in our lake.
datalake_payload = {
"transaction_id": transaction_id,
"entity_id": entity_id,
"committed_at": record.get("dynamodb", {}).get("ApproximateCreationDateTime"),
"new_data": new_image.get("data", {}).get("M", {}) # Simplified unmarshalling
}
# Kinesis Firehose expects data to be base64 encoded bytes.
records_for_datalake.append({
"Data": json.dumps(datalake_payload).encode('utf-8')
})
# 2. Push notification via WebSocket
# In a real app, you would look up the connectionId based on the entity or user.
# Here we assume it's passed in the transaction for simplicity.
# This is a placeholder as we don't have the connectionId.
# connection_id = lookup_connection_id(entity_id)
# if connection_id:
# send_websocket_message(connection_id, {"status": "updated", "entityId": entity_id})
# Send batch to Kinesis Firehose for efficiency
if records_for_datalake:
try:
firehose_client.put_record_batch(
DeliveryStreamName=FIREHOSE_STREAM_NAME,
Records=records_for_datalake
)
logger.info(f"Successfully sent {len(records_for_datalake)} records to Firehose.")
except Exception as e:
logger.exception("Failed to send records to Kinesis Firehose.")
# Handle partial failures and retries if necessary.
def send_websocket_message(connection_id, message):
"""Sends a message to a specific WebSocket connection."""
try:
apigateway_management_client.post_to_connection(
ConnectionId=connection_id,
Data=json.dumps(message).encode('utf-8')
)
except apigateway_management_client.exceptions.GoneException:
logger.warning(f"Connection {connection_id} is no longer available.")
# Clean up stale connection IDs from your database.
except Exception as e:
logger.exception(f"Failed to post to WebSocket connection {connection_id}.")
架构的局限性与未来迭代路径
此架构虽然解决了核心问题,但在生产环境中部署前,必须正视其固有的复杂性和边界。
首先,协调器的健壮性是系统的关键。当前简单的 Lambda 实现中,如果协调器在更新事务状态为 COMMITTING 后、发送所有 commit 指令前崩溃,系统将进入不确定状态。参与者永远不会收到提交指令,它们的锁最终会超时,但事务的全局状态却是 COMMITTING。一个更健壮的实现应该使用 AWS Step Functions 来编排这个流程。Step Functions 提供了持久化的状态机、内置的重试和错误处理逻辑,能极大地简化协调器的容错设计。
其次,参与者的幂等性至关重要。commit 和 abort 操作必须可以安全地重试。我们的 commit 实现通过条件更新保证了这一点,但 abort 逻辑(未在代码中详述)也需要同样的设计,例如,只有当锁状态为 PREPARED 时才执行回滚。
再者,需要一个独立的恢复/清理机制。一个后台任务(例如,一个每小时运行一次的 EventBridge 规则触发的 Lambda)应该扫描 transactions 表和参与者数据表。它的职责是查找超时的 PREPARING 状态的事务,并主动触发 abort 流程;同时,它也需要处理那些卡在 COMMITTING 状态的事务,重新向未完成的参与者发送 commit 指令。
最后,这个模式的性能开销不容忽视。每一次事务都涉及大量的 Lambda 调用和 DynamoDB 操作,延迟远高于单次数据库事务。因此,它只适用于那些对数据一致性有强要求、但对延迟不极端敏感的关键业务流程,绝不能滥用于所有需要更新多个数据项的场景。