构建基于Git Hooks、Laravel与spaCy的文档语义变更分析管道


git diff 在追踪代码变更时无可替代,但对于追踪技术文档、产品需求文档(PRD)或法律合同的 意义 变更,它几乎是无用的。一个简单的段落重组,git diff 会将其标记为大规模的删除和添加,而一个词汇的细微改动——比如将“建议”改为“必须”——在视觉上微不足道,其语义影响却是巨大的。这种信息鸿沟在我们团队管理复杂产品文档时,成了一个持续的痛点。我们需要一个能够理解“含义”的 diff 工具。

这个需求催生了一个内部项目:构建一个自动化的语义变更分析管道。目标很明确:当任何受监控的文档被提交到我们的 Git 仓库时,系统能自动分析新旧版本之间的语义差异,并将结构化的分析报告存入数据库,最终在我们的内部 Laravel 平台中展示。

架构选择与权衡

我们的技术栈是固定的:版本控制用 Git,核心业务平台是 Laravel (PHP),而最强大的开源 NLP 工具集几乎都在 Python 生态,我们选择了 spaCy,因为它在性能和易用性之间取得了很好的平衡。问题变成了如何将这三个完全异构的技术栈粘合在一起,形成一个稳定、可维护的生产级管道。

通信方案的抉择

PHP (Laravel) 如何与 Python (spaCy) 通信是第一个要解决的核心问题。

  1. shell_exec 调用: 这是最直接的想法。Laravel 作业可以直接执行一个 Python 脚本。在原型验证阶段可以这么做,但在生产环境中,这是一种脆弱且难以维护的方式。错误处理、依赖管理、环境隔离、进程管理都会成为噩梦。我们直接否决了这个方案。
  2. HTTP API (Flask/FastAPI): 为 spaCy 服务封装一个轻量级的 RESTful API。这是一个标准且成熟的方案。Laravel 通过 Guzzle 发送 HTTP 请求。优点是解耦、技术栈无关。缺点是存在网络开销,且需要维护一套 API 的版本、认证和文档。对于纯内部、高吞吐的场景,这显得有些重。
  3. 消息队列 (Redis/RabbitMQ): 这是我们最终选择的方案。Laravel 作为生产者,将分析任务(包含新旧文件内容)推送到队列中。Python 服务作为消费者,监听队列,处理任务,并将结果写回另一个队列或数据库。这个模式的优势是显而易见的:
    • 异步解耦: Laravel 无需同步等待 Python 处理完成,避免了长时间的请求阻塞,提升了系统的响应能力。
    • 削峰填谷: 面对短时间内大量 commits,队列可以作为缓冲区,保护下游的 NLP 服务不被冲垮。
    • 韧性: 如果 Python 服务暂时宕机,任务会保留在队列中,待服务恢复后继续处理。
    • 水平扩展: 当处理能力不足时,可以简单地增加更多的 Python 消费者实例。

我们选择了 Redis Streams,因为它轻量、性能极高,并且 Laravel 生态对其支持良好。

流程概览

整个处理流程被设计为一套自动化的事件触发链。

graph TD
    A[开发者/PM push commits] --> B{GitLab/GitHub Server};
    B -- Git post-receive hook --> C[触发 Webhook];
    C -- HTTP POST --> D[Laravel Endpoint];
    D -- dispatch job --> E{Redis Queue: jobs};
    E -- job payload --> F[Laravel Queue Worker];
    F -- 1. 从 Git 仓库拉取文件内容 --> F;
    F -- 2. 推送分析任务 --> G{Redis Stream: nlp_tasks};
    H[Python spaCy Worker] -- 3. 阻塞监听 --> G;
    H -- 4. 执行 NLP 分析 --> H;
    H -- 5. 将结果推送到 --> I{Redis Stream: nlp_results};
    F -- 6. 等待并拉取结果 --> I;
    F -- 7. 结果存入 --> J[PostgreSQL 数据库];
    K[Laravel Dashboard] -- 8. 查询并展示 --> J;

第一步:配置 Git 服务端 Hook

我们在 GitLab 服务器的裸仓库 (.git 目录) 中配置了一个 post-receive 钩子。这个钩子在 push 操作成功完成后被 Git 自动执行。它的核心任务是识别出哪些分支被更新了,并通知 Laravel 应用。

这里的坑在于:post-receive 脚本的运行环境非常纯净,并且以 Git 用户的身份运行。必须确保脚本本身是可执行的 (chmod +x post-receive),并且所有路径都是绝对路径。

path/to/your/repo.git/hooks/post-receive

#!/bin/bash

# ==============================================================================
# Git Post-Receive Hook for Semantic Diff Pipeline
#
# 该脚本在 Git 仓库接收到 push 操作后执行。
# 它读取 stdin 获取变更的引用信息,并向 Laravel 应用发送一个 webhook。
# ==============================================================================

# 读取 stdin
# 格式为: <old-rev> <new-rev> <ref-name>
while read oldrev newrev refname
do
  # 我们只关心主分支的变更
  if [[ $refname = "refs/heads/main" ]]; then
    
    # 定义 Laravel Webhook 的 URL 和一个共享密钥
    WEBHOOK_URL="https://your-laravel-app.com/api/hooks/git-commit"
    SECRET_TOKEN="a-very-strong-secret-token"

    # 获取仓库名称,通常是目录名
    REPO_NAME=$(basename "$PWD" .git)

    # 构造 JSON payload
    payload=$(cat << EOF
{
  "repository": "$REPO_NAME",
  "ref": "$refname",
  "before": "$oldrev",
  "after": "$newrev"
}
EOF
)

    # 计算签名,用于 webhook 的安全验证
    # Laravel 端会用同样的密钥计算签名并比对,防止恶意调用
    signature=$(echo -n "$payload" | openssl dgst -sha256 -hmac "$SECRET_TOKEN" | sed 's/^.* //')

    echo "Sending webhook for commit $newrev in $REPO_NAME..."

    # 使用 curl 发送 POST 请求
    # -s: silent mode
    # -H: 设置 header
    # -d: 设置 request body
    # --connect-timeout 5: 5秒连接超时
    # --max-time 10: 10秒总超时
    # 生产环境中,应该添加重试逻辑或将失败事件记录到日志系统中
    curl -s \
      -H "Content-Type: application/json" \
      -H "X-Git-Signature: sha256=$signature" \
      -d "$payload" \
      --connect-timeout 5 \
      --max-time 10 \
      "$WEBHOOK_URL"

    # 检查 curl 的退出码
    if [ $? -ne 0 ]; then
      # 写入标准错误输出,这通常会被 Git 服务记录下来
      echo "Error: Webhook request failed for commit $newrev." >&2
      # 在生产环境中,这里可能需要触发告警
    fi
  fi
done

exit 0

第二步:Laravel 编排层

Laravel 负责接收 Webhook、验证请求、分发后台任务,并最终处理分析结果。

Webhook 接收与验证

我们创建了一个 API 路由和对应的控制器来处理来自 Git Hook 的请求。安全性是首要考虑,我们通过 X-Git-Signature 头来验证请求的合法性。

routes/api.php

<?php

use Illuminate\Support\Facades\Route;
use App\Http\Controllers\Api\GitHookController;

Route::post('/hooks/git-commit', [GitHookController::class, 'handle'])
    ->middleware('verify.git.signature');

app/Http/Middleware/VerifyGitSignature.php

<?php

namespace App\Http\Middleware;

use Closure;
use Illuminate\Http\Request;

class VerifyGitSignature
{
    public function handle(Request $request, Closure $next)
    {
        $signature = $request->header('X-Git-Signature');
        $secret = config('services.git_hook.secret');

        if (!$signature) {
            abort(403, 'Signature header not set.');
        }

        $parts = explode('=', $signature, 2);
        if (count($parts) !== 2) {
            abort(403, 'Invalid signature format.');
        }

        [$algo, $hash] = $parts;

        // 在真实项目中,应该支持多种哈希算法
        if ($algo !== 'sha256') {
             abort(403, 'Unsupported hash algorithm.');
        }

        $payload = $request->getContent();
        $expectedHash = hash_hmac('sha256', $payload, $secret);

        if (!hash_equals($expectedHash, $hash)) {
            abort(403, 'Invalid signature.');
        }

        return $next($request);
    }
}

app/Http/Controllers/Api/GitHookController.php

<?php

namespace App\Http\Controllers\Api;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessSemanticDiffJob;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\Log;

class GitHookController extends Controller
{
    public function handle(Request $request)
    {
        $payload = $request->json()->all();

        Log::info('Received git hook for repository', [
            'repository' => $payload['repository'],
            'after' => $payload['after']
        ]);

        // 将耗时任务分发到队列中,立即返回响应
        ProcessSemanticDiffJob::dispatch(
            $payload['repository'],
            $payload['before'],
            $payload['after']
        );

        return response()->json(['status' => 'ok', 'message' => 'Job dispatched.']);
    }
}

核心处理 Job

这个 Job 是整个 Laravel 端的逻辑核心。它负责与 Git 仓库和 Redis Streams 交互。

app/Jobs/ProcessSemanticDiffJob.php

<?php

namespace App\Jobs;

// ... (use statements)
use Illuminate\Support\Facades\Process;
use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Str;
use Throwable;

class ProcessSemanticDiffJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    // 作业失败前可以尝试的最大次数
    public int $tries = 3;
    // 作业在超时前可以运行的秒数
    public int $timeout = 300; // 5 minutes

    protected string $repoPath;

    public function __construct(
        protected string $repoName,
        protected string $oldCommit,
        protected string $newCommit
    ) {
        // 关键点:Laravel Worker 需要能访问到 Git 裸仓库
        // 这个路径应该来自配置文件,并且确保 Worker 进程有读取权限
        $this->repoPath = config('services.git_hook.repo_base_path') . '/' . $this->repoName . '.git';
    }

    public function handle(): void
    {
        // 1. 找出发生变更的文档文件 (例如 .md 文件)
        $changedFiles = $this->getChangedMarkdownFiles();

        if (empty($changedFiles)) {
            Log::info('No markdown files changed in commit.', ['commit' => $this->newCommit]);
            return;
        }

        foreach ($changedFiles as $file) {
            try {
                // 2. 获取文件的旧版本和新版本内容
                $oldContent = $this->getFileContentAtCommit($this->oldCommit, $file);
                $newContent = $this->getFileContentAtCommit($this->newCommit, $file);
                
                // 如果文件是新增的,旧内容为空字符串
                if ($oldContent === $newContent) {
                    continue; // 内容无变化,跳过
                }

                // 3. 将任务推送到 Redis Stream
                $taskId = 'task:' . Str::uuid();
                $taskPayload = [
                    'task_id' => $taskId,
                    'file_path' => $file,
                    'old_content' => $oldContent,
                    'new_content' => $newContent,
                ];

                Redis::xadd('nlp_tasks', '*', $taskPayload);

                // 4. 等待 Python 服务的处理结果
                $result = $this->waitForNlpResult($taskId);

                // 5. 将结果持久化到数据库
                $this->storeResult($file, $result);

            } catch (Throwable $e) {
                Log::error('Failed to process semantic diff for file.', [
                    'file' => $file,
                    'commit' => $this->newCommit,
                    'exception' => $e->getMessage()
                ]);
                // 可以选择让作业失败并重试
                $this->fail($e);
            }
        }
    }

    private function getChangedMarkdownFiles(): array
    {
        // 使用 git diff-tree 来获取两个 commit 之间的文件列表
        // --no-commit-id: 不显示 commit ID
        // --name-only: 只显示文件名
        // -r: 递归
        // *.md: 只关心 markdown 文件
        $command = "git --git-dir={$this->repoPath} diff-tree --no-commit-id --name-only -r {$this->oldCommit} {$this->newCommit} -- '*.md'";
        $result = Process::run($command);

        if (!$result->successful()) {
            throw new \RuntimeException('Failed to get changed files: ' . $result->errorOutput());
        }

        return array_filter(explode("\n", $result->output()));
    }

    private function getFileContentAtCommit(string $commit, string $path): string
    {
        // 使用 git show 获取特定 commit 中特定文件的内容
        // 对于一个 "zero hash" 的 commit (代表文件创建),git show 会失败,这是预期行为
        if ($commit === '0000000000000000000000000000000000000000') {
            return '';
        }
        $command = "git --git-dir={$this->repoPath} show {$commit}:{$path}";
        $result = Process::run($command);
        
        // 如果文件在旧 commit 不存在,git show 会返回非 0 code,这很正常
        if (!$result->successful()) {
            return '';
        }
        return $result->output();
    }
    
    private function waitForNlpResult(string $taskId, int $timeout = 60): array
    {
        // 这是一个简化的阻塞式等待。
        // 在真实的高并发应用中,应该使用一个独立的 Listener 来处理结果,
        // 或者采用 WebSockets/Polling 等方式通知前端。
        // XREAD a non-blocking command, but with BLOCK it becomes blocking.
        // COUNT 1: we only want one message
        // BLOCK 60000: block for up to 60 seconds (60000 ms)
        // STREAMS nlp_results:$taskId 0: read from the beginning of the specific result stream
        $streamName = "nlp_results:{$taskId}";
        $result = Redis::xread(['COUNT' => 1, 'BLOCK' => $timeout * 1000], [$streamName => '0-0']);
        
        if (empty($result) || !isset($result[$streamName])) {
            throw new \RuntimeException("Timeout waiting for NLP result for task: {$taskId}");
        }

        // Redis::xread 返回一个包含 stream name 的数组
        $message = current($result[$streamName]);
        $data = json_decode($message['data'], true);

        if (json_last_error() !== JSON_ERROR_NONE) {
            throw new \RuntimeException("Failed to decode NLP result JSON for task: {$taskId}");
        }

        // 收到结果后,可以从 Redis 中清理这个临时 Stream
        Redis::del($streamName);

        return $data;
    }

    private function storeResult(string $filePath, array $resultData): void
    {
        // 使用 Eloquent 模型存储结果
        \App\Models\SemanticDiffReport::create([
            'commit_hash' => $this->newCommit,
            'file_path' => $filePath,
            'similarity_score' => $resultData['similarity_score'] ?? 0.0,
            'analysis_data' => $resultData, // Store the full JSON response
        ]);
        Log::info('Successfully stored semantic diff report.', ['file' => $filePath]);
    }
}

第三步:Python spaCy 消费者服务

这个 Python 脚本是独立的、长期运行的服务。它唯一的职责就是从 Redis 中取出任务,调用 spaCy 进行分析,然后将结果写回 Redis。

requirements.txt

spacy==3.5.0
redis==4.5.4
# 推荐下载更强大的模型
# python -m spacy download en_core_web_lg

nlp_worker.py

import redis
import spacy
import json
import time
import logging
import os
import signal

# ==============================================================================
# spaCy NLP Worker
#
# 该服务连接到 Redis,监听 nlp_tasks Stream,执行 NLP 分析,
# 并将结果写回到一个特定于任务的 nlp_results Stream 中。
# ==============================================================================

# --- 配置 ---
REDIS_HOST = os.getenv('REDIS_HOST', '127.0.0.1')
REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
SPACY_MODEL = os.getenv('SPACY_MODEL', 'en_core_web_lg') # 使用包含词向量的大模型

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- 全局变量 ---
# 将 NLP 模型和 Redis 连接加载到全局,避免在每次循环中重新初始化
# 这是一个重要的性能优化点
try:
    logging.info(f"Loading spaCy model: {SPACY_MODEL}...")
    nlp = spacy.load(SPACY_MODEL)
    logging.info("spaCy model loaded successfully.")
    
    redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
    redis_client.ping() # 测试连接
    logging.info("Connected to Redis successfully.")
except Exception as e:
    logging.error(f"Initialization failed: {e}", exc_info=True)
    exit(1)

# 用于优雅退出的标志
shutdown_flag = False

def signal_handler(signum, frame):
    """处理 SIGINT 和 SIGTERM 信号,实现优雅退出"""
    global shutdown_flag
    logging.info("Shutdown signal received, preparing to exit...")
    shutdown_flag = True


def analyze_text_diff(old_text: str, new_text: str) -> dict:
    """
    使用 spaCy 对比两个文本的语义差异。
    """
    # 这里的坑:如果文本过长,nlp() 会消耗大量内存。
    # 在生产环境中,需要对输入大小进行限制,或采用分块处理的策略。
    if len(old_text.strip()) == 0: # 新增文件
        doc_new = nlp(new_text)
        new_entities = [{"text": ent.text, "label": ent.label_} for ent in doc_new.ents]
        return {
            "status": "created",
            "similarity_score": 0.0,
            "added_entities": new_entities,
            "removed_entities": [],
            "common_entities": [],
        }

    doc_old = nlp(old_text)
    doc_new = nlp(new_text)

    # 1. 计算整体语义相似度
    # 这需要模型包含词向量 (如 _md 或 _lg 模型)
    similarity = doc_old.similarity(doc_new)

    # 2. 对比命名实体 (Named Entities)
    entities_old = {(ent.text.lower(), ent.label_) for ent in doc_old.ents}
    entities_new = {(ent.text.lower(), ent.label_) for ent in doc_new.ents}

    added_entities = [{"text": text, "label": label} for text, label in entities_new - entities_old]
    removed_entities = [{"text": text, "label": label} for text, label in entities_old - entities_new]
    common_entities = [{"text": text, "label": label} for text, label in entities_old & entities_new]

    return {
        "status": "modified",
        "similarity_score": round(similarity, 4),
        "added_entities": added_entities,
        "removed_entities": removed_entities,
        "common_entities": common_entities,
    }

def process_task(task_data: dict):
    """处理单个任务"""
    task_id = task_data.get('task_id')
    if not task_id:
        logging.warning("Task received without task_id, skipping.")
        return

    try:
        logging.info(f"Processing task {task_id} for file {task_data.get('file_path')}")
        
        old_content = task_data.get('old_content', '')
        new_content = task_data.get('new_content', '')

        analysis_result = analyze_text_diff(old_content, new_content)
        analysis_result['task_id'] = task_id

        result_payload = json.dumps(analysis_result)
        
        # 将结果写回到一个专属于该任务的 stream
        # 这是一种简单的请求-响应模式实现
        result_stream = f"nlp_results:{task_id}"
        redis_client.xadd(result_stream, {'data': result_payload})
        # 设置一个过期时间,防止 Laravel worker 崩溃导致垃圾 stream 残留
        redis_client.expire(result_stream, 300) 
        
        logging.info(f"Finished processing task {task_id}, result sent.")

    except Exception as e:
        logging.error(f"Error processing task {task_id}: {e}", exc_info=True)
        # 可以在这里实现错误通知逻辑,比如写到一个专用的 error stream
        error_payload = json.dumps({"task_id": task_id, "error": str(e)})
        result_stream = f"nlp_results:{task_id}"
        redis_client.xadd(result_stream, {'error': error_payload})
        redis_client.expire(result_stream, 300)

def main_loop():
    """主消费循环"""
    group_name = "nlp_processor_group"
    consumer_name = f"consumer-{os.getpid()}"
    stream_name = "nlp_tasks"

    try:
        # 创建消费者组,如果已存在会报错,所以需要 try-except
        redis_client.xgroup_create(stream_name, group_name, id='0', mkstream=True)
    except redis.exceptions.ResponseError as e:
        if "BUSYGROUP Consumer Group name already exists" in str(e):
            logging.info(f"Consumer group '{group_name}' already exists.")
        else:
            raise

    logging.info(f"Worker '{consumer_name}' started, listening to stream '{stream_name}'...")
    while not shutdown_flag:
        try:
            # XREADGROUP is designed for reliable messaging with multiple consumers.
            # BLOCK 2000: block for up to 2 seconds
            # COUNT 1: process one message at a time
            # >: get new messages that have not been delivered to any consumer in this group
            messages = redis_client.xreadgroup(
                group_name, consumer_name, {stream_name: '>'}, count=1, block=2000
            )

            if not messages:
                continue # Timeout, loop again

            for stream, message_list in messages:
                for message_id, data in message_list:
                    process_task(data)
                    # 确认消息已被处理,从 pending entries list 中移除
                    redis_client.xack(stream_name, group_name, message_id)

        except redis.exceptions.ConnectionError:
            logging.warning("Redis connection lost. Reconnecting...")
            time.sleep(5)
        except Exception as e:
            logging.error(f"An unexpected error occurred in the main loop: {e}", exc_info=True)
            time.sleep(5) # 避免在持续错误状态下快速循环消耗 CPU
            
    logging.info("Worker shutting down.")


if __name__ == '__main__':
    # 注册信号处理器
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    
    main_loop()

这个 Python 脚本应该使用 systemd 或类似的主管进程(Supervisor)来管理,以确保它能作为后台服务稳定运行并在失败时自动重启。

方案局限性与未来迭代方向

这套管道成功地解决了最初的问题,但它远非完美。在真实项目中,我们很快就发现了一些局限性和可以改进的地方:

  1. 同步等待的瓶颈: Laravel Job 中 waitForNlpResult 的阻塞式等待是一个简化设计。当 NLP 处理时间较长或任务量巨大时,这会长时间占用队列 worker,降低整个系统的吞吐量。一个更优的架构是,Laravel Job 在推送任务后立即结束。结果由一个独立的、专门监听 nlp_results 流的 Laravel 命令或服务来处理,实现完全的异步化。

  2. NLP 分析的深度: 目前的分析还比较初级,仅限于相似度和实体增删。可以引入更复杂的分析,例如:

    • 句子级 Diff: 对比新旧版本中每个句子的向量,找出被修改、新增、删除的句子。
    • 依存关系分析: 分析句子结构的变化,例如主动语态变为被动语态。
    • 情感/语气分析: 对于 PRD 或用户反馈文档,分析文本的情感倾向是否发生变化。
  3. 对非文本文件的处理: 当前方案只处理 .md 文件。对于包含图表、代码块的复杂文档,需要先进行内容提取和清洗。例如,从 README.md 中剥离出代码块,只对自然语言部分进行分析。

  4. 性能与成本: 加载 en_core_web_lg 这样的大模型会占用数百兆内存。如果并发任务量很大,Python worker 会成为资源瓶颈。可以考虑使用更轻量的模型,或者使用像 ONNX Runtime 这样的工具对模型进行优化和加速,甚至考虑将 NLP 服务部署到专用的 GPU 实例上。

  5. 配置化与可扩展性: 当前的逻辑硬编码了对 main 分支和 .md 文件的处理。一个更健壮的系统应该允许通过配置文件来定义需要监控的仓库、分支和文件类型,以及针对不同文件类型应用的不同分析策略。


  目录