🔗 OpenClaw 数据源集成指南

RSS / API / Webhook 全面接入 — 让你的 Agent 实时获取最新数据,告别"信息茧房"

📡 RSS 订阅 🔌 REST API 🔔 Webhook 接收 🗄️ 数据持久化 ⚡ 实时同步

📋 目录

🤔 为什么 Agent 需要数据源集成?

默认情况下,OpenClaw Agent 的世界停留在训练数据截止日期。想让它知道今天发生了什么?需要接入实时数据源。

三种核心接入方式,覆盖 99% 的数据获取场景:

📡

RSS 订阅

最适合内容聚合。多 feed、定时抓取、增量更新,零成本。

🔌

REST API

接入任意服务。认证、分页、缓存、速率限制全覆盖。

🔔

Webhook 接收

事件驱动。创建 endpoint、验签、去重,实时响应。

💡 妙趣洞察: 数据源集成不是目的,让 Agent 做出更好的决策才是。一个接入了实时股价 + 新闻情绪 + 财报数据的金融 Agent,比单纯的大语言模型强 10 倍。
数据源 采集器 清洗/转换 持久化 Agent 查询

📡 RSS 订阅源集成

基础 RSS 采集器

import feedparser, sqlite3
from datetime import datetime
from typing import List, Dict, Optional

class RSSCollector:
    """多feed采集器,支持增量更新"""
    def __init__(self, db_path: str = "rss_cache.db"):
        self.db_path = db_path; self._init_db()

    def _init_db(self):
        conn = sqlite3.connect(self.db_path)
        conn.execute("""CREATE TABLE IF NOT EXISTS rss_cache (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            feed_url TEXT NOT NULL, item_guid TEXT NOT NULL,
            title TEXT, link TEXT, pub_date TIMESTAMP,
            summary TEXT, fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            UNIQUE(feed_url, item_guid))""")
        conn.commit(); conn.close()

    def fetch_feed(self, feed_url: str, max_items: int = 20) -> List[Dict]:
        print(f"\U0001f50d 抓取 {feed_url}")
        try: feed = feedparser.parse(feed_url)
        except Exception as e: print(f"\u274c 失败: {e}"); return []
        if not feed.entries: return []
        new_items = []; conn = sqlite3.connect(self.db_path)
        for entry in feed.entries[:max_items]:
            guid = entry.get("id") or entry.get("link", "")
            title = entry.get("title", ""); link = entry.get("link", "")
            summary = entry.get("summary", "")[:500]
            pub_date = None
            if hasattr(entry, "published_parsed") and entry.published_parsed:
                pub_date = datetime(*entry.published_parsed[:6])
            cur = conn.execute("SELECT 1 FROM rss_cache WHERE feed_url=? AND item_guid=?", (feed_url, guid))
            if cur.fetchone(): continue
            conn.execute("INSERT OR IGNORE INTO rss_cache VALUES (null,?,?,?,?,?,datetime('now'))",
                (feed_url, guid, title, link, pub_date, summary))
            new_items.append({"title": title, "link": link,
                "pub_date": pub_date.isoformat() if pub_date else None,
                "summary": summary, "source": feed_url})
        conn.commit(); conn.close()
        print(f"\u2705 新增 {len(new_items)} 条"); return new_items

    def fetch_all(self, feed_urls: List[str]) -> List[Dict]:
        items = []
        for url in feed_urls: items.extend(self.fetch_feed(url))
        return sorted(items, key=lambda x: x.get("pub_date") or "", reverse=True)

    def search_cache(self, keyword: str, limit: int = 10) -> List[Dict]:
        conn = sqlite3.connect(self.db_path)
        cur = conn.execute("""SELECT title,link,pub_date,summary,feed_url
            FROM rss_cache WHERE title LIKE ? OR summary LIKE ?
            ORDER BY pub_date DESC LIMIT ?""", (f"%{keyword}%", f"%{keyword}%", limit))
        results = [{"title":r[0],"link":r[1],"pub_date":r[2],"summary":r[3],"source":r[4]} for r in cur.fetchall()]
        conn.close(); return results

if __name__ == "__main__":
    c = RSSCollector()
    feeds = ["https://openclaw.ai/blog", "https://hnrss.org/frontpage"]
    items = c.fetch_all(feeds)
    print(f"\\n\U0001f4f0 新增 {len(items)} 条")
    for r in c.search_cache("MCP", 5): print(f"  \u2022 {r['title']}")

定时抓取配置

# cron:每 30 分钟
*/30 * * * * /usr/bin/python3 /path/to/rss_collector.py >> /var/log/rss.log 2>&1

# OpenClaw 调度器
{"schedules": [{"name":"rss-fetch","schedule":"*/30 * * * *","action":"exec","command":"python3 rss_collector.py"}]}
💡 增量更新:guidlink 做唯一标识,配合 SQLite UNIQUE 约束去重。

RSS 最佳实践

实践为什么怎么做
增量更新避免重复SQLite 缓存 guid
频率控制避免封 IP同一 feed 间隔 ≥15 分钟
错误处理feed 可能失效捕获异常,记录日志
内容截断避免膨胀summary[:500]
自定义 UA站点验证设置 User-Agent

🔌 REST API 数据源集成

通用 API 客户端

import requests, time, hashlib, json, sqlite3
from typing import Optional, Dict, Any, List
from datetime import datetime, timedelta

class APIClient:
    """通用客户端 --- 认证/缓存/分页/速率限制"""
    def __init__(self, base_url: str, auth_token: Optional[str] = None,
                 auth_header: str = "Authorization", auth_prefix: str = "Bearer",
                 cache_db: str = "api_cache.db", rate_limit_per_min: int = 60):
        self.base_url = base_url.rstrip("/"); self.auth_token = auth_token
        self.auth_header = auth_header; self.auth_prefix = auth_prefix
        self.rate_limit = rate_limit_per_min; self.cache_db = cache_db
        self._ts: List[float] = []; self._init_cache()

    def _init_cache(self):
        conn = sqlite3.connect(self.cache_db)
        conn.execute("""CREATE TABLE IF NOT EXISTS api_cache (
            cache_key TEXT PRIMARY KEY, response_json TEXT,
            cached_at TIMESTAMP, expires_at TIMESTAMP)""")
        conn.commit(); conn.close()

    def _headers(self) -> Dict[str, str]:
        h = {"Content-Type": "application/json"}
        if self.auth_token:
            prefix = f"{self.auth_prefix} " if self.auth_header == "Authorization" else ""
            h[self.auth_header] = f"{prefix}{self.auth_token}"
        return h

    def _enforce_rate_limit(self):
        now = time.time()
        self._ts = [t for t in self._ts if now - t < 60]
        if len(self._ts) >= self.rate_limit:
            s = 60 - (now - self._ts[0])
            if s > 0: print(f"\u23f3 等待 {s:.1f}s"); time.sleep(s)
        self._ts.append(time.time())

    def request(self, method: str, endpoint: str, params: dict = None,
                data: dict = None, use_cache: bool = True, cache_ttl: int = 300):
        url = f"{self.base_url}/{endpoint.lstrip('/')}"; params = params or {}
        if method.upper() == "GET" and use_cache:
            ck = hashlib.sha256(f"{method}:{url}:{json.dumps(params,sort_keys=True)}".encode()).hexdigest()[:16]
            conn = sqlite3.connect(self.cache_db)
            cur = conn.execute("SELECT response_json,expires_at FROM api_cache WHERE cache_key=?", (ck,))
            row = cur.fetchone(); conn.close()
            if row and row[1] and datetime.now() < datetime.fromisoformat(row[1]):
                print(f"\U0001f4be 缓存: {endpoint}"); return json.loads(row[0])
        self._enforce_rate_limit(); headers = self._headers()
        for attempt in range(3):
            try:
                r = requests.request(method.upper(), url, headers=headers,
                    params=params if method.upper()=="GET" else None,
                    json=data if method.upper()!="GET" else None, timeout=30)
                if r.status_code == 429:
                    after = int(r.headers.get("Retry-After", 2**attempt))
                    print(f"\u26a0\ufe0f 限流!等 {after}s"); time.sleep(after); continue
                r.raise_for_status(); result = r.json()
                if method.upper()=="GET" and use_cache:
                    conn = sqlite3.connect(self.cache_db)
                    conn.execute("INSERT OR REPLACE INTO api_cache VALUES (?,?,datetime('now'),?)",
                        (ck, json.dumps(result), (datetime.now()+timedelta(seconds=cache_ttl)).isoformat()))
                    conn.commit(); conn.close()
                return result
            except requests.RequestException as e:
                if attempt < 2: time.sleep(2**attempt); print(f"\u274c 重试: {e}")
                else: print(f"\u274c 失败: {e}"); return None

    def get(self, *a, **kw): return self.request("GET", *a, **kw)
    def post(self, *a, **kw): return self.request("POST", *a, **kw)

    def paginate(self, endpoint: str, params: dict = None,
                 page_key: str = "page", per_page: int = 100, max_pages: int = 10):
        items = []; params = params or {}
        for page in range(1, max_pages+1):
            params[page_key] = page; params["per_page"] = per_page
            result = self.get(endpoint, params=params)
            if not result: break
            batch = result.get("items", result) if isinstance(result, dict) else result
            if not batch: break
            items.extend(batch); print(f"  \U0001f4c4 第{page}页: {len(batch)}条(累计{len(items)})")
            if isinstance(result, dict) and not result.get("has_more", True): break
            if len(batch) < per_page: break
        return items

# 示例:GitHub API
if __name__ == "__main__":
    gh = APIClient("https://api.github.com", "ghp_TOKEN", rate_limit_per_min=60)
    commits = gh.get("repos/openclaw/openclaw/commits", {"per_page": 5}, cache_ttl=600)
    for c in (commits or []):
        print(f"  \u2022 {c['commit']['message'][:60]} ({c['sha'][:7]})")

API 认证方式速查

方式请求头示例服务
Bearer TokenAuthorization: Bearer xxxGitHub, OpenAI
API Key (Header)X-API-Key: xxx传统 SaaS
API Key (Query)?api_key=xxx简单服务
Basic AuthAuthorization: Basic base64==内网服务
OAuth 2.0Authorization: Bearer token需用户授权

🔔 Webhook 接收 — 事件驱动

Webhook = "反向 API":数据变化时,服务端主动推送。实时性最好,无需轮询。

from flask import Flask, request, jsonify
import hmac, hashlib

app = Flask(__name__)
SECRET = "your-webhook-secret"

def verify(payload: bytes, sig: str, secret: str) -> bool:
    """验证签名,防伪造"""
    if not sig: return False
    expected = "sha256=" + hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
    return hmac.compare_digest(sig, expected) if sig.startswith("sha256=") else \
           hmac.compare_digest(sig, hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest())

@app.route("/webhook/github", methods=["POST"])
def github_webhook():
    sig = request.headers.get("X-Hub-Signature-256", "")
    if not verify(request.get_data(), sig, SECRET):
        return jsonify({"error": "bad sig"}), 401
    data = request.get_json(); event = request.headers.get("X-GitHub-Event", "?")
    print(f"\U0001f4ec GitHub [{event}]: 仓库={data.get('repository',{}).get('full_name')}")
    if event == "push":
        for c in data.get("commits", [])[:3]:
            print(f"   \u2022 {c['message'][:60]}")
    return jsonify({"ok": True}), 200

@app.route("/webhook/feishu", methods=["POST"])
def feishu_webhook():
    """飞书事件回调"""
    data = request.get_json()
    challenge = data.get("challenge")
    if challenge: return jsonify({"challenge": challenge})
    print(f"\U0001f4ec 飞书事件: {data.get('header',{}).get('event_type')}")
    return jsonify({"ok": True}), 200

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8080)
⚠️ 安全: 生产环境务必验证签名!否则任何人都可以伪造事件触发你的 Agent。

🔧 数据清洗与转换管道

import re
from typing import Dict, List, Tuple

class DataPipeline:
    """清洗 -> 转换 -> 增强"""

    @staticmethod
    def clean_html(text: str) -> str:
        return re.sub(r'<[^>]+>', '', text)

    @staticmethod
    def truncate(text: str, max_len: int = 500) -> str:
        return text[:max_len] + "..." if len(text) > max_len else text

    @staticmethod
    def extract_keywords(text: str, top_n: int = 5) -> List[Tuple[str, int]]:
        words = re.findall(r'\b[a-zA-Z]{4,}\b', text.lower())
        freq = {}
        for w in words: freq[w] = freq.get(w, 0) + 1
        return sorted(freq.items(), key=lambda x: x[1], reverse=True)[:top_n]

    def process_rss_item(self, item: Dict) -> Dict:
        return {"title": self.clean_html(item.get("title", "")),
                "summary": self.truncate(self.clean_html(item.get("summary", "")), 300),
                "link": item.get("link", ""),
                "pub_date": item.get("pub_date"),
                "keywords": self.extract_keywords(item.get("title","") + " " + item.get("summary",""))}

    def batch_process(self, items: List[Dict]) -> List[Dict]:
        return [self.process_rss_item(i) for i in items]

    def to_context_string(self, items: List[Dict], max_items: int = 10) -> str:
        lines = []
        for i, item in enumerate(items[:max_items]):
            lines.append(f"[{i+1}] {item.get('title')} ({item.get('pub_date','')[:10] or '?'})")
            lines.append(f"    摘要: {item.get('summary','')[:150]}")
        return "\\n".join(lines)

🗄️ 数据持久化 — SQLite / 文件系统

方式适合缺点代码量
SQLite结构化数据、搜索并发写入受限中等
文件系统 (JSON)简单持久化无搜索能力
内存 (dict)临时缓存重启丢失极少

文件系统持久化

import json, os
from datetime import datetime

class FileStore:
    """简单的 JSON 文件存储"""
    def __init__(self, dir_path: str = "data"):
        self.dir_path = dir_path; os.makedirs(dir_path, exist_ok=True)

    def save(self, key: str, data: list) -> str:
        path = os.path.join(self.dir_path, f"{key}_{datetime.now().strftime('%Y%m%d')}.json")
        with open(path, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        return path

    def load(self, key: str, date: str = None) -> list:
        if date is None: date = datetime.now().strftime("%Y%m%d")
        path = os.path.join(self.dir_path, f"{key}_{date}.json")
        if not os.path.exists(path): return []
        with open(path, "r", encoding="utf-8") as f: return json.load(f)

store = FileStore("news_cache")
store.save("hn_frontpage", [{"title": "GPT-5.6 released", "points": 1085}])
loaded = store.load("hn_frontpage")
print(f"\U0001f4c2 {len(loaded)} 条缓存")
>

🔗 相关资源

🚀 开始构建你的数据 Agent

从 RSS 采集到 MCP 包装,三种数据源集成方式任选!

📡 查看 RSS 聚合 🧰 更多教程