#!/usr/bin/env python3
# 巡检并恢复“OpenAI 账号实际可用、但后台残留 403 错误状态”的产线脚本。
#
# 设计原则：
# 1. 默认只做 dry-run，避免误操作；
# 2. 默认从仓库根目录 .env 读取 SUBLB_PROD_BASE_URL / SUBLB_PROD_ADMIN_API_KEY；
# 3. 候选账号只锁定为 openai + status=error + schedulable=true + error_message 以 Access forbidden (403): 开头；
# 4. 真正恢复时不直接 clear-error，而是调用真实 admin test，只有 test 成功才让系统自动清理 error；
# 5. 支持按 account_id 定点恢复，便于人工处理单个账号。

from __future__ import annotations

import argparse
import json
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Iterable

DEFAULT_PAGE_SIZE = 100
DEFAULT_MODEL_ID = "gpt-5.4"
DEFAULT_LAST_USED_WITHIN_HOURS = 168
DEFAULT_TIMEOUT_SECONDS = 60
DEFAULT_TEST_PROMPT = "Reply with OK only."
DEFAULT_RECOVER_POLL_ATTEMPTS = 6
DEFAULT_RECOVER_POLL_INTERVAL_SECONDS = 1.0
ERROR_PREFIX = "Access forbidden (403):"


@dataclass
class RecoverResult:
    account_id: int
    name: str
    action: str
    success: bool
    reason: str


def read_env_file(env_path: Path) -> dict[str, str]:
    env_map: dict[str, str] = {}
    if not env_path.exists():
        return env_map

    for raw in env_path.read_text(encoding="utf-8").splitlines():
        line = raw.strip()
        if not line or line.startswith("#") or "=" not in line:
            continue
        key, value = line.split("=", 1)
        env_map[key.strip()] = value.strip()
    return env_map


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="巡检并恢复产线 OpenAI 403 stale error 账号（默认 dry-run）。"
    )
    parser.add_argument("--base-url", help="管理员接口 base URL，默认读 .env 的 SUBLB_PROD_BASE_URL")
    parser.add_argument("--admin-key", help="管理员 API Key，默认读 .env 的 SUBLB_PROD_ADMIN_API_KEY")
    parser.add_argument("--env-file", default=".env", help="环境变量文件路径，默认 .env")
    parser.add_argument("--page-size", type=int, default=DEFAULT_PAGE_SIZE, help="分页大小，默认 100")
    parser.add_argument("--model-id", default=DEFAULT_MODEL_ID, help=f"测试模型，默认 {DEFAULT_MODEL_ID}")
    parser.add_argument(
        "--last-used-within-hours",
        type=int,
        default=DEFAULT_LAST_USED_WITHIN_HOURS,
        help="仅处理最近使用时间在多少小时内的账号；传 0 表示不按最近使用筛选，默认 168",
    )
    parser.add_argument("--limit", type=int, default=0, help="最多处理多少个候选账号，0 表示不限制")
    parser.add_argument(
        "--account-id",
        action="append",
        type=int,
        default=[],
        help="只处理指定账号 ID；可重复传多次",
    )
    parser.add_argument("--apply", action="store_true", help="真正执行恢复；默认仅 dry-run")
    parser.add_argument(
        "--sleep-seconds",
        type=float,
        default=0.0,
        help="每个账号之间额外 sleep 秒数，默认 0",
    )
    parser.add_argument(
        "--timeout-seconds",
        type=int,
        default=DEFAULT_TIMEOUT_SECONDS,
        help=f"HTTP 超时时间，默认 {DEFAULT_TIMEOUT_SECONDS} 秒",
    )
    parser.add_argument(
        "--prompt",
        default=DEFAULT_TEST_PROMPT,
        help="admin test 的测试提示词，默认要求只回复 OK",
    )
    return parser.parse_args()


def build_headers(admin_key: str) -> dict[str, str]:
    return {
        "x-api-key": admin_key,
        "accept": "application/json",
        "user-agent": "sublb-stale-openai-403-recover/20260418",
    }


def request_json(url: str, *, headers: dict[str, str], timeout: int) -> dict[str, Any]:
    req = urllib.request.Request(url, headers=headers)
    with urllib.request.urlopen(req, timeout=timeout) as resp:
        return json.load(resp)


def post_stream(url: str, *, headers: dict[str, str], body: dict[str, Any], timeout: int) -> Iterable[dict[str, Any]]:
    payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
    req = urllib.request.Request(
        url,
        data=payload,
        method="POST",
        headers={
            **headers,
            "content-type": "application/json",
            "accept": "text/event-stream",
        },
    )
    with urllib.request.urlopen(req, timeout=timeout) as resp:
        for raw in resp:
            line = raw.decode("utf-8", errors="replace").strip()
            if not line.startswith("data: "):
                continue
            data = line[6:].strip()
            if not data:
                continue
            try:
                yield json.loads(data)
            except json.JSONDecodeError:
                yield {"type": "parse_error", "raw": data}


def parse_api_items(payload: dict[str, Any]) -> list[dict[str, Any]]:
    data = payload.get("data") or {}
    return list(data.get("items") or [])


def parse_api_total(payload: dict[str, Any]) -> int:
    data = payload.get("data") or {}
    return int(data.get("total") or 0)


def parse_iso_datetime(value: str | None) -> datetime | None:
    if not value:
        return None
    text = value.strip()
    if not text:
        return None
    if text.endswith("Z"):
        text = text[:-1] + "+00:00"
    try:
        return datetime.fromisoformat(text)
    except ValueError:
        return None


def list_accounts(*, base_url: str, admin_key: str, page_size: int, timeout: int) -> list[dict[str, Any]]:
    headers = build_headers(admin_key)
    items_all: list[dict[str, Any]] = []
    page = 1

    while True:
        query = urllib.parse.urlencode({"page": page, "page_size": page_size, "status": "error"})
        url = f"{base_url.rstrip('/')}/api/v1/admin/accounts?{query}"
        payload = request_json(url, headers=headers, timeout=timeout)
        items = parse_api_items(payload)
        total = parse_api_total(payload)
        items_all.extend(items)
        print(f"[信息] 已拉取账号页 page={page} items={len(items)} total={total}")

        if not items or len(items) < page_size or len(items_all) >= total:
            break
        page += 1

    return items_all


def should_consider_account(account: dict[str, Any], *, allowed_ids: set[int], last_used_cutoff: datetime | None) -> tuple[bool, str]:
    account_id = int(account.get("id") or 0)
    if allowed_ids and account_id not in allowed_ids:
        return False, "不在指定 account_id 范围"
    if str(account.get("platform") or "").strip().lower() != "openai":
        return False, "不是 openai 平台"
    if str(account.get("status") or "").strip().lower() != "error":
        return False, "状态不是 error"
    if account.get("schedulable") is not True:
        return False, "schedulable 不是 true"
    error_message = str(account.get("error_message") or "")
    if not error_message.startswith(ERROR_PREFIX):
        return False, "error_message 不是 403 stale error 形态"
    if last_used_cutoff is not None:
        last_used_at = parse_iso_datetime(str(account.get("last_used_at") or ""))
        if last_used_at is None:
            return False, "没有最近使用时间，且当前启用了最近使用筛选"
        if last_used_at.tzinfo is None:
            last_used_at = last_used_at.replace(tzinfo=timezone.utc)
        if last_used_at < last_used_cutoff:
            return False, "最近使用时间超出筛选窗口"
    return True, "命中候选条件"


def fetch_account_detail(*, base_url: str, admin_key: str, account_id: int, timeout: int) -> dict[str, Any]:
    headers = build_headers(admin_key)
    url = f"{base_url.rstrip('/')}/api/v1/admin/accounts/{account_id}"
    payload = request_json(url, headers=headers, timeout=timeout)
    return payload.get("data") or {}


def wait_for_account_recovered(
    *,
    base_url: str,
    admin_key: str,
    account_id: int,
    timeout: int,
    attempts: int = DEFAULT_RECOVER_POLL_ATTEMPTS,
    interval_seconds: float = DEFAULT_RECOVER_POLL_INTERVAL_SECONDS,
) -> dict[str, Any]:
    last_detail: dict[str, Any] = {}
    for attempt in range(1, max(1, attempts) + 1):
        detail = fetch_account_detail(
            base_url=base_url,
            admin_key=admin_key,
            account_id=account_id,
            timeout=timeout,
        )
        last_detail = detail
        status = str(detail.get("status") or "").strip().lower()
        error_message = str(detail.get("error_message") or "").strip()
        if status == "active" and not error_message:
            return detail
        if attempt < attempts:
            time.sleep(interval_seconds)
    return last_detail


def run_account_test(
    *,
    base_url: str,
    admin_key: str,
    account_id: int,
    model_id: str,
    prompt: str,
    timeout: int,
) -> tuple[bool, str, list[dict[str, Any]]]:
    headers = build_headers(admin_key)
    url = f"{base_url.rstrip('/')}/api/v1/admin/accounts/{account_id}/test"
    events: list[dict[str, Any]] = []
    last_reason = "未收到 test_complete 事件"

    try:
        for event in post_stream(
            url,
            headers=headers,
            body={"model_id": model_id, "prompt": prompt},
            timeout=timeout,
        ):
            events.append(event)
            event_type = str(event.get("type") or "")
            if event_type == "test_complete":
                success = bool(event.get("success"))
                reason = str(event.get("error") or ("test 成功" if success else "test_complete=false"))
                return success, reason, events
            if event_type == "error":
                last_reason = str(event.get("error") or "收到 error 事件")
        return False, last_reason, events
    except urllib.error.HTTPError as exc:
        try:
            body = exc.read().decode("utf-8", errors="replace")
        except Exception:
            body = ""
        message = f"HTTP {exc.code}"
        if body:
            message = f"{message}: {body[:300]}"
        return False, message, events
    except Exception as exc:
        return False, f"请求异常: {exc}", events


def print_candidate_table(candidates: list[dict[str, Any]]) -> None:
    if not candidates:
        print("[信息] 当前没有命中候选账号。")
        return
    print("[信息] 候选账号如下：")
    for account in candidates:
        print(
            "- id={id} name={name} status={status} schedulable={schedulable} last_used_at={last_used_at} error={error}".format(
                id=account.get("id"),
                name=account.get("name") or "",
                status=account.get("status") or "",
                schedulable=account.get("schedulable"),
                last_used_at=account.get("last_used_at") or "",
                error=(account.get("error_message") or "")[:160],
            )
        )


def main() -> int:
    args = parse_args()
    env_map = read_env_file(Path(args.env_file))
    base_url = args.base_url or env_map.get("SUBLB_PROD_BASE_URL", "").strip()
    admin_key = args.admin_key or env_map.get("SUBLB_PROD_ADMIN_API_KEY", "").strip()

    if not base_url:
        print("[错误] 缺少 base_url，请传 --base-url 或在 .env 中设置 SUBLB_PROD_BASE_URL", file=sys.stderr)
        return 2
    if not admin_key:
        print("[错误] 缺少 admin_key，请传 --admin-key 或在 .env 中设置 SUBLB_PROD_ADMIN_API_KEY", file=sys.stderr)
        return 2

    print(f"[信息] 运行模式：{'APPLY' if args.apply else 'DRY-RUN'}")
    print(f"[信息] 目标 base_url：{base_url}")
    print(f"[信息] 测试模型：{args.model_id}")

    if args.last_used_within_hours > 0:
        last_used_cutoff = datetime.now(timezone.utc) - timedelta(hours=args.last_used_within_hours)
        print(f"[信息] 最近使用时间筛选：仅处理 {args.last_used_within_hours} 小时内用过的账号")
    else:
        last_used_cutoff = None
        print("[信息] 最近使用时间筛选：已关闭")

    accounts = list_accounts(
        base_url=base_url,
        admin_key=admin_key,
        page_size=max(1, min(args.page_size, 100)),
        timeout=args.timeout_seconds,
    )

    allowed_ids = {int(v) for v in args.account_id if int(v) > 0}
    candidates: list[dict[str, Any]] = []
    skipped = 0
    for account in accounts:
        ok, _reason = should_consider_account(
            account,
            allowed_ids=allowed_ids,
            last_used_cutoff=last_used_cutoff,
        )
        if not ok:
            skipped += 1
            continue
        candidates.append(account)
        if args.limit > 0 and len(candidates) >= args.limit:
            break

    print(f"[信息] 总账号数：{len(accounts)}，跳过：{skipped}，命中候选：{len(candidates)}")
    print_candidate_table(candidates)

    if not args.apply:
        print("[信息] 当前为 dry-run，未执行任何恢复动作。若要真正恢复，请追加 --apply。")
        return 0

    results: list[RecoverResult] = []
    for index, account in enumerate(candidates, start=1):
        account_id = int(account.get("id") or 0)
        name = str(account.get("name") or "")
        print(f"[执行] ({index}/{len(candidates)}) 开始 test 账号 id={account_id} name={name}")
        success, reason, _events = run_account_test(
            base_url=base_url,
            admin_key=admin_key,
            account_id=account_id,
            model_id=args.model_id,
            prompt=args.prompt,
            timeout=args.timeout_seconds,
        )
        if not success:
            print(f"[失败] id={account_id} name={name} reason={reason}")
            results.append(RecoverResult(account_id=account_id, name=name, action="test", success=False, reason=reason))
            if args.sleep_seconds > 0:
                time.sleep(args.sleep_seconds)
            continue

        detail = wait_for_account_recovered(
            base_url=base_url,
            admin_key=admin_key,
            account_id=account_id,
            timeout=args.timeout_seconds,
        )
        status = str(detail.get("status") or "")
        error_message = str(detail.get("error_message") or "")
        if status.lower() == "active" and not error_message.strip():
            result_reason = "test 成功，账号已恢复 active"
            print(f"[成功] id={account_id} name={name} {result_reason}")
            results.append(RecoverResult(account_id=account_id, name=name, action="recover", success=True, reason=result_reason))
        else:
            result_reason = f"test 成功，但状态未完全恢复：status={status} error_message={error_message[:160]}"
            print(f"[警告] id={account_id} name={name} {result_reason}")
            results.append(RecoverResult(account_id=account_id, name=name, action="recover", success=False, reason=result_reason))

        if args.sleep_seconds > 0:
            time.sleep(args.sleep_seconds)

    ok_count = sum(1 for item in results if item.success)
    fail_count = len(results) - ok_count
    print("[汇总] 恢复完成：")
    print(f"- 成功：{ok_count}")
    print(f"- 失败：{fail_count}")
    for item in results:
        print(
            f"  - id={item.account_id} name={item.name} success={item.success} action={item.action} reason={item.reason}"
        )

    return 0 if fail_count == 0 else 1


if __name__ == "__main__":
    raise SystemExit(main())
