基于 Crossplane 与 Rust 构建 AWS EKS 上的动态端到端测试环境编排器


端到端(E2E)测试环境的维护是一项成本高昂且极其脆弱的工作。在大多数团队中,通常存在一个或多个长期运行的、共享的 stagingqa 环境。这些环境状态不一,数据陈旧,并且由于多人同时使用,测试之间相互干扰,导致大量的 flaky tests(不稳定测试)。一次失败的 E2E 测试运行,我们很难判断是代码缺陷,还是环境本身的问题。这种不确定性极大地消耗了研发与QA团队的精力。

核心问题在于环境的生命周期管理隔离性。理想的 E2E 测试环境应当是:

  1. 按需创建 (On-demand): 为每一次测试运行(或每一个 Pull Request)创建一套全新的、独立的环境。
  2. 状态隔离 (Stateful Isolation): 每个环境包含独立的数据库、消息队列、对象存储等基础设施,确保测试数据不会相互污染。
  3. 生产一致 (Production-like): 环境的基础设施规格和配置应尽可能接近生产环境。
  4. 自动销毁 (Ephemeral): 测试结束后,环境及其所有关联的云资源被彻底、干净地回收,避免资源浪费。

实现这样的系统,本质上是构建一个内部开发者平台(IDP)的核心组件:一个动态环境编排器。

方案 A: 主流脚本化 IaC (Terraform + Python/Go Wrapper)

一个直接的思路是使用成熟的 IaC 工具,如 Terraform,并通过脚本来驱动。

  • 架构:

    1. 一个 HTTP 服务(例如用 Python Flask 或 Go Gin 编写)接收创建环境的 API 请求。
    2. 服务内部调用 terraform apply 命令,传入变量来创建一套 AWS 资源(如 RDS, SQS, S3 bucket)。
    3. Terraform 执行完毕后,将输出(如数据库连接串)返回给 API 服务。
    4. 服务随后在 EKS 集群上创建一个 Kubernetes Job,该 Job 运行 Playwright 测试,并将基础设施连接信息作为环境变量或 Secret 注入。
    5. 测试完成后,API 服务接收到一个回调,然后调用 terraform destroy 来清理资源。
  • 优势:

    • 生态成熟: Terraform 的 AWS Provider 功能非常完善,几乎覆盖所有 AWS 服务。
    • 上手快: 对于熟悉 Terraform 的团队,学习成本较低。
    • 社区庞大: 遇到问题很容易找到解决方案。
  • 劣势:

    • 状态管理复杂: 每个动态环境都需要一个独立的 Terraform state 文件。管理成百上千个 state 文件,尤其是在并发创建销毁时,是一个巨大的挑战。通常需要依赖 Terraform Cloud 或自建基于 S3 + DynamoDB 的状态后端,但这增加了架构的复杂性。
    • 命令式“胶水层”: 整个流程由外部的 Python/Go 脚本驱动。这个服务本身成为了一个有状态的、需要维护的单点。它需要处理 Terraform 执行的失败、重试、超时,本质上是在用命令式的代码去模拟一个声明式系统应该有的调和(reconciliation)能力。
    • 漂移风险: 如果有人手动修改了 Terraform 管理的资源,或者 terraform destroy 意外失败,资源就会泄露。虽然有 terraform plan 可以检测,但在一个完全自动化的系统中,处理这种漂移需要编写大量的额外逻辑。
    • API 边界模糊: Terraform 本身不是为 API 驱动设计的。将其能力暴露为服务,需要做大量的封装,而且安全性难以保障。

方案 B: 云原生控制平面 (Crossplane + Rust Controller)

另一个思路是拥抱 Kubernetes 的控制平面模型,将基础设施也视为一种 Kubernetes API 资源。

  • 架构:

    1. Crossplane: 在 EKS 集群中安装 Crossplane。它会安装 AWS Provider,允许我们通过 Kubernetes API 来声明式地管理 AWS 资源。
    2. Composition: 我们使用 Crossplane 的 Composition 功能,定义一个名为 TestEnvironment 的抽象资源。这个抽象资源“组合”了具体的 AWS 资源,如 RDSInstance, Queue, Bucket
    3. Rust Controller: 开发一个轻量级的 Kubernetes Operator/Controller(使用 Rust 和 kube-rs 库)。这个 Controller 监听我们自定义的 TestRun 资源。
    4. 工作流:
      • 用户(或 CI/CD 流水线)在 EKS 集群中创建一个 TestRun 自定义资源(CR)。
      • Rust Controller 监听到新的 TestRun CR,它的第一个动作是在集群中创建一个 TestEnvironment 资源。
      • Crossplane 监听到新的 TestEnvironment 资源,并根据其 Composition 定义,开始向 AWS API 发出请求,创建对应的 RDS、SQS 等实例。
      • Crossplane 会将创建好的资源的连接信息写回 TestEnvironment 资源的 status 字段,并创建一个包含这些敏感信息的 Kubernetes Secret。
      • Rust Controller 持续监控 TestEnvironment 的状态。一旦 status 显示所有资源都已就绪(Ready),它就会创建一个 Kubernetes Job 来运行 Playwright 测试,并将 Crossplane 生成的 Secret 挂载到 Job 的 Pod 中。
      • 测试 Job 完成后,Rust Controller 更新 TestRun 的状态,并删除 TestEnvironment 资源。
      • Kubernetes 的垃圾回收机制确保,当 TestEnvironment 被删除时,Crossplane 会自动调用 AWS API 删除所有关联的云资源。
  • 优势:

    • 统一的声明式 API: 整个系统的入口点就是 Kubernetes API。无论是应用(Playwright Job)还是基础设施(RDS),都通过 kubectl apply 或客户端库以同样的方式管理。没有外部的“胶水”服务。
    • 内置调和循环: Crossplane 和我们的 Rust Controller 都遵循 Operator 模式。如果 AWS 资源创建失败,它们会自动重试。如果资源被意外修改,它们会自动纠正,以匹配声明的期望状态。这就是控制平面的力量。
    • 强大的抽象能力: 我们可以为开发者提供一个非常简单的 TestEnvironment API,他们无需关心底层是 RDS 还是 S3。平台团队可以通过修改 Composition 来轻松替换或升级底层实现,而对用户透明。
    • 高性能与高可靠性的控制器: 使用 Rust 编写 Controller,我们可以获得内存安全、无GC停顿以及极高的并发性能。对于一个需要处理大量并发测试请求的核心平台组件来说,这些特性至关重要。kube-rs 提供了与 Kubernetes API 交互的强大而类型安全的基础。
  • 劣势:

    • 学习曲线: Crossplane 的概念(Composition, XRD)以及编写 Kubernetes Operator 对团队来说可能有一定的学习成本。
    • 生态相对年轻: 虽然发展迅速,但 Crossplane Provider 的成熟度和覆盖范围可能在某些边缘场景下不及 Terraform。
    • Rust 开发门槛: 相比 Python/Go,Rust 的所有权和生命周期等概念需要时间掌握。

最终选择与理由

我们选择了方案 B。尽管存在学习曲线,但其带来的架构一致性和长期可维护性是决定性的。将基础设施的管理完全融入 Kubernetes 控制平面,避免了方案 A 中脆弱的、命令式的胶水代码。我们构建的不再是一个简单的“脚本”,而是一个健壮、自愈、可扩展的平台。

选择 Rust 作为 Controller 的开发语言,是出于对平台核心组件稳定性和性能的极致追求。一个需要管理大量云资源生命周期和并发测试任务的服务,其内存安全和可预测的性能至关重要。Rust 在这方面提供了无与伦比的保障。

核心实现概览

以下是系统的架构和关键代码片段。

1. 架构图 (Mermaid.js)

sequenceDiagram
    participant User/CI
    participant K8s API Server
    participant Rust Controller
    participant Crossplane
    participant AWS API
    participant Playwright Job

    User/CI->>+K8s API Server: 1. Create TestRun CR
    K8s API Server-->>+Rust Controller: 2. Watch Event: TestRun created
    Rust Controller->>+K8s API Server: 3. Create TestEnvironment CR
    K8s API Server-->>+Crossplane: 4. Watch Event: TestEnvironment created
    Crossplane->>+AWS API: 5. Provision RDS, SQS, etc.
    AWS API-->>-Crossplane: Resources Created
    Crossplane->>+K8s API Server: 6. Update TestEnvironment Status (Ready) & Create Secret
    K8s API Server-->>+Rust Controller: 7. Watch Event: TestEnvironment Ready
    Rust Controller->>+K8s API Server: 8. Create Kubernetes Job (Playwright)
    K8s API Server-->>-Playwright Job: 9. Start Pod, mount Secret
    Playwright Job->>Playwright Job: 10. Run tests using credentials from Secret
    Playwright Job->>+K8s API Server: 11. Job Completed
    K8s API Server-->>+Rust Controller: 12. Watch Event: Job Completed
    Rust Controller->>+K8s API Server: 13. Delete TestEnvironment CR
    K8s API Server-->>+Crossplane: 14. Watch Event: TestEnvironment deleted
    Crossplane->>+AWS API: 15. Deprovision RDS, SQS, etc.
    AWS API-->>-Crossplane: Resources Deleted

2. Crossplane 资源定义

首先,我们定义开发者需要关心的抽象资源 CompositeResourceDefinition (XRD)。

# xd-testenvironment.yaml
apiVersion: apiextensions.crossplane.io/v1
kind: CompositeResourceDefinition
metadata:
  name: testenvironments.platform.example.org
spec:
  group: platform.example.org
  names:
    kind: TestEnvironment
    plural: testenvironments
  claimNames:
    kind: TestEnvironmentClaim
    plural: testenvironmentclaims
  connectionSecretKeys:
    - connectionString
    - queueUrl
  versions:
  - name: v1alpha1
    served: true
    referenceable: true
    schema:
      openAPIV3Schema:
        type: object
        properties:
          spec:
            type: object
            properties:
              parameters:
                type: object
                properties:
                  # 定义用户可以自定义的参数
                  storageGB:
                    type: integer
                    description: "Storage size for the database in GB."
                    default: 5
                  instanceClass:
                    type: string
                    description: "Instance class for the database."
                    default: "db.t3.micro"
                required:
                  - storageGB
                  - instanceClass
            required:
              - parameters

然后,我们创建 Composition,它将这个抽象的 TestEnvironment 映射到具体的 AWS 资源。

# composition-aws-testenvironment.yaml
apiVersion: apiextensions.crossplane.io/v1
kind: Composition
metadata:
  name: aws.testenvironments.platform.example.org
  labels:
    provider: aws
spec:
  compositeTypeRef:
    apiVersion: platform.example.org/v1alpha1
    kind: TestEnvironment
  # 将连接信息传播到 Secret
  writeConnectionSecretsToNamespace: crossplane-system
  resources:
    # 1. 定义 RDS 数据库实例
    - name: rds-instance
      base:
        apiVersion: rds.aws.upbound.io/v1beta1
        kind: DBInstance
        spec:
          forProvider:
            region: "us-west-2"
            # 使用 Upbound 官方 provider 的配置
            dbSubnetGroupNameSelector:
              matchLabels:
                "network.example.org/purpose": "private"
            vpcSecurityGroupIDSelector:
              matchLabels:
                "network.example.org/purpose": "database"
            engine: "postgres"
            engineVersion: "14.6"
            username: "testuser"
            passwordSecretRef:
              namespace: crossplane-system
              name: db-password
              key: password
            skipFinalSnapshot: true
            publiclyAccessible: false
      patches:
        # 从 TestEnvironment 的 spec.parameters 字段 patch 值到 RDSInstance
        - fromFieldPath: "spec.parameters.storageGB"
          toFieldPath: "spec.forProvider.allocatedStorage"
        - fromFieldPath: "spec.parameters.instanceClass"
          toFieldPath: "spec.forProvider.instanceClass"
        - fromFieldPath: "metadata.uid" # 使用 UID 确保资源名唯一
          toFieldPath: "spec.forProvider.dbInstanceIdentifier"
          transforms:
            - type: string
              string:
                fmt: "e2e-db-%s"
      # 将 RDS 的连接信息暴露给 TestEnvironment 的 Connection Secret
      connectionDetails:
        - fromConnectionSecretKey: "endpoint"
          name: "dbHost"
        - fromConnectionSecretKey: "port"
          name: "dbPort"
        - fromConnectionSecretKey: "username"
        - fromConnectionSecretKey: "password"
        - type: ConnectionDetail
          name: connectionString
          value: "postgresql://$(username):$(password)@$(dbHost):$(dbPort)/postgres"

    # 2. 定义 SQS 队列
    - name: sqs-queue
      base:
        apiVersion: sqs.aws.upbound.io/v1beta1
        kind: Queue
        spec:
          forProvider:
            region: "us-west-2"
      patches:
        - fromFieldPath: "metadata.uid"
          toFieldPath: "metadata.name"
          transforms:
            - type: string
              string:
                fmt: "e2e-queue-%s"
      connectionDetails:
        - fromConnectionSecretKey: "url"
          name: "queueUrl"

3. Rust Controller 核心逻辑

我们将使用 kube-rstokio 来构建控制器。

首先是 TestRun CRD 的 Rust 结构体定义:

// src/crd.rs

use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
    group = "testing.example.org",
    version = "v1alpha1",
    kind = "TestRun",
    namespaced
)]
#[kube(status = "TestRunStatus")]
pub struct TestRunSpec {
    /// Git repository containing the Playwright tests
    pub repository: String,
    /// Git revision (commit hash, branch, or tag)
    pub revision: String,
    /// Parameters for the underlying TestEnvironment
    pub environment_params: EnvironmentParams,
}

#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
pub struct EnvironmentParams {
    #[serde(rename = "storageGB")]
    pub storage_gb: i32,
    #[serde(rename = "instanceClass")]
    pub instance_class: String,
}

#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
pub struct TestRunStatus {
    pub phase: Phase,
    #[serde(rename = "environmentRef")]
    #[serde(skip_serializing_if = "Option::is_none")]
    pub environment_ref: Option<String>,
    #[serde(rename = "jobRef")]
    #[serde(skip_serializing_if = "Option::is_none")]
    pub job_ref: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub message: Option<String>,
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, JsonSchema)]
pub enum Phase {
    Pending,
    ProvisioningEnvironment,
    EnvironmentReady,
    RunningTests,
    TestsCompleted,
    CleaningUp,
    Failed,
    Succeeded,
}

接下来是调和循环(Reconciliation Loop)的核心逻辑。

// src/controller.rs

use kube::{
    api::{Api, Patch, PatchParams, ResourceExt},
    runtime::controller::{Action, Controller},
    Client,
};
use std::sync::Arc;
use thiserror::Error;
use tokio::time::Duration;

// ... imports for CRDs, k8s resources, etc.

#[derive(Debug, Error)]
pub enum Error {
    #[error("Kube API Error: {0}")]
    Kube(#[from] kube::Error),
    // ... other error types
}

type Result<T, E = Error> = std::result::Result<T, E>;

/// The reconciliation logic
async fn reconcile(run: Arc<TestRun>, ctx: Arc<Context>) -> Result<Action> {
    let client = &ctx.client;
    let ns = run.namespace().unwrap(); // Our CRD is namespaced
    let api_runs: Api<TestRun> = Api::namespaced(client.clone(), &ns);

    // Main state machine based on the 'phase' status field
    match run.status.as_ref().map(|s| &s.phase) {
        // Initial state: create the TestEnvironment
        Some(Phase::Pending) | None => {
            let env_name = format!("env-{}", run.name_any());
            
            // 1. Construct the TestEnvironment resource
            let test_env = serde_json::from_value(serde_json::json!({
                "apiVersion": "platform.example.org/v1alpha1",
                "kind": "TestEnvironment",
                "metadata": {
                    "name": env_name,
                    "namespace": ns,
                },
                "spec": {
                    "parameters": {
                        "storageGB": run.spec.environment_params.storage_gb,
                        "instanceClass": run.spec.environment_params.instance_class,
                    }
                }
            }))?;
            
            // 2. Create it via the dynamic API
            let api_envs: Api<kube::api::DynamicObject> = Api::namespaced(client.clone(), &ns);
            api_envs.patch(&env_name, &PatchParams::apply("testrun-controller"), &Patch::Apply(test_env)).await?;

            // 3. Update status to reflect the new state
            let new_status = Patch::Merge(serde_json::json!({
                "status": {
                    "phase": Phase::ProvisioningEnvironment,
                    "environmentRef": env_name,
                }
            }));
            api_runs.patch_status(&run.name_any(), &PatchParams::default(), &new_status).await?;
        }

        // Environment is being created, wait for it to become ready
        Some(Phase::ProvisioningEnvironment) => {
            let env_name = run.status.as_ref().unwrap().environment_ref.as_ref().unwrap();
            
            // Check the status of the Crossplane composite resource
            let api_envs: Api<kube::api::DynamicObject> = Api::namespaced(client.clone(), &ns);
            let env = api_envs.get(env_name).await?;

            let is_ready = env.data["status"]["conditions"].as_array()
                .and_then(|conds| conds.iter().find(|c| c["type"] == "Ready"))
                .map_or(false, |c| c["status"] == "True");

            if is_ready {
                let new_status = Patch::Merge(serde_json::json!({
                    "status": { "phase": Phase::EnvironmentReady }
                }));
                api_runs.patch_status(&run.name_any(), &PatchParams::default(), &new_status).await?;
            }
        }

        // Environment is ready, launch the Playwright Job
        Some(Phase::EnvironmentReady) => {
            let job_name = format!("test-job-{}", run.name_any());
            let env_name = run.status.as_ref().unwrap().environment_ref.as_ref().unwrap();
            let secret_name = env_name; // Crossplane creates a secret with the same name as the resource

            // 1. Define the Kubernetes Job
            let job = serde_json::from_value(serde_json::json!({
                "apiVersion": "batch/v1",
                "kind": "Job",
                "metadata": { "name": job_name },
                "spec": {
                    "template": {
                        "spec": {
                            "containers": [{
                                "name": "playwright-runner",
                                "image": "my-registry/playwright-runner:latest",
                                "envFrom": [{
                                    "secretRef": { "name": secret_name }
                                }]
                            }],
                            "restartPolicy": "Never"
                        }
                    },
                    "backoffLimit": 1
                }
            }))?;

            // 2. Create the Job
            let api_jobs: Api<kube::api::Object<_,_>> = Api::namespaced(client.clone(), &ns);
            api_jobs.patch(&job_name, &PatchParams::apply("testrun-controller"), &Patch::Apply(job)).await?;

            // 3. Update status
            let new_status = Patch::Merge(serde_json::json!({
                "status": {
                    "phase": Phase::RunningTests,
                    "jobRef": job_name,
                }
            }));
            api_runs.patch_status(&run.name_any(), &PatchParams::default(), &new_status).await?;
        }

        // ... Implement other phases: RunningTests, TestsCompleted, CleaningUp ...
        
        Some(Phase::CleaningUp) => {
            // Delete the TestEnvironment, Crossplane will handle resource cleanup
            let env_name = run.status.as_ref().unwrap().environment_ref.as_ref().unwrap();
            let api_envs: Api<kube::api::DynamicObject> = Api::namespaced(client.clone(), &ns);
            
            match api_envs.delete(env_name, &Default::default()).await {
                Ok(_) => {
                     // ... transition to Succeeded or Failed state
                }
                Err(kube::Error::Api(e)) if e.code == 404 => {
                    // Already deleted, we are done
                }
                Err(e) => return Err(e.into())
            };
        }
        
        // Final states, do nothing
        Some(Phase::Succeeded) | Some(Phase::Failed) => {}
    }

    Ok(Action::requeue(Duration::from_secs(15)))
}

这是一个简化的调和器,但在真实项目中,它需要更完善的错误处理、事件记录以及对 Job 成功或失败状态的精细判断。

架构的扩展性与局限性

当前架构的局限性主要体现在两个方面。首先,云资源的创建并非瞬时完成,一个 RDS 实例的启动可能需要几分钟。这意味着 E2E 测试的启动存在延迟。对于需要快速反馈的 PR 检查场景,这可能无法接受。优化方向可以是预热资源池,或者使用更轻量的、启动更快的替代方案(如容器化的 Postgres)。

其次,系统的复杂性不容忽视。我们引入了 Crossplane 和一个自定义的 Controller,这增加了团队的运维负担。对 Kubernetes、Crossplane 和 Rust 的深入理解是维护这个系统的必要条件。在真实项目中,必须投入资源建设完善的可观测性体系,包括 Controller 的日志、指标以及 TestRun CR 状态的追踪,以确保在出现问题时能快速定位。

然而,这个架构的扩展性非常出色。想要支持新的云厂商?只需要安装对应的 Crossplane Provider 并创建新的 Composition。想为测试环境增加缓存(如 ElastiCache)?只需在 Composition 中添加一个 CacheCluster 资源,并更新 Rust Controller 以处理新的连接信息。整个平台的核心逻辑——基于 Kubernetes 控制平面的声明式调和——保持不变。这正是投资于一个健壮平台所带来的长期回报。


  目录