项目面临的直接挑战是推荐系统的实时性。传统的推荐模型依赖 T+1 的批量数据处理,用户在当前会话中的行为,如点击、浏览、停留时长等,无法立刻反馈到推荐结果中,导致体验割裂。我们的目标是构建一个信令摄取层,它必须能处理百万级的并发长连接,实时聚合用户行为特征,并将这些特征流式传输给下游的推荐模型服务。任何超过百毫秒的延迟都会显著削弱这套系统的价值。
初步构想是利用 WebSocket 建立客户端与服务器之间的持久化信道,实时上报用户行为。但真正的难点在于服务端如何低成本、高弹性地维护这百万级的并发连接及其关联的用户会话状态。
技术选型上,我们排除了 Node.js 和 Go。Node.js 的单线程事件循环在处理大量 CPU 密集型的聚合计算时,可能会阻塞,影响全局响应。Go 的 Goroutine 模型虽然优秀,但在需要为每个用户会话维护一个隔离、带状态、且需要容错监督的“实体”时,Elixir 基于 Erlang/OTP 的 Actor 模型(轻量级进程)提供了更符合问题域的抽象。每个用户会话可以映射为一个极度轻量(几 KB 内存)的 Elixir 进程,由 OTP 的 Supervisor 进行监管,单个会话的崩溃不会影响系统整体。
而 Nginx 在此架构中扮演的角色远不止一个简单的反向代理。它将作为我们整个 Elixir 集群的流量入口,负责处理 TLS 卸载、WebSocket 协议升级握手,并通过一致性哈希或轮询策略将海量连接负载均衡到后端的多个 Elixir 节点上。这是一个经过生产环境反复验证的、处理 C10M 问题的经典组合。
Nginx: 坚不可摧的第一道防线
在生产项目中,直接将 Elixir 的 Cowboy 服务器暴露在公网上是不可取的。Nginx 提供了必要的安全屏障、负载均衡和性能优化。以下是一份用于生产环境的、针对 WebSocket 负载均衡的 Nginx 配置。
# /etc/nginx/nginx.conf
worker_processes auto;
worker_rlimit_nofile 1000000; # 关键:提升单个 worker 进程的最大文件描述符数
events {
worker_connections 200000; # 关键:提升单个 worker 的连接数
multi_accept on;
use epoll;
}
http {
# ... (常规 http 配置,如 gzip, log_format, etc.)
# 定义 Elixir Phoenix 应用服务器集群
upstream phoenix_cluster {
# 使用 ip_hash 可以确保来自同一客户端的请求被路由到同一台后端服务器
# 这对于需要会话粘性的场景很有用。但在我们的 Elixir 分布式架构中,
# 我们将使用分布式注册表,因此简单的轮询或 least_conn 更为合适。
# least_conn;
server 10.0.1.1:4000;
server 10.0.1.2:4000;
server 10.0.1.3:4000;
# 可以添加更多后端节点
}
server {
listen 80;
listen 443 ssl http2;
server_name your.domain.com;
# TLS/SSL 配置
ssl_certificate /etc/letsencrypt/live/your.domain.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/your.domain.com/privkey.pem;
# ... 其他 SSL 安全优化配置
location /socket {
# 代理到 Phoenix 集群
proxy_pass http://phoenix_cluster;
proxy_http_version 1.1;
# 关键:WebSocket 协议升级的必要头信息
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
# 传递真实客户端 IP
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Real-IP $remote_addr;
# 调整代理超时时间以适应长连接
proxy_connect_timeout 7d;
proxy_send_timeout 7d;
proxy_read_timeout 7d;
}
# ... (处理其他 HTTP 请求的 location 块)
}
}
这份配置的要点在于:
- 系统级调优:
worker_rlimit_nofile和worker_connections被大幅提高,这是支撑百万连接的基础。操作系统层面也需要同步修改ulimit -n的限制。 - WebSocket 握手:
proxy_set_header Upgrade $http_upgrade;和proxy_set_header Connection "upgrade";是 Nginx 代理 WebSocket 流量的“咒语”,它们将客户端的协议升级请求正确传递给后端 Phoenix 应用。 - 长连接超时:
proxy_*_timeout被设置为一个很长的时间(例如7天),以防止 Nginx 因为不活跃而主动切断长连接。 - 负载均衡:
upstream块定义了后端 Elixir 节点池。在真实项目中,这里通常会使用服务发现动态填充。
Elixir/Phoenix: 构建有状态会话的核心
我们的核心逻辑是,当一个用户通过 WebSocket 连接成功后,系统会为其启动一个专属的 GenServer 进程。这个进程将成为该用户在本次会话中的“数字分身”,负责接收、聚合其实时行为,并在特定时机将聚合结果推送到下游。
1. Phoenix Channel: 通信管道
首先定义客户端与服务器之间的通信管道。
# lib/your_app_web/channels/user_socket.ex
defmodule YourAppWeb.UserSocket do
use Phoenix.Socket
# 定义 socket 路径,与 Nginx 配置中的 /socket 对应
channel "behavior:*", YourAppWeb.BehaviorChannel
@impl true
def connect(%{"token" => token}, socket, _connect_info) do
# 在真实项目中,这里会有严格的 token 验证逻辑
case YourApp.Accounts.verify_user_token(token) do
{:ok, user_id} ->
{:ok, assign(socket, :user_id, user_id)}
{:error, _reason} ->
:error
end
end
# 如果没有 token 则拒绝连接
def connect(_params, _socket, _connect_info), do: :error
@impl true
def id(socket), do: "users_socket:#{socket.assigns.user_id}"
end
# lib/your_app_web/channels/behavior_channel.ex
defmodule YourAppWeb.BehaviorChannel do
use Phoenix.Channel
alias YourApp.Tracking.SessionTracker
# 当客户端成功加入 channel 时触发
@impl true
def join("behavior:" <> user_id, _payload, socket) do
# 确保加入 channel 的 user_id 与 token 中的一致
if socket.assigns.user_id == user_id do
# 关键:为当前用户启动或查找其专属的 SessionTracker 进程
{:ok, _pid} = SessionTracker.start_session(user_id)
{:ok, socket}
else
{:error, %{reason: "unauthorized"}}
end
end
# 处理从客户端发来的 "event" 消息
@impl true
def handle_in("event", payload, socket) do
user_id = socket.assigns.user_id
# 将事件异步地发送给对应的 SessionTracker 进程处理
SessionTracker.track_event(user_id, payload)
{:noreply, socket}
end
# 客户端断开连接时,通知 SessionTracker 进程
@impl true
def terminate(_reason, socket) do
user_id = socket.assigns.user_id
SessionTracker.end_session(user_id)
:ok
end
end
这里的 BehaviorChannel 并不直接处理业务逻辑。它像一个调度员,接收到事件后,立即将其转发给对应 user_id 的 SessionTracker 进程,从而保持 Channel 本身的无状态和高响应性。
2. SessionTracker GenServer: 状态聚合的核心
这是整个架构的心脏。每个 SessionTracker 都是一个独立的进程,拥有自己的状态、邮箱,并由 Supervisor 树管理其生命周期。
# lib/your_app/tracking/session_tracker.ex
defmodule YourApp.Tracking.SessionTracker do
use GenServer
require Logger
# 30秒聚合一次并向下游发送
@flush_interval :timer.seconds(30)
# 会话如果在5分钟内无任何活动,则自动终止
@session_timeout :timer.minutes(5)
# --- Client API ---
def start_session(user_id) do
# 使用 Registry 来动态地启动和注册进程,确保每个 user_id 只有一个进程
# `via_name` 会自动处理进程的查找或启动
via_tuple = {:via, Registry, {YourApp.SessionRegistry, user_id}}
GenServer.start_link(__MODULE__, %{user_id: user_id}, name: via_tuple)
end
def track_event(user_id, event) do
via_tuple = {:via, Registry, {YourApp.SessionRegistry, user_id}}
# 异步发送事件,不阻塞 Channel 进程
GenServer.cast(via_tuple, {:track, event})
end
def end_session(user_id) do
via_tuple = {:via, Registry, {YourApp.SessionRegistry, user_id}}
GenServer.cast(via_tuple, :terminate_session)
end
# --- GenServer Callbacks ---
@impl true
def init(args) do
user_id = args.user_id
Logger.info("SessionTracker started for user #{user_id}")
# 初始化状态
state = %{
user_id: user_id,
click_counts: %{}, # 按 item_id 统计点击次数
view_duration: %{}, # 按 page_id 统计浏览时长
last_event_time: DateTime.utc_now(),
# 启动一个定时器,周期性地 flush 数据
flush_timer: Process.send_after(self(), :flush, @flush_interval),
# 启动一个不活动超时定时器
timeout_timer: Process.send_after(self(), :check_timeout, @session_timeout)
}
{:ok, state}
end
# 处理 track_event/2 发来的事件
@impl true
def handle_cast({:track, %{"type" => "click", "item_id" => item_id}}, state) do
new_counts = Map.update(state.click_counts, item_id, 1, &(&1 + 1))
# 重置不活动超时定时器
Process.cancel_timer(state.timeout_timer)
new_timeout_timer = Process.send_after(self(), :check_timeout, @session_timeout)
{:noreply,
%{state |
click_counts: new_counts,
last_event_time: DateTime.utc_now(),
timeout_timer: new_timeout_timer
}}
end
# 处理其他类型的事件...
def handle_cast({:track, %{"type" => "view", "page_id" => page_id, "duration" => dur}}, state) do
# ... 类似地更新 view_duration ...
{:noreply, state}
end
# 客户端断开连接的通知
@impl true
def handle_cast(:terminate_session, state) do
Logger.info("Client disconnected for user #{state.user_id}. Flushing final state.")
# 立即 flush 一次数据,然后终止
flush_and_log(state)
{:stop, :normal, state}
end
# 定时 flush 数据
@impl true
def handle_info(:flush, state) do
flush_and_log(state)
# 重置定时器,进行下一次调度
new_timer = Process.send_after(self(), :flush, @flush_interval)
# 清空已聚合的数据,开始新的窗口期
{:noreply, %{state | click_counts: %{}, view_duration: %{}, flush_timer: new_timer}}
end
# 检查不活动超时
@impl true
def handle_info(:check_timeout, state) do
Logger.warn("Session for user #{state.user_id} timed out due to inactivity.")
# 同样在超时后 flush 一次数据
flush_and_log(state)
{:stop, :shutdown, state}
end
@impl true
def terminate(reason, state) do
Process.cancel_timer(state.flush_timer)
Process.cancel_timer(state.timeout_timer)
Logger.info("SessionTracker for user #{state.user_id} terminating. Reason: #{inspect(reason)}")
:ok
end
# --- Private Helpers ---
defp flush_and_log(state) do
# 在真实项目中,这里会将数据序列化 (e.g., Protobuf)
# 并发送到 Kafka/Kinesis 等消息队列。
# 为简化示例,我们只打印到日志。
features = %{
user_id: state.user_id,
timestamp: DateTime.utc_now(),
features: %{
clicks: state.click_counts,
views: state.view_duration
}
}
# 只有在有数据时才发送,避免空消息
if map_size(features.features.clicks) > 0 or map_size(features.features.views) > 0 do
Logger.info("Flushing features for user #{state.user_id}: #{inspect(features)}")
# YourApp.KafkaProducer.produce("realtime_features", features)
end
end
end
3. 进程注册与监督
手动管理成千上万个进程的 PID 是一个噩梦。Elixir 的 Registry 提供了完美的解决方案。它是一个高效的、分布式的键值存储,专门用于将名称映射到进程 PID。
我们需要在应用启动时,设置一个动态的 Supervisor 来监管这些 SessionTracker 进程。
# lib/your_app/application.ex
defmodule YourApp.Application do
use Application
def start(_type, _args) do
children = [
# ... other children like Repo, Endpoint
{Registry, keys: :unique, name: YourApp.SessionRegistry},
{DynamicSupervisor, name: YourApp.SessionSupervisor, strategy: :one_for_one}
]
Supervisor.start_link(children, strategy: :one_for_one, name: YourApp.Supervisor)
end
end
然后在 SessionTracker 的 start_session/1 函数中,我们不再直接调用 start_link,而是通过 DynamicSupervisor 来启动,这样所有会话进程都处于正确的监督树下。
# lib/your_app/tracking/session_tracker.ex - 修改后的 start_session
def start_session(user_id) do
# 进程规格
child_spec = {__MODULE__, %{user_id: user_id}}
# 使用 Registry 确保唯一性,如果进程已存在,则不会重复启动
Registry.register(YourApp.SessionRegistry, user_id, self())
# 通过 DynamicSupervisor 启动,并由它监管
DynamicSupervisor.start_child(YourApp.SessionSupervisor, child_spec)
end
一个常见的错误是让 Channel 进程直接 start_link GenServer。这会导致 GenServer 与 Channel 进程的生命周期绑定,如果 Channel 崩溃,GenServer 也会被终止,不符合我们的容错要求。通过 DynamicSupervisor,SessionTracker 的生命周期独立于任何单个的 WebSocket 连接。
数据流与架构概览
现在,我们可以用一个清晰的流程图来描绘整个系统的运作方式。
sequenceDiagram
participant Client
participant Nginx
participant PhoenixNode as Elixir/Phoenix Node
participant Registry
participant DynSupervisor as DynamicSupervisor
participant SessionTracker as SessionTracker (GenServer)
participant Downstream as Downstream (Kafka)
Client->>+Nginx: WebSocket Handshake /socket
Nginx->>+PhoenixNode: Proxy Request
PhoenixNode-->>-Nginx: Handshake OK
Nginx-->>-Client: Handshake OK
Note over Client, PhoenixNode: WebSocket Channel Established
Client->>PhoenixNode: channel.join("behavior:user123")
PhoenixNode->>PhoenixNode: BehaviorChannel.join/3
PhoenixNode->>Registry: Find/Register "user123"
alt Process not found
PhoenixNode->>DynSupervisor: start_child(SessionTracker, {user_id: "user123"})
DynSupervisor->>SessionTracker: start_link()
SessionTracker-->>DynSupervisor: {:ok, pid}
DynSupervisor-->>PhoenixNode: {:ok, pid}
end
loop Real-time Events
Client->>PhoenixNode: channel.push("event", {type: "click", ...})
PhoenixNode->>PhoenixNode: BehaviorChannel.handle_in/3
PhoenixNode->>SessionTracker: GenServer.cast({:track, event})
Note right of SessionTracker: State is updated internally
end
loop Periodic Flush (e.g., every 30s)
par
SessionTracker->>SessionTracker: handle_info(:flush)
and
SessionTracker->>Downstream: Produce aggregated features
end
end
Client->>PhoenixNode: Disconnects
PhoenixNode->>PhoenixNode: BehaviorChannel.terminate/2
PhoenixNode->>SessionTracker: GenServer.cast(:terminate_session)
SessionTracker->>Downstream: Produce final features
SessionTracker->>DynSupervisor: Stops
局限性与未来迭代路径
这套架构解决了实时摄取和初步聚合的核心问题,但在投入更大规模的生产环境前,必须正视其局限性:
内存状态的易失性:
SessionTracker的状态完全存在于进程内存中。如果承载该进程的 Elixir 节点宕机,所有在该节点上的用户会话聚合数据都会丢失。对于要求更高数据持久性的场景,可以考虑引入 Redis 或其他高速缓存,让SessionTracker定期将状态快照持久化。这会增加系统复杂度和延迟,是一个需要权衡的决策。集群间的进程发现:
Registry默认是单节点范围的。当 Nginx 将同一个用户的不同连接(例如用户刷新页面)路由到不同的 Elixir 节点时,会导致为同一个用户启动多个SessionTracker进程。解决方案是使用分布式的进程注册表,例如Horde.Registry。它能够在整个集群中同步进程名称和 PID,确保{:via, Horde.Registry, {YourApp.Registry, user_id}}能够全局唯一地定位到用户的SessionTracker进程,无论它在哪个节点上。下游反压处理: 当前
flush_and_log的实现是“即发即忘”的。如果下游的 Kafka 集群出现延迟或故障,SessionTracker进程会持续尝试发送,可能导致消息堆积在内存中,最终耗尽节点资源。一个更健壮的实现会采用GenStage或集成带有背压机制的 Kafka 客户端库,当检测到下游拥堵时,可以采取丢弃、缓存或降低发送频率等策略。特征聚合逻辑的灵活性:
SessionTracker内部的聚合逻辑是硬编码的。随着业务发展,产品和算法团队会需要更复杂的特征,例如序列特征、时间窗口特征等。未来可以考虑将聚合逻辑抽象化,甚至动态化,允许通过配置或脚本来定义新的特征提取规则,而无需重新部署整个应用。