利用 Envoy Tap Filter 与 Dart gRPC 流实现 Redux 驱动的客户端网络可观测性


我们面临的困境是客户端的网络黑盒。Flutter 应用通过 gRPC 与后端十几个微服务通信,所有流量都经由一个部署在本地(或边缘)的 Envoy Sidecar 代理。当一次调用链出现延迟或错误时,排查的起点总是模糊不清:是客户端构造请求的问题?是 Envoy 的路由或重试策略出了岔子?还是某个下游服务的固有缺陷?传统的日志分析和分布式追踪在这种场景下显得笨拙,因为它无法提供客户端视角的实时网络交互快照。

最初的构想很简单:如果客户端能“看到”流经 Envoy 的原始流量,那么大部分网络层面的问题将迎刃而解。我们需要的不是事后分析的日志,而是一个实时的、与应用状态同步的网络仪表盘。Envoy 的管理接口中隐藏着一个强大的工具——Tap Filter,它能像网络嗅探器一样捕获流经代理的流量副本,并将其流式传输出来。我们的目标,就是将这个数据流引入 Dart 应用,并用 Redux 来管理和消费这些实时的网络追踪状态。

这套方案的技术选型几乎是确定的。Envoy Tap Filter 通过 gRPC 服务暴露其数据流,因此在 Dart 端,我们需要一个 gRPC 客户端。grpc-dart 库是标准选择。应用的状态管理已经基于 reduxflutter_redux,将新的网络可观测性状态集成进去,是保持架构一致性的必然要求。整个架构的核心,就是构建一个桥梁:一端是 Envoy 的 TapSinkService,另一端是 Dart 应用的 Redux Store。

graph TD
    subgraph Flutter Application
        A[UI Widgets] -- Dispatches Action --> B{Redux Middleware};
        B -- Manages Lifecycle --> C[gRPC Tap Client Service];
        C -- Streams Data --> B;
        B -- Dispatches Data Action --> D[Redux Store];
        D -- Updates State --> A;
    end

    subgraph "Envoy Proxy (Sidecar)"
        E[Client Traffic] --> F{Envoy Listener};
        F -- Tapped --> G[Tap Filter];
        F -- Proxied --> H[Upstream Services];
        G -- Streams Traces via gRPC --> I[Admin Interface];
    end

    C -- Connects to --> I;

    style B fill:#f9f,stroke:#333,stroke-width:2px;
    style C fill:#ccf,stroke:#333,stroke-width:2px;

第一步:配置 Envoy 启用 Tap Filter

一切始于 Envoy。我们需要修改其配置,插入 Tap Filter,并使其将捕获的流量发送到 gRPC Tap Sink。这里的关键在于配置 tap_config。我们需要捕获所有进出流量,因此将 filter 同时应用在 HTTP 连接管理器上。

一个常见的错误是直接暴露 Envoy 的管理接口(默认 9901 端口)给外部网络,这是极度危险的。在真实项目中,该接口必须严格限制在本地回环地址 (127.0.0.1) 或通过 mTLS 等机制进行保护。

以下是一份用于演示的核心 envoy.yaml 配置。它监听 10000 端口,代理所有流量到 your_backend_service,同时启用 Tap Filter。

# envoy.yaml
admin:
  address:
    socket_address:
      address: 0.0.0.0 # 在生产中应为 127.0.0.1
      port_value: 9901

static_resources:
  listeners:
  - name: listener_0
    address:
      socket_address:
        address: 0.0.0.0
        port_value: 10000
    filter_chains:
    - filters:
      - name: envoy.filters.network.http_connection_manager
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
          stat_prefix: ingress_http
          route_config:
            name: local_route
            virtual_hosts:
            - name: local_service
              domains: ["*"]
              routes:
              - match:
                  prefix: "/"
                route:
                  cluster: some_service
                  # 开启 gRPC Web 兼容性
                  grpc_web_timeout: 60s
          http_filters:
          - name: envoy.filters.http.tap
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.tap.v3.Tap
              common_config:
                static_config:
                  match:
                    any_match: true # 捕获所有请求
                  output_config:
                    sinks:
                      - format: PROTO_BINARY # 使用二进制 protobuf 格式
                        grpc_service:
                          envoy_grpc:
                            cluster_name: tap_cluster
                          timeout: 0.25s # gRPC 超时
          - name: envoy.filters.http.grpc_web
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.grpc_web.v3.GrpcWeb
          - name: envoy.filters.http.router
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
  clusters:
  - name: some_service
    connect_timeout: 5s
    type: LOGICAL_DNS
    # 使用 HTTP/2,因为后端是 gRPC 服务
    typed_extension_protocol_options:
      envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
        "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
        explicit_http_config:
          http2_protocol_options: {}
    load_assignment:
      cluster_name: some_service
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: host.docker.internal # 指向宿主机,根据你的环境调整
                port_value: 50051
  # 这个 cluster 定义了 tap gRPC 服务的地址,也就是 Envoy 自身
  - name: tap_cluster
    connect_timeout: 5s
    type: STATIC
    lb_policy: ROUND_ROBIN
    # 必须是 HTTP/2
    typed_extension_protocol_options:
      envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
        "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
        explicit_http_config:
          http2_protocol_options: {}
    load_assignment:
      cluster_name: tap_cluster
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: 127.0.0.1
                port_value: 9901 # 指向 Envoy 的管理接口

第二步:定义 Protobuf 并生成 Dart 代码

要与 Envoy 的 Tap gRPC 服务通信,我们需要对应的 .proto 文件。这些文件通常可以在 Envoy 的官方仓库中找到。主要涉及 envoy/service/tap/v3/tap.proto 和其依赖项。我们将它们整理好,并使用 protoc 编译器和 Dart 插件生成客户端代码。

tap.proto 核心定义如下:

// envoy/service/tap/v3/tap.proto

service TapSinkService {
  // Envoy will connect and send StreamTapMessage messages forever. It does not expect any response
  // to be sent as nothing would be done in the case of failure. The server should swallow any
  // error occurring during processing.
  rpc StreamTaps(stream StreamTapMessage) returns (StreamTapResponse) {}
}

message StreamTapMessage {
  // Identifier uniquely identifies a tap. Multiple taps can be configured in Envoy. This identifier
  // provides a way to suppress verbose information in the message, such as the full configuration,
  // so that it may be sent only once.
  uint64 identifier = 1;

  // The trace data tapped.
  TraceWrapper trace = 2;
}

// ... 其他定义

生成 Dart 代码的命令大致如下:

protoc --dart_out=grpc:lib/src/generated \
    -I=./protos \
    ./protos/envoy/service/tap/v3/tap.proto \
    # ... 包含所有依赖的 .proto 文件

第三步:构建 Dart gRPC Tap 客户端

这是连接 Envoy 和 Dart 应用的核心。我们创建一个服务类 EnvoyTapService,它负责建立 gRPC 连接,发起流式请求,并将接收到的数据流暴露出去。

这里的坑在于生命周期管理和错误处理。gRPC 连接可能会因为网络问题或 Envoy 重启而中断,客户端必须具备健壮的重连机制。我们使用 StreamController 将 gRPC 的 ResponseStream 转换为一个更易于管理的 Dart Stream

// lib/services/envoy_tap_service.dart

import 'dart:async';
import 'package:grpc/grpc.dart';
import '../generated/envoy/service/tap/v3/tap.pbgrpc.dart';
import '../generated/envoy/data/tap/v3/wrapper.pb.dart';

class EnvoyTapService {
  late ClientChannel _channel;
  late TapSinkServiceClient _client;
  StreamSubscription? _subscription;
  final _traceController = StreamController<TraceWrapper>.broadcast();

  // 外部可以监听这个流来获取追踪数据
  Stream<TraceWrapper> get traceStream => _traceController.stream;

  final String host;
  final int port;
  bool _isConnecting = false;
  bool _isConnected = false;

  EnvoyTapService({required this.host, required this.port});

  Future<void> connect() async {
    if (_isConnected || _isConnecting) return;
    _isConnecting = true;

    print('[EnvoyTapService] Connecting to $host:$port...');

    try {
      _channel = ClientChannel(
        host,
        port: port,
        options: const ChannelOptions(
          credentials: ChannelCredentials.insecure(), // 本地调试,生产环境需mTLS
          connectTimeout: Duration(seconds: 5),
        ),
      );

      _client = TapSinkServiceClient(_channel);

      final requestStreamController = StreamController<StreamTapMessage>();
      final responseStream = _client.streamTaps(requestStreamController.stream);
      _isConnected = true;
      _isConnecting = false;
      print('[EnvoyTapService] Connected and streaming.');

      // 监听来自 Envoy 的数据流
      _subscription = responseStream.listen(
        (StreamTapResponse response) {
          // 在此版本中,Envoy 不会发送任何有意义的响应,所以我们忽略
        },
        onDone: () {
          print('[EnvoyTapService] Stream closed by server.');
          _handleDisconnect();
        },
        onError: (error) {
          print('[EnvoyTapService] Stream error: $error');
          _traceController.addError(error); // 将错误传递给下游
          _handleDisconnect();
        },
        cancelOnError: true,
      );

      // 这是关键:我们需要向 Envoy 发送一个初始消息来建立流
      // Envoy 的实现要求客户端流不能关闭,即使我们只发送一个消息
      final initialMessage = StreamTapMessage(
        identifier: 0,
        trace: TraceWrapper(), // trace 字段可以是空的
      );
      requestStreamController.add(initialMessage);

    } catch (e) {
      print('[EnvoyTapService] Connection failed: $e');
      _isConnecting = false;
      _isConnected = false;
      // 触发重连逻辑
      _reconnect();
    }
  }

  void _handleDisconnect() {
    if (!_isConnected) return;
    print('[EnvoyTapService] Disconnected.');
    _isConnected = false;
    _subscription?.cancel();
    _subscription = null;
    _channel.shutdown();
    _reconnect();
  }

  void _reconnect() {
    if (_isConnecting) return;
    print('[EnvoyTapService] Reconnecting in 5 seconds...');
    Future.delayed(const Duration(seconds: 5), () {
      connect();
    });
  }

  void dispose() {
    print('[EnvoyTapService] Disposing service.');
    _subscription?.cancel();
    _traceController.close();
    _channel.shutdown();
    _isConnected = false;
  }
}

// 注意: Envoy的tap filter有一个独特的行为:它会为每个worker线程创建一个到tap sink的gRPC连接。
// 因此,如果你的Envoy配置了多个worker线程,你的Dart应用会收到多个并发的gRPC连接请求。
// 我们的服务设计目前是单例模式,需要考虑如何处理或合并来自多个连接的数据流。
// 一个简单的策略是让所有流都写入同一个 `_traceController`。

第四步:Redux 状态、Action 和 Middleware 集成

现在,我们将 EnvoyTapService 与 Redux 集成。我们将使用 redux_epics 来处理这种异步流操作,它非常适合管理这种长时间运行的副作用。

1. 定义 State

// lib/redux/app_state.dart

import 'package:meta/meta.dart';
import '../generated/envoy/data/tap/v3/wrapper.pb.dart';


class AppState {
  final NetworkTraceState networkTraceState;

  const AppState({required this.networkTraceState});

  factory AppState.initial() => AppState(networkTraceState: NetworkTraceState.initial());
}


class NetworkTraceState {
  final bool isConnected;
  final List<TraceWrapper> traces;
  final dynamic error;
  final int maxTraces; // 限制存储的 trace 数量,防止内存溢出

  const NetworkTraceState({
    required this.isConnected,
    required this.traces,
    this.error,
    this.maxTraces = 100, // 默认最多保留100条
  });

  factory NetworkTraceState.initial() => const NetworkTraceState(
    isConnected: false,
    traces: [],
  );

  NetworkTraceState copyWith({
    bool? isConnected,
    List<TraceWrapper>? traces,
    dynamic error,
  }) {
    return NetworkTraceState(
      isConnected: isConnected ?? this.isConnected,
      traces: traces ?? this.traces,
      error: error,
      maxTraces: this.maxTraces,
    );
  }
}

2. 定义 Actions

// lib/redux/actions/network_trace_actions.dart
import '../../generated/envoy/data/tap/v3/wrapper.pb.dart';

class StartNetworkTraceAction {}
class StopNetworkTraceAction {}

class NetworkTraceConnectedAction {}
class NetworkTraceDisconnectedAction {}

class NetworkTraceReceivedAction {
  final TraceWrapper trace;
  NetworkTraceReceivedAction(this.trace);
}

class NetworkTraceErrorAction {
  final dynamic error;
  NetworkTraceErrorAction(this.error);
}

3. 创建 Epic (Middleware)

Epic 是处理副作用的核心。它监听 StartNetworkTraceAction,然后创建 EnvoyTapService 实例,监听其 traceStream,并将流中的数据或错误转换为新的 Action 分发给 Store。

// lib/redux/epics/network_trace_epic.dart

import 'package:redux_epics/redux_epics.dart';
import 'package:rxdart/rxdart.dart';
import '../actions/network_trace_actions.dart';
import '../app_state.dart';
import '../../services/envoy_tap_service.dart';

// 假设 EnvoyTapService 是一个单例或通过依赖注入提供
final EnvoyTapService _tapService = EnvoyTapService(host: '127.0.0.1', port: 9901);
StreamSubscription? _traceSubscription;

final networkTraceEpic = combineEpics<AppState>([
  _startTraceEpic,
  _stopTraceEpic,
]);

Stream<dynamic> _startTraceEpic(Stream<dynamic> actions, EpicStore<AppState> store) {
  return actions
      .whereType<StartNetworkTraceAction>()
      .switchMap((action) {
        // switchMap 确保如果多次分发 StartAction,旧的流会被取消
        return Rx.concat([
          // 立即连接
          Stream.fromFuture(_tapService.connect()).map((_) => NetworkTraceConnectedAction()),
          // 然后监听数据
          _tapService.traceStream.map((trace) => NetworkTraceReceivedAction(trace)),
        ])
        .doOnCancel(() {
          // 当 epic 被取消时(例如,因为 switchMap),清理资源
          _tapService.dispose();
        })
        .onErrorReturnWith((error, stackTrace) => NetworkTraceErrorAction(error));
      });
}


Stream<dynamic> _stopTraceEpic(Stream<dynamic> actions, EpicStore<AppState> store) {
  return actions
      .whereType<StopNetworkTraceAction>()
      .doOnData((_) {
        _tapService.dispose();
      })
      // Stop action 不应该触发新的 action,它只是一个副作用
      .flatMap((_) => Stream.value(NetworkTraceDisconnectedAction()));
}

一个实现上的细节:上述 _startTraceEpic 示例中 _tapService.connect() 返回 Future<void>map 操作不会等待 connect 完成。一个更健壮的实现是让 connect 返回一个状态流,或者使用更复杂的 RxDart 操作符来管理连接状态和数据流的合并。但为清晰起见,这里展示了核心思路。

4. 编写 Reducer

Reducer 是纯函数,它根据 Action 更新状态。

// lib/redux/reducers/network_trace_reducer.dart

import 'package:redux/redux.dart';
import '../actions/network_trace_actions.dart';
import '../app_state.dart';

final networkTraceReducer = combineReducers<NetworkTraceState>([
  TypedReducer<NetworkTraceState, NetworkTraceConnectedAction>(_onConnected),
  TypedReducer<NetworkTraceState, NetworkTraceDisconnectedAction>(_onDisconnected),
  TypedReducer<NetworkTraceState, NetworkTraceReceivedAction>(_onTraceReceived),
  TypedReducer<NetworkTraceState, NetworkTraceErrorAction>(_onError),
]);

NetworkTraceState _onConnected(NetworkTraceState state, NetworkTraceConnectedAction action) {
  return state.copyWith(isConnected: true, error: null);
}

NetworkTraceState _onDisconnected(NetworkTraceState state, NetworkTraceDisconnectedAction action) {
  return state.copyWith(isConnected: false);
}

NetworkTraceState _onTraceReceived(NetworkTraceState state, NetworkTraceReceivedAction action) {
  final newTraces = List<TraceWrapper>.from(state.traces)..insert(0, action.trace);
  if (newTraces.length > state.maxTraces) {
    newTraces.removeLast();
  }
  return state.copyWith(traces: newTraces);
}

NetworkTraceState _onError(NetworkTraceState state, NetworkTraceErrorAction action) {
  return state.copyWith(isConnected: false, error: action.error);
}

// 主 Reducer
AppState appReducer(AppState state, dynamic action) {
  return AppState(
    networkTraceState: networkTraceReducer(state.networkTraceState, action),
  );
}

第五步:在 Flutter UI 中消费状态

最后,在 Flutter UI 中,使用 StoreConnector 控件将 Redux store 中的状态连接到视图。当新的 TraceWrapper 到达时,UI 将自动重建。

// lib/widgets/network_trace_view.dart

import 'package:flutter/material.dart';
import 'package:flutter_redux/flutter_redux.dart';
import 'package:redux/redux.dart';
import '../redux/app_state.dart';
import '../generated/envoy/data/tap/v3/wrapper.pb.dart';

class NetworkTraceView extends StatelessWidget {
  const NetworkTraceView({super.key});

  
  Widget build(BuildContext context) {
    return StoreConnector<AppState, _ViewModel>(
      converter: (Store<AppState> store) => _ViewModel.fromStore(store),
      builder: (BuildContext context, _ViewModel vm) {
        if (!vm.isConnected && vm.traces.isEmpty) {
          return const Center(child: Text('Not connected to Envoy Tap Stream.'));
        }
        if (vm.error != null) {
          return Center(child: Text('Error: ${vm.error}'));
        }
        return ListView.builder(
          itemCount: vm.traces.length,
          itemBuilder: (context, index) {
            final trace = vm.traces[index];
            // 这里只是一个简单的展示,真实项目中需要一个复杂的解析器
            // 来从 TraceWrapper 中提取有用的信息,如 HTTP 请求头、gRPC 状态码等
            String summary = 'Trace ID: ${trace.hashCode}';
            if (trace.hasHttpBufferedTrace()) {
                final httpTrace = trace.httpBufferedTrace;
                final path = httpTrace.request.headers.firstWhere(
                    (h) => h.key == ':path', 
                    orElse: () => HeaderValue()..value='N/A'
                ).value;
                summary = '[HTTP] ${httpTrace.request.method} $path';
            } else if (trace.hasSocketBufferedTrace()) {
                summary = '[TCP] Connection event';
            }

            return ListTile(
              title: Text(summary),
              subtitle: Text('Timestamp: ${DateTime.now()}'), // 应使用 trace 里的时间
              dense: true,
            );
          },
        );
      },
    );
  }
}

class _ViewModel {
  final bool isConnected;
  final List<TraceWrapper> traces;
  final dynamic error;

  _ViewModel({
    required this.isConnected,
    required this.traces,
    this.error,
  });

  static _ViewModel fromStore(Store<AppState> store) {
    return _ViewModel(
      isConnected: store.state.networkTraceState.isConnected,
      traces: store.state.networkTraceState.traces,
      error: store.state.networkTraceState.error,
    );
  }
}

这套方案打通了从底层网络代理到顶层应用状态的实时数据链路。在开发和测试环境中,它成为了一个强大的诊断工具,让前端开发者能够直观地看到每次操作触发的网络交互细节,而无需再深入后端日志的泥潭。

然而,这套方案的局限性也十分明显。首先,Tap Filter 对 Envoy 自身会引入性能损耗,在生产环境中必须谨慎使用,最好通过动态配置按需开启。其次,从客户端直接连接 Envoy 的管理接口带来了安全风险,需要严格的网络策略来隔离。最后,海量的追踪数据对客户端的内存和 CPU 也是一个考验,我们在 Reducer 中做了简单的数量限制,但在高流量场景下,可能需要更精细的采样和聚合策略。

未来的一个优化方向是,将 Tap Stream 发送到一个专门的中间层服务,而不是直接发到客户端。这个服务可以对数据进行预处理、聚合和持久化,客户端再按需从这个服务查询数据。这虽然增加了架构复杂性,但解耦了客户端与 Envoy,并提供了更强大的数据处理能力。


  目录