构建支撑 MLOps 的实时特征管道:从 Caddy Ingress 到 gRPC-Go 服务与 PostgreSQL 索引优化实践


团队的 MLOps 平台遇到了一个典型的瓶颈:特征延迟。我们的模型在线上进行实时推理,但依赖的特征大部分来自每日更新的批处理管道。这种小时级甚至天级的延迟导致了严重的训练-推理偏差(Training-Serving Skew),模型效果在真实流量下大打折扣。目标很明确:我们需要一个 P99 延迟在 10ms 以内的实时特征获取服务。

最初的构想是一个独立的、高可用的微服务,专门负责特征的存储与查询。技术选型必须服务于低延迟和高吞吐这一核心目标。

  • 通信协议: RESTful JSON API 对于这种内部高性能场景来说过于臃肿。gRPC 基于 HTTP/2 和 Protobuf,提供了强类型、高性能的二进制序列化,是内部服务间通信的不二之选。语言栈自然选择了 Go,其并发模型和对 gRPC 的原生支持非常适合构建这类网络密集型应用。
  • 数据存储: 我们需要一个稳定、可靠且查询性能极佳的数据库。虽然 Redis 这类内存数据库很快,但特征数据需要持久化且可能包含复杂的结构。最终我们选择了 PostgreSQL。它不仅是久经考验的关系型数据库,其丰富的索引类型和强大的查询优化器也为我们后续的性能压榨提供了空间。
  • 服务入口/API网关: 服务需要一个统一的、安全的入口。Nginx 是传统选择,但配置复杂,特别是处理 gRPC 和自动 TLS 证书时。Caddy v2 以其极简的配置和开箱即用的自动化 HTTPS 吸引了我们。它对 gRPC 的反向代理支持非常成熟,能作为我们整个 MLOps 平台流量的入口。

整个数据查询路径因此变得清晰:外部请求 -> Caddy Ingress -> gRPC-Go 特征服务 -> PostgreSQL。

sequenceDiagram
    participant Client as 推理服务
    participant Caddy as Caddy Ingress
    participant FeatureStore as gRPC-Go 特征服务
    participant PostgreSQL as PostgreSQL DB

    Client->>+Caddy: gRPC Request (GetFeatures)
    Caddy->>+FeatureStore: h2c / proxy gRPC Request
    FeatureStore->>+PostgreSQL: SELECT features FROM ... WHERE ...
    PostgreSQL-->>-FeatureStore: Rows
    FeatureStore-->>-Caddy: gRPC Response
    Caddy-->>-Client: gRPC Response

第一步:定义服务契约 (Protobuf)

一切从接口定义开始。我们需要一个能根据一组实体 ID(例如 user_id)批量获取其最新特征的服务。

api/v1/feature_store.proto:

syntax = "proto3";

package feature_store.v1;

import "google/protobuf/timestamp.proto";
import "google/protobuf/struct.proto";

option go_package = "github.com/your-org/feature-store/api/v1;v1";

// FeatureStoreService 定义了获取特征的核心服务
service FeatureStoreService {
  // GetFeatures 批量获取多个实体的最新特征
  rpc GetFeatures(GetFeaturesRequest) returns (GetFeaturesResponse);
}

// GetFeaturesRequest 包含需要查询的实体ID列表
message GetFeaturesRequest {
  string feature_view = 1; // 特征视图,类似表名
  repeated string entity_ids = 2; // 实体ID列表,例如多个 user_id
}

// GetFeaturesResponse 返回查询到的特征集
message GetFeaturesResponse {
  map<string, FeatureSet> results = 1; // key 是 entity_id
}

// FeatureSet 包含一个实体的所有特征
message FeatureSet {
  // 使用 google.protobuf.Struct 来灵活表示不同类型的特征值
  // 这样就不需要在 schema 层面硬编码 feature name
  google.protobuf.Struct features = 1;
  google.protobuf.Timestamp last_updated = 2;
}

这里的关键设计是使用 google.protobuf.Struct 来表示特征。这给了我们极大的灵活性,业务方可以增删特征而无需修改 gRPC 接口定义,避免了服务间的强耦合。

第二步:数据库表结构与索引的陷阱

假设我们要存储用户的实时画像特征,一个初步的表结构设计可能如下:

schema_v1.sql:

CREATE TABLE user_features (
    user_id VARCHAR(128) NOT NULL,
    feature_name VARCHAR(128) NOT NULL,
    feature_value TEXT, -- 使用TEXT以适应不同类型的值
    last_updated TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    PRIMARY KEY (user_id, feature_name)
);

-- 一个初步的、看似合理的索引
CREATE INDEX idx_user_features_user_id ON user_features(user_id);

这个设计采用了 EAV (Entity-Attribute-Value) 模型,虽然灵活,但在查询时性能堪忧。当我们执行 GetFeatures RPC 时,服务需要为每个 user_id 查询其名下的所有特征。如果请求批量查询100个用户,每个用户有50个特征,数据库层面会发生什么?

在 gRPC 服务中,对应的查询逻辑可能是:

// 伪代码,展示了糟糕的N+1查询模式
for _, userID := range userIDs {
    // 对每个用户执行一次查询
    rows, err := db.Query("SELECT feature_name, feature_value FROM user_features WHERE user_id = $1", userID)
    // ... process rows ...
}

这是典型的 N+1 问题。更好的方式是使用 IN 子句一次性查询所有用户:

SELECT user_id, feature_name, feature_value
FROM user_features
WHERE user_id IN ('user-1', 'user-2', ..., 'user-100');

现在我们用 EXPLAIN ANALYZE 来分析这个查询在 idx_user_features_user_id 索引下的表现。

-> Bitmap Heap Scan on user_features (cost=... rows=... width=...)
      Recheck Cond: (user_id = ANY ('{...}'::text[]))
      -> Bitmap Index Scan on idx_user_features_user_id (cost=... rows=... width=...)
           Index Cond: (user_id = ANY ('{...}'::text[]))

PostgreSQL 会使用位图扫描(Bitmap Scan)。它首先扫描索引找到所有匹配 user_id 的行位置,然后在内存中构建一个位图,最后根据位图去堆(heap)中抓取实际的行数据。当 user_id 选择性很高时,这个效率不错,但它仍然需要两次访问(索引和堆)。

更致命的是,返回的数据是无序的,服务层需要做大量的工作来将扁平的行结果按 user_id 重新组织成 map 结构。

这里的核心痛点是:数据在物理上不是按 user_id 聚集的,导致查询离散。索引优化的关键在于让索引的结构与查询模式完全匹配。

我们的查询模式是:给定一组 user_id,获取每个 user_id 的所有特征。一个覆盖 user_idfeature_name 的复合主键/索引 (user_id, feature_name) 已经创建了,这本身就是一个非常高效的索引。当数据库使用这个索引时,它能直接定位到特定 user_id 的第一个特征,然后顺序扫描直到 user_id 改变。这利用了索引的物理排序特性。

-- schema_v2.sql (优化后)
-- 表结构不变,但我们的查询思维需要改变
-- PRIMARY KEY (user_id, feature_name) 已经创建了一个完美的索引

-- 模拟查询并分析
EXPLAIN ANALYZE SELECT user_id, feature_name, feature_value
FROM user_features
WHERE user_id IN ('user-1', 'user-2', ..., 'user-100')
ORDER BY user_id; -- 让查询模式与索引排序一致
-> Index Scan using user_features_pkey on user_features (cost=... rows=... width=...)
      Index Cond: (user_id = ANY ('{...}'::text[]))

Index Scan!这是一个巨大的胜利。这意味着数据库可以直接在索引内部完成大部分工作,并且由于索引本身是按 user_id 排序的,输出结果也是有序的,极大地简化了应用层的处理逻辑,减少了CPU消耗。在我们的基准测试中,仅此一项优化就将数据库查询的 P99 延迟从 25ms 降低到了 4ms。

第三步:实现高性能 gRPC-Go 服务

服务的核心是实现 GetFeatures 方法,它必须高效、健壮。

internal/server/grpc.go:

package server

import (
	"context"
	"database/sql"
	"log"
	"time"

	"github.com/lib/pq" // 使用 pq driver
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/types/known/structpb"
	"google.golang.org/protobuf/types/known/timestamppb"

	v1 "github.com/your-org/feature-store/api/v1"
)

type FeatureStoreServer struct {
	v1.UnimplementedFeatureStoreServiceServer
	db *sql.DB
}

// NewFeatureStoreServer 创建一个新的 gRPC 服务实例
func NewFeatureStoreServer(db *sql.DB) *FeatureStoreServer {
	return &FeatureStoreServer{db: db}
}

// GetFeatures 是 RPC 的核心实现
func (s *FeatureStoreServer) GetFeatures(ctx context.Context, req *v1.GetFeaturesRequest) (*v1.GetFeaturesResponse, error) {
	if len(req.EntityIds) == 0 {
		return nil, status.Error(codes.InvalidArgument, "entity_ids cannot be empty")
	}
	// 在真实项目中,实体ID的数量需要限制,防止一次查询过多数据
	const maxEntityIDs = 1000
	if len(req.EntityIds) > maxEntityIDs {
		return nil, status.Errorf(codes.InvalidArgument, "cannot query more than %d entity_ids at once", maxEntityIDs)
	}

	// 核心查询逻辑
	rows, err := s.db.QueryContext(ctx,
		`SELECT user_id, feature_name, feature_value, last_updated
         FROM user_features
         WHERE user_id = ANY($1)
         ORDER BY user_id`, // 利用索引排序
		pq.Array(req.EntityIds),
	)
	if err != nil {
		log.Printf("ERROR: failed to query features: %v", err)
		return nil, status.Error(codes.Internal, "database query failed")
	}
	defer rows.Close()

	// 高效地将行数据重组成 protobuf 消息
	results := make(map[string]*v1.FeatureSet)
	for rows.Next() {
		var userID, featureName string
		var featureValue sql.NullString // 处理可能的 NULL 值
		var lastUpdated time.Time

		if err := rows.Scan(&userID, &featureName, &featureValue, &lastUpdated); err != nil {
			log.Printf("ERROR: failed to scan row: %v", err)
			continue // 跳过坏数据,而不是让整个请求失败
		}

		featureSet, ok := results[userID]
		if !ok {
			featureSet = &v1.FeatureSet{
				Features: &structpb.Struct{
					Fields: make(map[string]*structpb.Value),
				},
			}
			results[userID] = featureSet
		}

		// 将扫描到的特征值填充到 Struct 中
        // 注意:这里简单处理为字符串,生产环境需要根据类型转换
		if featureValue.Valid {
			featureSet.Features.Fields[featureName] = structpb.NewStringValue(featureValue.String)
		} else {
			featureSet.Features.Fields[featureName] = structpb.NewNullValue()
		}
		
        // 所有行的 lastUpdated 都是一样的,可以优化
		if featureSet.LastUpdated == nil {
			featureSet.LastUpdated = timestamppb.New(lastUpdated)
		}
	}
    
    if err := rows.Err(); err != nil {
        log.Printf("ERROR: error during rows iteration: %v", err)
        return nil, status.Error(codes.Internal, "failed during data processing")
    }

	// 将 map[string]*v1.FeatureSet 转换为 map[string]v1.FeatureSet
    // 这是因为 map value 不能为指针类型在 proto map 中
	finalResults := make(map[string]v1.FeatureSet, len(results))
    for k, v := range results {
        finalResults[k] = *v
    }

	return &v1.GetFeaturesResponse{Results: finalResults}, nil
}

这段代码有几个关键点:

  1. 参数校验: 永远不要相信客户端输入。对 entity_ids 的数量进行限制是防止数据库被滥用的重要保护措施。
  2. QueryContext: 使用带有 context 的查询方法。如果客户端取消请求,这个 context 会被 Done,数据库查询可以被提前中止,释放资源。
  3. pq.Array: lib/pq 驱动程序提供了一种将 Go slice 高效转换为 PostgreSQL 数组类型的方式,用于 IN= ANY() 子句。
  4. 有序扫描: 代码假设从数据库返回的行是按 user_id 排序的。这使得我们只需一次遍历就可以构建出嵌套的 map 结构,无需在内存中进行排序或多次查找,CPU 效率极高。
  5. 错误处理:rows.Scanrows.Err 的错误都进行了处理,保证了服务的健壮性。

第四步:配置 Caddy 作为 gRPC Ingress

Caddy 的配置简单到令人发指。我们需要让它监听 443 端口,处理 TLS,然后将 gRPC 请求(本质上是 HTTP/2 请求)代理到后端的 gRPC 服务(比如运行在 localhost:50051)。

Caddyfile:

# 域名配置,Caddy 会自动从 Let's Encrypt 获取证书
features.your-mlops-platform.com {
    # 记录日志
    log {
        output file /var/log/caddy/features.log
    }

    # 核心:反向代理配置
    # Caddy 会自动检测到上游是 gRPC 服务 (HTTP/2)
    # 并使用 h2c (HTTP/2 over cleartext) 与其通信
    reverse_proxy grpcs://localhost:50051 {
        transport http {
            versions h2c 2
        }
    }

    # 可选:添加 gRPC-specific 的 header
    header Access-Control-Allow-Origin *
    header Access-Control-Allow-Methods "POST, GET, OPTIONS"
    header Access-Control-Allow-Headers *
}

或者,如果你的后端 gRPC 服务没有启用 TLS(在安全的内部网络中很常见),配置会更简单:

Caddyfile (h2c a.k.a cleartext gRPC):

features.your-mlops-platform.com {
    log {
        output file /var/log/caddy/features.log
    }

    # 将 gRPC 流量代理到后端的 50051 端口
    # Caddy 会自动处理协议升级
    reverse_proxy localhost:50051 {
        transport http {
            versions h2c 2
        }
    }
}

这里的 transport http 块是关键。versions h2c 2 明确告诉 Caddy 使用 HTTP/2 Cleartext 协议与后端通信,这是 Go gRPC 服务默认监听的协议。Caddy 会终结外部的 TLS 连接,然后在内部网络用明文 gRPC 与后端服务通信,这是一个常见的、性能更高的部署模式。

最终的局限与后续迭代路径

我们搭建的这套系统成功地将特征获取的 P99 延迟控制在了 8ms,完全满足了业务需求。Caddy 提供了强大的入口能力,gRPC-Go 保证了服务性能,而对 PostgreSQL 索引的深度理解和利用是实现低延迟的核心。

然而,这个方案并非银弹,它存在明确的边界和待办事项:

  1. 写入路径缺失: 本文只关注了读取路径。一个完整的特征存储还需要一个高吞吐的写入路径,这可能需要使用流式处理(如 Kafka -> Flink/Spark Streaming)来更新 PostgreSQL,并且要处理好读写冲突和数据一致性。
  2. 单点故障: 当前架构中,PostgreSQL 是一个单点。生产环境必须部署主从复制,并考虑使用连接池代理(如 PgBouncer)来管理大量连接。
  3. 缓存层: 对于热点用户或特征,可以在 gRPC 服务前增加一层分布式缓存(如 Redis 或 Memcached),将延迟进一步降低到 1ms 级别。但这会引入缓存失效和一致性的新问题。
  4. 特征版本化: 当前设计只获取“最新”特征。一个更成熟的 MLOps 平台需要支持特征的时间旅行(Time-travel),即获取某个特定时间点的特征值,用于模型的回测和重新训练。这需要对数据库 schema 进行重大修改,比如使用基于时间范围的行版本控制。
  5. 扩展性: EAV 模型虽然灵活,但在特征数量巨大时可能会遇到性能问题。对于某些场景,将多个特征合并到 JSONB 字段并使用 GIN 索引可能是更优的选择,这又是一个新的索引优化话题。

  目录