使用 Elixir 与 Nginx 构建百万级实时推荐信令摄取层


项目面临的直接挑战是推荐系统的实时性。传统的推荐模型依赖 T+1 的批量数据处理,用户在当前会话中的行为,如点击、浏览、停留时长等,无法立刻反馈到推荐结果中,导致体验割裂。我们的目标是构建一个信令摄取层,它必须能处理百万级的并发长连接,实时聚合用户行为特征,并将这些特征流式传输给下游的推荐模型服务。任何超过百毫秒的延迟都会显著削弱这套系统的价值。

初步构想是利用 WebSocket 建立客户端与服务器之间的持久化信道,实时上报用户行为。但真正的难点在于服务端如何低成本、高弹性地维护这百万级的并发连接及其关联的用户会话状态。

技术选型上,我们排除了 Node.js 和 Go。Node.js 的单线程事件循环在处理大量 CPU 密集型的聚合计算时,可能会阻塞,影响全局响应。Go 的 Goroutine 模型虽然优秀,但在需要为每个用户会话维护一个隔离、带状态、且需要容错监督的“实体”时,Elixir 基于 Erlang/OTP 的 Actor 模型(轻量级进程)提供了更符合问题域的抽象。每个用户会话可以映射为一个极度轻量(几 KB 内存)的 Elixir 进程,由 OTP 的 Supervisor 进行监管,单个会话的崩溃不会影响系统整体。

而 Nginx 在此架构中扮演的角色远不止一个简单的反向代理。它将作为我们整个 Elixir 集群的流量入口,负责处理 TLS 卸载、WebSocket 协议升级握手,并通过一致性哈希或轮询策略将海量连接负载均衡到后端的多个 Elixir 节点上。这是一个经过生产环境反复验证的、处理 C10M 问题的经典组合。

Nginx: 坚不可摧的第一道防线

在生产项目中,直接将 Elixir 的 Cowboy 服务器暴露在公网上是不可取的。Nginx 提供了必要的安全屏障、负载均衡和性能优化。以下是一份用于生产环境的、针对 WebSocket 负载均衡的 Nginx 配置。

# /etc/nginx/nginx.conf

worker_processes auto;
worker_rlimit_nofile 1000000; # 关键:提升单个 worker 进程的最大文件描述符数

events {
    worker_connections 200000; # 关键:提升单个 worker 的连接数
    multi_accept on;
    use epoll;
}

http {
    # ... (常规 http 配置,如 gzip, log_format, etc.)

    # 定义 Elixir Phoenix 应用服务器集群
    upstream phoenix_cluster {
        # 使用 ip_hash 可以确保来自同一客户端的请求被路由到同一台后端服务器
        # 这对于需要会话粘性的场景很有用。但在我们的 Elixir 分布式架构中,
        # 我们将使用分布式注册表,因此简单的轮询或 least_conn 更为合适。
        # least_conn; 
        server 10.0.1.1:4000;
        server 10.0.1.2:4000;
        server 10.0.1.3:4000;
        # 可以添加更多后端节点
    }

    server {
        listen 80;
        listen 443 ssl http2;
        server_name your.domain.com;

        # TLS/SSL 配置
        ssl_certificate /etc/letsencrypt/live/your.domain.com/fullchain.pem;
        ssl_certificate_key /etc/letsencrypt/live/your.domain.com/privkey.pem;
        # ... 其他 SSL 安全优化配置

        location /socket {
            # 代理到 Phoenix 集群
            proxy_pass http://phoenix_cluster;
            proxy_http_version 1.1;

            # 关键:WebSocket 协议升级的必要头信息
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";

            # 传递真实客户端 IP
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Real-IP $remote_addr;

            # 调整代理超时时间以适应长连接
            proxy_connect_timeout 7d;
            proxy_send_timeout 7d;
            proxy_read_timeout 7d;
        }

        # ... (处理其他 HTTP 请求的 location 块)
    }
}

这份配置的要点在于:

  1. 系统级调优: worker_rlimit_nofileworker_connections 被大幅提高,这是支撑百万连接的基础。操作系统层面也需要同步修改 ulimit -n 的限制。
  2. WebSocket 握手: proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade"; 是 Nginx 代理 WebSocket 流量的“咒语”,它们将客户端的协议升级请求正确传递给后端 Phoenix 应用。
  3. 长连接超时: proxy_*_timeout 被设置为一个很长的时间(例如7天),以防止 Nginx 因为不活跃而主动切断长连接。
  4. 负载均衡: upstream 块定义了后端 Elixir 节点池。在真实项目中,这里通常会使用服务发现动态填充。

Elixir/Phoenix: 构建有状态会话的核心

我们的核心逻辑是,当一个用户通过 WebSocket 连接成功后,系统会为其启动一个专属的 GenServer 进程。这个进程将成为该用户在本次会话中的“数字分身”,负责接收、聚合其实时行为,并在特定时机将聚合结果推送到下游。

1. Phoenix Channel: 通信管道

首先定义客户端与服务器之间的通信管道。

# lib/your_app_web/channels/user_socket.ex
defmodule YourAppWeb.UserSocket do
  use Phoenix.Socket

  # 定义 socket 路径,与 Nginx 配置中的 /socket 对应
  channel "behavior:*", YourAppWeb.BehaviorChannel

  @impl true
  def connect(%{"token" => token}, socket, _connect_info) do
    # 在真实项目中,这里会有严格的 token 验证逻辑
    case YourApp.Accounts.verify_user_token(token) do
      {:ok, user_id} ->
        {:ok, assign(socket, :user_id, user_id)}
      {:error, _reason} ->
        :error
    end
  end

  # 如果没有 token 则拒绝连接
  def connect(_params, _socket, _connect_info), do: :error

  @impl true
  def id(socket), do: "users_socket:#{socket.assigns.user_id}"
end

# lib/your_app_web/channels/behavior_channel.ex
defmodule YourAppWeb.BehaviorChannel do
  use Phoenix.Channel
  alias YourApp.Tracking.SessionTracker

  # 当客户端成功加入 channel 时触发
  @impl true
  def join("behavior:" <> user_id, _payload, socket) do
    # 确保加入 channel 的 user_id 与 token 中的一致
    if socket.assigns.user_id == user_id do
      # 关键:为当前用户启动或查找其专属的 SessionTracker 进程
      {:ok, _pid} = SessionTracker.start_session(user_id)
      {:ok, socket}
    else
      {:error, %{reason: "unauthorized"}}
    end
  end

  # 处理从客户端发来的 "event" 消息
  @impl true
  def handle_in("event", payload, socket) do
    user_id = socket.assigns.user_id
    # 将事件异步地发送给对应的 SessionTracker 进程处理
    SessionTracker.track_event(user_id, payload)
    {:noreply, socket}
  end

  # 客户端断开连接时,通知 SessionTracker 进程
  @impl true
  def terminate(_reason, socket) do
    user_id = socket.assigns.user_id
    SessionTracker.end_session(user_id)
    :ok
  end
end

这里的 BehaviorChannel 并不直接处理业务逻辑。它像一个调度员,接收到事件后,立即将其转发给对应 user_idSessionTracker 进程,从而保持 Channel 本身的无状态和高响应性。

2. SessionTracker GenServer: 状态聚合的核心

这是整个架构的心脏。每个 SessionTracker 都是一个独立的进程,拥有自己的状态、邮箱,并由 Supervisor 树管理其生命周期。

# lib/your_app/tracking/session_tracker.ex
defmodule YourApp.Tracking.SessionTracker do
  use GenServer
  require Logger

  # 30秒聚合一次并向下游发送
  @flush_interval :timer.seconds(30)
  # 会话如果在5分钟内无任何活动,则自动终止
  @session_timeout :timer.minutes(5)

  # --- Client API ---

  def start_session(user_id) do
    # 使用 Registry 来动态地启动和注册进程,确保每个 user_id 只有一个进程
    # `via_name` 会自动处理进程的查找或启动
    via_tuple = {:via, Registry, {YourApp.SessionRegistry, user_id}}
    GenServer.start_link(__MODULE__, %{user_id: user_id}, name: via_tuple)
  end

  def track_event(user_id, event) do
    via_tuple = {:via, Registry, {YourApp.SessionRegistry, user_id}}
    # 异步发送事件,不阻塞 Channel 进程
    GenServer.cast(via_tuple, {:track, event})
  end

  def end_session(user_id) do
    via_tuple = {:via, Registry, {YourApp.SessionRegistry, user_id}}
    GenServer.cast(via_tuple, :terminate_session)
  end

  # --- GenServer Callbacks ---

  @impl true
  def init(args) do
    user_id = args.user_id
    Logger.info("SessionTracker started for user #{user_id}")
    
    # 初始化状态
    state = %{
      user_id: user_id,
      click_counts: %{}, # 按 item_id 统计点击次数
      view_duration: %{}, # 按 page_id 统计浏览时长
      last_event_time: DateTime.utc_now(),
      # 启动一个定时器,周期性地 flush 数据
      flush_timer: Process.send_after(self(), :flush, @flush_interval),
      # 启动一个不活动超时定时器
      timeout_timer: Process.send_after(self(), :check_timeout, @session_timeout)
    }
    
    {:ok, state}
  end

  # 处理 track_event/2 发来的事件
  @impl true
  def handle_cast({:track, %{"type" => "click", "item_id" => item_id}}, state) do
    new_counts = Map.update(state.click_counts, item_id, 1, &(&1 + 1))
    
    # 重置不活动超时定时器
    Process.cancel_timer(state.timeout_timer)
    new_timeout_timer = Process.send_after(self(), :check_timeout, @session_timeout)

    {:noreply,
     %{state | 
       click_counts: new_counts, 
       last_event_time: DateTime.utc_now(),
       timeout_timer: new_timeout_timer
     }}
  end
  
  # 处理其他类型的事件...
  def handle_cast({:track, %{"type" => "view", "page_id" => page_id, "duration" => dur}}, state) do
    # ... 类似地更新 view_duration ...
    {:noreply, state}
  end

  # 客户端断开连接的通知
  @impl true
  def handle_cast(:terminate_session, state) do
    Logger.info("Client disconnected for user #{state.user_id}. Flushing final state.")
    # 立即 flush 一次数据,然后终止
    flush_and_log(state)
    {:stop, :normal, state}
  end

  # 定时 flush 数据
  @impl true
  def handle_info(:flush, state) do
    flush_and_log(state)
    
    # 重置定时器,进行下一次调度
    new_timer = Process.send_after(self(), :flush, @flush_interval)
    
    # 清空已聚合的数据,开始新的窗口期
    {:noreply, %{state | click_counts: %{}, view_duration: %{}, flush_timer: new_timer}}
  end

  # 检查不活动超时
  @impl true
  def handle_info(:check_timeout, state) do
    Logger.warn("Session for user #{state.user_id} timed out due to inactivity.")
    # 同样在超时后 flush 一次数据
    flush_and_log(state)
    {:stop, :shutdown, state}
  end

  @impl true
  def terminate(reason, state) do
    Process.cancel_timer(state.flush_timer)
    Process.cancel_timer(state.timeout_timer)
    Logger.info("SessionTracker for user #{state.user_id} terminating. Reason: #{inspect(reason)}")
    :ok
  end

  # --- Private Helpers ---

  defp flush_and_log(state) do
    # 在真实项目中,这里会将数据序列化 (e.g., Protobuf) 
    # 并发送到 Kafka/Kinesis 等消息队列。
    # 为简化示例,我们只打印到日志。
    features = %{
      user_id: state.user_id,
      timestamp: DateTime.utc_now(),
      features: %{
        clicks: state.click_counts,
        views: state.view_duration
      }
    }
    
    # 只有在有数据时才发送,避免空消息
    if map_size(features.features.clicks) > 0 or map_size(features.features.views) > 0 do
      Logger.info("Flushing features for user #{state.user_id}: #{inspect(features)}")
      # YourApp.KafkaProducer.produce("realtime_features", features)
    end
  end
end

3. 进程注册与监督

手动管理成千上万个进程的 PID 是一个噩梦。Elixir 的 Registry 提供了完美的解决方案。它是一个高效的、分布式的键值存储,专门用于将名称映射到进程 PID。

我们需要在应用启动时,设置一个动态的 Supervisor 来监管这些 SessionTracker 进程。

# lib/your_app/application.ex
defmodule YourApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      # ... other children like Repo, Endpoint
      {Registry, keys: :unique, name: YourApp.SessionRegistry},
      {DynamicSupervisor, name: YourApp.SessionSupervisor, strategy: :one_for_one}
    ]
    Supervisor.start_link(children, strategy: :one_for_one, name: YourApp.Supervisor)
  end
end

然后在 SessionTrackerstart_session/1 函数中,我们不再直接调用 start_link,而是通过 DynamicSupervisor 来启动,这样所有会话进程都处于正确的监督树下。

# lib/your_app/tracking/session_tracker.ex - 修改后的 start_session
def start_session(user_id) do
  # 进程规格
  child_spec = {__MODULE__, %{user_id: user_id}}
  
  # 使用 Registry 确保唯一性,如果进程已存在,则不会重复启动
  Registry.register(YourApp.SessionRegistry, user_id, self())

  # 通过 DynamicSupervisor 启动,并由它监管
  DynamicSupervisor.start_child(YourApp.SessionSupervisor, child_spec)
end

一个常见的错误是让 Channel 进程直接 start_link GenServer。这会导致 GenServer 与 Channel 进程的生命周期绑定,如果 Channel 崩溃,GenServer 也会被终止,不符合我们的容错要求。通过 DynamicSupervisorSessionTracker 的生命周期独立于任何单个的 WebSocket 连接。

数据流与架构概览

现在,我们可以用一个清晰的流程图来描绘整个系统的运作方式。

sequenceDiagram
    participant Client
    participant Nginx
    participant PhoenixNode as Elixir/Phoenix Node
    participant Registry
    participant DynSupervisor as DynamicSupervisor
    participant SessionTracker as SessionTracker (GenServer)
    participant Downstream as Downstream (Kafka)

    Client->>+Nginx: WebSocket Handshake /socket
    Nginx->>+PhoenixNode: Proxy Request
    PhoenixNode-->>-Nginx: Handshake OK
    Nginx-->>-Client: Handshake OK
    Note over Client, PhoenixNode: WebSocket Channel Established

    Client->>PhoenixNode: channel.join("behavior:user123")
    PhoenixNode->>PhoenixNode: BehaviorChannel.join/3
    PhoenixNode->>Registry: Find/Register "user123"
    alt Process not found
        PhoenixNode->>DynSupervisor: start_child(SessionTracker, {user_id: "user123"})
        DynSupervisor->>SessionTracker: start_link()
        SessionTracker-->>DynSupervisor: {:ok, pid}
        DynSupervisor-->>PhoenixNode: {:ok, pid}
    end
    
    loop Real-time Events
        Client->>PhoenixNode: channel.push("event", {type: "click", ...})
        PhoenixNode->>PhoenixNode: BehaviorChannel.handle_in/3
        PhoenixNode->>SessionTracker: GenServer.cast({:track, event})
        Note right of SessionTracker: State is updated internally
    end

    loop Periodic Flush (e.g., every 30s)
        par
            SessionTracker->>SessionTracker: handle_info(:flush)
        and
            SessionTracker->>Downstream: Produce aggregated features
        end
    end

    Client->>PhoenixNode: Disconnects
    PhoenixNode->>PhoenixNode: BehaviorChannel.terminate/2
    PhoenixNode->>SessionTracker: GenServer.cast(:terminate_session)
    SessionTracker->>Downstream: Produce final features
    SessionTracker->>DynSupervisor: Stops

局限性与未来迭代路径

这套架构解决了实时摄取和初步聚合的核心问题,但在投入更大规模的生产环境前,必须正视其局限性:

  1. 内存状态的易失性: SessionTracker 的状态完全存在于进程内存中。如果承载该进程的 Elixir 节点宕机,所有在该节点上的用户会话聚合数据都会丢失。对于要求更高数据持久性的场景,可以考虑引入 Redis 或其他高速缓存,让 SessionTracker 定期将状态快照持久化。这会增加系统复杂度和延迟,是一个需要权衡的决策。

  2. 集群间的进程发现: Registry 默认是单节点范围的。当 Nginx 将同一个用户的不同连接(例如用户刷新页面)路由到不同的 Elixir 节点时,会导致为同一个用户启动多个 SessionTracker 进程。解决方案是使用分布式的进程注册表,例如 Horde.Registry。它能够在整个集群中同步进程名称和 PID,确保 {:via, Horde.Registry, {YourApp.Registry, user_id}} 能够全局唯一地定位到用户的 SessionTracker 进程,无论它在哪个节点上。

  3. 下游反压处理: 当前 flush_and_log 的实现是“即发即忘”的。如果下游的 Kafka 集群出现延迟或故障,SessionTracker 进程会持续尝试发送,可能导致消息堆积在内存中,最终耗尽节点资源。一个更健壮的实现会采用 GenStage 或集成带有背压机制的 Kafka 客户端库,当检测到下游拥堵时,可以采取丢弃、缓存或降低发送频率等策略。

  4. 特征聚合逻辑的灵活性: SessionTracker 内部的聚合逻辑是硬编码的。随着业务发展,产品和算法团队会需要更复杂的特征,例如序列特征、时间窗口特征等。未来可以考虑将聚合逻辑抽象化,甚至动态化,允许通过配置或脚本来定义新的特征提取规则,而无需重新部署整个应用。


  目录