将单体NumPy计算任务改造为基于Docker和Redis的分布式处理架构


项目的计算模块最近遇到了一个典型的性能瓶颈。最初,它只是一个用于处理传感器数据的Python脚本,核心是几个密集的NumPy操作,比如对高维矩阵进行傅里叶变换和应用一系列复杂的滤波器。在开发阶段,处理单个数据文件耗时几秒钟,完全可以接受。但随着业务上线,我们需要每天处理数万个这样的文件,整个流程变成了串行执行,耗时超过10小时,成了整个数据管道中最脆弱的一环。

这是最初的核心计算逻辑的简化版本,它模拟了CPU密集型的操作:

# core_computation.py
import numpy as np
import time
import logging
import os

# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def complex_matrix_operation(data_matrix: np.ndarray) -> np.ndarray:
    """
    一个模拟CPU密集型计算的函数。
    在真实场景中,这可能是一系列滤波器、变换或统计计算。
    """
    try:
        # 1. 傅里叶变换
        fft_data = np.fft.fft2(data_matrix)
        
        # 2. 应用一个复杂的相位滤波器 (计算密集)
        rows, cols = data_matrix.shape
        y, x = np.ogrid[:rows, :cols]
        center_y, center_x = rows / 2, cols / 2
        # 创建一个径向频率滤波器,这部分计算量很大
        filter_kernel = np.sin(np.sqrt((x - center_x)**2 + (y - center_y)**2) * 0.1)
        filtered_fft = fft_data * filter_kernel
        
        # 3. 逆变换
        result = np.fft.ifft2(filtered_fft)
        
        # 4. 更多的数值计算,例如求标准差和归一化
        std_dev = np.std(result.real)
        if std_dev == 0:
            return result.real # 避免除以零
        normalized_result = (result.real - np.mean(result.real)) / std_dev

        # 模拟更多的计算耗时
        time.sleep(1) 
        
        return normalized_result
    except Exception as e:
        logging.error(f"Matrix operation failed: {e}")
        # 在生产环境中,这里应该有更完善的异常处理
        raise

def process_single_file(input_path: str, output_path: str):
    """
    处理单个数据文件。
    """
    if not os.path.exists(input_path):
        logging.error(f"Input file not found: {input_path}")
        return

    try:
        # 从.npy文件加载数据
        raw_data = np.load(input_path)
        logging.info(f"Processing file {input_path} with shape {raw_data.shape}...")
        
        start_time = time.time()
        processed_data = complex_matrix_operation(raw_data)
        end_time = time.time()
        
        # 保存处理结果
        np.save(output_path, processed_data)
        logging.info(f"Finished processing {input_path} in {end_time - start_time:.2f} seconds. Output saved to {output_path}")
    except Exception as e:
        logging.error(f"Failed to process file {input_path}. Error: {e}")

if __name__ == '__main__':
    # 这是一个单体、串行的执行方式
    # 在生产环境中,我们有一个目录充满了待处理的文件
    # python process_runner.py data/raw data/processed
    import sys
    
    if len(sys.argv) != 3:
        print("Usage: python core_computation.py <input_dir> <output_dir>")
        sys.exit(1)

    input_dir = sys.argv[1]
    output_dir = sys.argv[2]
    
    os.makedirs(output_dir, exist_ok=True)
    
    for filename in os.listdir(input_dir):
        if filename.endswith(".npy"):
            file_input_path = os.path.join(input_dir, filename)
            file_output_path = os.path.join(output_dir, filename)
            process_single_file(file_input_path, file_output_path)

问题很明确:这是一个典型的尴尬并行(Embarrassingly Parallel)任务。每个文件的处理都是独立的,但我们的代码却在一个进程里顺序执行。初步的想法是使用Python的multiprocessing库。这在单机上确实能利用多核,但它有几个致命的缺点:首先,它难以跨多台机器扩展;其次,任务分发、结果收集和错误处理的逻辑会变得非常复杂,容易出错;最后,部署和依赖管理会成为一个新问题,确保每台机器上的环境完全一致本身就是一个挑战。

我们需要一个更健壮、可扩展的架构。方案是将其重构为一个基于容器的分布式工作者(Worker)模式。

技术选型决策如下:

  1. NumPy: 核心计算库,保持不变。我们的目标是重构架构,而不是重写已经验证过的数值计算逻辑。
  2. Docker: 将计算逻辑和其依赖项打包成一个标准化的、可移植的Worker镜像。这解决了环境一致性问题,并且让水平扩展变得极其简单。
  3. Redis: 作为任务队列和结果存储。它足够轻量、快速,非常适合作为Manager和Workers之间的通信桥梁。Manager将任务(待处理的文件路径)推入一个列表,Workers从列表中原子性地弹出任务进行处理,并将结果信息存回另一个列表。

架构图如下:

graph TD
    subgraph Manager Node
        A[Manager Script] -- 1. Pushes task messages --> B{Redis};
    end
    
    subgraph Worker Nodes
        C1[Worker 1] -- 2. BLPOP task --> B;
        C2[Worker 2] -- 2. BLPOP task --> B;
        C3[Worker N] -- 2. BLPOP task --> B;
    end

    subgraph Shared Storage
        D[(Shared Volume)];
    end
    
    A -- Reads from --> D;
    C1 -- Reads/Writes to --> D;
    C2 -- Reads/Writes to --> D;
    C3 -- Reads/Writes to --> D;
    
    C1 -- 4. Pushes result message --> B;
    C2 -- 4. Pushes result message --> B;
    C3 -- 4. Pushes result message --> B;

    B -- 3. Provides task --> C1;
    B -- 3. Provides task --> C2;
    B -- 3. Provides task --> C3;
    
    A -- 5. Monitors results --> B;

这里的核心在于,我们不直接通过Redis传递庞大的NumPy矩阵数据。这样做会因为序列化/反序列化开销和网络带宽占用而严重影响性能。取而代之的是,我们使用一个由Docker管理的共享卷(Shared Volume)。Manager和所有Workers都挂载这个卷。任务消息只包含文件的元数据,例如输入路径和输出路径。Worker根据路径从共享卷中读取原始数据,并将处理结果写回共享卷。

步骤一:构建可运行的Worker镜像

首先,我们需要将核心计算逻辑封装成一个可独立运行的Worker脚本。这个脚本将连接到Redis,在一个无限循环中等待任务。

worker/worker.py

import redis
import os
import json
import logging
import numpy as np
import time

# --- 和 core_computation.py 中相同的计算函数 ---
def complex_matrix_operation(data_matrix: np.ndarray) -> np.ndarray:
    # ... (此处省略与上面完全相同的函数体)
    try:
        fft_data = np.fft.fft2(data_matrix)
        rows, cols = data_matrix.shape
        y, x = np.ogrid[:rows, :cols]
        center_y, center_x = rows / 2, cols / 2
        filter_kernel = np.sin(np.sqrt((x - center_x)**2 + (y - center_y)**2) * 0.1)
        filtered_fft = fft_data * filter_kernel
        result = np.fft.ifft2(filtered_fft)
        std_dev = np.std(result.real)
        if std_dev == 0:
            return result.real
        normalized_result = (result.real - np.mean(result.real)) / std_dev
        time.sleep(1)
        return normalized_result
    except Exception as e:
        logging.error(f"Matrix operation failed: {e}")
        raise
# ------------------------------------------------

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - WORKER - %(levelname)s - %(message)s')

# 从环境变量获取配置,这是容器化应用的最佳实践
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
TASK_QUEUE_NAME = 'numpy_task_queue'
RESULT_QUEUE_NAME = 'numpy_result_queue'
FAILED_QUEUE_NAME = 'numpy_failed_queue'
# 数据卷在容器内的挂载点
DATA_DIR = "/data" 

def main():
    """
    Worker主循环。
    """
    # 在真实项目中,这里应该有更健壮的重连逻辑
    try:
        redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
        redis_client.ping()
        logging.info(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}")
    except redis.exceptions.ConnectionError as e:
        logging.error(f"Could not connect to Redis: {e}")
        # 在无法连接到Redis时,worker应该退出,让容器编排系统重启它
        exit(1)

    worker_id = os.getenv('HOSTNAME', 'unknown_worker') # HOSTNAME是Docker自动注入的环境变量
    logging.info(f"Worker {worker_id} started. Waiting for tasks from '{TASK_QUEUE_NAME}'...")

    while True:
        try:
            # BLPOP 是阻塞式列表弹出原语,如果列表中没有元素,它会阻塞连接直到有新元素添加或超时
            # '0' 表示永不超时
            _, task_message = redis_client.blpop(TASK_QUEUE_NAME, 0)
            
            task_data = json.loads(task_message)
            task_id = task_data.get('task_id')
            input_filename = task_data.get('input_filename')
            output_filename = task_data.get('output_filename')

            if not all([task_id, input_filename, output_filename]):
                logging.error(f"Invalid task message received: {task_message}")
                continue

            logging.info(f"[{worker_id}] Received task {task_id}: Process {input_filename}")

            input_path = os.path.join(DATA_DIR, 'raw', input_filename)
            output_path = os.path.join(DATA_DIR, 'processed', output_filename)
            
            # 确保输出目录存在
            os.makedirs(os.path.dirname(output_path), exist_ok=True)

            start_time = time.time()
            
            # --- 核心计算逻辑调用 ---
            raw_data = np.load(input_path)
            processed_data = complex_matrix_operation(raw_data)
            np.save(output_path, processed_data)
            # -------------------------

            duration = time.time() - start_time
            logging.info(f"[{worker_id}] Completed task {task_id} in {duration:.2f}s. Output: {output_filename}")

            # 将结果报告给结果队列
            result_message = json.dumps({
                'task_id': task_id,
                'status': 'SUCCESS',
                'worker_id': worker_id,
                'duration': duration,
                'output_filename': output_filename
            })
            redis_client.rpush(RESULT_QUEUE_NAME, result_message)

        except FileNotFoundError as e:
            logging.error(f"[{worker_id}] File not found for task {task_id}: {e}")
            error_message = json.dumps({'task_id': task_id, 'error': str(e), 'original_task': task_message})
            redis_client.rpush(FAILED_QUEUE_NAME, error_message)
        except Exception as e:
            logging.error(f"[{worker_id}] An unexpected error occurred while processing task {task_id}: {e}")
            # 这里的错误处理很关键。我们将失败的任务放入一个专门的队列,以便后续分析或重试
            error_message = json.dumps({'task_id': task_id, 'error': str(e), 'original_task': task_message})
            redis_client.rpush(FAILED_QUEUE_NAME, error_message)

if __name__ == '__main__':
    main()

Worker的代码遵循了几个生产实践:

  1. 配置外部化:Redis的地址等配置通过环境变量注入,而不是硬编码。
  2. 健壮的循环:使用while Truetry...except来确保单个任务的失败不会导致整个Worker进程崩溃。
  3. 明确的错误处理:将失败的任务信息推送到一个failed_queue,而不是简单地丢弃,这对于问题排查至关重要。
  4. 阻塞式获取任务BLPOP是实现任务队列的理想选择,它在没有任务时不会空转CPU。

接下来是Dockerfile和依赖文件。

worker/Dockerfile

# 使用一个包含Python和基础构建工具的精简镜像
FROM python:3.9-slim

# 设置工作目录
WORKDIR /app

# 复制依赖文件
COPY requirements.txt .

# 安装依赖
# --no-cache-dir 减小镜像体积
# aarch64架构的机器上安装numpy可能会需要额外的系统依赖,这里为了通用性暂不添加
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY worker.py .

# 设置容器启动时执行的命令
# python -u 表示无缓冲输出,确保日志能实时被Docker收集
CMD ["python", "-u", "worker.py"]

worker/requirements.txt

numpy==1.24.3
redis==4.5.5

步骤二:使用Docker Compose编排服务

现在我们有了Worker镜像的定义,下一步是使用docker-compose来定义和运行整个多容器应用。这包括Redis服务、多个Worker服务实例,以及一个用于分发任务的Manager服务。

docker-compose.yml

version: '3.8'

services:
  redis:
    image: "redis:7-alpine"
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes

  worker:
    build:
      context: ./worker
    # 我们不直接运行worker,而是通过 `docker-compose up --scale worker=N` 来启动
    # deploy 块用于swarm模式,但scale命令在compose v2中也可用
    deploy:
      replicas: 4 # 默认启动4个worker实例
    environment:
      - REDIS_HOST=redis
      - REDIS_PORT=6379
    depends_on:
      - redis
    volumes:
      - shared_data:/data # 挂载共享数据卷

  manager:
    build:
      context: ./manager
    environment:
      - REDIS_HOST=redis
      - REDIS_PORT=6379
    depends_on:
      - redis
    volumes:
      - shared_data:/data # 同样挂载共享数据卷

volumes:
  redis_data:
  shared_data: # 定义一个命名的卷,用于在容器间共享数据

这份docker-compose.yml定义了我们的架构:

  • 一个redis服务。
  • 一个worker服务,其镜像是从worker/目录构建的。注意replicas: 4,这表示我们希望启动4个Worker容器实例。depends_on确保Worker在Redis启动之后再启动。
  • 一个manager服务,它会运行一个一次性脚本来填充任务队列。
  • 最重要的部分是volumesshared_data卷被同时挂载到manager和所有worker容器的/data目录下,这是实现数据共享的关键。

步骤三:实现Manager任务分发器

Manager是一个简单的脚本,它的职责是扫描输入目录,为每个文件生成一个任务,并将其推送到Redis队列中。之后,它会监听结果队列,直到所有任务完成。

manager/manager.py

import redis
import os
import json
import logging
import time
import uuid
import numpy as np

# 配置
logging.basicConfig(level=logging.INFO, format='%(asctime)s - MANAGER - %(levelname)s - %(message)s')
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
TASK_QUEUE_NAME = 'numpy_task_queue'
RESULT_QUEUE_NAME = 'numpy_result_queue'
DATA_DIR = "/data"
RAW_DIR = os.path.join(DATA_DIR, 'raw')
PROCESSED_DIR = os.path.join(DATA_DIR, 'processed')
NUM_SAMPLES = 100 # 生成100个样本文件

def prepare_sample_data():
    """
    生成一些模拟的.npy文件用于测试。
    在真实场景中,数据是预先存在的。
    """
    logging.info(f"Generating {NUM_SAMPLES} sample data files in {RAW_DIR}...")
    os.makedirs(RAW_DIR, exist_ok=True)
    os.makedirs(PROCESSED_DIR, exist_ok=True)
    for i in range(NUM_SAMPLES):
        # 创建一个 1024x1024 的随机矩阵
        sample_matrix = np.random.rand(1024, 1024)
        np.save(os.path.join(RAW_DIR, f"sample_{i}.npy"), sample_matrix)
    logging.info("Sample data generation complete.")


def main():
    try:
        redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
        redis_client.ping()
        logging.info(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}")
    except redis.exceptions.ConnectionError as e:
        logging.error(f"Could not connect to Redis: {e}")
        exit(1)

    # 1. 清理旧的队列,确保从一个干净的状态开始
    redis_client.delete(TASK_QUEUE_NAME, RESULT_QUEUE_NAME)
    
    # 2. 准备数据 (仅用于演示)
    prepare_sample_data()

    # 3. 扫描目录并分发任务
    files_to_process = [f for f in os.listdir(RAW_DIR) if f.endswith('.npy')]
    total_tasks = len(files_to_process)
    if total_tasks == 0:
        logging.warning("No .npy files found in raw directory. Exiting.")
        return
        
    logging.info(f"Found {total_tasks} files to process. Pushing tasks to queue '{TASK_QUEUE_NAME}'...")
    for filename in files_to_process:
        task_id = str(uuid.uuid4())
        task_data = {
            'task_id': task_id,
            'input_filename': filename,
            'output_filename': filename # 使用相同的文件名
        }
        redis_client.rpush(TASK_QUEUE_NAME, json.dumps(task_data))
    
    logging.info("All tasks have been pushed.")

    # 4. 等待并收集结果
    processed_count = 0
    start_time = time.time()
    while processed_count < total_tasks:
        try:
            # 使用带超时的blpop,避免在没有结果时永久阻塞
            # 同时也给一个机会打印进度
            result_message = redis_client.blpop(RESULT_QUEUE_NAME, timeout=10)
            if result_message is None:
                logging.info(f"Waiting for results... ({processed_count}/{total_tasks} done)")
                continue

            result_data = json.loads(result_message[1])
            if result_data.get('status') == 'SUCCESS':
                processed_count += 1
                logging.info(f"Task {result_data['task_id']} completed by {result_data['worker_id']}. "
                             f"Progress: {processed_count}/{total_tasks}")
            else:
                # 理论上我们应该处理失败状态,但在此简化模型中,失败任务由worker直接放入失败队列
                pass
        except KeyboardInterrupt:
            logging.warning("Interrupted by user. Exiting.")
            break
            
    end_time = time.time()
    total_duration = end_time - start_time
    logging.info(f"All {total_tasks} tasks processed in {total_duration:.2f} seconds.")
    # 计算吞吐量
    if total_duration > 0:
        throughput = total_tasks / total_duration
        logging.info(f"Throughput: {throughput:.2f} tasks/second.")

if __name__ == '__main__':
    main()

Manager的Dockerfilerequirements.txt与Worker的非常类似,只是Python脚本不同。

运行与观察

项目目录结构如下:

.
├── docker-compose.yml
├── manager
│   ├── Dockerfile
│   ├── manager.py
│   └── requirements.txt
└── worker
    ├── Dockerfile
    ├── requirements.txt
    └── worker.py

执行以下命令来构建并启动整个系统:

# 构建镜像并以后台模式启动服务,同时将worker扩展到8个实例
docker-compose up --build -d --scale worker=8

然后,运行Manager来分发任务:

# Manager服务在执行完任务后会自动退出
docker-compose run --rm manager

你会看到Manager的日志,它会先生成100个样本文件,然后将100个任务推送到Redis。之后,它会开始等待结果。

同时,我们可以通过docker-compose logs -f worker来实时观察所有8个Worker的日志输出。你会看到类似下面的交错日志,清晰地展示了任务被不同Worker并行处理的过程:

worker_1  | 2023-10-27 02:45:10 - WORKER - INFO - [worker_1] Received task 2a3b...: Process sample_10.npy
worker_5  | 2023-10-27 02:45:10 - WORKER - INFO - [worker_5] Received task 8c1d...: Process sample_25.npy
worker_3  | 2023-10-27 02:45:11 - WORKER - INFO - [worker_3] Received task f4e5...: Process sample_5.npy
...
worker_1  | 2023-10-27 02:45:13 - WORKER - INFO - [worker_1] Completed task 2a3b... in 2.88s. Output: sample_10.npy
worker_5  | 2023-10-27 02:45:13 - WORKER - INFO - [worker_5] Completed task 8c1d... in 2.91s. Output: sample_25.npy
...

最终,Manager会报告总耗时。在我的8核机器上,处理100个文件(每个文件计算耗时约3秒)的总时间大约是 100 * 3 / 8 再加上一些开销,大概在40秒左右。而单体脚本则需要 100 * 3 = 300 秒。性能提升是线性的,这正是我们想要的。

方案的局限性与未来迭代路径

这个架构虽然解决了最初的性能瓶颈,但在生产环境中它依然存在一些局限性:

  1. 任务队列的健壮性:Redis作为一个简单的任务队列工作得很好,但它缺乏高级消息队列的一些特性。例如,如果一个Worker在处理任务中途崩溃,这个任务就会丢失。一个更健壮的系统会使用RabbitMQ或Kafka,它们支持消息确认(ACK)机制,确保任务至少被处理一次。
  2. 数据共享的瓶颈:当数据量和Worker数量急剧增加时,本地的共享卷可能会成为IO瓶颈。下一步的演进可能是使用一个分布式文件系统(如GlusterFS、Ceph)或对象存储(如MinIO、S3),让存储层本身具备高可用和可扩展性。
  3. 服务编排docker-compose非常适合单机部署和开发环境。要将此架构扩展到多台机器的集群上,就需要迁移到更强大的容器编排平台,如Docker Swarm或Kubernetes。Kubernetes尤其擅长管理有状态的服务、存储和复杂的网络策略,是这类计算密集型工作负载的理想选择。
  4. 动态伸缩:当前的Worker数量是固定的。一个更先进的系统应该能根据任务队列的长度动态地增加或减少Worker实例,以实现成本和性能的最佳平衡。Kubernetes的Horizontal Pod Autoscaler (HPA) 和 KEDA (Kubernetes-based Event Driven Autoscaling) 正是为此而生。

尽管存在这些局限,但从一个单体脚本演进到这个基于Docker和Redis的分布式模型,是架构上一个巨大的飞跃。它将计算逻辑与调度、通信逻辑解耦,为未来的大规模扩展奠定了坚实的基础。


  目录