示例中心
本页汇总接入主路径中最常用的示例命令与脚本组织方式,目标是减少手抄文档导致的细节错误。
Polling Echo Bot
适合第一次联调。你只需要 `FLY_BASE_URL` 和 `FLY_BOT_TOKEN`。以下命令假设你已将示例代码保存为 `hello_bot_polling.py`。
pip install requests export FLY_BASE_URL="https://api.flycrytp.com" export FLY_BOT_TOKEN="<bot_token>" python hello_bot_polling.py
- 示例做了最小的 update 消费循环。
- 收到文本消息后,会回一条
echo: ...。 - 通过
client_msg_id=f"echo_{update_id}"演示发送幂等键。 - 如果你准备把它变成正式服务,请把
offset持久化。
from __future__ import annotations
import os
import time
from typing import Any, Dict, Optional
import requests
def _env(name: str) -> str:
return str(os.getenv(name, "")).strip()
BASE_URL = _env("FLY_BASE_URL").rstrip("/")
TOKEN = _env("FLY_BOT_TOKEN")
if not BASE_URL or not TOKEN:
raise SystemExit("Missing env: FLY_BASE_URL / FLY_BOT_TOKEN")
def bot_post(path: str, payload: Dict[str, Any], *, timeout: int = 35) -> Dict[str, Any]:
resp = requests.post(
BASE_URL + path,
headers={"Authorization": f"Bearer {TOKEN}"},
json=payload,
timeout=timeout,
)
resp.raise_for_status()
out = resp.json()
if not isinstance(out, dict):
raise RuntimeError("unexpected response json")
return out
def _get_updates(offset: int) -> Dict[str, Any]:
return bot_post("/api/v1/s/bot/updates", {"offset": offset, "limit": 50, "timeout": 30}, timeout=40)
def _send_echo(*, to_user_id: Optional[str], group_id: Optional[str], content: str, client_msg_id: str) -> None:
payload: Dict[str, Any] = {
"msg_type": "text",
"data": {"content": content},
"client_msg_id": client_msg_id,
}
if group_id:
payload["group_id"] = group_id
else:
payload["to_user_id"] = to_user_id
bot_post("/api/v1/s/bot/send", payload)
def main() -> None:
offset = 0
while True:
data = _get_updates(offset)
updates = ((data.get("data") or {}) if isinstance(data.get("data"), dict) else {}).get("updates", [])
if not isinstance(updates, list):
updates = []
for upd in updates:
if not isinstance(upd, dict):
continue
update_id = int(upd.get("update_id") or 0)
if update_id > 0:
offset = max(offset, update_id + 1)
if upd.get("type") != "message":
continue
msg = upd.get("message") or {}
if not isinstance(msg, dict):
continue
chat = msg.get("chat") or {}
if not isinstance(chat, dict):
continue
chat_type = str(chat.get("type") or "")
chat_id = str(chat.get("id") or "")
if not chat_type or not chat_id:
continue
text = ""
if msg.get("msgType") == "text":
msg_data = msg.get("data") or {}
if isinstance(msg_data, dict):
text = str(msg_data.get("content") or "")
echo = f"echo: {text}"
if chat_type == "group":
_send_echo(to_user_id=None, group_id=chat_id, content=echo, client_msg_id=f"echo_{update_id}")
else:
_send_echo(to_user_id=chat_id, group_id=None, content=echo, client_msg_id=f"echo_{update_id}")
time.sleep(0.2)
if __name__ == "__main__":
main()
Polling Echo Bot(Node.js)
如果你不用 Python,可以直接参考下面这份最小 Node 示例。依赖 Node 18+ 自带 fetch。
const BASE_URL = (process.env.FLY_BASE_URL || "https://api.flycrytp.com").replace(/\/$/, "");
const TOKEN = process.env.FLY_BOT_TOKEN || "";
if (!BASE_URL || !TOKEN) {
throw new Error("Missing FLY_BASE_URL or FLY_BOT_TOKEN");
}
let offset = 0;
async function botPost(path, payload, timeoutMs = 35000) {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), timeoutMs);
try {
const resp = await fetch(BASE_URL + path, {
method: "POST",
headers: {
"Content-Type": "application/json",
"Authorization": `Bearer ${TOKEN}`,
},
body: JSON.stringify(payload),
signal: controller.signal,
});
if (!resp.ok) {
throw new Error(`HTTP ${resp.status}`);
}
return await resp.json();
} finally {
clearTimeout(timer);
}
}
async function main() {
while (true) {
const data = await botPost("/api/v1/s/bot/updates", {
offset,
limit: 50,
timeout: 30,
}, 40000);
const updates = (((data || {}).data || {}).updates || []);
for (const upd of updates) {
offset = Math.max(offset, Number(upd.update_id || 0) + 1);
if (upd.type !== "message") continue;
const message = upd.message || {};
const chat = message.chat || {};
const text = message.msgType === "text"
? (((message.data || {}).content) || "")
: "";
const payload = {
msg_type: "text",
data: { content: `echo: ${text}` },
client_msg_id: `echo_${upd.update_id}`,
};
if (chat.type === "group") {
payload.group_id = chat.id;
} else {
payload.to_user_id = chat.id;
}
await botPost("/api/v1/s/bot/send", payload);
}
await new Promise((resolve) => setTimeout(resolve, 200));
}
}
main().catch((err) => {
console.error(err);
process.exit(1);
});
Webhook Echo Server
适合你已经有公网 HTTPS 或 HTTPS 隧道的场景。以下命令假设你已将示例代码保存为 `hello_bot_webhook_server.py`。
pip install requests export FLY_BASE_URL="https://api.flycrytp.com" export FLY_BOT_TOKEN="<bot_token>" python hello_bot_webhook_server.py
- 示例自带最小 HTTP Server。
- 默认用
sha256(bot_token)推导webhook_secret。 - 示例中已包含签名验证、回调转发和最小 echo 逻辑。
- 本地开发时应配合 HTTPS 隧道使用,不能直接暴露 HTTP 本地地址作为生产口径。
from __future__ import annotations
import hashlib
import hmac
import json
import os
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Any, Dict
import requests
def _env(name: str) -> str:
return str(os.getenv(name, "")).strip()
def canonical_json(obj: Any) -> bytes:
return json.dumps(obj, ensure_ascii=False, separators=(",", ":"), sort_keys=True).encode("utf-8")
def sha256_hex(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def build_signing_string(*, update_id: int, update_type: str, bot_id: str, timestamp: int, nonce: str, payload_sha256: str) -> str:
return "\n".join([str(update_id), update_type or "", bot_id or "", str(timestamp), nonce or "", payload_sha256 or ""])
def verify_signed_update_v1(*, signed_update: Dict[str, Any], secret: str, now_ts: int, replay_window_sec: int = 300) -> str:
sig = str(signed_update.get("signature") or "")
timestamp = int(signed_update.get("timestamp") or 0)
if abs(int(now_ts) - int(timestamp)) > int(replay_window_sec):
raise ValueError("REPLAY_WINDOW_EXCEEDED")
payload = signed_update.get("message") or signed_update.get("callback_query") or {}
payload_sha = sha256_hex(canonical_json(payload))
bot_id = str((signed_update.get("bot") or {}).get("id") or "")
signing_string = build_signing_string(
update_id=int(signed_update.get("update_id") or 0),
update_type=str(signed_update.get("type") or ""),
bot_id=bot_id,
timestamp=timestamp,
nonce=str(signed_update.get("nonce") or ""),
payload_sha256=payload_sha,
)
expected = hmac.new(secret.encode("utf-8"), signing_string.encode("utf-8"), hashlib.sha256).hexdigest()
if not hmac.compare_digest(expected, sig):
raise ValueError("SIGNATURE_MISMATCH")
return "OK"
BASE_URL = _env("FLY_BASE_URL").rstrip("/")
TOKEN = _env("FLY_BOT_TOKEN")
PORT = int(_env("FLY_WEBHOOK_PORT") or "8080")
PATH = _env("FLY_WEBHOOK_PATH") or "/webhook"
WEBHOOK_SECRET = _env("FLY_WEBHOOK_SECRET") or sha256_hex(TOKEN.encode("utf-8"))
def bot_post(path: str, payload: Dict[str, Any], *, timeout: int = 35) -> Dict[str, Any]:
resp = requests.post(
BASE_URL + path,
headers={"Authorization": f"Bearer {TOKEN}"},
json=payload,
timeout=timeout,
)
resp.raise_for_status()
return resp.json()
class WebhookHandler(BaseHTTPRequestHandler):
def do_POST(self) -> None: # noqa: N802
if self.path != PATH:
self.send_response(404)
self.end_headers()
return
raw_len = int(self.headers.get("Content-Length", "0") or "0")
body = self.rfile.read(raw_len)
signed_update = json.loads(body.decode("utf-8"))
verify_signed_update_v1(signed_update=signed_update, secret=WEBHOOK_SECRET, now_ts=int(time.time()))
if signed_update.get("type") == "message":
msg = signed_update.get("message") or {}
chat = msg.get("chat") or {}
text = str(((msg.get("data") or {}).get("content")) or "") if msg.get("msgType") == "text" else ""
send_payload = {
"msg_type": "text",
"data": {"content": f"echo: {text}"},
"client_msg_id": f"echo_{signed_update.get('update_id')}",
}
if chat.get("type") == "group":
send_payload["group_id"] = chat.get("id")
else:
send_payload["to_user_id"] = chat.get("id")
bot_post("/api/v1/s/bot/send", send_payload)
self.send_response(200)
self.end_headers()
self.wfile.write(b"ok")
if __name__ == "__main__":
HTTPServer(("0.0.0.0", PORT), WebhookHandler).serve_forever()
离线验签脚本
适合调试 Webhook 签名失败问题。以下命令假设你已将验签脚本保存为 `verify_webhook_signature_v1.py`。
export FLY_WEBHOOK_SECRET="<secret>" cat signed_update.json | python verify_webhook_signature_v1.py
- 可以脱离业务服务,单独验证某个回调样本是否符合 v1 签名规则。
- 适合排查 canonical JSON、payload sha256、时间戳窗口等问题。
from __future__ import annotations
import hashlib
import hmac
import json
import os
import sys
import time
from typing import Any
def canonical_json(obj: Any) -> bytes:
return json.dumps(obj, ensure_ascii=False, separators=(",", ":"), sort_keys=True).encode("utf-8")
def sha256_hex(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def build_signing_string(*, update_id: int, update_type: str, bot_id: str, timestamp: int, nonce: str, payload_sha256: str) -> str:
return "\n".join([str(update_id), update_type or "", bot_id or "", str(timestamp), nonce or "", payload_sha256 or ""])
def main() -> None:
secret = str(os.getenv("FLY_WEBHOOK_SECRET", "")).strip()
if not secret:
raise SystemExit("Missing env: FLY_WEBHOOK_SECRET")
now_ts = int(str(os.getenv("FLY_NOW_TS", "")).strip() or int(time.time()))
signed_update = json.loads(sys.stdin.read())
payload = signed_update.get("message") or signed_update.get("callback_query") or {}
payload_sha = sha256_hex(canonical_json(payload))
signing_string = build_signing_string(
update_id=int(signed_update.get("update_id") or 0),
update_type=str(signed_update.get("type") or ""),
bot_id=str((signed_update.get("bot") or {}).get("id") or ""),
timestamp=int(signed_update.get("timestamp") or 0),
nonce=str(signed_update.get("nonce") or ""),
payload_sha256=payload_sha,
)
expected = hmac.new(secret.encode("utf-8"), signing_string.encode("utf-8"), hashlib.sha256).hexdigest()
if abs(now_ts - int(signed_update.get("timestamp") or 0)) > 300:
raise SystemExit("Verify failed: REPLAY_WINDOW_EXCEEDED")
if not hmac.compare_digest(expected, str(signed_update.get("signature") or "")):
raise SystemExit("Verify failed: SIGNATURE_MISMATCH")
print("Verify OK")
if __name__ == "__main__":
main()
媒体收发 Roundtrip(Polling + batch_access)
适合需要处理图片、音频或文件消息的 Bot。这个脚本演示“拉到媒体 Update -> 换取访问 URL -> 再回一条文本确认”的首轮媒体链路。
pip install requests export FLY_BASE_URL="https://api.flycrytp.com" export FLY_BOT_TOKEN="<bot_token>" python media_roundtrip_polling_batch_access.py
- 示例覆盖
s/bot/files/batch_access的典型使用方式。 - 如果你要做图片识别、OCR 或音视频处理,这个脚本比单纯 echo 更接近真实接入链路。
- 生产环境里不要把换出来的短期访问 URL 长期缓存。
from __future__ import annotations
import os
import time
from pathlib import Path
from typing import Any, Dict, List, Tuple
import requests
BASE_URL = os.getenv("FLY_BASE_URL", "").rstrip("/")
TOKEN = os.getenv("FLY_BOT_TOKEN", "")
if not BASE_URL or not TOKEN:
raise SystemExit("Missing FLY_BASE_URL or FLY_BOT_TOKEN")
def bot_post(path: str, payload: dict, *, timeout: int = 35) -> dict:
resp = requests.post(
BASE_URL + path,
headers={"Authorization": f"Bearer {TOKEN}"},
json=payload,
timeout=timeout,
)
resp.raise_for_status()
return resp.json()
def extract_media_urls(msg_type: str, data: dict) -> List[Tuple[str, str]]:
out: List[Tuple[str, str]] = []
if msg_type == "image":
for key in ("originalUrl", "thumbnailUrl"):
value = data.get(key)
if isinstance(value, str) and value.strip():
out.append((f"data.{key}", value.strip()))
elif msg_type in ("voice", "file"):
value = data.get("url")
if isinstance(value, str) and value.strip():
out.append(("data.url", value.strip()))
elif msg_type == "video":
for key in ("videoUrl", "thumbnailUrl"):
value = data.get(key)
if isinstance(value, str) and value.strip():
out.append((f"data.{key}", value.strip()))
return out
def download_to_tmp(access_url: str, *, prefix: str) -> str:
tmp_dir = Path("/tmp/fly_bot_media")
tmp_dir.mkdir(parents=True, exist_ok=True)
out_path = tmp_dir / f"{prefix}_{int(time.time())}.bin"
with requests.get(access_url, stream=True, timeout=35) as resp:
resp.raise_for_status()
with out_path.open("wb") as file_obj:
for chunk in resp.iter_content(chunk_size=1024 * 128):
if chunk:
file_obj.write(chunk)
return str(out_path)
def main() -> None:
offset = 0
while True:
upd_resp = bot_post("/api/v1/s/bot/updates", {"offset": offset, "limit": 50, "timeout": 30}, timeout=40)
updates = (upd_resp.get("data") or {}).get("updates") or []
for upd in updates:
offset = max(offset, int(upd.get("update_id") or 0) + 1)
if upd.get("type") != "message":
continue
msg = upd.get("message") or {}
chat = msg.get("chat") or {}
msg_type = str(msg.get("msgType") or "").strip()
data = msg.get("data") or {}
if msg_type not in ("image", "voice", "video", "file"):
continue
urls = extract_media_urls(msg_type, data)
if not urls:
continue
access_resp = bot_post(
"/api/v1/s/bot/files/batch_access",
{"files": [{"url": value} for _, value in urls], "expire_seconds": 600},
)
url_map = ((access_resp.get("data") or {}).get("files") or {}) if isinstance(access_resp, dict) else {}
first_file_url = urls[0][1]
access_url = url_map.get(first_file_url)
if isinstance(access_url, str) and access_url.strip():
print(download_to_tmp(access_url.strip(), prefix=msg_type))
else:
continue
send_payload: Dict[str, Any] = {"msg_type": msg_type, "data": data, "client_msg_id": f"fwd_{upd.get('update_id')}"}
if chat.get("type") == "group":
send_payload["group_id"] = chat.get("id")
else:
send_payload["to_user_id"] = chat.get("id")
bot_post("/api/v1/s/bot/send", send_payload)
time.sleep(0.2)
if __name__ == "__main__":
main()
下一步你通常会自己加什么
- 把配置读取、日志和错误处理抽成单独模块。
- 给 Polling 或 Webhook 增加去重存储。
- 把文本 echo 换成你的 LLM 或业务逻辑。
- 在发送侧补上更稳定的
client_msg_id生成规则。
建议的使用顺序
- 先跑 Polling Echo Bot,确保 token、事件和发送链路都通。
- 再跑 Webhook Echo Server,确认公网回调、签名和重放窗口都正常。
- 如果 Webhook 失败,先用离线验签脚本定位签名问题,再回到服务端代码。
使用这些示例前先确认
- 示例命令中的文件名只是推荐的本地组织方式,不依赖仓库内部目录结构。
- Polling 参考代码已在快速开始页给出完整脚本;Webhook 与验签请结合协议页的签名规则实现。
- 如果你准备将示例纳入正式服务,请补充持久化 offset、去重存储、重试和结构化日志。