示例中心

本页汇总接入主路径中最常用的示例命令与脚本组织方式,目标是减少手抄文档导致的细节错误。

Polling Echo Bot

适合第一次联调。你只需要 `FLY_BASE_URL` 和 `FLY_BOT_TOKEN`。以下命令假设你已将示例代码保存为 `hello_bot_polling.py`。

pip install requests
export FLY_BASE_URL="https://api.flycrytp.com"
export FLY_BOT_TOKEN="<bot_token>"
python hello_bot_polling.py
  • 示例做了最小的 update 消费循环。
  • 收到文本消息后,会回一条 echo: ...
  • 通过 client_msg_id=f"echo_{update_id}" 演示发送幂等键。
  • 如果你准备把它变成正式服务,请把 offset 持久化。
from __future__ import annotations

import os
import time
from typing import Any, Dict, Optional

import requests


def _env(name: str) -> str:
    return str(os.getenv(name, "")).strip()


BASE_URL = _env("FLY_BASE_URL").rstrip("/")
TOKEN = _env("FLY_BOT_TOKEN")

if not BASE_URL or not TOKEN:
    raise SystemExit("Missing env: FLY_BASE_URL / FLY_BOT_TOKEN")


def bot_post(path: str, payload: Dict[str, Any], *, timeout: int = 35) -> Dict[str, Any]:
    resp = requests.post(
        BASE_URL + path,
        headers={"Authorization": f"Bearer {TOKEN}"},
        json=payload,
        timeout=timeout,
    )
    resp.raise_for_status()
    out = resp.json()
    if not isinstance(out, dict):
        raise RuntimeError("unexpected response json")
    return out


def _get_updates(offset: int) -> Dict[str, Any]:
    return bot_post("/api/v1/s/bot/updates", {"offset": offset, "limit": 50, "timeout": 30}, timeout=40)


def _send_echo(*, to_user_id: Optional[str], group_id: Optional[str], content: str, client_msg_id: str) -> None:
    payload: Dict[str, Any] = {
        "msg_type": "text",
        "data": {"content": content},
        "client_msg_id": client_msg_id,
    }
    if group_id:
        payload["group_id"] = group_id
    else:
        payload["to_user_id"] = to_user_id
    bot_post("/api/v1/s/bot/send", payload)


def main() -> None:
    offset = 0
    while True:
        data = _get_updates(offset)
        updates = ((data.get("data") or {}) if isinstance(data.get("data"), dict) else {}).get("updates", [])
        if not isinstance(updates, list):
            updates = []

        for upd in updates:
            if not isinstance(upd, dict):
                continue

            update_id = int(upd.get("update_id") or 0)
            if update_id > 0:
                offset = max(offset, update_id + 1)

            if upd.get("type") != "message":
                continue

            msg = upd.get("message") or {}
            if not isinstance(msg, dict):
                continue

            chat = msg.get("chat") or {}
            if not isinstance(chat, dict):
                continue

            chat_type = str(chat.get("type") or "")
            chat_id = str(chat.get("id") or "")
            if not chat_type or not chat_id:
                continue

            text = ""
            if msg.get("msgType") == "text":
                msg_data = msg.get("data") or {}
                if isinstance(msg_data, dict):
                    text = str(msg_data.get("content") or "")

            echo = f"echo: {text}"
            if chat_type == "group":
                _send_echo(to_user_id=None, group_id=chat_id, content=echo, client_msg_id=f"echo_{update_id}")
            else:
                _send_echo(to_user_id=chat_id, group_id=None, content=echo, client_msg_id=f"echo_{update_id}")

        time.sleep(0.2)


if __name__ == "__main__":
    main()

Polling Echo Bot(Node.js)

如果你不用 Python,可以直接参考下面这份最小 Node 示例。依赖 Node 18+ 自带 fetch

const BASE_URL = (process.env.FLY_BASE_URL || "https://api.flycrytp.com").replace(/\/$/, "");
const TOKEN = process.env.FLY_BOT_TOKEN || "";

if (!BASE_URL || !TOKEN) {
  throw new Error("Missing FLY_BASE_URL or FLY_BOT_TOKEN");
}

let offset = 0;

async function botPost(path, payload, timeoutMs = 35000) {
  const controller = new AbortController();
  const timer = setTimeout(() => controller.abort(), timeoutMs);
  try {
    const resp = await fetch(BASE_URL + path, {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        "Authorization": `Bearer ${TOKEN}`,
      },
      body: JSON.stringify(payload),
      signal: controller.signal,
    });
    if (!resp.ok) {
      throw new Error(`HTTP ${resp.status}`);
    }
    return await resp.json();
  } finally {
    clearTimeout(timer);
  }
}

async function main() {
  while (true) {
    const data = await botPost("/api/v1/s/bot/updates", {
      offset,
      limit: 50,
      timeout: 30,
    }, 40000);

    const updates = (((data || {}).data || {}).updates || []);
    for (const upd of updates) {
      offset = Math.max(offset, Number(upd.update_id || 0) + 1);
      if (upd.type !== "message") continue;

      const message = upd.message || {};
      const chat = message.chat || {};
      const text = message.msgType === "text"
        ? (((message.data || {}).content) || "")
        : "";

      const payload = {
        msg_type: "text",
        data: { content: `echo: ${text}` },
        client_msg_id: `echo_${upd.update_id}`,
      };

      if (chat.type === "group") {
        payload.group_id = chat.id;
      } else {
        payload.to_user_id = chat.id;
      }

      await botPost("/api/v1/s/bot/send", payload);
    }

    await new Promise((resolve) => setTimeout(resolve, 200));
  }
}

main().catch((err) => {
  console.error(err);
  process.exit(1);
});

Webhook Echo Server

适合你已经有公网 HTTPS 或 HTTPS 隧道的场景。以下命令假设你已将示例代码保存为 `hello_bot_webhook_server.py`。

pip install requests
export FLY_BASE_URL="https://api.flycrytp.com"
export FLY_BOT_TOKEN="<bot_token>"
python hello_bot_webhook_server.py
  • 示例自带最小 HTTP Server。
  • 默认用 sha256(bot_token) 推导 webhook_secret
  • 示例中已包含签名验证、回调转发和最小 echo 逻辑。
  • 本地开发时应配合 HTTPS 隧道使用,不能直接暴露 HTTP 本地地址作为生产口径。
from __future__ import annotations

import hashlib
import hmac
import json
import os
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Any, Dict

import requests


def _env(name: str) -> str:
    return str(os.getenv(name, "")).strip()


def canonical_json(obj: Any) -> bytes:
    return json.dumps(obj, ensure_ascii=False, separators=(",", ":"), sort_keys=True).encode("utf-8")


def sha256_hex(data: bytes) -> str:
    return hashlib.sha256(data).hexdigest()


def build_signing_string(*, update_id: int, update_type: str, bot_id: str, timestamp: int, nonce: str, payload_sha256: str) -> str:
    return "\n".join([str(update_id), update_type or "", bot_id or "", str(timestamp), nonce or "", payload_sha256 or ""])


def verify_signed_update_v1(*, signed_update: Dict[str, Any], secret: str, now_ts: int, replay_window_sec: int = 300) -> str:
    sig = str(signed_update.get("signature") or "")
    timestamp = int(signed_update.get("timestamp") or 0)
    if abs(int(now_ts) - int(timestamp)) > int(replay_window_sec):
        raise ValueError("REPLAY_WINDOW_EXCEEDED")

    payload = signed_update.get("message") or signed_update.get("callback_query") or {}
    payload_sha = sha256_hex(canonical_json(payload))
    bot_id = str((signed_update.get("bot") or {}).get("id") or "")
    signing_string = build_signing_string(
        update_id=int(signed_update.get("update_id") or 0),
        update_type=str(signed_update.get("type") or ""),
        bot_id=bot_id,
        timestamp=timestamp,
        nonce=str(signed_update.get("nonce") or ""),
        payload_sha256=payload_sha,
    )
    expected = hmac.new(secret.encode("utf-8"), signing_string.encode("utf-8"), hashlib.sha256).hexdigest()
    if not hmac.compare_digest(expected, sig):
        raise ValueError("SIGNATURE_MISMATCH")
    return "OK"


BASE_URL = _env("FLY_BASE_URL").rstrip("/")
TOKEN = _env("FLY_BOT_TOKEN")
PORT = int(_env("FLY_WEBHOOK_PORT") or "8080")
PATH = _env("FLY_WEBHOOK_PATH") or "/webhook"
WEBHOOK_SECRET = _env("FLY_WEBHOOK_SECRET") or sha256_hex(TOKEN.encode("utf-8"))


def bot_post(path: str, payload: Dict[str, Any], *, timeout: int = 35) -> Dict[str, Any]:
    resp = requests.post(
        BASE_URL + path,
        headers={"Authorization": f"Bearer {TOKEN}"},
        json=payload,
        timeout=timeout,
    )
    resp.raise_for_status()
    return resp.json()


class WebhookHandler(BaseHTTPRequestHandler):
    def do_POST(self) -> None:  # noqa: N802
        if self.path != PATH:
            self.send_response(404)
            self.end_headers()
            return

        raw_len = int(self.headers.get("Content-Length", "0") or "0")
        body = self.rfile.read(raw_len)
        signed_update = json.loads(body.decode("utf-8"))
        verify_signed_update_v1(signed_update=signed_update, secret=WEBHOOK_SECRET, now_ts=int(time.time()))

        if signed_update.get("type") == "message":
            msg = signed_update.get("message") or {}
            chat = msg.get("chat") or {}
            text = str(((msg.get("data") or {}).get("content")) or "") if msg.get("msgType") == "text" else ""
            send_payload = {
                "msg_type": "text",
                "data": {"content": f"echo: {text}"},
                "client_msg_id": f"echo_{signed_update.get('update_id')}",
            }
            if chat.get("type") == "group":
                send_payload["group_id"] = chat.get("id")
            else:
                send_payload["to_user_id"] = chat.get("id")
            bot_post("/api/v1/s/bot/send", send_payload)

        self.send_response(200)
        self.end_headers()
        self.wfile.write(b"ok")


if __name__ == "__main__":
    HTTPServer(("0.0.0.0", PORT), WebhookHandler).serve_forever()

离线验签脚本

适合调试 Webhook 签名失败问题。以下命令假设你已将验签脚本保存为 `verify_webhook_signature_v1.py`。

export FLY_WEBHOOK_SECRET="<secret>"
cat signed_update.json | python verify_webhook_signature_v1.py
  • 可以脱离业务服务,单独验证某个回调样本是否符合 v1 签名规则。
  • 适合排查 canonical JSON、payload sha256、时间戳窗口等问题。
from __future__ import annotations

import hashlib
import hmac
import json
import os
import sys
import time
from typing import Any


def canonical_json(obj: Any) -> bytes:
    return json.dumps(obj, ensure_ascii=False, separators=(",", ":"), sort_keys=True).encode("utf-8")


def sha256_hex(data: bytes) -> str:
    return hashlib.sha256(data).hexdigest()


def build_signing_string(*, update_id: int, update_type: str, bot_id: str, timestamp: int, nonce: str, payload_sha256: str) -> str:
    return "\n".join([str(update_id), update_type or "", bot_id or "", str(timestamp), nonce or "", payload_sha256 or ""])


def main() -> None:
    secret = str(os.getenv("FLY_WEBHOOK_SECRET", "")).strip()
    if not secret:
        raise SystemExit("Missing env: FLY_WEBHOOK_SECRET")

    now_ts = int(str(os.getenv("FLY_NOW_TS", "")).strip() or int(time.time()))
    signed_update = json.loads(sys.stdin.read())

    payload = signed_update.get("message") or signed_update.get("callback_query") or {}
    payload_sha = sha256_hex(canonical_json(payload))
    signing_string = build_signing_string(
        update_id=int(signed_update.get("update_id") or 0),
        update_type=str(signed_update.get("type") or ""),
        bot_id=str((signed_update.get("bot") or {}).get("id") or ""),
        timestamp=int(signed_update.get("timestamp") or 0),
        nonce=str(signed_update.get("nonce") or ""),
        payload_sha256=payload_sha,
    )
    expected = hmac.new(secret.encode("utf-8"), signing_string.encode("utf-8"), hashlib.sha256).hexdigest()
    if abs(now_ts - int(signed_update.get("timestamp") or 0)) > 300:
        raise SystemExit("Verify failed: REPLAY_WINDOW_EXCEEDED")
    if not hmac.compare_digest(expected, str(signed_update.get("signature") or "")):
        raise SystemExit("Verify failed: SIGNATURE_MISMATCH")
    print("Verify OK")


if __name__ == "__main__":
    main()

媒体收发 Roundtrip(Polling + batch_access)

适合需要处理图片、音频或文件消息的 Bot。这个脚本演示“拉到媒体 Update -> 换取访问 URL -> 再回一条文本确认”的首轮媒体链路。

pip install requests
export FLY_BASE_URL="https://api.flycrytp.com"
export FLY_BOT_TOKEN="<bot_token>"
python media_roundtrip_polling_batch_access.py
  • 示例覆盖 s/bot/files/batch_access 的典型使用方式。
  • 如果你要做图片识别、OCR 或音视频处理,这个脚本比单纯 echo 更接近真实接入链路。
  • 生产环境里不要把换出来的短期访问 URL 长期缓存。
from __future__ import annotations

import os
import time
from pathlib import Path
from typing import Any, Dict, List, Tuple

import requests


BASE_URL = os.getenv("FLY_BASE_URL", "").rstrip("/")
TOKEN = os.getenv("FLY_BOT_TOKEN", "")

if not BASE_URL or not TOKEN:
    raise SystemExit("Missing FLY_BASE_URL or FLY_BOT_TOKEN")


def bot_post(path: str, payload: dict, *, timeout: int = 35) -> dict:
    resp = requests.post(
        BASE_URL + path,
        headers={"Authorization": f"Bearer {TOKEN}"},
        json=payload,
        timeout=timeout,
    )
    resp.raise_for_status()
    return resp.json()


def extract_media_urls(msg_type: str, data: dict) -> List[Tuple[str, str]]:
    out: List[Tuple[str, str]] = []
    if msg_type == "image":
        for key in ("originalUrl", "thumbnailUrl"):
            value = data.get(key)
            if isinstance(value, str) and value.strip():
                out.append((f"data.{key}", value.strip()))
    elif msg_type in ("voice", "file"):
        value = data.get("url")
        if isinstance(value, str) and value.strip():
            out.append(("data.url", value.strip()))
    elif msg_type == "video":
        for key in ("videoUrl", "thumbnailUrl"):
            value = data.get(key)
            if isinstance(value, str) and value.strip():
                out.append((f"data.{key}", value.strip()))
    return out


def download_to_tmp(access_url: str, *, prefix: str) -> str:
    tmp_dir = Path("/tmp/fly_bot_media")
    tmp_dir.mkdir(parents=True, exist_ok=True)
    out_path = tmp_dir / f"{prefix}_{int(time.time())}.bin"
    with requests.get(access_url, stream=True, timeout=35) as resp:
        resp.raise_for_status()
        with out_path.open("wb") as file_obj:
            for chunk in resp.iter_content(chunk_size=1024 * 128):
                if chunk:
                    file_obj.write(chunk)
    return str(out_path)


def main() -> None:
    offset = 0
    while True:
        upd_resp = bot_post("/api/v1/s/bot/updates", {"offset": offset, "limit": 50, "timeout": 30}, timeout=40)
        updates = (upd_resp.get("data") or {}).get("updates") or []

        for upd in updates:
            offset = max(offset, int(upd.get("update_id") or 0) + 1)
            if upd.get("type") != "message":
                continue

            msg = upd.get("message") or {}
            chat = msg.get("chat") or {}
            msg_type = str(msg.get("msgType") or "").strip()
            data = msg.get("data") or {}
            if msg_type not in ("image", "voice", "video", "file"):
                continue

            urls = extract_media_urls(msg_type, data)
            if not urls:
                continue

            access_resp = bot_post(
                "/api/v1/s/bot/files/batch_access",
                {"files": [{"url": value} for _, value in urls], "expire_seconds": 600},
            )
            url_map = ((access_resp.get("data") or {}).get("files") or {}) if isinstance(access_resp, dict) else {}
            first_file_url = urls[0][1]
            access_url = url_map.get(first_file_url)
            if isinstance(access_url, str) and access_url.strip():
                print(download_to_tmp(access_url.strip(), prefix=msg_type))
            else:
                continue

            send_payload: Dict[str, Any] = {"msg_type": msg_type, "data": data, "client_msg_id": f"fwd_{upd.get('update_id')}"}
            if chat.get("type") == "group":
                send_payload["group_id"] = chat.get("id")
            else:
                send_payload["to_user_id"] = chat.get("id")
            bot_post("/api/v1/s/bot/send", send_payload)

        time.sleep(0.2)


if __name__ == "__main__":
    main()

下一步你通常会自己加什么

  1. 把配置读取、日志和错误处理抽成单独模块。
  2. 给 Polling 或 Webhook 增加去重存储。
  3. 把文本 echo 换成你的 LLM 或业务逻辑。
  4. 在发送侧补上更稳定的 client_msg_id 生成规则。

建议的使用顺序

  1. 先跑 Polling Echo Bot,确保 token、事件和发送链路都通。
  2. 再跑 Webhook Echo Server,确认公网回调、签名和重放窗口都正常。
  3. 如果 Webhook 失败,先用离线验签脚本定位签名问题,再回到服务端代码。

使用这些示例前先确认

  • 示例命令中的文件名只是推荐的本地组织方式,不依赖仓库内部目录结构。
  • Polling 参考代码已在快速开始页给出完整脚本;Webhook 与验签请结合协议页的签名规则实现。
  • 如果你准备将示例纳入正式服务,请补充持久化 offset、去重存储、重试和结构化日志。