我们面临的困境是客户端的网络黑盒。Flutter 应用通过 gRPC 与后端十几个微服务通信,所有流量都经由一个部署在本地(或边缘)的 Envoy Sidecar 代理。当一次调用链出现延迟或错误时,排查的起点总是模糊不清:是客户端构造请求的问题?是 Envoy 的路由或重试策略出了岔子?还是某个下游服务的固有缺陷?传统的日志分析和分布式追踪在这种场景下显得笨拙,因为它无法提供客户端视角的实时网络交互快照。
最初的构想很简单:如果客户端能“看到”流经 Envoy 的原始流量,那么大部分网络层面的问题将迎刃而解。我们需要的不是事后分析的日志,而是一个实时的、与应用状态同步的网络仪表盘。Envoy 的管理接口中隐藏着一个强大的工具——Tap Filter,它能像网络嗅探器一样捕获流经代理的流量副本,并将其流式传输出来。我们的目标,就是将这个数据流引入 Dart 应用,并用 Redux 来管理和消费这些实时的网络追踪状态。
这套方案的技术选型几乎是确定的。Envoy Tap Filter 通过 gRPC 服务暴露其数据流,因此在 Dart 端,我们需要一个 gRPC 客户端。grpc-dart 库是标准选择。应用的状态管理已经基于 redux 和 flutter_redux,将新的网络可观测性状态集成进去,是保持架构一致性的必然要求。整个架构的核心,就是构建一个桥梁:一端是 Envoy 的 TapSinkService,另一端是 Dart 应用的 Redux Store。
graph TD
subgraph Flutter Application
A[UI Widgets] -- Dispatches Action --> B{Redux Middleware};
B -- Manages Lifecycle --> C[gRPC Tap Client Service];
C -- Streams Data --> B;
B -- Dispatches Data Action --> D[Redux Store];
D -- Updates State --> A;
end
subgraph "Envoy Proxy (Sidecar)"
E[Client Traffic] --> F{Envoy Listener};
F -- Tapped --> G[Tap Filter];
F -- Proxied --> H[Upstream Services];
G -- Streams Traces via gRPC --> I[Admin Interface];
end
C -- Connects to --> I;
style B fill:#f9f,stroke:#333,stroke-width:2px;
style C fill:#ccf,stroke:#333,stroke-width:2px;
第一步:配置 Envoy 启用 Tap Filter
一切始于 Envoy。我们需要修改其配置,插入 Tap Filter,并使其将捕获的流量发送到 gRPC Tap Sink。这里的关键在于配置 tap_config。我们需要捕获所有进出流量,因此将 filter 同时应用在 HTTP 连接管理器上。
一个常见的错误是直接暴露 Envoy 的管理接口(默认 9901 端口)给外部网络,这是极度危险的。在真实项目中,该接口必须严格限制在本地回环地址 (127.0.0.1) 或通过 mTLS 等机制进行保护。
以下是一份用于演示的核心 envoy.yaml 配置。它监听 10000 端口,代理所有流量到 your_backend_service,同时启用 Tap Filter。
# envoy.yaml
admin:
address:
socket_address:
address: 0.0.0.0 # 在生产中应为 127.0.0.1
port_value: 9901
static_resources:
listeners:
- name: listener_0
address:
socket_address:
address: 0.0.0.0
port_value: 10000
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match:
prefix: "/"
route:
cluster: some_service
# 开启 gRPC Web 兼容性
grpc_web_timeout: 60s
http_filters:
- name: envoy.filters.http.tap
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.tap.v3.Tap
common_config:
static_config:
match:
any_match: true # 捕获所有请求
output_config:
sinks:
- format: PROTO_BINARY # 使用二进制 protobuf 格式
grpc_service:
envoy_grpc:
cluster_name: tap_cluster
timeout: 0.25s # gRPC 超时
- name: envoy.filters.http.grpc_web
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.grpc_web.v3.GrpcWeb
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
clusters:
- name: some_service
connect_timeout: 5s
type: LOGICAL_DNS
# 使用 HTTP/2,因为后端是 gRPC 服务
typed_extension_protocol_options:
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
"@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
explicit_http_config:
http2_protocol_options: {}
load_assignment:
cluster_name: some_service
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: host.docker.internal # 指向宿主机,根据你的环境调整
port_value: 50051
# 这个 cluster 定义了 tap gRPC 服务的地址,也就是 Envoy 自身
- name: tap_cluster
connect_timeout: 5s
type: STATIC
lb_policy: ROUND_ROBIN
# 必须是 HTTP/2
typed_extension_protocol_options:
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
"@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
explicit_http_config:
http2_protocol_options: {}
load_assignment:
cluster_name: tap_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 9901 # 指向 Envoy 的管理接口
第二步:定义 Protobuf 并生成 Dart 代码
要与 Envoy 的 Tap gRPC 服务通信,我们需要对应的 .proto 文件。这些文件通常可以在 Envoy 的官方仓库中找到。主要涉及 envoy/service/tap/v3/tap.proto 和其依赖项。我们将它们整理好,并使用 protoc 编译器和 Dart 插件生成客户端代码。
tap.proto 核心定义如下:
// envoy/service/tap/v3/tap.proto
service TapSinkService {
// Envoy will connect and send StreamTapMessage messages forever. It does not expect any response
// to be sent as nothing would be done in the case of failure. The server should swallow any
// error occurring during processing.
rpc StreamTaps(stream StreamTapMessage) returns (StreamTapResponse) {}
}
message StreamTapMessage {
// Identifier uniquely identifies a tap. Multiple taps can be configured in Envoy. This identifier
// provides a way to suppress verbose information in the message, such as the full configuration,
// so that it may be sent only once.
uint64 identifier = 1;
// The trace data tapped.
TraceWrapper trace = 2;
}
// ... 其他定义
生成 Dart 代码的命令大致如下:
protoc --dart_out=grpc:lib/src/generated \
-I=./protos \
./protos/envoy/service/tap/v3/tap.proto \
# ... 包含所有依赖的 .proto 文件
第三步:构建 Dart gRPC Tap 客户端
这是连接 Envoy 和 Dart 应用的核心。我们创建一个服务类 EnvoyTapService,它负责建立 gRPC 连接,发起流式请求,并将接收到的数据流暴露出去。
这里的坑在于生命周期管理和错误处理。gRPC 连接可能会因为网络问题或 Envoy 重启而中断,客户端必须具备健壮的重连机制。我们使用 StreamController 将 gRPC 的 ResponseStream 转换为一个更易于管理的 Dart Stream。
// lib/services/envoy_tap_service.dart
import 'dart:async';
import 'package:grpc/grpc.dart';
import '../generated/envoy/service/tap/v3/tap.pbgrpc.dart';
import '../generated/envoy/data/tap/v3/wrapper.pb.dart';
class EnvoyTapService {
late ClientChannel _channel;
late TapSinkServiceClient _client;
StreamSubscription? _subscription;
final _traceController = StreamController<TraceWrapper>.broadcast();
// 外部可以监听这个流来获取追踪数据
Stream<TraceWrapper> get traceStream => _traceController.stream;
final String host;
final int port;
bool _isConnecting = false;
bool _isConnected = false;
EnvoyTapService({required this.host, required this.port});
Future<void> connect() async {
if (_isConnected || _isConnecting) return;
_isConnecting = true;
print('[EnvoyTapService] Connecting to $host:$port...');
try {
_channel = ClientChannel(
host,
port: port,
options: const ChannelOptions(
credentials: ChannelCredentials.insecure(), // 本地调试,生产环境需mTLS
connectTimeout: Duration(seconds: 5),
),
);
_client = TapSinkServiceClient(_channel);
final requestStreamController = StreamController<StreamTapMessage>();
final responseStream = _client.streamTaps(requestStreamController.stream);
_isConnected = true;
_isConnecting = false;
print('[EnvoyTapService] Connected and streaming.');
// 监听来自 Envoy 的数据流
_subscription = responseStream.listen(
(StreamTapResponse response) {
// 在此版本中,Envoy 不会发送任何有意义的响应,所以我们忽略
},
onDone: () {
print('[EnvoyTapService] Stream closed by server.');
_handleDisconnect();
},
onError: (error) {
print('[EnvoyTapService] Stream error: $error');
_traceController.addError(error); // 将错误传递给下游
_handleDisconnect();
},
cancelOnError: true,
);
// 这是关键:我们需要向 Envoy 发送一个初始消息来建立流
// Envoy 的实现要求客户端流不能关闭,即使我们只发送一个消息
final initialMessage = StreamTapMessage(
identifier: 0,
trace: TraceWrapper(), // trace 字段可以是空的
);
requestStreamController.add(initialMessage);
} catch (e) {
print('[EnvoyTapService] Connection failed: $e');
_isConnecting = false;
_isConnected = false;
// 触发重连逻辑
_reconnect();
}
}
void _handleDisconnect() {
if (!_isConnected) return;
print('[EnvoyTapService] Disconnected.');
_isConnected = false;
_subscription?.cancel();
_subscription = null;
_channel.shutdown();
_reconnect();
}
void _reconnect() {
if (_isConnecting) return;
print('[EnvoyTapService] Reconnecting in 5 seconds...');
Future.delayed(const Duration(seconds: 5), () {
connect();
});
}
void dispose() {
print('[EnvoyTapService] Disposing service.');
_subscription?.cancel();
_traceController.close();
_channel.shutdown();
_isConnected = false;
}
}
// 注意: Envoy的tap filter有一个独特的行为:它会为每个worker线程创建一个到tap sink的gRPC连接。
// 因此,如果你的Envoy配置了多个worker线程,你的Dart应用会收到多个并发的gRPC连接请求。
// 我们的服务设计目前是单例模式,需要考虑如何处理或合并来自多个连接的数据流。
// 一个简单的策略是让所有流都写入同一个 `_traceController`。
第四步:Redux 状态、Action 和 Middleware 集成
现在,我们将 EnvoyTapService 与 Redux 集成。我们将使用 redux_epics 来处理这种异步流操作,它非常适合管理这种长时间运行的副作用。
1. 定义 State
// lib/redux/app_state.dart
import 'package:meta/meta.dart';
import '../generated/envoy/data/tap/v3/wrapper.pb.dart';
class AppState {
final NetworkTraceState networkTraceState;
const AppState({required this.networkTraceState});
factory AppState.initial() => AppState(networkTraceState: NetworkTraceState.initial());
}
class NetworkTraceState {
final bool isConnected;
final List<TraceWrapper> traces;
final dynamic error;
final int maxTraces; // 限制存储的 trace 数量,防止内存溢出
const NetworkTraceState({
required this.isConnected,
required this.traces,
this.error,
this.maxTraces = 100, // 默认最多保留100条
});
factory NetworkTraceState.initial() => const NetworkTraceState(
isConnected: false,
traces: [],
);
NetworkTraceState copyWith({
bool? isConnected,
List<TraceWrapper>? traces,
dynamic error,
}) {
return NetworkTraceState(
isConnected: isConnected ?? this.isConnected,
traces: traces ?? this.traces,
error: error,
maxTraces: this.maxTraces,
);
}
}
2. 定义 Actions
// lib/redux/actions/network_trace_actions.dart
import '../../generated/envoy/data/tap/v3/wrapper.pb.dart';
class StartNetworkTraceAction {}
class StopNetworkTraceAction {}
class NetworkTraceConnectedAction {}
class NetworkTraceDisconnectedAction {}
class NetworkTraceReceivedAction {
final TraceWrapper trace;
NetworkTraceReceivedAction(this.trace);
}
class NetworkTraceErrorAction {
final dynamic error;
NetworkTraceErrorAction(this.error);
}
3. 创建 Epic (Middleware)
Epic 是处理副作用的核心。它监听 StartNetworkTraceAction,然后创建 EnvoyTapService 实例,监听其 traceStream,并将流中的数据或错误转换为新的 Action 分发给 Store。
// lib/redux/epics/network_trace_epic.dart
import 'package:redux_epics/redux_epics.dart';
import 'package:rxdart/rxdart.dart';
import '../actions/network_trace_actions.dart';
import '../app_state.dart';
import '../../services/envoy_tap_service.dart';
// 假设 EnvoyTapService 是一个单例或通过依赖注入提供
final EnvoyTapService _tapService = EnvoyTapService(host: '127.0.0.1', port: 9901);
StreamSubscription? _traceSubscription;
final networkTraceEpic = combineEpics<AppState>([
_startTraceEpic,
_stopTraceEpic,
]);
Stream<dynamic> _startTraceEpic(Stream<dynamic> actions, EpicStore<AppState> store) {
return actions
.whereType<StartNetworkTraceAction>()
.switchMap((action) {
// switchMap 确保如果多次分发 StartAction,旧的流会被取消
return Rx.concat([
// 立即连接
Stream.fromFuture(_tapService.connect()).map((_) => NetworkTraceConnectedAction()),
// 然后监听数据
_tapService.traceStream.map((trace) => NetworkTraceReceivedAction(trace)),
])
.doOnCancel(() {
// 当 epic 被取消时(例如,因为 switchMap),清理资源
_tapService.dispose();
})
.onErrorReturnWith((error, stackTrace) => NetworkTraceErrorAction(error));
});
}
Stream<dynamic> _stopTraceEpic(Stream<dynamic> actions, EpicStore<AppState> store) {
return actions
.whereType<StopNetworkTraceAction>()
.doOnData((_) {
_tapService.dispose();
})
// Stop action 不应该触发新的 action,它只是一个副作用
.flatMap((_) => Stream.value(NetworkTraceDisconnectedAction()));
}
一个实现上的细节:上述 _startTraceEpic 示例中 _tapService.connect() 返回 Future<void>,map 操作不会等待 connect 完成。一个更健壮的实现是让 connect 返回一个状态流,或者使用更复杂的 RxDart 操作符来管理连接状态和数据流的合并。但为清晰起见,这里展示了核心思路。
4. 编写 Reducer
Reducer 是纯函数,它根据 Action 更新状态。
// lib/redux/reducers/network_trace_reducer.dart
import 'package:redux/redux.dart';
import '../actions/network_trace_actions.dart';
import '../app_state.dart';
final networkTraceReducer = combineReducers<NetworkTraceState>([
TypedReducer<NetworkTraceState, NetworkTraceConnectedAction>(_onConnected),
TypedReducer<NetworkTraceState, NetworkTraceDisconnectedAction>(_onDisconnected),
TypedReducer<NetworkTraceState, NetworkTraceReceivedAction>(_onTraceReceived),
TypedReducer<NetworkTraceState, NetworkTraceErrorAction>(_onError),
]);
NetworkTraceState _onConnected(NetworkTraceState state, NetworkTraceConnectedAction action) {
return state.copyWith(isConnected: true, error: null);
}
NetworkTraceState _onDisconnected(NetworkTraceState state, NetworkTraceDisconnectedAction action) {
return state.copyWith(isConnected: false);
}
NetworkTraceState _onTraceReceived(NetworkTraceState state, NetworkTraceReceivedAction action) {
final newTraces = List<TraceWrapper>.from(state.traces)..insert(0, action.trace);
if (newTraces.length > state.maxTraces) {
newTraces.removeLast();
}
return state.copyWith(traces: newTraces);
}
NetworkTraceState _onError(NetworkTraceState state, NetworkTraceErrorAction action) {
return state.copyWith(isConnected: false, error: action.error);
}
// 主 Reducer
AppState appReducer(AppState state, dynamic action) {
return AppState(
networkTraceState: networkTraceReducer(state.networkTraceState, action),
);
}
第五步:在 Flutter UI 中消费状态
最后,在 Flutter UI 中,使用 StoreConnector 控件将 Redux store 中的状态连接到视图。当新的 TraceWrapper 到达时,UI 将自动重建。
// lib/widgets/network_trace_view.dart
import 'package:flutter/material.dart';
import 'package:flutter_redux/flutter_redux.dart';
import 'package:redux/redux.dart';
import '../redux/app_state.dart';
import '../generated/envoy/data/tap/v3/wrapper.pb.dart';
class NetworkTraceView extends StatelessWidget {
const NetworkTraceView({super.key});
Widget build(BuildContext context) {
return StoreConnector<AppState, _ViewModel>(
converter: (Store<AppState> store) => _ViewModel.fromStore(store),
builder: (BuildContext context, _ViewModel vm) {
if (!vm.isConnected && vm.traces.isEmpty) {
return const Center(child: Text('Not connected to Envoy Tap Stream.'));
}
if (vm.error != null) {
return Center(child: Text('Error: ${vm.error}'));
}
return ListView.builder(
itemCount: vm.traces.length,
itemBuilder: (context, index) {
final trace = vm.traces[index];
// 这里只是一个简单的展示,真实项目中需要一个复杂的解析器
// 来从 TraceWrapper 中提取有用的信息,如 HTTP 请求头、gRPC 状态码等
String summary = 'Trace ID: ${trace.hashCode}';
if (trace.hasHttpBufferedTrace()) {
final httpTrace = trace.httpBufferedTrace;
final path = httpTrace.request.headers.firstWhere(
(h) => h.key == ':path',
orElse: () => HeaderValue()..value='N/A'
).value;
summary = '[HTTP] ${httpTrace.request.method} $path';
} else if (trace.hasSocketBufferedTrace()) {
summary = '[TCP] Connection event';
}
return ListTile(
title: Text(summary),
subtitle: Text('Timestamp: ${DateTime.now()}'), // 应使用 trace 里的时间
dense: true,
);
},
);
},
);
}
}
class _ViewModel {
final bool isConnected;
final List<TraceWrapper> traces;
final dynamic error;
_ViewModel({
required this.isConnected,
required this.traces,
this.error,
});
static _ViewModel fromStore(Store<AppState> store) {
return _ViewModel(
isConnected: store.state.networkTraceState.isConnected,
traces: store.state.networkTraceState.traces,
error: store.state.networkTraceState.error,
);
}
}
这套方案打通了从底层网络代理到顶层应用状态的实时数据链路。在开发和测试环境中,它成为了一个强大的诊断工具,让前端开发者能够直观地看到每次操作触发的网络交互细节,而无需再深入后端日志的泥潭。
然而,这套方案的局限性也十分明显。首先,Tap Filter 对 Envoy 自身会引入性能损耗,在生产环境中必须谨慎使用,最好通过动态配置按需开启。其次,从客户端直接连接 Envoy 的管理接口带来了安全风险,需要严格的网络策略来隔离。最后,海量的追踪数据对客户端的内存和 CPU 也是一个考验,我们在 Reducer 中做了简单的数量限制,但在高流量场景下,可能需要更精细的采样和聚合策略。
未来的一个优化方向是,将 Tap Stream 发送到一个专门的中间层服务,而不是直接发到客户端。这个服务可以对数据进行预处理、聚合和持久化,客户端再按需从这个服务查询数据。这虽然增加了架构复杂性,但解耦了客户端与 Envoy,并提供了更强大的数据处理能力。