基于 OpenTelemetry 实现从 Webpack 到 Celery 的 DDD 领域感知型链路追踪


日志里躺着一条冰冷的错误记录:process_shipment_task[1a2b3c4d-5e6f-7a8b-9c0d-1e2f3a4b5c6d] failed: Third-party logistics API timeout. 这是我们物流限界上下文中一个核心的 Celery 异步任务。用户报告说他的订单在支付后一直停滞在“处理中”状态。这条日志是我们唯一的线索,但它几乎毫无用处。

这个任务是由哪个用户的哪个HTTP请求触发的?请求的完整参数是什么?在调用第三方API之前,系统内部耗时多久?数据库操作是否缓慢?我们一无所知。系统就像一个黑盒,特别是在同步的 Web 请求与异步的后台任务之间,存在着一道巨大的信息鸿沟。在真实项目中,这种鸿沟是定位和解决复杂问题的最大障碍。我们需要打通它。

初步构想:引入分布式追踪

我们的目标是建立一个统一的视图,将前端的用户操作、后端的 API 处理、以及 Celery 任务的执行串联成一个完整的调用链。分布式追踪是解决这个问题的标准答案。技术选型上,我们决定采用 OpenTelemetry (OTel),因为它是一个开放标准,与供应商解耦,并且在 Python 和 JavaScript生态中都有成熟的实现。

初步计划分为三个阶段:

  1. 后端打通:首先解决最棘手的问题,实现 Django Web 服务与 Celery worker 之间的追踪上下文传递。
  2. 前端接入:将追踪的起点延伸至用户浏览器,覆盖由 Webpack 打包的前端应用。
  3. 领域丰富:将追踪数据与我们的 Domain-Driven Design (DDD) 模型结合,让 Span 不仅仅包含技术细节,更能反映业务语义。

第一步:打通 Django 与 Celery 的任督二脉

在复杂的系统中,一个常见的错误是认为只要各自引入了 OTel 库就能自动连接。现实是,跨进程、特别是跨消息队列的上下文传递需要精确的配置。

1.1 基础环境与依赖

我们从一个简化的 Django + Celery 项目结构开始。

# requirements.txt
django
celery
redis
opentelemetry-api
opentelemetry-sdk
opentelemetry-exporter-otlp-proto-http

# OTel 自动埋点库
opentelemetry-instrumentation-django
opentelemetry-instrumentation-celery
opentelemetry-instrumentation-redis
opentelemetry-instrumentation-requests

1.2 错误的开端:各自为政的初始化

很多团队一开始会这样做:在 Django 的 settings.py 里初始化 OTel,然后在 Celery 的 celery.py 里再初始化一次。

# settings.py - 错误的示范
# ...
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter

provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(provider)

# DjangoDstrumentor().instrument()
# ...
# celery.py - 错误的示范
# ...
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
# ... 重复的初始化代码 ...
trace.set_tracer_provider(TracerProvider())

# CeleryInstrumentor().instrument()
# ...

这么做会导致 Django Web 进程和 Celery Worker 进程拥有各自独立的 TracerProvider 实例。它们会各自生成追踪数据,但无法关联起来。你会看到两条独立的 trace,一条是 HTTP 请求,另一条是 Celery 任务执行,它们之间没有任何父子关系。

1.3 正确的姿势:集中化配置与延迟加载

OTel 的初始化应该在项目启动时执行一次,并确保所有进程(Web 和 Worker)都能共享相同的配置。我们创建一个专门的 observability.py 模块来处理这件事。

# my_project/observability.py

import logging
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio

# 引入所有需要自动埋点的库
from opentelemetry.instrumentation.django import DjangoInstrumentor
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor


def setup_observability():
    """
    配置 OpenTelemetry,用于整个应用 (Django + Celery)
    """
    # 这里的 service.name 对于数据聚合至关重要
    resource = Resource(attributes={
        "service.name": "my-ddd-app",
        "service.version": "0.1.0",
    })

    # 在生产环境中,我们会使用更复杂的采样策略
    # ParentBasedTraceIdRatio 意味着如果一个请求被采样了,它的所有子 Span 都会被采样
    sampler = ParentBasedTraceIdRatio(0.1)  # 采样 10% 的根踪迹

    provider = TracerProvider(resource=resource, sampler=sampler)

    # 我们使用 OTLP Exporter 将数据发送到收集器 (e.g., Jaeger, OpenTelemetry Collector)
    # 这里的 endpoint 需要指向你的 OTel Collector 或兼容的后端地址
    # 为了演示,我们也可以用 ConsoleSpanExporter()
    otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4318/v1/traces")
    
    processor = BatchSpanProcessor(otlp_exporter)
    provider.add_span_processor(processor)

    # 设置全局的 Tracer Provider
    trace.set_tracer_provider(provider)

    # 执行自动埋点
    # 这里的坑在于:必须在 Django app 和 Celery app 加载前执行 instrument()
    logging.info("Applying OpenTelemetry instrumentations...")
    DjangoInstrumentor().instrument()
    CeleryInstrumentor().instrument()
    RedisInstrumentor().instrument()
    RequestsInstrumentor().instrument() # 用于追踪对外的 HTTP 调用,比如第三方物流 API
    logging.info("OpenTelemetry instrumentations applied.")

现在,我们需要在 Django 和 Celery 的入口处调用这个函数。

manage.pymy_project/wsgi.py 的顶部加入:

# manage.py / wsgi.py
from my_project.observability import setup_observability
setup_observability() # 在任何 Django 核心代码加载前调用

my_project/celery.py 中,我们需要利用 Celery 的信号机制来确保在 worker 启动时进行初始化。

# my_project/celery.py
import os
from celery import Celery
from celery.signals import worker_process_init
from my_project.observability import setup_observability

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_project.settings')

@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
    """
    Celery worker 进程启动时调用,确保每个 worker 都配置了 OTel
    """
    setup_observability()

app = Celery('my_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

这个改动是决定性的。CeleryInstrumentor 会自动修补 Celery 的任务发布和执行逻辑。当 Django 进程调用 .delay().apply_async() 时,它会:

  1. 从当前活跃的 Span 中提取追踪上下文 (trace id, span id)。
  2. 将上下文注入到 Celery 任务消息的 headers 中(遵循 W3C Trace Context 规范)。
  3. 当 Celery worker 从 Broker (如 Redis) 中获取到这个消息时,CeleryInstrumentor 会解析 headers
  4. 它会提取出父级的追踪上下文,并以此为基础创建一个新的子 Span,从而将调用链完美地连接起来。
sequenceDiagram
    participant Django as Django Web Process
    participant Broker as Message Broker (Redis)
    participant Celery as Celery Worker Process

    Django->>Django: 接收 HTTP 请求, OTel 创建 Root Span (Span A)
    Django->>+Broker: task.delay(), OTel 将 Span A 上下文注入消息头
    Broker-->>-Celery: 投递任务消息
    Celery->>Celery: 接收消息, OTel 从消息头提取 Span A 上下文
    Celery->>Celery: 创建子 Span (Span B), parent=Span A
    Note right of Celery: 执行任务逻辑
    Celery->>Celery: 任务完成, 关闭 Span B

第二步:将追踪起点延伸至 Webpack 构建的前端

一个完整的用户旅程始于浏览器。我们需要从那里开始追踪。

2.1 前端 OTel 配置

我们使用 React 作为例子,但原理适用于任何现代前端框架。

npm install @opentelemetry/api @opentelemetry/sdk-trace-web \
@opentelemetry/context-zone \
@opentelemetry/instrumentation-fetch \
@opentelemetry/exporter-trace-otlp-http \
@opentelemetry/propagator-w3c

创建一个 tracing.js 文件来集中管理前端的 OTel 初始化。

// src/tracing.js
import { WebTracerProvider } from '@opentelemetry/sdk-trace-web';
import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { W3CTraceContextPropagator } from '@opentelemetry/propagator-w3c';
import { ZoneContextManager } from '@opentelemetry/context-zone';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import { FetchInstrumentation } from '@opentelemetry/instrumentation-fetch';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';

const resource = new Resource({
  [SemanticResourceAttributes.SERVICE_NAME]: 'my-ddd-app-frontend',
});

const provider = new WebTracerProvider({ resource });

// 使用 OTLP Exporter 将数据发送到 Collector
const exporter = new OTLPTraceExporter({
  url: 'http://localhost:4318/v1/traces', // Collector 地址
});

provider.addSpanProcessor(new BatchSpanProcessor(exporter, {
  // 生产环境建议调整这些参数
  maxQueueSize: 100,
  maxExportBatchSize: 10,
  scheduledDelayMillis: 500,
}));

// 为了在异步回调中维持上下文,ZoneContextManager 是必须的
provider.register({
  contextManager: new ZoneContextManager(),
  propagator: new W3CTraceContextPropagator(),
});

// 注册自动埋点
registerInstrumentations({
  instrumentations: [
    // FetchInstrumentation 会自动为所有 fetch 请求添加追踪头
    new FetchInstrumentation({
      propagateTraceHeaderCorsUrls: [
        // 必须将你的后端 API 地址加入 CORS 列表
        new RegExp('http://localhost:8000/api/.*'),
      ],
    }),
  ],
});

然后在你的应用入口文件(如 index.js)的顶端导入它。

// src/index.js
import './tracing'; // 确保这是第一个导入
import React from 'react';
import ReactDOM from 'react-dom';
import App from './App';

ReactDOM.render(<App />, document.getElementById('root'));

现在,当你的 React 组件发起一个 API 调用时:

// src/components/OrderForm.js
import React, { useState } from 'react';

function OrderForm() {
  const [status, setStatus] = useState('');

  const handleSubmit = async () => {
    setStatus('Processing...');
    try {
      const response = await fetch('/api/shipping/request_shipment/', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ orderId: 'ORD-12345' }),
      });
      if (response.ok) {
        setStatus('Shipment requested successfully!');
      } else {
        setStatus('Failed to request shipment.');
      }
    } catch (error) {
      setStatus('Network error.');
    }
  };

  return (
    <div>
      <button onClick={handleSubmit}>Request Shipment</button>
      <p>Status: {status}</p>
    </div>
  );
}

FetchInstrumentation 会拦截这次 fetch 调用,自动创建一个 Span,并将 W3C traceparenttracestate HTTP 头注入请求中。Django 端的 DjangoInstrumentor 会自动识别这些头,并将服务端的 Span 作为前端 Span 的子节点,从而实现了端到端的链路串联。

第三步:注入 DDD 领域知识,让追踪数据开口说话

到目前为止,我们已经有了一个完整的技术链路,但 Span 的名字(如 POST /api/shipping/request_shipment/)和属性(如 http.status_code)仍然是纯技术的。这对于 SRE 工程师排查网络或代码问题很有用,但对于理解业务流程的瓶颈却帮助有限。

在我们的 DDD 设计中,一个 API 请求通常对应一个应用服务(Application Service)的调用,处理一个命令(Command)。我们希望追踪数据能反映这一点。

3.1 定义一个领域感知的装饰器

我们创建一个 Python 装饰器,用于包裹应用服务的方法。这个装饰器会从当前的 Span 中获取信息,并用领域概念来丰富它。

# shipping_context/application/decorators.py
import functools
from opentelemetry import trace
from typing import Type

# 一个简单的命令基类
class Command:
    pass

def domain_trace(command_class: Type[Command]):
    """
    一个用于应用服务方法的装饰器,用于丰富 OTel Span
    """
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 获取当前的 Span,这是由 DjangoInstrumentor 自动创建的
            tracer = trace.get_tracer(__name__)
            current_span = trace.get_current_span()

            if not current_span.is_recording():
                # 如果没有采样,直接返回
                return func(*args, **kwargs)

            # 1. 更新 Span 的名字,使其更具业务含义
            span_name = f"EXECUTE {command_class.__name__}"
            current_span.update_name(span_name)
            
            # 2. 从 self (应用服务实例) 和 command 对象中提取领域属性
            # args[0] 应该是 'self',args[1] 应该是 command 对象
            app_service = args[0]
            command = args[1]

            # 假设我们的应用服务有 `bounded_context` 属性
            bounded_context = getattr(app_service, 'bounded_context', 'unknown')
            
            # 3. 添加自定义的、具有业务价值的属性
            current_span.set_attribute("ddd.bounded_context", bounded_context)
            current_span.set_attribute("ddd.command.type", command_class.__name__)

            # 将命令的所有字段作为属性添加,注意处理敏感数据
            for key, value in command.__dict__.items():
                if "password" not in key and "token" not in key:
                    current_span.set_attribute(f"ddd.command.payload.{key}", str(value))
            
            try:
                result = func(*args, **kwargs)
                return result
            except Exception as e:
                # OTel 默认会记录异常,但我们可以添加更多上下文
                current_span.set_attribute("ddd.execution.error", True)
                current_span.set_attribute("ddd.execution.error_type", e.__class__.__name__)
                raise
        return wrapper
    return decorator

3.2 在应用服务中使用装饰器

现在,我们将这个装饰器应用到我们的应用服务上。

# shipping_context/application/services.py
from .commands import RequestShipmentCommand
from .decorators import domain_trace
from ..domain.repositories import ShipmentRepository
from ..tasks import process_shipment_task

class ShippingApplicationService:
    bounded_context = "shipping"

    def __init__(self, repository: ShipmentRepository):
        self.repository = repository

    @domain_trace(RequestShipmentCommand)
    def request_shipment(self, command: RequestShipmentCommand):
        """
        处理请求发货的命令
        """
        # ... 业务逻辑 ...
        # 1. 创建并保存 Shipment 聚合
        shipment = self.repository.create_for_order(command.order_id)
        
        # 2. 为当前 Span 添加聚合 ID,这对于后续问题排查至关重要
        current_span = trace.get_current_span()
        current_span.set_attribute("ddd.aggregate.type", "Shipment")
        current_span.set_attribute("ddd.aggregate.id", str(shipment.id))

        # 3. 触发异步任务
        # OTel 上下文会自动传递到这个任务中
        process_shipment_task.delay(shipment_id=str(shipment.id))

        return shipment.id

3.3 Celery 任务中的领域上下文

同样地,我们也应该在 Celery 任务中添加领域上下文。

# shipping_context/tasks.py
from my_project.celery import app
from opentelemetry import trace

@app.task(name="process_shipment_task")
def process_shipment_task(shipment_id: str):
    """
    一个异步任务,用于与第三方物流 API 交互
    """
    # 这里的 Span 是由 CeleryInstrumentor 自动创建的,并且是 request_shipment Span 的子节点
    current_span = trace.get_current_span()
    current_span.set_attribute("ddd.aggregate.type", "Shipment")
    current_span.set_attribute("ddd.aggregate.id", shipment_id)
    current_span.set_attribute("ddd.bounded_context", "shipping")
    
    # ... 从数据库加载 shipment 聚合 ...
    # ... 调用第三方物流 API (这个调用会被 RequestsInstrumentor 自动追踪) ...
    # ... 更新 shipment 状态 ...

经过这些改造,我们的追踪数据变得异常强大。当排查最初那个“API 超时”问题时,我们能在一个链路视图中看到:

  1. 前端: Click: Request Shipment Button -> FETCH POST /api/shipping/request_shipment/
  2. Django: EXECUTE RequestShipmentCommand (包含 order_id 等所有载荷)
    • ddd.bounded_context: shipping
    • ddd.aggregate.id: <shipment_id>
  3. Celery Worker: process_shipment_task (作为 EXECUTE RequestShipmentCommand 的子 Span)
    • ddd.aggregate.id: <shipment_id>
    • 子Span: POST https://api.logistics.com/create (由 RequestsInstrumentor 创建)
      • 这个 Span 会显示具体的超时错误和耗时。

我们不仅能立即定位到是第三方 API 调用超时,还能精确关联到是哪个用户、哪个订单、哪个发货聚合根触发的,整个业务流程的性能瓶颈和错误点一目了然。

最终成果与局限性

我们成功地构建了一套端到端的、领域感知型的分布式追踪系统。它贯穿了由 Webpack 构建的前端、Django 后端以及 Celery 异步任务,解决了跨进程、跨异步边界的上下文传递这一核心难题。通过将 DDD 的领域概念注入追踪数据,我们让可观测性数据不再是冰冷的技术指标,而是能够直接反映业务状态的有力工具。

然而,当前的方案也存在一些局限和可优化的方向:

  • 采样策略: 我们使用了简单的基于比例的采样。在高流量系统中,这可能会丢失掉一些稀有的、但很重要的错误链路。引入尾部采样(Tail-based Sampling)会是下一步的重点,它允许我们在整个链路完成后再决定是否保留,从而能100%捕获所有包含错误的链路。
  • 指标与日志的关联: 目前我们只关注了 Traces。一个完整的可观测性平台需要将 Traces、Metrics 和 Logs 关联起来。下一步应该在结构化日志中自动包含 trace_idspan_id,并基于领域事件发出关键业务指标(例如,使用 Prometheus),从而实现三者的互查。
  • 手动埋点成本: 虽然自动埋点覆盖了大部分场景,但对于复杂的业务逻辑内部,仍需要开发者通过 domain_trace 这样的装饰器或手动创建 Span 来增加上下文。这需要在团队内部形成一种“可观测性驱动开发”的文化,将埋点视为功能开发的一部分,而非事后的补救措施。
  • 前端追踪深度: 当前前端追踪仅覆盖了 API 请求。对于复杂的单页应用,追踪用户交互、组件渲染性能等可以提供更完整的用户体验视图,这需要引入更多的前端专用埋点库。

  目录