团队的计算机视觉模型迭代速度上来了,但支撑模型运行的特征工程管道却成了一笔技术债。特征散落在不同的 GCS 存储桶、BigQuery 表和一些临时文件中。同一个物体的特征向量可能被重复计算多次,特征的来源和版本混乱不堪。更致命的是,线上服务需要进行多路视频流的实时分析,特征查找的延迟直接影响了业务指标,目前的数据库查询已经达到了百毫秒级别,完全无法接受。
我们需要一个统一的、可追溯的、低延迟的特征平台。它必须能处理图像中复杂的关联关系,并为在线推理提供毫秒级的特征访问。这是我们着手构建这个系统的起点。
初步构想与架构蓝图
我们的目标是建立一个事件驱动的异步处理管道。整个系统将部署在 Google Kubernetes Engine (GKE) 上,以获得弹性和统一的运维环境。
graph TD
subgraph GCP 环境
A[GCS Bucket: raw-images] -- GCS Event --> B[Cloud Pub/Sub: image-events];
end
subgraph GKE Cluster: ml-platform
B -- Pull --> C{Ingestion Service};
C -- Creates Job --> D[K8s Job: CV Processor];
D -- GPU Node --> E[CV Model Execution];
E -- Extracted Features (JSON) --> F{Feature Ingestor};
subgraph Data Storage & Serving
F -- Write Graph Data --> G[Graph DB: Neo4j];
F -- Write Hot Keys --> H[Cache: Redis];
G <-. Periodic Sync .-> I{Cache Warmer};
H <-. Write .-> I;
end
subgraph Serving API
J[API Gateway] --> K{Feature Service};
K -- Read (Cache Hit) --> H;
K -- Read (Cache Miss) --> G;
end
subgraph Management UI
L[Frontend] -- API Calls --> K;
L -- API Calls --> G;
end
end
M[ML Model Server] -- Feature Request --> J;
这个流程的核心思路是解耦。
- 接收与调度:
Ingestion Service消费上游的图像事件,并为每个图像创建一个独立的 Kubernetes Job。这种模式允许我们根据负载动态扩展处理能力。 - 特征提取:
CV ProcessorJob 在专用的 GPU 节点上运行,执行耗时的计算任务,提取实体、关系和特征向量。 - 双重存储: 提取的特征被写入两个地方。图数据库 (Neo4j) 作为“真理之源”,存储所有实体及其复杂关系,用于深度分析和可追溯性。Redis 作为一级缓存,存储在线服务最需要的热点特征向量,确保低延迟。
- 实时服务:
Feature ServiceAPI 遵循 Cache-Aside 模式,优先查询 Redis,失败则回源到图数据库。
技术选型决策的背后
在真实项目中,技术选型从来不是只看“新”或“酷”,而是基于具体问题的权衡。
为什么是 GKE 而非 Cloud Run 或 VMs?
- 异构工作负载: 我们同时需要无状态的 API 服务、有状态的数据库和耗费资源的批处理任务(CV Job)。GKE 提供了统一的编排层来管理这一切。
- GPU 支持: CV 推理和训练严重依赖 GPU。GKE 对 GPU 节点池的良好支持和调度能力是决定性因素。我们可以创建一个按需自动伸缩的 GPU 节点池,只在有 Job 时才启动,有效控制成本。
- 生态系统: Kubernetes 成熟的生态,如 Helm、Istio、Prometheus,为后续的服务治理和可观测性建设铺平了道路。
为什么选择图数据库?
- 一个常见的错误是试图用关系型数据库或文档数据库来模拟复杂的图关系。一张图片可能包含
人、车、建筑等多个实体,这些实体之间存在驾驶、位于...旁边等关系。用 SQL 查询“所有包含张三且他旁边有一辆蓝色汽车的图片”会产生大量昂贵的 JOIN 操作。 - 图数据库将关系作为一等公民。
(Person {name: "张三"})-[:NEAR]->(Car {color: "blue"})这样的查询在图模型中极其高效。这对数据科学家的探索性分析和特征发现至关重要。
- 一个常见的错误是试图用关系型数据库或文档数据库来模拟复杂的图关系。一张图片可能包含
为什么还需要 Redis?
- 图数据库的遍历查询虽然高效,但对于“给定实体ID,获取其最新的特征向量”这类点查,其延迟通常在 10ms-100ms 范围,这取决于数据模型和集群负载。
- 对于在线模型推理,我们需要的是稳定的亚毫秒级(sub-millisecond)延迟。Redis 这种纯内存存储是实现这一目标的不二之选。它在这里的角色不是通用缓存,而是服务于在线系统的高性能“只读副本”。
步骤化实现:代码与配置
1. GKE 集群基础设施 (Terraform)
我们使用 Terraform 来管理基础设施,确保环境的可复现性。一个关键点是定义两个不同的节点池。
# main.tf
provider "google" {
project = var.gcp_project_id
region = var.gcp_region
}
resource "google_container_cluster" "ml_platform_cluster" {
name = "ml-platform-cluster"
location = var.gcp_region
# ... 其他集群配置,如网络、认证等
remove_default_node_pool = true
initial_node_count = 1
}
# 通用服务节点池
resource "google_container_node_pool" "primary_nodes" {
name = "primary-node-pool"
cluster = google_container_cluster.ml_platform_cluster.name
location = var.gcp_region
node_count = 2
node_config {
machine_type = "e2-standard-4"
oauth_scopes = [
"https://www.googleapis.com/auth/cloud-platform"
]
}
}
# GPU 计算节点池,支持自动伸缩
resource "google_container_node_pool" "gpu_nodes" {
name = "gpu-node-pool"
cluster = google_container_cluster.ml_platform_cluster.name
location = var.gcp_region
autoscaling {
min_node_count = 0
max_node_count = 5
}
management {
auto_repair = true
auto_upgrade = true
}
node_config {
machine_type = "n1-standard-8"
guest_accelerator {
type = "nvidia-tesla-t4"
count = 1
}
# GKE 需要这个 taint 来确保只有明确请求 GPU 的 Pod 才会调度到这里
taint {
key = "nvidia.com/gpu"
value = "present"
effect = "NO_SCHEDULE"
}
oauth_scopes = [
"https://www.googleapis.com/auth/cloud-platform"
]
}
}
这里的 taint 和 autoscaling (min_node_count = 0) 是成本优化的关键。当没有 CV 任务时,GPU 节点池可以缩减到 0,避免了高昂的闲置费用。
2. CV 处理 Job 与调度服务
Ingestion Service 是一个简单的 Python FastAPI 应用,监听 Pub/Sub 订阅。
# ingestion_service/main.py
import os
import base64
import json
import logging
from fastapi import FastAPI, Request, status
from google.cloud import pubsub_v1
from kubernetes import client, config
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
app = FastAPI()
# GKE In-Cluster 配置
try:
config.load_incluster_config()
except config.ConfigException:
# 本地开发时使用 kubeconfig
config.load_kube_config()
batch_v1 = client.BatchV1Api()
namespace = os.getenv("K8S_NAMESPACE", "default")
def create_cv_job(image_path: str, message_id: str) -> client.V1Job:
# 保证 Job 名称唯一性,使用 message_id
job_name = f"cv-processor-{message_id.lower().replace('_', '-')}"
container = client.V1Container(
name="cv-processor",
image="gcr.io/my-project/cv-processor:latest",
args=["--image-path", image_path],
resources=client.V1ResourceRequirements(
limits={"nvidia.com/gpu": "1"}, # 请求一个 GPU
requests={"nvidia.com/gpu": "1"}
),
env=[
client.V1EnvVar(name="NEO4J_URI", value=os.getenv("NEO4J_URI")),
client.V1EnvVar(name="NEO4J_USER", value=os.getenv("NEO4J_USER")),
# 密码应使用 K8s Secret 挂载
client.V1EnvVar(name="NEO4J_PASSWORD", value_from=client.V1EnvVarSource(
secret_key_ref=client.V1SecretKeySelector(name="neo4j-secrets", key="password")
)),
]
)
# Job Spec
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": "cv-processor"}),
spec=client.V1PodSpec(
containers=[container],
restart_policy="Never",
# 容忍 GPU 节点的 taint
tolerations=[
client.V1Toleration(
key="nvidia.com/gpu",
operator="Exists",
effect="NoSchedule"
)
]
)
)
# Job Body
body = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=client.V1ObjectMeta(name=job_name),
spec=client.V1JobSpec(
template=template,
backoff_limit=2, # 重试次数
ttl_seconds_after_finished=3600 # 1小时后自动清理已完成的 Job
)
)
return body
@app.post("/pubsub/push")
async def handle_pubsub_push(request: Request):
envelope = await request.json()
if not envelope or "message" not in envelope:
logging.error("Invalid Pub/Sub message format")
return {"status": "error"}, status.HTTP_400_BAD_REQUEST
message = envelope["message"]
# 实际项目中,需要对消息来源和格式做更严格的校验
try:
data = base64.b64decode(message["data"]).decode("utf-8")
attributes = message["attributes"]
# GCS 事件格式
if attributes.get("eventType") == "OBJECT_FINALIZE":
bucket_id = attributes["bucketId"]
object_id = attributes["objectId"]
image_gcs_path = f"gs://{bucket_id}/{object_id}"
logging.info(f"Received new image notification: {image_gcs_path}")
job_body = create_cv_job(image_gcs_path, message["messageId"])
try:
batch_v1.create_namespaced_job(body=job_body, namespace=namespace)
logging.info(f"Successfully created Job: {job_body.metadata.name}")
except client.ApiException as e:
logging.error(f"Failed to create K8s Job: {e}")
# 此处应有重试或告警逻辑
return {"status": "error"}, status.HTTP_500_INTERNAL_SERVER_ERROR
except Exception as e:
logging.error(f"Error processing message: {e}")
return {"status": "error"}, status.HTTP_500_INTERNAL_SERVER_ERROR
return {"status": "success"}, status.HTTP_204_NO_CONTENT
这个服务是连接云存储和 Kubernetes 计算集群的桥梁。tolerations 和 resources 的设置是确保 Pod 能被正确调度到 GPU 节点上的关键。
3. 图数据建模与入库
我们的图模型很简单:
- 节点:
Image,Object,Feature - 关系:
(Image)-[:CONTAINS]->(Object),(Object)-[:HAS_FEATURE]->(Feature)
cv-processor Job 的核心脚本会调用 CV 模型,然后将结果构造成图数据并写入 Neo4j。
# cv_processor/main.py
import uuid
from neo4j import GraphDatabase, exceptions
# ... (省略 CV 模型加载和推理部分) ...
class GraphIngestor:
def __init__(self, uri, user, password):
self._driver = GraphDatabase.driver(uri, auth=(user, password))
def close(self):
self._driver.close()
def add_image_features(self, image_path, detection_results):
# detection_results 是一个 list of dicts, e.g.,
# [{'label': 'person', 'box': [x,y,w,h], 'embedding': [0.1, ...]}, ...]
with self._driver.session() as session:
# 使用事务确保数据一致性
session.execute_write(self._create_image_and_features_tx, image_path, detection_results)
@staticmethod
def _create_image_and_features_tx(tx, image_path, results):
# 步骤 1: 创建或合并 Image 节点,确保幂等性
# MERGE 会在节点不存在时创建,存在时匹配,避免重复
query = (
"MERGE (img:Image {path: $image_path}) "
"RETURN id(img) as img_id"
)
tx.run(query, image_path=image_path)
# 步骤 2: 遍历检测到的每个物体
for res in results:
object_id = str(uuid.uuid4()) # 为每个检测到的实例生成唯一ID
# 创建 Object 节点,并与 Image 建立关系
# 同时创建 Feature 节点并关联
object_query = (
"MATCH (img:Image {path: $image_path}) "
"CREATE (obj:Object {uuid: $object_id, label: $label, bounding_box: $box}) "
"CREATE (feat:Feature {vector: $embedding}) "
"CREATE (img)-[:CONTAINS]->(obj) "
"CREATE (obj)-[:HAS_FEATURE {model_version: 'yolo-v5-resnet50'}]->(feat)"
)
tx.run(object_query,
image_path=image_path,
object_id=object_id,
label=res['label'],
box=res['box'],
embedding=res['embedding'])
# ... (主函数解析参数,调用推理,然后调用 GraphIngestor) ...
这里的核心是 MERGE 和 CREATE 的使用。MERGE 用于处理 Image 这种可以唯一标识的实体,防止重复创建。对于每次检测到的 Object 实例,我们使用 CREATE,因为它们是新发现的实体。将模型版本号作为关系属性,是实现特征版本管理的第一步。
4. 低延迟服务 API 与 Redis 缓存
Feature Service 提供在线查询。
# feature_service/main.py
import redis
from fastapi import FastAPI, HTTPException
from neo4j import GraphDatabase
# ... (省略配置和客户端初始化) ...
app = FastAPI()
redis_client = redis.Redis(host=os.getenv("REDIS_HOST"), port=6379, db=0, decode_responses=True)
neo4j_driver = GraphDatabase.driver(...)
@app.get("/features/object/{object_uuid}")
async def get_object_features(object_uuid: str):
# 步骤 1: 检查 Redis 缓存
cache_key = f"feature:object:{object_uuid}"
try:
cached_features = redis_client.get(cache_key)
if cached_features:
# 在真实项目中,这里应该记录一个 cache_hit 指标
return {"source": "cache", "features": json.loads(cached_features)}
except redis.exceptions.RedisError as e:
# 如果 Redis 故障,不能影响主流程,记录错误并继续
logging.error(f"Redis GET failed for key {cache_key}: {e}")
# 步骤 2: 缓存未命中,查询图数据库
# 在真实项目中,这里应该记录一个 cache_miss 指标
try:
with neo4j_driver.session() as session:
result = session.read_transaction(find_features_by_uuid, object_uuid)
except exceptions.ServiceUnavailable as e:
logging.error(f"Neo4j connection failed: {e}")
raise HTTPException(status_code=503, detail="Data backend unavailable")
if not result:
raise HTTPException(status_code=404, detail="Object not found")
# 步骤 3: 异步更新缓存
# 使用 setex 设置 TTL,例如 24 小时
try:
redis_client.setex(cache_key, 86400, json.dumps(result))
except redis.exceptions.RedisError as e:
# 缓存写入失败也应记录,但不阻塞返回
logging.error(f"Redis SETEX failed for key {cache_key}: {e}")
return {"source": "database", "features": result}
def find_features_by_uuid(tx, uuid):
query = (
"MATCH (obj:Object {uuid: $uuid})-[:HAS_FEATURE]->(feat:Feature) "
"RETURN feat.vector as vector"
)
records = tx.run(query, uuid=uuid)
# 可能一个物体有多个模型生成的特征,这里返回一个列表
return [record["vector"] for record in records]
这个 API 的设计体现了容错性。Redis 的失败不应该导致整个服务中断。查询数据库后对缓存的更新是“异步”的(尽管代码是同步的,但它在返回给用户之前完成,逻辑上是为了填充缓存),保证了用户能尽快拿到数据。
5. 用于探索的前端面板 (Sass/SCSS)
一个内部平台的用户体验同样重要。我们为数据科学家提供了一个简单的界面来可视化图谱。Sass/SCSS 在构建这种复杂组件时能极大地提升效率和可维护性。
假设我们有一个 NodeInspector.vue 组件,用于显示点击节点的信息。
// components/NodeInspector.scss
// 定义主题变量,便于统一修改
$primary-text-color: #e0e0e0;
$secondary-text-color: #b0b0b0;
$background-color: #2c2c2c;
$border-color: #444;
$accent-color: #4CAF50;
$font-family: 'Fira Code', 'Consolas', monospace;
// Mixin 用于可重用的样式块,例如 Flexbox 布局
@mixin flex-container($direction: row, $justify: flex-start, $align: stretch) {
display: flex;
flex-direction: $direction;
justify-content: $justify;
align-items: $align;
}
.node-inspector {
@include flex-container(column);
background-color: $background-color;
border-left: 1px solid $border-color;
color: $primary-text-color;
font-family: $font-family;
padding: 16px;
width: 350px;
height: 100vh;
&__header {
margin-bottom: 20px;
.title {
font-size: 1.2rem;
font-weight: 500;
color: $accent-color;
}
.subtitle {
font-size: 0.8rem;
color: $secondary-text-color;
}
}
&__properties {
flex-grow: 1;
overflow-y: auto;
.property {
@include flex-container(row, space-between);
margin-bottom: 8px;
font-size: 0.9rem;
// 使用 & 选择器和嵌套,结构非常清晰
&-key {
font-weight: bold;
color: $secondary-text-color;
}
&-value {
word-break: break-all;
text-align: right;
max-width: 60%;
&--long {
font-size: 0.8rem;
color: #888;
// 省略号处理长文本
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
}
}
}
&__actions {
padding-top: 16px;
border-top: 1px solid $border-color;
@include flex-container(row, flex-end);
button {
background: none;
border: 1px solid $accent-color;
color: $accent-color;
padding: 8px 16px;
cursor: pointer;
transition: all 0.2s ease-in-out;
&:hover {
background-color: rgba($accent-color, 0.1);
}
}
}
}
通过变量、Mixin 和嵌套,这份 SCSS 代码比纯 CSS 意图更明确,修改主题颜色或布局逻辑只需要改动一处。这在构建大型、多组件的前端应用时是至关重要的。
遗留问题与未来迭代
这套系统解决了我们最初的痛点,但它远非完美。
- 图数据库的扩展性: 我们的 Neo4j 目前是单点部署。随着数据量达到十亿、百亿级别,单机实例将成为瓶颈。Neo4j 的因果集群提供了读扩展,但写扩展仍然是一个挑战。未来可能需要探索 JanusGraph 这种基于分布式后端的图数据库,但这会带来更高的运维复杂性。
- 缓存一致性: 目前的缓存更新策略是读时穿透和周期性预热。这在大多数场景下够用,但在特征被频繁更新的场景下,可能会有短暂的数据不一致。一个更优的方案是引入 CDC (Change Data Capture),监听图数据库的变更流,实时地更新或失效 Redis 缓存。
- 特征版本与回滚: 当前我们只在关系上标记了模型版本。一个完整的 MLOps 平台需要支持特征集的版本化。当新模型效果不佳时,如何快速回滚到使用旧版本特征?这需要在数据模型和服务 API 层面进行更复杂的设计。
- 成本精细化控制: GKE GPU 节点的自动缩容解决了大部分成本问题,但对于 Job 的资源请求(CPU/Memory)还不够精细。引入 VPA (Vertical Pod Autoscaler) 来自动推荐和调整资源请求,可以进一步提高资源利用率,降低计算成本。