基于 Monorepo 构建支持实时反馈的跨平台开发者工作台


团队规模扩张到五十人以上时,微服务和多仓库(Multi-Repo)带来的管理混乱开始集中爆发。CI/CD 流水线状态分散、模块间版本依赖冲突、本地环境搭建耗时数小时,这些问题严重拖慢了开发迭代的速度。我们的核心痛点非常明确:开发者缺乏一个统一、高效、实时的信息反馈渠道来了解整个项目的健康状况。单纯的 Jenkins 或 GitLab 界面信息过于分散和延迟,无法满足快速定位问题的需求。

我们构想的解决方案是一个内部的“开发者工作台”,它需要具备几个核心特质:

  1. 信息聚合: 在单一视图中展示所有关键模块的构建、测试、部署状态。
  2. 实时反馈: CI/CD 任何阶段的变化,都应在秒级内推送到工作台界面。
  3. 跨平台: 无论是开发机(macOS/Linux/Windows)还是移动设备(Android),都应该有原生的使用体验。
  4. 统一代码基: 工作台本身及其后端服务的开发维护,不能再引入新的技术孤岛和仓库。

基于这些目标,技术选型决策变得清晰起来。

  • 代码组织 - Monorepo: 这是解决问题的基石。我们选择使用 Gradle 多项目构建,将所有后端服务、共享库、客户端代码统一管理。这从根本上解决了依赖管理和原子化提交的问题。
  • 后端服务 - Spring Boot + JPA/Hibernate: 快速开发是关键。Spring Boot 的生态和 JPA 的高效率数据访问能力,让我们能快速搭建一个用于聚合CI/CD数据的API服务。
  • 实时消息 - Redis Pub/Sub: 为了实现秒级反馈,轮询 API 显然是低效且不优雅的。Redis 的发布/订阅模式轻量、快速,非常适合作为 CI/CD 事件的实时消息总线。
  • 客户端 - Jetpack Compose for Desktop/Android: 我们需要一个能覆盖桌面和移动端的UI框架。Jetpack Compose 凭借其声明式UI和强大的跨平台能力(Compose for Desktop),成为不二之选。它让我们能用同一套 Kotlin 代码库,构建出性能优异的原生UI。

整个系统的核心架构如下,CI/CD 工具(如 Jenkins/GitLab CI)在执行任务的各个阶段,通过一个简单的脚本将事件推送到后端服务,后端服务处理后发布到 Redis 的特定频道,最终由 Compose 客户端订阅并实时更新UI。

graph TD
    subgraph "CI/CD Environment"
        A[GitLab CI Runner] -- HTTP POST --> B{Developer API Service};
    end

    subgraph "Backend Infrastructure"
        B -- "JPA/Hibernate for persistence" --> C[PostgreSQL Database];
        B -- "Publish Event (e.g., build_status_update)" --> D[Redis Pub/Sub];
    end

    subgraph "Real-time Delivery"
        E[WebSocket Gateway] -- "Subscribes to Redis Channel" --> D;
    end

    subgraph "Client Side"
        F[Jetpack Compose Desktop Client] -- "WebSocket Connection" --> E;
        G[Jetpack Compose Android Client] -- "WebSocket Connection" --> E;
    end

    B -- "Writes build history" --> C;
    A -.-> |"Payload: {moduleId, status, duration}"| B;
    E -.-> |"Pushes JSON message"| F;
    E -.-> |"Pushes JSON message"| G;

第一步: Monorepo 根基搭建

我们的项目结构采用 Gradle 多项目(multi-project)构建。根目录的 settings.gradle.kts 是关键,它声明了所有子项目。

// settings.gradle.kts
rootProject.name = "developer-workbench"

include(
    ":platform:workbench-api",         // 后端 Spring Boot 服务
    ":platform:workbench-core",        // 核心领域模型和共享逻辑 (JPA Entities)
    ":clients:workbench-desktop",      // Jetpack Compose 桌面客户端
    ":clients:workbench-android",      // Jetpack Compose 安卓客户端
    ":clients:workbench-ui-shared"     // Compose 共享UI组件和ViewModel
)

这种结构的好处是显而易见的。:platform:workbench-core 模块可以被后端API和客户端共享。例如,CI/CD 流水线的状态枚举、数据模型等可以只定义一次,避免了前后端定义不一致的问题。

第二步: 实时事件处理后端 (JPA + Redis)

后端服务的核心职责是接收来自 CI/CD 的 webhook,将状态持久化,并通过 Redis 广播出去。

JPA 实体定义

首先在共享模块 :platform:workbench-core 中定义我们的核心数据模型。

// platform/workbench-core/src/main/kotlin/com/workbench/core/entity/PipelineRun.kt

package com.workbench.core.entity

import jakarta.persistence.*
import java.time.Instant
import java.util.UUID

@Entity
@Table(name = "pipeline_runs")
class PipelineRun(
    @Id
    @GeneratedValue(strategy = GenerationType.UUID)
    var id: UUID? = null,

    @Column(nullable = false, unique = true)
    var correlationId: String, // CI/CD 工具提供的唯一ID

    @Column(nullable = false)
    var moduleName: String,

    @Enumerated(EnumType.STRING)
    @Column(nullable = false)
    var status: PipelineStatus = PipelineStatus.PENDING,

    var durationMs: Long? = null,
    
    @Column(nullable = false, updatable = false)
    var createdAt: Instant = Instant.now(),

    var finishedAt: Instant? = null
)

enum class PipelineStatus {
    PENDING, RUNNING, SUCCESS, FAILED, CANCELLED
}

这里的 correlationId 是一个关键字段,它用于将来自 CI/CD 工具的多个事件(如 start, test_complete, deploy_failed)关联到同一次流水线运行。

Spring Boot 服务与 Redis 集成

:platform:workbench-api 模块中,我们编写 Controller 和 Service。

application.yml 配置:

# platform/workbench-api/src/main/resources/application.yml
spring:
  application:
    name: workbench-api
  datasource:
    url: jdbc:postgresql://localhost:5432/workbench
    username: user
    password: password
    driver-class-name: org.postgresql.Driver
  jpa:
    hibernate:
      ddl-auto: update # 在开发环境中自动更新表结构
    show-sql: true
  data:
    redis:
      host: localhost
      port: 6379

server:
  port: 8080

# 自定义配置
workbench:
  redis:
    channel: "pipeline-events"

核心业务逻辑: PipelineEventService

这个 Service 负责处理业务逻辑:更新数据库状态,然后发布消息到 Redis。直接使用 RedisTemplate 是最直接的方式。

// platform/workbench-api/src/main/kotlin/com/workbench/api/service/PipelineEventService.kt
package com.workbench.api.service

import com.fasterxml.jackson.databind.ObjectMapper
import com.workbench.api.dto.PipelineEventRequest
import com.workbench.core.entity.PipelineRun
import com.workbench.core.repository.PipelineRunRepository
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.time.Instant

@Service
class PipelineEventService(
    private val pipelineRunRepository: PipelineRunRepository,
    private val redisTemplate: RedisTemplate<String, String>,
    private val objectMapper: ObjectMapper, // Jackson for JSON serialization
    @Value("\${workbench.redis.channel}") private val redisChannel: String
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    @Transactional
    fun processEvent(request: PipelineEventRequest) {
        // 在真实项目中,这里应该有更健壮的错误处理和日志记录
        try {
            val pipelineRun = pipelineRunRepository.findByCorrelationId(request.correlationId)
                ?.apply {
                    // 更新已有记录
                    this.status = request.status
                    this.durationMs = request.durationMs
                    if (status.isTerminal()) {
                        this.finishedAt = Instant.now()
                    }
                }
                ?: // 创建新记录
                PipelineRun(
                    correlationId = request.correlationId,
                    moduleName = request.moduleName,
                    status = request.status
                )
            
            val savedRun = pipelineRunRepository.save(pipelineRun)
            
            // 将更新后的实体序列化为 JSON 并发布到 Redis
            val eventMessage = objectMapper.writeValueAsString(savedRun)
            redisTemplate.convertAndSend(redisChannel, eventMessage)
            
            logger.info("Successfully processed and published event for correlationId: ${request.correlationId}")

        } catch (e: Exception) {
            // 关键的错误处理:如果数据库事务失败或Redis发布失败,必须记录下来
            logger.error("Failed to process event for correlationId: ${request.correlationId}", e)
            // 这里可以加入重试机制或将失败事件推入死信队列
            throw RuntimeException("Event processing failed", e)
        }
    }
    
    // 扩展函数判断状态是否为终态
    private fun com.workbench.core.entity.PipelineStatus.isTerminal() = 
        this in listOf(
            com.workbench.core.entity.PipelineStatus.SUCCESS, 
            com.workbench.core.entity.PipelineStatus.FAILED, 
            com.workbench.core.entity.PipelineStatus.CANCELLED
        )
}

这里的关键点在于事务性。@Transactional 保证了数据库操作的原子性。如果数据库保存失败,事务回滚,Redis 消息也不会被发送。在生产环境中,一个常见的错误是忘记处理 Redis 连接失败的情况,这会导致数据不一致。上面代码中的 try-catch 块就是为了应对这类问题。

第三步: WebSocket 网关

虽然客户端可以直接连接 Redis,但这会暴露 Redis 实例并且管理大量连接会很复杂。一个更稳健的架构是引入一个轻量级的 WebSocket 网关,它负责订阅 Redis 频道,并将消息转发给所有连接的 WebSocket 客户端。这里我们使用 Ktor 或 Spring WebFlux 可以轻松实现,但为保持示例简洁,我们假设这个网关已经存在。

第四步: Jetpack Compose 实时仪表盘

现在到了最有趣的部分:构建客户端。得益于 Monorepo 和共享模块,我们在客户端可以直接使用 :platform:workbench-core 中定义的 PipelineRun 等数据类。

共享 ViewModel

我们将数据获取和状态管理的逻辑放在 :clients:workbench-ui-shared 模块中,以便桌面和安卓客户端复用。

// clients/workbench-ui-shared/src/commonMain/kotlin/com/workbench/ui/shared/viewmodel/DashboardViewModel.kt
package com.workbench.ui.shared.viewmodel

import com.workbench.core.entity.PipelineRun
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
import com.workbench.ui.shared.repository.PipelineRepository // 这是一个接口,具体实现由各平台提供

// 使用一个简单的 data class 来管理UI状态
data class DashboardUiState(
    val pipelines: Map<String, PipelineRun> = emptyMap(),
    val connectionStatus: String = "Connecting..."
)

class DashboardViewModel(
    private val repository: PipelineRepository,
    private val viewModelScope: CoroutineScope = CoroutineScope(Dispatchers.Main)
) {
    private val _uiState = MutableStateFlow(DashboardUiState())
    val uiState: StateFlow<DashboardUiState> = _uiState.asStateFlow()

    init {
        // 在ViewModel初始化时开始监听事件
        listenToPipelineEvents()
    }

    private fun listenToPipelineEvents() {
        viewModelScope.launch {
            // 这里的 repository.getRealtimeEvents() 会返回一个 Flow<PipelineRun>
            // 其具体实现将是平台相关的WebSocket客户端
            repository.getRealtimeEvents().collect { newOrUpdatedRun ->
                _uiState.update { currentState ->
                    val updatedPipelines = currentState.pipelines.toMutableMap()
                    // 使用 moduleName + correlationId 作为唯一键,或者直接用 ID
                    updatedPipelines[newOrUpdatedRun.correlationId] = newOrUpdatedRun
                    
                    // 只保留最新的50条记录,防止内存无限增长
                    val sortedAndTrimmed = updatedPipelines.values
                        .sortedByDescending { it.createdAt }
                        .take(50)
                        .associateBy { it.correlationId }

                    currentState.copy(pipelines = sortedAndTrimmed, connectionStatus = "Connected")
                }
            }
        }
    }
}

这里的 PipelineRepository 是一个抽象,其实现将由具体平台(桌面/安卓)提供,负责处理 WebSocket 连接。

Compose for Desktop 实现

现在在 :clients:workbench-desktop 中实现UI。

// clients/workbench-desktop/src/main/kotlin/Main.kt

import androidx.compose.desktop.ui.tooling.preview.Preview
import androidx.compose.foundation.background
import androidx.compose.foundation.layout.*
import androidx.compose.foundation.lazy.LazyColumn
import androidx.compose.foundation.lazy.items
import androidx.compose.material.*
import androidx.compose.runtime.*
import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier
import androidx.compose.ui.graphics.Color
import androidx.compose.ui.unit.dp
import androidx.compose.ui.window.Window
import androidx.compose.ui.window.application
import com.workbench.core.entity.PipelineRun
import com.workbench.core.entity.PipelineStatus
import com.workbench.ui.shared.viewmodel.DashboardViewModel

// 假设 KtorWebSocketRepository 实现了 PipelineRepository 接口
val viewModel = DashboardViewModel(KtorWebSocketRepository())

fun main() = application {
    Window(onCloseRequest = ::exitApplication, title = "Developer Workbench") {
        MaterialTheme {
            DashboardScreen(viewModel)
        }
    }
}

@Composable
fun DashboardScreen(viewModel: DashboardViewModel) {
    val uiState by viewModel.uiState.collectAsState()
    val pipelineRuns = uiState.pipelines.values.sortedByDescending { it.createdAt }

    Column(modifier = Modifier.fillMaxSize()) {
        TopAppBar(title = { Text("Real-time Pipeline Status") })
        
        if (pipelineRuns.isEmpty()) {
            Box(modifier = Modifier.fillMaxSize(), contentAlignment = Alignment.Center) {
                CircularProgressIndicator()
            }
        } else {
            LazyColumn(
                modifier = Modifier.fillMaxSize(),
                contentPadding = PaddingValues(16.dp),
                verticalArrangement = Arrangement.spacedBy(8.dp)
            ) {
                items(pipelineRuns, key = { it.correlationId }) { run ->
                    PipelineCard(run)
                }
            }
        }
    }
}

@Composable
fun PipelineCard(run: PipelineRun) {
    Card(
        modifier = Modifier.fillMaxWidth(),
        elevation = 4.dp
    ) {
        Row(
            modifier = Modifier.padding(16.dp),
            verticalAlignment = Alignment.CenterVertically
        ) {
            StatusIndicator(run.status)
            Spacer(Modifier.width(16.dp))
            Column(modifier = Modifier.weight(1f)) {
                Text(run.moduleName, style = MaterialTheme.typography.h6)
                Text("ID: ${run.correlationId.substring(0, 8)}...", style = MaterialTheme.typography.caption)
            }
            run.durationMs?.let {
                Text("${it / 1000.0}s", style = MaterialTheme.typography.body2)
            }
        }
    }
}

@Composable
fun StatusIndicator(status: PipelineStatus) {
    val color = when (status) {
        PipelineStatus.SUCCESS -> Color(0xFF2E7D32) // Green
        PipelineStatus.FAILED -> Color(0xFFC62828) // Red
        PipelineStatus.RUNNING -> Color(0xFF0288D1) // Blue
        PipelineStatus.PENDING -> Color.Gray
        PipelineStatus.CANCELLED -> Color.DarkGray
    }
    Box(
        modifier = Modifier
            .size(24.dp)
            .background(color, shape = MaterialTheme.shapes.small)
    )
}

这段代码展示了 Jetpack Compose 的强大之处:

  1. collectAsState()StateFlow 无缝转换为 Compose 的 State。当 ViewModel 中的 _uiState 更新时,UI会自动、高效地重组。
  2. LazyColumn 用于高效地显示列表,只渲染屏幕上可见的项。
  3. key = { it.correlationId } 是一个重要的性能优化。它帮助 Compose 识别列表中的项目,当列表更新时,只重组发生变化的项,而不是整个列表。

这个工作台最终实现的效果是,当开发人员提交代码后,桌面应用上的对应模块状态会从 PENDING 变为 RUNNING,最终变为 SUCCESSFAILED,整个过程几乎没有延迟。这种即时反馈极大地缩短了从“提交代码”到“发现问题”的周期。

当前方案的局限性与未来展望

尽管这套系统解决了核心痛点,但它并非完美,还存在一些局限和可以迭代的方向:

  1. WebSocket 网关的健壮性: 当前架构中,WebSocket 网关是一个单点。在生产环境中,它需要被设计成高可用的集群,并考虑负载均衡和故障转移。
  2. Redis Pub/Sub 的消息可靠性: Redis 的 Pub/Sub 是“即发即弃”模式,如果客户端(WebSocket网关)在消息发布时离线,它将永久丢失这条消息。对于要求更严格的场景,可以考虑使用 Redis Streams,它提供了持久化和消费组功能,能保证消息至少被消费一次。
  3. 历史数据查询: 当前方案侧重于实时反馈。对于历史流水线数据的复杂查询和分析,直接查询 PostgreSQL 可能会变慢。后续可以引入 CQRS 模式,将写模型(PipelineRun)和读模型分离,使用 Elasticsearch 等搜索引擎构建一个更强大的查询服务。
  4. 功能扩展: 当前工作台只显示状态。未来可以集成更多功能,比如直接点击卡片跳转到 CI/CD 日志页面,甚至在工作台内提供“重试失败阶段”等操作,实现更深度的开发流程整合。

  目录