日志里躺着一条冰冷的错误记录:process_shipment_task[1a2b3c4d-5e6f-7a8b-9c0d-1e2f3a4b5c6d] failed: Third-party logistics API timeout. 这是我们物流限界上下文中一个核心的 Celery 异步任务。用户报告说他的订单在支付后一直停滞在“处理中”状态。这条日志是我们唯一的线索,但它几乎毫无用处。
这个任务是由哪个用户的哪个HTTP请求触发的?请求的完整参数是什么?在调用第三方API之前,系统内部耗时多久?数据库操作是否缓慢?我们一无所知。系统就像一个黑盒,特别是在同步的 Web 请求与异步的后台任务之间,存在着一道巨大的信息鸿沟。在真实项目中,这种鸿沟是定位和解决复杂问题的最大障碍。我们需要打通它。
初步构想:引入分布式追踪
我们的目标是建立一个统一的视图,将前端的用户操作、后端的 API 处理、以及 Celery 任务的执行串联成一个完整的调用链。分布式追踪是解决这个问题的标准答案。技术选型上,我们决定采用 OpenTelemetry (OTel),因为它是一个开放标准,与供应商解耦,并且在 Python 和 JavaScript生态中都有成熟的实现。
初步计划分为三个阶段:
- 后端打通:首先解决最棘手的问题,实现 Django Web 服务与 Celery worker 之间的追踪上下文传递。
- 前端接入:将追踪的起点延伸至用户浏览器,覆盖由 Webpack 打包的前端应用。
- 领域丰富:将追踪数据与我们的 Domain-Driven Design (DDD) 模型结合,让 Span 不仅仅包含技术细节,更能反映业务语义。
第一步:打通 Django 与 Celery 的任督二脉
在复杂的系统中,一个常见的错误是认为只要各自引入了 OTel 库就能自动连接。现实是,跨进程、特别是跨消息队列的上下文传递需要精确的配置。
1.1 基础环境与依赖
我们从一个简化的 Django + Celery 项目结构开始。
# requirements.txt
django
celery
redis
opentelemetry-api
opentelemetry-sdk
opentelemetry-exporter-otlp-proto-http
# OTel 自动埋点库
opentelemetry-instrumentation-django
opentelemetry-instrumentation-celery
opentelemetry-instrumentation-redis
opentelemetry-instrumentation-requests
1.2 错误的开端:各自为政的初始化
很多团队一开始会这样做:在 Django 的 settings.py 里初始化 OTel,然后在 Celery 的 celery.py 里再初始化一次。
# settings.py - 错误的示范
# ...
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(provider)
# DjangoDstrumentor().instrument()
# ...
# celery.py - 错误的示范
# ...
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
# ... 重复的初始化代码 ...
trace.set_tracer_provider(TracerProvider())
# CeleryInstrumentor().instrument()
# ...
这么做会导致 Django Web 进程和 Celery Worker 进程拥有各自独立的 TracerProvider 实例。它们会各自生成追踪数据,但无法关联起来。你会看到两条独立的 trace,一条是 HTTP 请求,另一条是 Celery 任务执行,它们之间没有任何父子关系。
1.3 正确的姿势:集中化配置与延迟加载
OTel 的初始化应该在项目启动时执行一次,并确保所有进程(Web 和 Worker)都能共享相同的配置。我们创建一个专门的 observability.py 模块来处理这件事。
# my_project/observability.py
import logging
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
# 引入所有需要自动埋点的库
from opentelemetry.instrumentation.django import DjangoInstrumentor
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
def setup_observability():
"""
配置 OpenTelemetry,用于整个应用 (Django + Celery)
"""
# 这里的 service.name 对于数据聚合至关重要
resource = Resource(attributes={
"service.name": "my-ddd-app",
"service.version": "0.1.0",
})
# 在生产环境中,我们会使用更复杂的采样策略
# ParentBasedTraceIdRatio 意味着如果一个请求被采样了,它的所有子 Span 都会被采样
sampler = ParentBasedTraceIdRatio(0.1) # 采样 10% 的根踪迹
provider = TracerProvider(resource=resource, sampler=sampler)
# 我们使用 OTLP Exporter 将数据发送到收集器 (e.g., Jaeger, OpenTelemetry Collector)
# 这里的 endpoint 需要指向你的 OTel Collector 或兼容的后端地址
# 为了演示,我们也可以用 ConsoleSpanExporter()
otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4318/v1/traces")
processor = BatchSpanProcessor(otlp_exporter)
provider.add_span_processor(processor)
# 设置全局的 Tracer Provider
trace.set_tracer_provider(provider)
# 执行自动埋点
# 这里的坑在于:必须在 Django app 和 Celery app 加载前执行 instrument()
logging.info("Applying OpenTelemetry instrumentations...")
DjangoInstrumentor().instrument()
CeleryInstrumentor().instrument()
RedisInstrumentor().instrument()
RequestsInstrumentor().instrument() # 用于追踪对外的 HTTP 调用,比如第三方物流 API
logging.info("OpenTelemetry instrumentations applied.")
现在,我们需要在 Django 和 Celery 的入口处调用这个函数。
在 manage.py 和 my_project/wsgi.py 的顶部加入:
# manage.py / wsgi.py
from my_project.observability import setup_observability
setup_observability() # 在任何 Django 核心代码加载前调用
在 my_project/celery.py 中,我们需要利用 Celery 的信号机制来确保在 worker 启动时进行初始化。
# my_project/celery.py
import os
from celery import Celery
from celery.signals import worker_process_init
from my_project.observability import setup_observability
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_project.settings')
@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
"""
Celery worker 进程启动时调用,确保每个 worker 都配置了 OTel
"""
setup_observability()
app = Celery('my_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
这个改动是决定性的。CeleryInstrumentor 会自动修补 Celery 的任务发布和执行逻辑。当 Django 进程调用 .delay() 或 .apply_async() 时,它会:
- 从当前活跃的 Span 中提取追踪上下文 (trace id, span id)。
- 将上下文注入到 Celery 任务消息的
headers中(遵循 W3C Trace Context 规范)。 - 当 Celery worker 从 Broker (如 Redis) 中获取到这个消息时,
CeleryInstrumentor会解析headers。 - 它会提取出父级的追踪上下文,并以此为基础创建一个新的子 Span,从而将调用链完美地连接起来。
sequenceDiagram
participant Django as Django Web Process
participant Broker as Message Broker (Redis)
participant Celery as Celery Worker Process
Django->>Django: 接收 HTTP 请求, OTel 创建 Root Span (Span A)
Django->>+Broker: task.delay(), OTel 将 Span A 上下文注入消息头
Broker-->>-Celery: 投递任务消息
Celery->>Celery: 接收消息, OTel 从消息头提取 Span A 上下文
Celery->>Celery: 创建子 Span (Span B), parent=Span A
Note right of Celery: 执行任务逻辑
Celery->>Celery: 任务完成, 关闭 Span B
第二步:将追踪起点延伸至 Webpack 构建的前端
一个完整的用户旅程始于浏览器。我们需要从那里开始追踪。
2.1 前端 OTel 配置
我们使用 React 作为例子,但原理适用于任何现代前端框架。
npm install @opentelemetry/api @opentelemetry/sdk-trace-web \
@opentelemetry/context-zone \
@opentelemetry/instrumentation-fetch \
@opentelemetry/exporter-trace-otlp-http \
@opentelemetry/propagator-w3c
创建一个 tracing.js 文件来集中管理前端的 OTel 初始化。
// src/tracing.js
import { WebTracerProvider } from '@opentelemetry/sdk-trace-web';
import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { W3CTraceContextPropagator } from '@opentelemetry/propagator-w3c';
import { ZoneContextManager } from '@opentelemetry/context-zone';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import { FetchInstrumentation } from '@opentelemetry/instrumentation-fetch';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
const resource = new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: 'my-ddd-app-frontend',
});
const provider = new WebTracerProvider({ resource });
// 使用 OTLP Exporter 将数据发送到 Collector
const exporter = new OTLPTraceExporter({
url: 'http://localhost:4318/v1/traces', // Collector 地址
});
provider.addSpanProcessor(new BatchSpanProcessor(exporter, {
// 生产环境建议调整这些参数
maxQueueSize: 100,
maxExportBatchSize: 10,
scheduledDelayMillis: 500,
}));
// 为了在异步回调中维持上下文,ZoneContextManager 是必须的
provider.register({
contextManager: new ZoneContextManager(),
propagator: new W3CTraceContextPropagator(),
});
// 注册自动埋点
registerInstrumentations({
instrumentations: [
// FetchInstrumentation 会自动为所有 fetch 请求添加追踪头
new FetchInstrumentation({
propagateTraceHeaderCorsUrls: [
// 必须将你的后端 API 地址加入 CORS 列表
new RegExp('http://localhost:8000/api/.*'),
],
}),
],
});
然后在你的应用入口文件(如 index.js)的顶端导入它。
// src/index.js
import './tracing'; // 确保这是第一个导入
import React from 'react';
import ReactDOM from 'react-dom';
import App from './App';
ReactDOM.render(<App />, document.getElementById('root'));
现在,当你的 React 组件发起一个 API 调用时:
// src/components/OrderForm.js
import React, { useState } from 'react';
function OrderForm() {
const [status, setStatus] = useState('');
const handleSubmit = async () => {
setStatus('Processing...');
try {
const response = await fetch('/api/shipping/request_shipment/', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ orderId: 'ORD-12345' }),
});
if (response.ok) {
setStatus('Shipment requested successfully!');
} else {
setStatus('Failed to request shipment.');
}
} catch (error) {
setStatus('Network error.');
}
};
return (
<div>
<button onClick={handleSubmit}>Request Shipment</button>
<p>Status: {status}</p>
</div>
);
}
FetchInstrumentation 会拦截这次 fetch 调用,自动创建一个 Span,并将 W3C traceparent 和 tracestate HTTP 头注入请求中。Django 端的 DjangoInstrumentor 会自动识别这些头,并将服务端的 Span 作为前端 Span 的子节点,从而实现了端到端的链路串联。
第三步:注入 DDD 领域知识,让追踪数据开口说话
到目前为止,我们已经有了一个完整的技术链路,但 Span 的名字(如 POST /api/shipping/request_shipment/)和属性(如 http.status_code)仍然是纯技术的。这对于 SRE 工程师排查网络或代码问题很有用,但对于理解业务流程的瓶颈却帮助有限。
在我们的 DDD 设计中,一个 API 请求通常对应一个应用服务(Application Service)的调用,处理一个命令(Command)。我们希望追踪数据能反映这一点。
3.1 定义一个领域感知的装饰器
我们创建一个 Python 装饰器,用于包裹应用服务的方法。这个装饰器会从当前的 Span 中获取信息,并用领域概念来丰富它。
# shipping_context/application/decorators.py
import functools
from opentelemetry import trace
from typing import Type
# 一个简单的命令基类
class Command:
pass
def domain_trace(command_class: Type[Command]):
"""
一个用于应用服务方法的装饰器,用于丰富 OTel Span
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 获取当前的 Span,这是由 DjangoInstrumentor 自动创建的
tracer = trace.get_tracer(__name__)
current_span = trace.get_current_span()
if not current_span.is_recording():
# 如果没有采样,直接返回
return func(*args, **kwargs)
# 1. 更新 Span 的名字,使其更具业务含义
span_name = f"EXECUTE {command_class.__name__}"
current_span.update_name(span_name)
# 2. 从 self (应用服务实例) 和 command 对象中提取领域属性
# args[0] 应该是 'self',args[1] 应该是 command 对象
app_service = args[0]
command = args[1]
# 假设我们的应用服务有 `bounded_context` 属性
bounded_context = getattr(app_service, 'bounded_context', 'unknown')
# 3. 添加自定义的、具有业务价值的属性
current_span.set_attribute("ddd.bounded_context", bounded_context)
current_span.set_attribute("ddd.command.type", command_class.__name__)
# 将命令的所有字段作为属性添加,注意处理敏感数据
for key, value in command.__dict__.items():
if "password" not in key and "token" not in key:
current_span.set_attribute(f"ddd.command.payload.{key}", str(value))
try:
result = func(*args, **kwargs)
return result
except Exception as e:
# OTel 默认会记录异常,但我们可以添加更多上下文
current_span.set_attribute("ddd.execution.error", True)
current_span.set_attribute("ddd.execution.error_type", e.__class__.__name__)
raise
return wrapper
return decorator
3.2 在应用服务中使用装饰器
现在,我们将这个装饰器应用到我们的应用服务上。
# shipping_context/application/services.py
from .commands import RequestShipmentCommand
from .decorators import domain_trace
from ..domain.repositories import ShipmentRepository
from ..tasks import process_shipment_task
class ShippingApplicationService:
bounded_context = "shipping"
def __init__(self, repository: ShipmentRepository):
self.repository = repository
@domain_trace(RequestShipmentCommand)
def request_shipment(self, command: RequestShipmentCommand):
"""
处理请求发货的命令
"""
# ... 业务逻辑 ...
# 1. 创建并保存 Shipment 聚合
shipment = self.repository.create_for_order(command.order_id)
# 2. 为当前 Span 添加聚合 ID,这对于后续问题排查至关重要
current_span = trace.get_current_span()
current_span.set_attribute("ddd.aggregate.type", "Shipment")
current_span.set_attribute("ddd.aggregate.id", str(shipment.id))
# 3. 触发异步任务
# OTel 上下文会自动传递到这个任务中
process_shipment_task.delay(shipment_id=str(shipment.id))
return shipment.id
3.3 Celery 任务中的领域上下文
同样地,我们也应该在 Celery 任务中添加领域上下文。
# shipping_context/tasks.py
from my_project.celery import app
from opentelemetry import trace
@app.task(name="process_shipment_task")
def process_shipment_task(shipment_id: str):
"""
一个异步任务,用于与第三方物流 API 交互
"""
# 这里的 Span 是由 CeleryInstrumentor 自动创建的,并且是 request_shipment Span 的子节点
current_span = trace.get_current_span()
current_span.set_attribute("ddd.aggregate.type", "Shipment")
current_span.set_attribute("ddd.aggregate.id", shipment_id)
current_span.set_attribute("ddd.bounded_context", "shipping")
# ... 从数据库加载 shipment 聚合 ...
# ... 调用第三方物流 API (这个调用会被 RequestsInstrumentor 自动追踪) ...
# ... 更新 shipment 状态 ...
经过这些改造,我们的追踪数据变得异常强大。当排查最初那个“API 超时”问题时,我们能在一个链路视图中看到:
- 前端:
Click: Request Shipment Button->FETCH POST /api/shipping/request_shipment/ - Django:
EXECUTE RequestShipmentCommand(包含order_id等所有载荷)-
ddd.bounded_context: shipping -
ddd.aggregate.id: <shipment_id>
-
- Celery Worker:
process_shipment_task(作为EXECUTE RequestShipmentCommand的子 Span)-
ddd.aggregate.id: <shipment_id> - 子Span:
POST https://api.logistics.com/create(由RequestsInstrumentor创建)- 这个 Span 会显示具体的超时错误和耗时。
-
我们不仅能立即定位到是第三方 API 调用超时,还能精确关联到是哪个用户、哪个订单、哪个发货聚合根触发的,整个业务流程的性能瓶颈和错误点一目了然。
最终成果与局限性
我们成功地构建了一套端到端的、领域感知型的分布式追踪系统。它贯穿了由 Webpack 构建的前端、Django 后端以及 Celery 异步任务,解决了跨进程、跨异步边界的上下文传递这一核心难题。通过将 DDD 的领域概念注入追踪数据,我们让可观测性数据不再是冰冷的技术指标,而是能够直接反映业务状态的有力工具。
然而,当前的方案也存在一些局限和可优化的方向:
- 采样策略: 我们使用了简单的基于比例的采样。在高流量系统中,这可能会丢失掉一些稀有的、但很重要的错误链路。引入尾部采样(Tail-based Sampling)会是下一步的重点,它允许我们在整个链路完成后再决定是否保留,从而能100%捕获所有包含错误的链路。
- 指标与日志的关联: 目前我们只关注了 Traces。一个完整的可观测性平台需要将 Traces、Metrics 和 Logs 关联起来。下一步应该在结构化日志中自动包含
trace_id和span_id,并基于领域事件发出关键业务指标(例如,使用 Prometheus),从而实现三者的互查。 - 手动埋点成本: 虽然自动埋点覆盖了大部分场景,但对于复杂的业务逻辑内部,仍需要开发者通过
domain_trace这样的装饰器或手动创建 Span 来增加上下文。这需要在团队内部形成一种“可观测性驱动开发”的文化,将埋点视为功能开发的一部分,而非事后的补救措施。 - 前端追踪深度: 当前前端追踪仅覆盖了 API 请求。对于复杂的单页应用,追踪用户交互、组件渲染性能等可以提供更完整的用户体验视图,这需要引入更多的前端专用埋点库。