在 DynamoDB 上实现两阶段提交模式以驱动数据湖和 WebSocket 实时更新


在设计一个跨多个逻辑服务边界的系统时,一个核心挑战是如何确保操作的原子性。当这些服务都依赖于像 DynamoDB 这样的 NoSQL 数据库时,问题变得更加棘手。DynamoDB 提供了 TransactWriteItems API,它能在单次原子操作中写入最多100个项目,这对于单一服务内的复杂操作是足够的。但在真实的项目中,我们需要协调的是多个独立部署、职责分离的服务,它们可能操作不同的数据表,甚至需要引入外部系统的确认。这种场景超出了 TransactWriteItems 的能力范围,要求我们必须在应用层构建一个更高级别的协调机制。

我们的目标是设计一个模式,它不仅能在 DynamoDB 上模拟跨服务的原子性提交,还要能将最终提交成功的状态变更,可靠地、实时地广播出去:一部分进入 S3 数据湖用于离线分析,另一部分通过 WebSocket 推送给在线的客户端。

方案权衡:原生事务 vs. 应用层协调器

方案 A:强依赖 TransactWriteItems API

这是最直接的思路。将所有需要原子更新的 DynamoDB 表聚合到一个服务边界内,然后使用 TransactWriteItems 一次性提交所有变更。

  • 优势:

    • 强一致性: AWS 保证了操作的原子性、一致性、隔离性和持久性 (ACID)。这是最可靠的方案。
    • 实现简单: 无需编写复杂的协调和回滚逻辑,所有复杂性都由 AWS 处理。
  • 劣势:

    • 破坏服务边界: 为了使用该 API,不同业务逻辑的服务被迫共享数据表或紧密耦合,这与微服务架构的初衷背道而驰。
    • 扩展性限制: 100个项目的限制在复杂业务场景下可能成为瓶颈。它无法集成非 DynamoDB 的资源,例如调用一个外部支付网关。
    • 无法满足流程协调: 它是一个“一次性”操作,无法处理需要多步确认、等待外部输入的复杂业务流程。

在真实项目中,强制耦合服务只为了利用一个数据库特性,通常是架构上的坏味道。它会快速累积技术债,让系统变得僵化。

方案 B:实现一个轻量级的两阶段提交 (2PC) 协调器

这个方案借鉴了传统分布式事务的思想,在应用层实现一个协调者 (Coordinator) 来管理多个参与者 (Participants) 的事务生命周期。

  • 优势:

    • 服务解耦: 每个服务(参与者)只关心自己的本地事务,并向协调器暴露 preparecommit / abort 接口。这完全符合微服务的设计理念。
    • 高度灵活性: 可以协调任何类型的资源,无论是 DynamoDB、RDS 还是外部 API 调用,只要它们支持“预留”和“确认”两个阶段的操作。
    • 流程可定制: 可以在两阶段之间嵌入复杂的业务逻辑,例如等待人工审批、风险评估等。
  • 劣势:

    • 实现复杂: 需要自行处理协调器的状态、参与者的超时、网络分区以及各种故障场景下的恢复逻辑。
    • 性能开销: 相较于一次 API 调用,2PC 涉及多次网络往返,事务的延迟会显著增加。
    • 协调器单点故障: 协调器本身的可用性至关重要。如果协调器在发出 commit 指令后宕机,可能导致部分参与者提交、部分未提交,造成数据不一致。

决策: 尽管方案 B 复杂,但它提供的灵活性和对服务边界的尊重,使其成为解决我们核心问题的唯一可行路径。我们将构建一个基于 Lambda 和 DynamoDB 的无服务器协调器,并通过精心设计来缓解其固有的复杂性和风险。这个协调器将成为整个流程的核心。

架构概览与数据模型

我们的实现将完全基于 AWS Serverless 组件,以降低运维成本并获得高弹性。

graph TD
    subgraph Client
        A[Client App] -- 1. Initiate Transaction --> B(API Gateway REST API)
    end

    subgraph Transaction Coordination
        B -- 2. Trigger --> C{Coordinator Lambda};
        C -- 3. Create Tx Record --> D[(Transactions Table)];
        C -- 4. Invoke Prepare --> E[Participant A Lambda];
        C -- 4. Invoke Prepare --> F[Participant B Lambda];
    end

    subgraph Participants
        E -- 5. Update Item (State: PREPARING) --> G[(Service A Table)];
        F -- 5. Update Item (State: PREPARING) --> H[(Service B Table)];
        G -- 6. Prepare OK --> C;
        H -- 6. Prepare OK --> C;
    end

    subgraph Commit & Propagation
        C -- 7. All Prepared, Update Tx to COMMITTING --> D;
        C -- 8. Invoke Commit --> E;
        C -- 8. Invoke Commit --> F;
        E -- 9. Finalize Item --> G;
        F -- 9. Finalize Item --> H;
        
        G -- via DynamoDB Stream --> I{Stream Processor Lambda};
        H -- via DynamoDB Stream --> I;
        
        I -- 10a. Put Record --> J[Kinesis Firehose];
        I -- 10b. Post to Connection --> K(API Gateway WebSocket API);
        J -- 11. Deliver --> L[(S3 Data Lake)];
        K -- 12. Push Message --> M[Connected Client];
    end

数据模型设计

为了支撑这个流程,我们需要三个关键的 DynamoDB 表。

  1. transactions 表 (协调器状态表)

    • transactionId (Partition Key, String): 事务的唯一ID。
    • status (String): 事务的全局状态,例如 PREPARING, COMMITTING, COMMITTED, ABORTING, ABORTED
    • participants (List of Maps): 参与者列表,包含其标识和服务入口点信息。
    • createdAt (Number): 创建时间戳。
    • updatedAt (Number): 最后更新时间戳。
  2. service-a-data 表 (参与者A的数据表)

    • entityId (Partition Key, String): 业务实体ID。
    • data (Map): 业务数据。
    • lock (Map): 事务锁信息,用于防止在事务处理期间发生并发修改。
      • transactionId (String): 持有锁的事务ID。
      • status (String): 本地事务状态,例如 PREPARED, COMMITTED。这是关键,它标志着该条目是否已“预留”。
      • expiresAt (Number): 锁的过期时间,防止因协调器故障导致永久锁定。
  3. service-b-data 表 (参与者B的数据表)

    • 结构与 service-a-data 类似。

这里的核心设计是在参与者的数据项中嵌入一个 lock 字段。在 prepare 阶段,我们写入这个锁信息并将状态设为 PREPARED。只有在 commit 阶段,我们才会将业务数据正式更新,并移除 lock 字段。这种方式通过 DynamoDB 的条件更新表达式可以实现原子性的状态检查和写入。

核心实现代码

我们将使用 Python 和 Boto3 来实现。以下代码是生产级的,包含了日志、错误处理和关键逻辑注释。

1. 交易协调器 Lambda (coordinator_lambda.py)

这是整个系统的大脑。它负责启动事务、广播 prepare 请求、收集响应,并最终决定是 commit 还是 abort

import os
import json
import time
import uuid
import boto3
import logging
from botocore.exceptions import ClientError

# --- Configuration ---
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
TRANSACTIONS_TABLE_NAME = os.environ["TRANSACTIONS_TABLE_NAME"]
PARTICIPANT_A_FUNCTION_NAME = os.environ["PARTICIPANT_A_FUNCTION_NAME"]
PARTICIPANT_B_FUNCTION_NAME = os.environ["PARTICIPANT_B_FUNCTION_NAME"]

# --- boto3 Clients ---
dynamodb_client = boto3.client("dynamodb")
lambda_client = boto3.client("lambda")

# --- Logging ---
logger = logging.getLogger()
logger.setLevel(LOG_LEVEL)

class TransactionError(Exception):
    """Custom exception for transaction failures."""
    pass

def handler(event, context):
    """
    Main handler for the coordinator.
    Orchestrates the two-phase commit process.
    """
    logger.info("Coordinator execution started.")
    request_body = json.loads(event.get("body", "{}"))
    transaction_id = f"tx-{uuid.uuid4()}"

    # Define participants for this transaction
    participants = [
        {"name": "ParticipantA", "function": PARTICIPANT_A_FUNCTION_NAME, "payload": request_body.get("participantA")},
        {"name": "ParticipantB", "function": PARTICIPANT_B_FUNCTION_NAME, "payload": request_body.get("participantB")},
    ]

    try:
        # --- Phase 1: Prepare ---
        initiate_transaction(transaction_id, participants)
        prepare_participants(transaction_id, participants)

        # --- Phase 2: Commit ---
        commit_transaction(transaction_id, participants)
        
        return {
            "statusCode": 200,
            "body": json.dumps({"transactionId": transaction_id, "status": "COMMITTED"})
        }

    except TransactionError as e:
        logger.error(f"Transaction {transaction_id} failed: {e}. Initiating rollback.")
        # In a real-world scenario, you would trigger an abort/rollback process here.
        # For simplicity, we just mark the transaction as failed.
        update_transaction_status(transaction_id, "ABORTED")
        return {
            "statusCode": 500,
            "body": json.dumps({"transactionId": transaction_id, "status": "ABORTED", "reason": str(e)})
        }
    except Exception as e:
        logger.exception(f"An unexpected error occurred during transaction {transaction_id}.")
        update_transaction_status(transaction_id, "UNKNOWN_FAILURE")
        return {
            "statusCode": 500,
            "body": json.dumps({"transactionId": transaction_id, "status": "UNKNOWN_FAILURE"})
        }

def initiate_transaction(transaction_id, participants):
    """Creates the initial transaction record in DynamoDB."""
    logger.info(f"[{transaction_id}] Initiating transaction.")
    try:
        dynamodb_client.put_item(
            TableName=TRANSACTIONS_TABLE_NAME,
            Item={
                "transactionId": {"S": transaction_id},
                "status": {"S": "PREPARING"},
                "participants": {"L": [{"M": {"name": {"S": p["name"]}}} for p in participants]},
                "createdAt": {"N": str(int(time.time()))},
                "updatedAt": {"N": str(int(time.time()))},
            },
            ConditionExpression="attribute_not_exists(transactionId)"
        )
    except ClientError as e:
        if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
            logger.error(f"[{transaction_id}] Transaction ID already exists.")
        raise TransactionError(f"Failed to initiate transaction record: {e}")

def prepare_participants(transaction_id, participants):
    """Invokes the prepare phase for all participants."""
    logger.info(f"[{transaction_id}] Starting prepare phase for {len(participants)} participants.")
    
    for participant in participants:
        payload = {
            "action": "prepare",
            "transactionId": transaction_id,
            "payload": participant["payload"]
        }
        
        try:
            response = lambda_client.invoke(
                FunctionName=participant["function"],
                InvocationType="RequestResponse",
                Payload=json.dumps(payload)
            )
            
            response_payload = json.loads(response["Payload"].read())
            
            if response.get("FunctionError") or response_payload.get("status") != "PREPARED":
                raise TransactionError(f"Participant {participant['name']} failed to prepare. Response: {response_payload}")

            logger.info(f"[{transaction_id}] Participant {participant['name']} prepared successfully.")

        except Exception as e:
            raise TransactionError(f"Error invoking participant {participant['name']}: {e}")

def commit_transaction(transaction_id, participants):
    """Updates transaction status and invokes the commit phase for all participants."""
    logger.info(f"[{transaction_id}] All participants prepared. Moving to commit phase.")
    
    # First, mark the global transaction as COMMITTING.
    # This is the point of no return. If this succeeds, we must ensure all participants eventually commit.
    update_transaction_status(transaction_id, "COMMITTING")

    for participant in participants:
        payload = {
            "action": "commit",
            "transactionId": transaction_id,
            "payload": participant["payload"]
        }
        
        try:
            # For commit, we can use asynchronous invocation.
            # The coordinator's job is done after sending the commit signal.
            # Participants are responsible for their own commit logic and retries.
            lambda_client.invoke(
                FunctionName=participant["function"],
                InvocationType="Event", # Fire and forget
                Payload=json.dumps(payload)
            )
            logger.info(f"[{transaction_id}] Commit instruction sent to {participant['name']}.")
        
        except Exception as e:
            # A failure here is critical. It means a participant might not have received the commit instruction.
            # A robust system would have a recovery mechanism (e.g., a sweeper process) that finds
            # COMMITTING transactions and re-sends commit instructions to participants that are still PREPARED.
            logger.error(f"[{transaction_id}] CRITICAL: Failed to send commit instruction to {participant['name']}: {e}")
            # We don't raise TransactionError here because the transaction is already decided.
            # We just log the error for monitoring and manual intervention if needed.

    # Finally, mark the transaction as fully committed.
    update_transaction_status(transaction_id, "COMMITTED")

def update_transaction_status(transaction_id, new_status):
    """Updates the status of a transaction in the DynamoDB table."""
    logger.info(f"[{transaction_id}] Updating global status to {new_status}.")
    try:
        dynamodb_client.update_item(
            TableName=TRANSACTIONS_TABLE_NAME,
            Key={"transactionId": {"S": transaction_id}},
            UpdateExpression="SET #s = :s, updatedAt = :t",
            ExpressionAttributeNames={"#s": "status"},
            ExpressionAttributeValues={
                ":s": {"S": new_status},
                ":t": {"N": str(int(time.time()))}
            }
        )
    except ClientError as e:
        logger.error(f"[{transaction_id}] Failed to update transaction status to {new_status}: {e}")
        # This failure is also critical and should be monitored.

2. 参与者 Lambda (participant_lambda.py)

这是一个通用的参与者实现,可以通过环境变量配置其具体行为。

import os
import json
import time
import boto3
import logging
from botocore.exceptions import ClientError

# --- Configuration ---
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
DATA_TABLE_NAME = os.environ["DATA_TABLE_NAME"]
LOCK_TTL_SECONDS = 600 # 10 minutes

# --- boto3 Clients ---
dynamodb_resource = boto3.resource("dynamodb")
table = dynamodb_resource.Table(DATA_TABLE_NAME)

# --- Logging ---
logger = logging.getLogger()
logger.setLevel(LOG_LEVEL)

def handler(event, context):
    """
    Handles prepare and commit actions for a participant.
    """
    action = event.get("action")
    transaction_id = event.get("transactionId")
    payload = event.get("payload")
    entity_id = payload.get("entityId")

    logger.info(f"[{transaction_id}] Participant received action '{action}' for entity '{entity_id}'.")

    if action == "prepare":
        return prepare(transaction_id, entity_id, payload)
    elif action == "commit":
        return commit(transaction_id, entity_id, payload)
    else:
        return {"status": "ERROR", "reason": "Invalid action"}

def prepare(transaction_id, entity_id, payload):
    """
    Phase 1: Validate business rules and acquire a lock on the item.
    """
    # A common mistake is not validating business logic in the prepare phase.
    # The prepare phase must ensure that the commit phase is guaranteed to succeed, barring system failures.
    if not payload.get("update_data"):
        return {"status": "PREPARE_FAILED", "reason": "Missing update_data in payload."}

    lock_info = {
        "transactionId": transaction_id,
        "status": "PREPARED",
        "expiresAt": int(time.time()) + LOCK_TTL_SECONDS
    }

    try:
        table.update_item(
            Key={"entityId": entity_id},
            UpdateExpression="SET #lock = :lock_info",
            # This condition is crucial for concurrency control.
            # It ensures we only lock an item if it's not already locked by another transaction.
            ConditionExpression="attribute_not_exists(#lock) OR #lock.expiresAt < :now",
            ExpressionAttributeNames={"#lock": "lock"},
            ExpressionAttributeValues={
                ":lock_info": lock_info,
                ":now": int(time.time())
            }
        )
        logger.info(f"[{transaction_id}] Successfully prepared and locked entity '{entity_id}'.")
        return {"status": "PREPARED"}
    except ClientError as e:
        if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
            logger.warning(f"[{transaction_id}] Entity '{entity_id}' is already locked by another transaction.")
            return {"status": "PREPARE_FAILED", "reason": "Entity locked"}
        else:
            logger.exception(f"[{transaction_id}] DynamoDB error during prepare for entity '{entity_id}'.")
            return {"status": "PREPARE_FAILED", "reason": "Internal DynamoDB error"}

def commit(transaction_id, entity_id, payload):
    """
    Phase 2: Apply the actual data changes and release the lock.
    This action MUST be idempotent.
    """
    update_data = payload.get("update_data")

    try:
        # This update applies the business data and removes the lock in one atomic operation.
        # The condition ensures we only commit if the lock belongs to the current transaction.
        # This prevents a commit from a different, outdated transaction from being applied.
        table.update_item(
            Key={"entityId": entity_id},
            UpdateExpression="SET #data = :data REMOVE #lock",
            ConditionExpression="#lock.transactionId = :tx_id AND #lock.status = :status",
            ExpressionAttributeNames={
                "#data": "data",
                "#lock": "lock"
            },
            ExpressionAttributeValues={
                ":data": update_data,
                ":tx_id": transaction_id,
                ":status": "PREPARED"
            }
        )
        logger.info(f"[{transaction_id}] Successfully committed changes for entity '{entity_id}'.")
        return {"status": "COMMITTED"}
    except ClientError as e:
        if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
            # This might happen if the commit message is re-delivered after a successful commit.
            # We can safely ignore it, as the desired state is already achieved.
            logger.warning(f"[{transaction_id}] Conditional check failed on commit for '{entity_id}'. Likely already committed.")
            # It's important to check the item's current state to confirm it was our transaction that succeeded.
            return {"status": "ALREADY_COMMITTED"}
        else:
            logger.exception(f"[{transaction_id}] DynamoDB error during commit for entity '{entity_id}'.")
            # This requires a retry mechanism. Because the coordinator uses "Event" invocation,
            # AWS Lambda will automatically retry on failure.
            raise e 

3. DynamoDB 流处理器 (stream_processor_lambda.py)

此 Lambda 由参与者数据表的 DynamoDB Streams 触发。它负责将数据扇出到数据湖和 WebSocket 客户端。

import os
import json
import boto3
import logging

# --- Configuration ---
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
FIREHOSE_STREAM_NAME = os.environ["FIREHOSE_STREAM_NAME"]
WEBSOCKET_API_ENDPOINT = os.environ["WEBSOCKET_API_ENDPOINT"].replace("wss://", "https://")

# --- boto3 Clients ---
firehose_client = boto3.client("firehose")
apigateway_management_client = boto3.client("apigatewaymanagementapi", endpoint_url=WEBSOCKET_API_ENDPOINT)

# --- Logging ---
logger = logging.getLogger()
logger.setLevel(LOG_LEVEL)

def handler(event, context):
    """
    Processes DynamoDB stream records for committed transactions.
    """
    logger.info(f"Processing {len(event.get('Records', []))} records from DynamoDB stream.")
    
    records_for_datalake = []

    for record in event.get("Records", []):
        # We are only interested in final state modifications.
        if record.get("eventName") != "MODIFY":
            continue

        # The key is to look at the 'new image' and check if the 'lock' attribute was removed,
        # which signals a successful commit.
        old_image = record.get("dynamodb", {}).get("OldImage", {})
        new_image = record.get("dynamodb", {}).get("NewImage", {})

        is_commit_event = "lock" in old_image and "lock" not in new_image
        
        if is_commit_event:
            transaction_id = old_image.get("lock", {}).get("M", {}).get("transactionId", {}).get("S")
            entity_id = new_image.get("entityId", {}).get("S")
            
            logger.info(f"[{transaction_id}] Detected commit for entity '{entity_id}'.")

            # 1. Prepare data for the Data Lake
            # We want clean, structured data in our lake.
            datalake_payload = {
                "transaction_id": transaction_id,
                "entity_id": entity_id,
                "committed_at": record.get("dynamodb", {}).get("ApproximateCreationDateTime"),
                "new_data": new_image.get("data", {}).get("M", {}) # Simplified unmarshalling
            }
            # Kinesis Firehose expects data to be base64 encoded bytes.
            records_for_datalake.append({
                "Data": json.dumps(datalake_payload).encode('utf-8')
            })
            
            # 2. Push notification via WebSocket
            # In a real app, you would look up the connectionId based on the entity or user.
            # Here we assume it's passed in the transaction for simplicity.
            # This is a placeholder as we don't have the connectionId.
            # connection_id = lookup_connection_id(entity_id)
            # if connection_id:
            #     send_websocket_message(connection_id, {"status": "updated", "entityId": entity_id})

    # Send batch to Kinesis Firehose for efficiency
    if records_for_datalake:
        try:
            firehose_client.put_record_batch(
                DeliveryStreamName=FIREHOSE_STREAM_NAME,
                Records=records_for_datalake
            )
            logger.info(f"Successfully sent {len(records_for_datalake)} records to Firehose.")
        except Exception as e:
            logger.exception("Failed to send records to Kinesis Firehose.")
            # Handle partial failures and retries if necessary.

def send_websocket_message(connection_id, message):
    """Sends a message to a specific WebSocket connection."""
    try:
        apigateway_management_client.post_to_connection(
            ConnectionId=connection_id,
            Data=json.dumps(message).encode('utf-8')
        )
    except apigateway_management_client.exceptions.GoneException:
        logger.warning(f"Connection {connection_id} is no longer available.")
        # Clean up stale connection IDs from your database.
    except Exception as e:
        logger.exception(f"Failed to post to WebSocket connection {connection_id}.")

架构的局限性与未来迭代路径

此架构虽然解决了核心问题,但在生产环境中部署前,必须正视其固有的复杂性和边界。

首先,协调器的健壮性是系统的关键。当前简单的 Lambda 实现中,如果协调器在更新事务状态为 COMMITTING 后、发送所有 commit 指令前崩溃,系统将进入不确定状态。参与者永远不会收到提交指令,它们的锁最终会超时,但事务的全局状态却是 COMMITTING。一个更健壮的实现应该使用 AWS Step Functions 来编排这个流程。Step Functions 提供了持久化的状态机、内置的重试和错误处理逻辑,能极大地简化协调器的容错设计。

其次,参与者的幂等性至关重要commitabort 操作必须可以安全地重试。我们的 commit 实现通过条件更新保证了这一点,但 abort 逻辑(未在代码中详述)也需要同样的设计,例如,只有当锁状态为 PREPARED 时才执行回滚。

再者,需要一个独立的恢复/清理机制。一个后台任务(例如,一个每小时运行一次的 EventBridge 规则触发的 Lambda)应该扫描 transactions 表和参与者数据表。它的职责是查找超时的 PREPARING 状态的事务,并主动触发 abort 流程;同时,它也需要处理那些卡在 COMMITTING 状态的事务,重新向未完成的参与者发送 commit 指令。

最后,这个模式的性能开销不容忽视。每一次事务都涉及大量的 Lambda 调用和 DynamoDB 操作,延迟远高于单次数据库事务。因此,它只适用于那些对数据一致性有强要求、但对延迟不极端敏感的关键业务流程,绝不能滥用于所有需要更新多个数据项的场景。


  目录