结合 Haskell 与 JavaScript 构建面向 Qdrant 的实时向量化数据管道


我们面临一个具体的工程挑战:一个高频写入的 PostgreSQL 数据库,其 products 表存储着不断更新的商品描述。业务需求是提供一个低延迟的语义搜索接口,允许用户通过自然语言描述来查找商品。直接在 PostgreSQL 上使用 pg_vector 并执行全文搜索或向量搜索,在高并发写入和复杂查询场景下,会对主业务数据库造成不可接受的性能压力。因此,将读(搜索)和写(业务逻辑)的负载分离是架构上的必然选择。

核心问题是,如何构建一个健壮、实时的数据管道,将 products 表的变更(INSERTUPDATE)实时捕获,将其文本内容向量化,并存储到专用的向量数据库 Qdrant 中,最终通过一个高性能的 API 服务暴露搜索能力。

方案 A:单一技术栈 - Node.js 全链路实现

一个显而易见的方案是使用单一技术栈,例如 Node.js (TypeScript),来构建整个管道。

  • 数据捕获: 使用 Debezium 作为 CDC (Change Data Capture) 工具监控 PostgreSQL 的 WAL (Write-Ahead Log),将变更事件推送到 Kafka。
  • 处理节点: 一个 Node.js 服务消费 Kafka 中的消息。
  • 向量化: 在 Node.js 服务中,调用外部的 Embedding API (如 OpenAI, Cohere 等)。
  • 数据索引: 将获取到的向量和元数据写入 Qdrant。
  • 查询服务: 另一个 Node.js 服务(或同一个服务)提供 REST/GraphQL API,接收查询请求,将其向量化后查询 Qdrant。

优势分析:

  1. 开发效率: 团队成员只需掌握 JavaScript/TypeScript,降低了认知负荷和招聘难度。
  2. 生态系统: Node.js 拥有庞大的生态系统,kafka-nodekafkajs@qdrant/js-clientaxios 等库都非常成熟。
  3. IO 密集型优势: Node.js 的异步非阻塞 IO 模型非常适合处理这种以网络调用为主(消费 Kafka、调用 Embedding API、写入 Qdrant)的工作流。

劣势分析:

  1. 类型安全: 尽管有 TypeScript,但在处理复杂、嵌套的数据转换(如解析 Debezium 复杂的 JSON 结构)时,其类型系统提供的编译时保证远不如 Haskell 或 Rust。数据管道的正确性至关重要,一个微小的解析错误可能导致数据污染或管道中断,而这些错误在运行时才可能暴露。
  2. 错误处理: 在 JavaScript 中,复杂的异步流中的错误处理(Promise 链、async/await)虽然可行,但很容易写出难以推理的代码。确保每条消息都得到原子性的“至少一次”或“精确一次”处理,需要非常严谨的错误处理和重试逻辑,这在 Haskell 的 Monad 结构下表达得更为清晰和安全。
  3. 计算密集部分: 虽然大部分是 IO 操作,但如果未来需要在管道中加入一些 CPU 密集的预处理步骤(例如,复杂的文本清洗、规则引擎),Node.js 的单线程模型可能会成为瓶颈。

方案 B:混合技术栈 - Haskell 数据处理 + JavaScript API 服务

这个方案将数据管道中最关键、最需要保证正确性的部分交给 Haskell,而将与前端交互、快速迭代的 API 服务部分交给 JavaScript。

  • 数据捕获: 同样使用 Debezium + Kafka。
  • 处理节点 (Haskell): 一个 Haskell 服务消费 Kafka 消息。它负责解析 Debezium 消息、调用 Embedding API、处理所有可能的转换逻辑,并写入 Qdrant。
  • 查询服务 (JavaScript/Node.js): 与方案 A 相同,一个轻量级的 Node.js 服务负责暴露查询接口。

优势分析:

  1. 极致的类型安全: Haskell 的静态类型系统可以在编译时捕获大量潜在的数据格式错误、空值问题和逻辑谬误。对于一个生产级的数据管道,这种确定性是巨大的资产。我们可以为 Debezium 的消息格式定义精确的 ADT (Algebraic Data Types),任何不匹配的变更都会导致编译失败,而不是运行时异常。
  2. 健壮的并发与错误处理: Haskell 的 exceptionsmtlrio 等库提供了非常强大的抽象来处理并发和错误。使用 STM (Software Transactional Memory) 或 async 库可以轻松构建高吞吐、无死锁的并发消费者。错误可以被建模为类型的一部分(如 EitherExceptT),强制开发者处理所有失败路径。
  3. 代码即文档: Haskell 代码,特别是其类型签名,具有很强的自文档性。processMessage :: DebeziumPayload -> IO (Either PipelineError QdrantSuccess) 这样的签名清晰地表达了函数的意图和所有可能的结果。

劣势分析:

  1. 技术栈复杂度: 引入 Haskell 增加了团队的技术栈广度,对 CI/CD、监控和运维提出了更高要求。
  2. 生态与人才: Haskell 的生态系统虽健全但不如 Node.js 庞大,尤其是在与各种 Web 服务集成时,可能需要自己编写一些绑定。寻找经验丰富的 Haskell 开发者也更具挑战。

最终决策与理由

在真实项目中,数据管道的稳定性和数据正确性是最高优先级。一个默默产生脏数据或频繁崩溃的管道是不可接受的。因此,我们选择方案 B

决策的核心理由是:用最合适的工具解决最关键的问题。数据转换和处理是这个管道的心脏,它的健壮性决定了整个系统的可靠性。Haskell 在这方面提供的编译时保证和强大的抽象能力,足以抵消其带来的额外复杂性。而 API 服务层,更注重快速迭代和与前端生态的集成,Node.js 是此处的最佳选择。通过 Kafka 这个强大的中间件,两个服务得以完美解耦。

核心实现概览

以下是整个系统的架构图和核心代码实现。

graph TD
    A[PostgreSQL] -- WAL --> B(Debezium Connector)
    B -- JSON over Kafka --> C{Kafka Topic: db.products.events}
    C -- Consume --> D[Haskell Vectorizer Service]
    D -- Call for Embedding --> E[Embedding Model API]
    D -- Upsert Vector --> F[Qdrant]
    G[User] -- Search Query --> H[Node.js API Service]
    H -- Vectorize Query --> E
    H -- Query by Vector --> F
    F -- Search Results --> H
    H -- API Response --> G

1. 基础设施编排 (Docker Compose)

为了本地开发和测试,我们使用 docker-compose 启动所有依赖。

docker-compose.yml

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  postgres:
    image: debezium/postgres:14
    container_name: postgres
    ports:
      - "5432:5432"
    environment:
      POSTGRES_DB: products_db
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password

  qdrant:
    image: qdrant/qdrant:v1.6.0
    container_name: qdrant
    ports:
      - "6333:6333"
      - "6334:6334"
    volumes:
      - ./qdrant_storage:/qdrant/storage

  debezium-connector:
    image: debezium/connect:2.1
    container_name: debezium-connector
    ports:
      - "8083:8083"
    depends_on:
      - kafka
      - postgres
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: my_connect_configs
      OFFSET_STORAGE_TOPIC: my_connect_offsets
      STATUS_STORAGE_TOPIC: my_connect_statuses

# ... Haskell and Node.js service definitions would go here

启动后,需要手动向 Debezium 提交 Connector 配置。

register-postgres-connector.sh

#!/bin/bash
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @- <<'EOF'
{
  "name": "products-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "user",
    "database.password": "password",
    "database.dbname": "products_db",
    "database.server.name": "dbserver1",
    "table.include.list": "public.products",
    "plugin.name": "pgoutput"
  }
}
EOF

2. Haskell 向量化服务

这是系统的核心。我们使用 hw-kafka-client 消费 Kafka,aeson 解析 JSON,wreq 调用 Embedding API,以及一个自定义的 Qdrant 客户端。

项目结构 (Stack):
stack.yaml, package.yaml, src/Main.hs, src/Types.hs

package.yaml (部分):

dependencies:
- base >= 4.7 && < 5
- aeson
- bytestring
- text
- hw-kafka-client
- http-client
- http-client-tls
- wreq
- vector
- unordered-containers
- scientific

src/Types.hs - 定义强类型的数据结构:
这是 Haskell 发挥关键作用的地方。我们为 Debezium 的消息定义了精确的类型。

{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}

module Types where

import Data.Aeson
import Data.Text (Text)
import GHC.Generics

-- 对应 Debezium 消息体中的 'after' 或 'before' 字段
data ProductPayload = ProductPayload
  { id :: Int
  , name :: Text
  , description :: Text
  } deriving (Show, Generic)

instance FromJSON ProductPayload
instance ToJSON ProductPayload

-- Debezium 消息的 schema
data DebeziumPayload = DebeziumPayload
  { after :: Maybe ProductPayload
  , op :: Text -- 'c' for create, 'u' for update, 'd' for delete
  -- ... 其他 Debezium 字段可以按需添加
  } deriving (Show, Generic)

instance FromJSON DebeziumPayload

-- Kafka 消息的顶层结构
data KafkaMessage = KafkaMessage
  { payload :: Maybe DebeziumPayload
  } deriving (Show, Generic)

instance FromJSON KafkaMessage

-- Qdrant Point 结构
data QdrantPoint = QdrantPoint
  { pointId :: Int
  , vector :: [Float]
  , payload :: ProductPayload
  } deriving (Show, Generic)

-- 自定义 ToJSON 以匹配 Qdrant API
instance ToJSON QdrantPoint where
  toJSON p = object
    [ "id" .= pointId p
    , "vector" .= vector p
    , "payload" .= payload p
    ]

data QdrantUpsertRequest = QdrantUpsertRequest
  { points :: [QdrantPoint]
  } deriving (Show, Generic)

instance ToJSON QdrantUpsertRequest where
    toJSON r = object ["points" .= points r]

-- 定义管道中可能发生的错误
data PipelineError = KafkaDecodeError String
                   | EmbeddingError String
                   | QdrantError String
                   | MissingPayloadError
                   deriving (Show)

src/Main.hs - 主消费逻辑:

{-# LANGUAGE OverloadedStrings #-}

module Main where

import Control.Exception (bracket)
import Control.Monad (forever, void)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Except (ExceptT(..), runExceptT, throwE)
import Data.Aeson (decode, encode)
import qualified Data.ByteString.Lazy as LBS
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Kafka.Consumer
import Lens.Micro ((^.))
import Network.Wreq
import Types -- 导入我们定义的类型

-- 配置信息
kafkaBroker :: BrokerAddress
kafkaBroker = BrokerAddress "localhost:9092"

kafkaTopic :: TopicName
kafkaTopic = TopicName "dbserver1.public.products"

kafkaGroupId :: ConsumerGroupId
kafkaGroupId = ConsumerGroupId "haskell-vectorizer"

qdrantUrl :: String
qdrantUrl = "http://localhost:6333/collections/products/points"

embeddingApiUrl :: String
embeddingApiUrl = "http://your-embedding-service/embed" -- 替换为真实的 Embedding 服务

main :: IO ()
main = do
  putStrLn "Starting Haskell Vectorizer Service..."
  -- 初始化 Qdrant Collection (生产环境中应更健壮)
  void $ runExceptT initializeQdrantCollection

  let props = brokersList [kafkaBroker] <> groupId kafkaGroupId <> autoCommit
  err <- runConsumer props sub (processMessages `catchError` handler)
  print err
  where
    sub = subscription [kafkaTopic]
    handler e = liftIO $ print e

-- 主消息处理循环
processMessages :: KafkaConsumer -> IO ()
processMessages consumer = forever $ do
  records <- pollRecords 1000 100 consumer -- Poll 100ms, max 10 records
  mapM_ (processRecord . crValue) records

-- 处理单条 Kafka 消息
processRecord :: Maybe LBS.ByteString -> IO ()
processRecord (Just bs) = do
    result <- runExceptT $ do
      -- 1. 解码 Debezium 消息
      kafkaMsg <- case decode bs of
        Just msg -> return msg
        Nothing  -> throwE $ KafkaDecodeError "Failed to decode top-level Kafka message"
      
      debeziumPayload <- case payload kafkaMsg of
        Just p -> return p
        Nothing -> throwE $ MissingPayloadError

      -- 2. 只处理 create 和 update 事件中的 'after' 状态
      case (op debeziumPayload, after debeziumPayload) of
        ((`elem` ["c", "u"]), Just productData) -> do
            liftIO $ putStrLn $ "Processing product ID: " ++ show (Types.id productData)
            -- 3. 获取向量
            vec <- getEmbedding (description productData)
            -- 4. 构建 Qdrant Point
            let qdrantPoint = QdrantPoint (Types.id productData) vec productData
            -- 5. 写入 Qdrant
            upsertToQdrant qdrantPoint
        _ -> liftIO $ putStrLn "Skipping delete event or message with no 'after' payload."

    case result of
      Left err -> putStrLn $ "Pipeline error: " ++ show err
      Right _  -> putStrLn "Successfully processed and indexed record."

processRecord Nothing = return () -- 空消息,忽略

-- 调用外部服务获取向量
getEmbedding :: T.Text -> ExceptT PipelineError IO [Float]
getEmbedding textToEmbed = do
  liftIO $ putStrLn $ "Getting embedding for: " ++ T.unpack (T.take 30 textToEmbed) ++ "..."
  let opts = defaults & header "Content-Type" .~ ["application/json"]
  let body = object ["text" .= textToEmbed]
  -- 这是一个模拟实现,真实项目中需要替换
  -- r <- liftIO $ postWith opts embeddingApiUrl (encode body)
  -- case (r ^. responseStatus . statusCode, decode (r ^. responseBody)) of
  --   (200, Just (Object o)) -> ... 解析响应 ...
  --   (code, _) -> throwE $ EmbeddingError ("API returned status " ++ show code)
  liftIO $ return $ take 384 $ cycle [0.1, -0.2, 0.3] -- MOCK: 返回一个固定的假向量

-- Upsert 数据到 Qdrant
upsertToQdrant :: QdrantPoint -> ExceptT PipelineError IO ()
upsertToQdrant point = do
  liftIO $ putStrLn $ "Upserting point " ++ show (pointId point) ++ " to Qdrant."
  let opts = defaults & header "Content-Type" .~ ["application/json"]
  let body = QdrantUpsertRequest { points = [point] }
  r <- liftIO $ putWith opts qdrantUrl (encode body)
  case r ^. responseStatus . statusCode of
    200 -> return ()
    code -> throwE $ QdrantError ("Qdrant returned status " ++ show code)

-- 初始化 Qdrant Collection
initializeQdrantCollection :: ExceptT PipelineError IO ()
initializeQdrantCollection = do
    liftIO $ putStrLn "Initializing Qdrant collection 'products'..."
    let url = "http://localhost:6333/collections/products"
    let body = object
            [ "vectors" .= object
                [ "size" .= (384 :: Int) -- 向量维度,需与模型匹配
                , "distance" .= ("Cosine" :: T.Text)
                ]
            ]
    r <- liftIO $ put url (encode body)
    case r ^. responseStatus . statusCode of
        200 -> liftIO $ putStrLn "Collection 'products' created or already exists."
        code -> throwE $ QdrantError $ "Failed to create collection, status: " ++ show code

这段 Haskell 代码展示了其核心优势:

  • 类型驱动: processRecord 的逻辑由 KafkaMessageDebeziumPayload 的结构驱动。
  • Monad Transformer: 使用 ExceptT IO 组合了 IO 操作和错误处理。整个处理流程是一系列连贯的 do 块,任何一步失败都会自动短路,进入错误处理逻辑。这比在 Node.js 中层层嵌套 try...catch.catch() 要清晰得多。

3. Node.js 查询 API 服务

这个服务使用 Fastify 框架,因为它以高性能和低开销著称。

项目结构:
package.json, server.js

package.json (部分):

{
  "dependencies": {
    "@qdrant/js-client-rest": "^1.6.0",
    "fastify": "^4.24.3",
    "pino-pretty": "^10.2.3",
    "axios": "^1.6.0" 
  }
}

server.js:

const fastify = require('fastify')({
  logger: {
    transport: {
      target: 'pino-pretty'
    }
  }
});
const { QdrantClient } = require('@qdrant/js-client-rest');

// --- Configuration ---
const QDRANT_HOST = process.env.QDRANT_HOST || 'localhost';
const QDRANT_PORT = process.env.QDRANT_PORT || 6333;
const EMBEDDING_API_URL = process.env.EMBEDDING_API_URL || 'http://your-embedding-service/embed';
const COLLECTION_NAME = 'products';
const VECTOR_DIM = 384; // Must match Haskell service and model

// --- Clients ---
const qdrantClient = new QdrantClient({ host: QDRANT_HOST, port: QDRANT_PORT });
const axios = require('axios').default;

// --- Helper Functions ---
/**
 * Gets vector embedding for a given text query.
 * In a real application, this should have robust error handling and retries.
 * @param {string} queryText
 * @returns {Promise<number[]>}
 */
async function getQueryVector(queryText) {
  fastify.log.info(`Getting vector for query: "${queryText}"`);
  // MOCK: In a real implementation, you would call the embedding API.
  // const response = await axios.post(EMBEDDING_API_URL, { text: queryText });
  // if (response.status !== 200 || !response.data.vector) {
  //   throw new Error('Failed to get embedding for query.');
  // }
  // return response.data.vector;
  
  // Return a consistent mock vector for demonstration
  return Array(VECTOR_DIM).fill(0).map((_, i) => Math.sin(i * 0.1));
}

// --- API Routes ---
fastify.post('/search', {
  schema: {
    body: {
      type: 'object',
      required: ['query'],
      properties: {
        query: { type: 'string' },
        limit: { type: 'number', default: 5 }
      }
    }
  }
}, async (request, reply) => {
  const { query, limit } = request.body;
  
  try {
    const queryVector = await getQueryVector(query);
    
    fastify.log.info(`Searching Qdrant collection "${COLLECTION_NAME}" with limit ${limit}`);
    
    const searchResult = await qdrantClient.search(COLLECTION_NAME, {
      vector: queryVector,
      limit: limit,
      with_payload: true, // Include the original product data
      with_vector: false, // Don't need to return the vector itself
    });

    return reply.code(200).send(searchResult);

  } catch (error) {
    fastify.log.error(error, `Search failed for query: "${query}"`);
    // A common error is the collection not existing. Provide a helpful message.
    if (error.response && error.response.status === 404) {
        return reply.code(500).send({
            message: `Qdrant collection '${COLLECTION_NAME}' not found. Ensure the Haskell service has run and initialized it.`
        });
    }
    return reply.code(500).send({ message: 'An internal server error occurred.' });
  }
});


// --- Server Start ---
const start = async () => {
  try {
    await fastify.listen({ port: 3000, host: '0.0.0.0' });
    fastify.log.info(`Node.js API server listening on port 3000`);
  } catch (err) {
    fastify.log.error(err);
    process.exit(1);
  }
};

start();

这个 Node.js 服务非常轻量,它的职责单一:验证输入、向量化查询、查询 Qdrant 并返回结果。这种职责分离使得两个服务都可以独立地进行扩展和维护。

架构的局限性与未来迭代

当前方案虽然健壮,但并非没有缺点。引入 polyglot 架构增加了运维的复杂性,需要为 Haskell 和 Node.js 分别建立构建、测试和部署流水线。同时,系统的整体吞吐量受限于最慢的环节,通常是外部 Embedding API 的响应时间和 Qdrant 的索引性能。

未来的优化路径可以包括:

  1. 引入 Schema Registry: 使用如 Avro 和 Confluent Schema Registry 来管理 Kafka 消息的 schema,可以为 Haskell 和其他潜在的消费者提供更强的 schema 保证,实现安全的 schema 演进。
  2. 批处理与背压: Haskell 消费者可以实现更复杂的批处理逻辑,将多个消息的向量化请求和 Qdrant 写入操作合并,以提高效率。同时,需要监控 Kafka 的消费延迟,实现背压机制,防止下游服务过载。
  3. 死信队列 (DLQ): 对于无法被 Haskell 服务处理的“毒丸”消息(例如,格式永久性错误),应将其转发到死信队列中进行人工干预,而不是无限重试阻塞整个分区。
  4. 本地 Embedding 模型: 为了降低延迟和成本,可以考虑在 Haskell 服务内部或旁路部署一个本地的 Embedding 模型(例如,使用 ONNX runtime 或 TorchScript),避免网络调用带来的不确定性。

  目录