今天我想分享一个结合Motoko和Node.js构建自定义嵌入存储检索系统的实战经验。这个系统能够高效存储、检索和管理嵌入向量(embeddings)——那些在机器学习和AI应用中广泛使用的数值表示形式。
作为一名长期从事分布式系统开发的工程师,我发现将Motoko的智能合约特性与Node.js的灵活性相结合,能够创造出既安全又易于扩展的解决方案。这个系统特别适合需要处理语义相似性搜索、推荐系统或自然语言处理任务的场景。
系统采用三层架构设计:
Motoko作为Internet Computer区块链的智能合约语言,提供了去中心化、不可篡改的存储能力。而Node.js则充当了区块链世界与传统Web应用之间的桥梁。
选择Motoko而非传统数据库主要基于三个考虑:
Node.js的选择则是因为:
首先需要安装DFINITY SDK:
bash复制sh -ci "$(curl -fsSL https://smartcontracts.org/install.sh)"
验证安装:
bash复制dfx --version
创建新项目:
bash复制dfx new embedding-store
cd embedding-store
在src/embedding-store/main.mo中定义数据结构和方法:
motoko复制import Array "mo:base/Array";
import Time "mo:base/Time";
actor EmbeddingStore {
type Embedding = {
text: Text;
embedding: [Float];
createdAt: Int;
};
stable var embeddings: [Embedding] = [];
public shared func storeEmbedding(text: Text, embedding: [Float]) : async () {
let timestamp = Time.now();
embeddings := Array.append(embeddings, [{
text = text;
embedding = embedding;
createdAt = timestamp;
}]);
};
public query func getEmbeddings() : async [Embedding] {
return embeddings;
};
}
注意:Motoko中的
stable关键字确保变量在合约升级时保持持久化
启动本地网络:
bash复制dfx start --background
部署合约:
bash复制dfx deploy
测试存储功能:
bash复制dfx canister call embedding-store storeEmbedding '("Sample Text", vec {1.0; 0.5; 0.25})'
查询数据:
bash复制dfx canister call embedding-store getEmbeddings
bash复制mkdir embedding-api
cd embedding-api
npm init -y
npm install express @dfinity/agent dotenv cors
index.js主要内容:
javascript复制const express = require('express');
const { HttpAgent, Actor } = require('@dfinity/agent');
require('dotenv').config();
const app = express();
const port = 3000;
app.use(express.json());
const canisterId = process.env.CANISTER_ID;
const host = process.env.HOST;
const agent = new HttpAgent({ host });
agent.fetchRootKey(); // 开发环境需要
const idlFactory = ({ IDL }) => {
return IDL.Service({
storeEmbedding: IDL.Func(
[IDL.Text, IDL.Vec(IDL.Float64)],
[],
['oneway']
),
getEmbeddings: IDL.Func(
[],
[IDL.Vec(IDL.Record({
text: IDL.Text,
embedding: IDL.Vec(IDL.Float64),
createdAt: IDL.Int
}))],
['query']
)
});
};
const embeddingStore = Actor.createActor(idlFactory, {
agent,
canisterId
});
app.post('/store', async (req, res) => {
try {
const { text, embedding } = req.body;
await embeddingStore.storeEmbedding(text, embedding);
res.status(200).json({ success: true });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.get('/retrieve', async (req, res) => {
try {
const embeddings = await embeddingStore.getEmbeddings();
res.status(200).json(embeddings);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.listen(port, () => {
console.log(`API服务运行在 http://localhost:${port}`);
});
.env文件示例:
code复制CANISTER_ID=rrkah-fqaaa-aaaaa-aaaaq-cai
HOST=http://localhost:8000
使用cURL测试API:
存储嵌入向量:
bash复制curl -X POST http://localhost:3000/store \
-H "Content-Type: application/json" \
-d '{"text":"自然语言处理","embedding":[0.1,0.5,0.8]}'
检索数据:
bash复制curl http://localhost:3000/retrieve
motoko复制public shared func storeEmbeddings(newEmbeddings: [Embedding]) : async () {
embeddings := Array.append(embeddings, newEmbeddings);
}
motoko复制public query func getEmbeddingsPage(page: Nat, size: Nat) : async [Embedding] {
let start = page * size;
let end = start + size;
if (end > embeddings.size()) {
return Array.subArray(embeddings, start, embeddings.size() - start);
} else {
return Array.subArray(embeddings, start, size);
}
}
javascript复制const jwt = require('jsonwebtoken');
const authenticate = (req, res, next) => {
const token = req.headers['authorization'];
if (!token) return res.sendStatus(401);
jwt.verify(token, process.env.SECRET_KEY, (err, user) => {
if (err) return res.sendStatus(403);
req.user = user;
next();
});
};
app.post('/store', authenticate, async (req, res) => {
// ...原有逻辑
});
javascript复制const validateEmbedding = (embedding) => {
if (!Array.isArray(embedding)) return false;
if (embedding.length > 1024) return false; // 限制维度大小
return embedding.every(num => typeof num === 'number' && isFinite(num));
};
添加Prometheus监控:
javascript复制const promBundle = require("express-prom-bundle");
const metricsMiddleware = promBundle({ includeMethod: true });
app.use(metricsMiddleware);
日志记录建议使用Winston:
javascript复制const winston = require('winston');
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [
new winston.transports.File({ filename: 'error.log', level: 'error' }),
new winston.transports.File({ filename: 'combined.log' })
]
});
// 在路由中使用
app.post('/store', async (req, res) => {
logger.info('存储请求', { body: req.body });
// ...原有逻辑
});
motoko复制// 在Motoko中维护多个canister引用
stable var shards: [Principal] = [];
javascript复制const redis = require('redis');
const client = redis.createClient();
app.get('/retrieve', async (req, res) => {
const cacheKey = 'all_embeddings';
client.get(cacheKey, async (err, cached) => {
if (cached) return res.json(JSON.parse(cached));
const embeddings = await embeddingStore.getEmbeddings();
client.setex(cacheKey, 60, JSON.stringify(embeddings));
res.json(embeddings);
});
});
| 错误现象 | 可能原因 | 解决方案 |
|---|---|---|
| 无法连接到canister | 网络配置错误 | 检查.env中的HOST和CANISTER_ID |
| 存储失败 | 数据类型不匹配 | 确保嵌入向量是Float64数组 |
| 查询超时 | canister过载 | 实现分页查询或增加超时时间 |
| 认证失败 | JWT配置错误 | 验证密钥和令牌有效期 |
motoko复制Debug.print("Current embeddings count: " # debug_show(embeddings.size()));
bash复制DEBUG=dfinity:* node index.js
bash复制dfx replica --verbose
扩展Motoko合约支持相似性搜索:
motoko复制import Float "mo:base/Float";
public query func findSimilar(target: [Float], threshold: Float) : async [Embedding] {
Array.filter(embeddings, func (e: Embedding) {
let similarity = cosineSimilarity(e.embedding, target);
similarity >= threshold
})
};
func cosineSimilarity(a: [Float], b: [Float]) : Float {
var dot = 0.0;
var normA = 0.0;
var normB = 0.0;
for (i in a.keys()) {
dot += a[i] * b[i];
normA += a[i] * a[i];
normB += b[i] * b[i];
};
dot / (Float.sqrt(normA) * Float.sqrt(normB))
};
在Node.js中实现混合推荐逻辑:
javascript复制app.get('/recommend', async (req, res) => {
const { userId } = req.query;
// 1. 获取用户嵌入向量
const userEmbedding = await getUserEmbedding(userId);
// 2. 获取候选物品
const candidates = await getCandidates();
// 3. 计算相似度并排序
const recommendations = candidates.map(item => ({
...item,
score: cosineSimilarity(userEmbedding, item.embedding)
})).sort((a, b) => b.score - a.score);
res.json(recommendations.slice(0, 10));
});
| 操作类型 | 平均延迟 | 吞吐量(QPS) |
|---|---|---|
| 单次存储 | 120ms | 80 |
| 批量存储(100条) | 900ms | 110 |
| 全量查询 | 450ms | 20 |
| 相似性搜索 | 680ms | 15 |
| 资源类型 | 估算成本 |
|---|---|
| 存储(1GB) | ~$5/月 |
| 计算(100万次调用) | ~$0.5 |
| 带宽(1TB) | ~$10 |
| 方案 | 月成本(估算) | 可靠性 | 扩展性 |
|---|---|---|---|
| Motoko+IC | $20-50 | 高 | 中 |
| MongoDB Atlas | $60-100 | 中 | 高 |
| AWS DynamoDB | $80-150 | 高 | 高 |
在实际部署这个系统时,我总结了几个关键经验:
数据预处理很重要:在存储前规范化嵌入向量可以显著提高查询性能
监控不可忽视:特别是canister的循环(cycle)余额,避免因耗尽而停机
版本控制策略:Motoko合约升级时需要特别注意stable变量的兼容性
测试覆盖全面:特别要测试边界情况,如空数组、极大数值等
对于想要进一步扩展这个系统的开发者,我建议:
这个项目最让我惊喜的是Motoko的表现力——虽然语法需要适应,但它的actor模型非常适合这种高并发的存储场景。Node.js的快速原型能力则让前端集成变得异常简单。