RSS / API / Webhook 全面接入 — 让你的 Agent 实时获取最新数据,告别"信息茧房"
默认情况下,OpenClaw Agent 的世界停留在训练数据截止日期。想让它知道今天发生了什么?需要接入实时数据源。
三种核心接入方式,覆盖 99% 的数据获取场景:
最适合内容聚合。多 feed、定时抓取、增量更新,零成本。
接入任意服务。认证、分页、缓存、速率限制全覆盖。
事件驱动。创建 endpoint、验签、去重,实时响应。
import feedparser, sqlite3
from datetime import datetime
from typing import List, Dict, Optional
class RSSCollector:
"""多feed采集器,支持增量更新"""
def __init__(self, db_path: str = "rss_cache.db"):
self.db_path = db_path; self._init_db()
def _init_db(self):
conn = sqlite3.connect(self.db_path)
conn.execute("""CREATE TABLE IF NOT EXISTS rss_cache (
id INTEGER PRIMARY KEY AUTOINCREMENT,
feed_url TEXT NOT NULL, item_guid TEXT NOT NULL,
title TEXT, link TEXT, pub_date TIMESTAMP,
summary TEXT, fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(feed_url, item_guid))""")
conn.commit(); conn.close()
def fetch_feed(self, feed_url: str, max_items: int = 20) -> List[Dict]:
print(f"\U0001f50d 抓取 {feed_url}")
try: feed = feedparser.parse(feed_url)
except Exception as e: print(f"\u274c 失败: {e}"); return []
if not feed.entries: return []
new_items = []; conn = sqlite3.connect(self.db_path)
for entry in feed.entries[:max_items]:
guid = entry.get("id") or entry.get("link", "")
title = entry.get("title", ""); link = entry.get("link", "")
summary = entry.get("summary", "")[:500]
pub_date = None
if hasattr(entry, "published_parsed") and entry.published_parsed:
pub_date = datetime(*entry.published_parsed[:6])
cur = conn.execute("SELECT 1 FROM rss_cache WHERE feed_url=? AND item_guid=?", (feed_url, guid))
if cur.fetchone(): continue
conn.execute("INSERT OR IGNORE INTO rss_cache VALUES (null,?,?,?,?,?,datetime('now'))",
(feed_url, guid, title, link, pub_date, summary))
new_items.append({"title": title, "link": link,
"pub_date": pub_date.isoformat() if pub_date else None,
"summary": summary, "source": feed_url})
conn.commit(); conn.close()
print(f"\u2705 新增 {len(new_items)} 条"); return new_items
def fetch_all(self, feed_urls: List[str]) -> List[Dict]:
items = []
for url in feed_urls: items.extend(self.fetch_feed(url))
return sorted(items, key=lambda x: x.get("pub_date") or "", reverse=True)
def search_cache(self, keyword: str, limit: int = 10) -> List[Dict]:
conn = sqlite3.connect(self.db_path)
cur = conn.execute("""SELECT title,link,pub_date,summary,feed_url
FROM rss_cache WHERE title LIKE ? OR summary LIKE ?
ORDER BY pub_date DESC LIMIT ?""", (f"%{keyword}%", f"%{keyword}%", limit))
results = [{"title":r[0],"link":r[1],"pub_date":r[2],"summary":r[3],"source":r[4]} for r in cur.fetchall()]
conn.close(); return results
if __name__ == "__main__":
c = RSSCollector()
feeds = ["https://openclaw.ai/blog", "https://hnrss.org/frontpage"]
items = c.fetch_all(feeds)
print(f"\\n\U0001f4f0 新增 {len(items)} 条")
for r in c.search_cache("MCP", 5): print(f" \u2022 {r['title']}")
# cron:每 30 分钟
*/30 * * * * /usr/bin/python3 /path/to/rss_collector.py >> /var/log/rss.log 2>&1
# OpenClaw 调度器
{"schedules": [{"name":"rss-fetch","schedule":"*/30 * * * *","action":"exec","command":"python3 rss_collector.py"}]}
guid 或 link 做唯一标识,配合 SQLite UNIQUE 约束去重。| 实践 | 为什么 | 怎么做 |
|---|---|---|
| 增量更新 | 避免重复 | SQLite 缓存 guid |
| 频率控制 | 避免封 IP | 同一 feed 间隔 ≥15 分钟 |
| 错误处理 | feed 可能失效 | 捕获异常,记录日志 |
| 内容截断 | 避免膨胀 | summary[:500] |
| 自定义 UA | 站点验证 | 设置 User-Agent 头 |
import requests, time, hashlib, json, sqlite3
from typing import Optional, Dict, Any, List
from datetime import datetime, timedelta
class APIClient:
"""通用客户端 --- 认证/缓存/分页/速率限制"""
def __init__(self, base_url: str, auth_token: Optional[str] = None,
auth_header: str = "Authorization", auth_prefix: str = "Bearer",
cache_db: str = "api_cache.db", rate_limit_per_min: int = 60):
self.base_url = base_url.rstrip("/"); self.auth_token = auth_token
self.auth_header = auth_header; self.auth_prefix = auth_prefix
self.rate_limit = rate_limit_per_min; self.cache_db = cache_db
self._ts: List[float] = []; self._init_cache()
def _init_cache(self):
conn = sqlite3.connect(self.cache_db)
conn.execute("""CREATE TABLE IF NOT EXISTS api_cache (
cache_key TEXT PRIMARY KEY, response_json TEXT,
cached_at TIMESTAMP, expires_at TIMESTAMP)""")
conn.commit(); conn.close()
def _headers(self) -> Dict[str, str]:
h = {"Content-Type": "application/json"}
if self.auth_token:
prefix = f"{self.auth_prefix} " if self.auth_header == "Authorization" else ""
h[self.auth_header] = f"{prefix}{self.auth_token}"
return h
def _enforce_rate_limit(self):
now = time.time()
self._ts = [t for t in self._ts if now - t < 60]
if len(self._ts) >= self.rate_limit:
s = 60 - (now - self._ts[0])
if s > 0: print(f"\u23f3 等待 {s:.1f}s"); time.sleep(s)
self._ts.append(time.time())
def request(self, method: str, endpoint: str, params: dict = None,
data: dict = None, use_cache: bool = True, cache_ttl: int = 300):
url = f"{self.base_url}/{endpoint.lstrip('/')}"; params = params or {}
if method.upper() == "GET" and use_cache:
ck = hashlib.sha256(f"{method}:{url}:{json.dumps(params,sort_keys=True)}".encode()).hexdigest()[:16]
conn = sqlite3.connect(self.cache_db)
cur = conn.execute("SELECT response_json,expires_at FROM api_cache WHERE cache_key=?", (ck,))
row = cur.fetchone(); conn.close()
if row and row[1] and datetime.now() < datetime.fromisoformat(row[1]):
print(f"\U0001f4be 缓存: {endpoint}"); return json.loads(row[0])
self._enforce_rate_limit(); headers = self._headers()
for attempt in range(3):
try:
r = requests.request(method.upper(), url, headers=headers,
params=params if method.upper()=="GET" else None,
json=data if method.upper()!="GET" else None, timeout=30)
if r.status_code == 429:
after = int(r.headers.get("Retry-After", 2**attempt))
print(f"\u26a0\ufe0f 限流!等 {after}s"); time.sleep(after); continue
r.raise_for_status(); result = r.json()
if method.upper()=="GET" and use_cache:
conn = sqlite3.connect(self.cache_db)
conn.execute("INSERT OR REPLACE INTO api_cache VALUES (?,?,datetime('now'),?)",
(ck, json.dumps(result), (datetime.now()+timedelta(seconds=cache_ttl)).isoformat()))
conn.commit(); conn.close()
return result
except requests.RequestException as e:
if attempt < 2: time.sleep(2**attempt); print(f"\u274c 重试: {e}")
else: print(f"\u274c 失败: {e}"); return None
def get(self, *a, **kw): return self.request("GET", *a, **kw)
def post(self, *a, **kw): return self.request("POST", *a, **kw)
def paginate(self, endpoint: str, params: dict = None,
page_key: str = "page", per_page: int = 100, max_pages: int = 10):
items = []; params = params or {}
for page in range(1, max_pages+1):
params[page_key] = page; params["per_page"] = per_page
result = self.get(endpoint, params=params)
if not result: break
batch = result.get("items", result) if isinstance(result, dict) else result
if not batch: break
items.extend(batch); print(f" \U0001f4c4 第{page}页: {len(batch)}条(累计{len(items)})")
if isinstance(result, dict) and not result.get("has_more", True): break
if len(batch) < per_page: break
return items
# 示例:GitHub API
if __name__ == "__main__":
gh = APIClient("https://api.github.com", "ghp_TOKEN", rate_limit_per_min=60)
commits = gh.get("repos/openclaw/openclaw/commits", {"per_page": 5}, cache_ttl=600)
for c in (commits or []):
print(f" \u2022 {c['commit']['message'][:60]} ({c['sha'][:7]})")
| 方式 | 请求头 | 示例服务 |
|---|---|---|
| Bearer Token | Authorization: Bearer xxx | GitHub, OpenAI |
| API Key (Header) | X-API-Key: xxx | 传统 SaaS |
| API Key (Query) | ?api_key=xxx | 简单服务 |
| Basic Auth | Authorization: Basic base64== | 内网服务 |
| OAuth 2.0 | Authorization: Bearer token | 需用户授权 |
Webhook = "反向 API":数据变化时,服务端主动推送。实时性最好,无需轮询。
from flask import Flask, request, jsonify
import hmac, hashlib
app = Flask(__name__)
SECRET = "your-webhook-secret"
def verify(payload: bytes, sig: str, secret: str) -> bool:
"""验证签名,防伪造"""
if not sig: return False
expected = "sha256=" + hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
return hmac.compare_digest(sig, expected) if sig.startswith("sha256=") else \
hmac.compare_digest(sig, hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest())
@app.route("/webhook/github", methods=["POST"])
def github_webhook():
sig = request.headers.get("X-Hub-Signature-256", "")
if not verify(request.get_data(), sig, SECRET):
return jsonify({"error": "bad sig"}), 401
data = request.get_json(); event = request.headers.get("X-GitHub-Event", "?")
print(f"\U0001f4ec GitHub [{event}]: 仓库={data.get('repository',{}).get('full_name')}")
if event == "push":
for c in data.get("commits", [])[:3]:
print(f" \u2022 {c['message'][:60]}")
return jsonify({"ok": True}), 200
@app.route("/webhook/feishu", methods=["POST"])
def feishu_webhook():
"""飞书事件回调"""
data = request.get_json()
challenge = data.get("challenge")
if challenge: return jsonify({"challenge": challenge})
print(f"\U0001f4ec 飞书事件: {data.get('header',{}).get('event_type')}")
return jsonify({"ok": True}), 200
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080)
import re
from typing import Dict, List, Tuple
class DataPipeline:
"""清洗 -> 转换 -> 增强"""
@staticmethod
def clean_html(text: str) -> str:
return re.sub(r'<[^>]+>', '', text)
@staticmethod
def truncate(text: str, max_len: int = 500) -> str:
return text[:max_len] + "..." if len(text) > max_len else text
@staticmethod
def extract_keywords(text: str, top_n: int = 5) -> List[Tuple[str, int]]:
words = re.findall(r'\b[a-zA-Z]{4,}\b', text.lower())
freq = {}
for w in words: freq[w] = freq.get(w, 0) + 1
return sorted(freq.items(), key=lambda x: x[1], reverse=True)[:top_n]
def process_rss_item(self, item: Dict) -> Dict:
return {"title": self.clean_html(item.get("title", "")),
"summary": self.truncate(self.clean_html(item.get("summary", "")), 300),
"link": item.get("link", ""),
"pub_date": item.get("pub_date"),
"keywords": self.extract_keywords(item.get("title","") + " " + item.get("summary",""))}
def batch_process(self, items: List[Dict]) -> List[Dict]:
return [self.process_rss_item(i) for i in items]
def to_context_string(self, items: List[Dict], max_items: int = 10) -> str:
lines = []
for i, item in enumerate(items[:max_items]):
lines.append(f"[{i+1}] {item.get('title')} ({item.get('pub_date','')[:10] or '?'})")
lines.append(f" 摘要: {item.get('summary','')[:150]}")
return "\\n".join(lines)
| 方式 | 适合 | 缺点 | 代码量 |
|---|---|---|---|
| SQLite | 结构化数据、搜索 | 并发写入受限 | 中等 |
| 文件系统 (JSON) | 简单持久化 | 无搜索能力 | 少 |
| 内存 (dict) | 临时缓存 | 重启丢失 | 极少 |
import json, os
from datetime import datetime
class FileStore:
"""简单的 JSON 文件存储"""
def __init__(self, dir_path: str = "data"):
self.dir_path = dir_path; os.makedirs(dir_path, exist_ok=True)
def save(self, key: str, data: list) -> str:
path = os.path.join(self.dir_path, f"{key}_{datetime.now().strftime('%Y%m%d')}.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return path
def load(self, key: str, date: str = None) -> list:
if date is None: date = datetime.now().strftime("%Y%m%d")
path = os.path.join(self.dir_path, f"{key}_{date}.json")
if not os.path.exists(path): return []
with open(path, "r", encoding="utf-8") as f: return json.load(f)
store = FileStore("news_cache")
store.save("hn_frontpage", [{"title": "GPT-5.6 released", "points": 1085}])
loaded = store.load("hn_frontpage")
print(f"\U0001f4c2 {len(loaded)} 条缓存")