#!/usr/bin/env bash

# request_audit_logs 低影响轮转备份脚本。
# 用途：
# 1. 避免直接 TRUNCATE 持续写入的 request_audit_logs，降低对产线写入的影响
# 2. 先把 live 表切到新空表，再导出旧归档表
# 3. 压缩、校验、生成摘要后删除旧归档表，真正释放 PostgreSQL 空间
# 4. 适合通过 cron / systemd timer 每 5 天执行一次
#
# 依赖：
# - psql
# - python3（用于按 CSV 标准解析真实记录数，避免多行字段导致误计数）
# - zstd 或 gzip（二选一；默认优先 zstd）
# - sha256sum 或 shasum
#
# 连接方式：
# - 通过标准 PostgreSQL 环境变量连接：PGHOST / PGPORT / PGUSER / PGDATABASE / PGPASSWORD
# - 或依赖 .pgpass / 当前 shell 已注入的连接信息

set -Eeuo pipefail

umask 077

SCRIPT_NAME="$(basename "$0")"
SCHEMA_NAME="${SCHEMA_NAME:-public}"
TABLE_NAME="${TABLE_NAME:-request_audit_logs}"
BACKUP_ROOT="${BACKUP_ROOT:-/home/admin/backups/sub2api/request_audit_logs}"
LOCK_FILE="${LOCK_FILE:-/tmp/request_audit_logs_rotate_backup.lock}"
LOCK_TIMEOUT="${LOCK_TIMEOUT:-2s}"
COMPRESSOR="${COMPRESSOR:-auto}"          # auto | zstd | gzip
ZSTD_LEVEL="${ZSTD_LEVEL:-19}"
HOST_LABEL="${HOST_LABEL:-$(hostname -s 2>/dev/null || hostname)}"
SERVICE_LABEL="${SERVICE_LABEL:-sub2api}"
MIN_INTERVAL_DAYS="${MIN_INTERVAL_DAYS:-5}"
ENABLE_EMAIL_NOTIFY="${ENABLE_EMAIL_NOTIFY:-1}"
NOTIFY_EMAIL_TO="${NOTIFY_EMAIL_TO:-zhangke_2021@126.com}"
NOTIFY_EMAIL_FROM="${NOTIFY_EMAIL_FROM:-noreply@mail.aihang365.com}"
NOTIFY_EMAIL_API_URL="${NOTIFY_EMAIL_API_URL:-https://api.resend.com/emails}"
NOTIFY_EMAIL_SUBJECT_PREFIX="${NOTIFY_EMAIL_SUBJECT_PREFIX:-[SubLB Backup]}"
DRY_RUN=0
FORCE_RUN=0

ARCHIVE_TABLE=""
BACKUP_FILE=""
SUMMARY_FILE=""
STATE_DIR=""
STATE_FILE=""
ARTIFACTS_FINALIZED=0

log() {
  printf '[%s] %s\n' "$(date '+%Y-%m-%d %H:%M:%S %z')" "$*"
}

fail() {
  log "失败：$*"
  exit 1
}

usage() {
  cat <<'EOF'
用法：
  ops/request_audit_logs_rotate_backup.sh [--dry-run] [--force] [--help]

说明：
  该脚本用于对持续写入的 request_audit_logs 做低影响轮转备份：
  1. 把当前 live 表改名为归档表
  2. 立即创建新的空 live 表接管写入
  3. 导出旧归档表到压缩文件
  4. 校验压缩包与真实记录数
  5. 删除旧归档表，释放数据库空间

关键环境变量：
  SCHEMA_NAME     默认 public
  TABLE_NAME      默认 request_audit_logs
  BACKUP_ROOT     默认 /home/admin/backups/sub2api/request_audit_logs
  LOCK_TIMEOUT    默认 2s
  COMPRESSOR      auto | zstd | gzip，默认 auto（优先 zstd）
  ZSTD_LEVEL      默认 19
  HOST_LABEL      摘要里记录的机器名，默认当前 hostname
  SERVICE_LABEL   摘要里记录的服务名，默认 sub2api
  MIN_INTERVAL_DAYS 默认 5。配合“每天 12 点定时触发”时，可确保至少间隔 5 天才真正轮转一次
  ENABLE_EMAIL_NOTIFY 默认 1。备份成功后发邮件
  NOTIFY_EMAIL_TO 默认 zhangke_2021@126.com
  NOTIFY_EMAIL_FROM 默认 noreply@mail.aihang365.com
  RESEND_API_KEY  当 ENABLE_EMAIL_NOTIFY=1 时必填

数据库连接：
  通过 PGHOST / PGPORT / PGUSER / PGDATABASE / PGPASSWORD 或 .pgpass 提供。

示例：
  export PGHOST=127.0.0.1
  export PGPORT=5432
  export PGUSER=sub2api
  export PGDATABASE=sub2api
  export PGPASSWORD='***'
  ops/request_audit_logs_rotate_backup.sh

只看计划不落库：
  ops/request_audit_logs_rotate_backup.sh --dry-run

忽略“五天间隔”强制执行一次：
  ops/request_audit_logs_rotate_backup.sh --force
EOF
}

on_error() {
  local line_no="$1"
  log "脚本在第 ${line_no} 行失败。"

  if [ "${ARTIFACTS_FINALIZED}" != "1" ] && [ -n "${BACKUP_FILE:-}" ] && [ -f "${BACKUP_FILE:-}" ]; then
    local trash_dir="${BACKUP_ROOT}/.trash"
    mkdir -p "$trash_dir"
    local failed_name
    failed_name="$(basename "$BACKUP_FILE").failed_$(date +%Y%m%d_%H%M%S)"
    mv "$BACKUP_FILE" "${trash_dir}/${failed_name}"
    log "已将失败的备份文件移到：${trash_dir}/${failed_name}"
  fi

  if [ "${ARTIFACTS_FINALIZED}" != "1" ] && [ -n "${SUMMARY_FILE:-}" ] && [ -f "${SUMMARY_FILE:-}" ]; then
    local trash_dir="${BACKUP_ROOT}/.trash"
    mkdir -p "$trash_dir"
    local failed_summary
    failed_summary="$(basename "$SUMMARY_FILE").failed_$(date +%Y%m%d_%H%M%S)"
    mv "$SUMMARY_FILE" "${trash_dir}/${failed_summary}"
    log "已将失败的摘要文件移到：${trash_dir}/${failed_summary}"
  fi
}

trap 'on_error $LINENO' ERR

while [ "$#" -gt 0 ]; do
  case "$1" in
    --dry-run)
      DRY_RUN=1
      shift
      ;;
    --help|-h)
      usage
      exit 0
      ;;
    --force)
      FORCE_RUN=1
      shift
      ;;
    *)
      fail "未知参数：$1。可用参数：--dry-run, --force, --help"
      ;;
  esac
done

validate_identifier() {
  local value="$1"
  local label="$2"
  if [[ ! "$value" =~ ^[a-zA-Z_][a-zA-Z0-9_]*$ ]]; then
    fail "${label} 非法：${value}"
  fi
}

require_cmd() {
  local cmd="$1"
  command -v "$cmd" >/dev/null 2>&1 || fail "缺少依赖命令：${cmd}"
}

pg_scalar() {
  local sql="$1"
  psql -X -v ON_ERROR_STOP=1 -qAt -c "$sql"
}

pg_exec() {
  local sql="$1"
  psql -X -v ON_ERROR_STOP=1 -q -c "$sql"
}

csv_row_count_from_file() {
  local file_path="$1"
  local format="$2"

  case "$format" in
    zst)
      zstd -dc -- "$file_path" | python3 - <<'PY'
import csv
import sys

reader = csv.reader(sys.stdin)
next(reader, None)
rows = 0
for _ in reader:
    rows += 1
print(rows)
PY
      ;;
    gz)
      gzip -dc -- "$file_path" | python3 - <<'PY'
import csv
import sys

reader = csv.reader(sys.stdin)
next(reader, None)
rows = 0
for _ in reader:
    rows += 1
print(rows)
PY
      ;;
    *)
      fail "不支持的压缩格式：${format}"
      ;;
  esac
}

sha256_file() {
  local file_path="$1"
  if command -v sha256sum >/dev/null 2>&1; then
    sha256sum "$file_path" | awk '{print $1}'
    return 0
  fi

  if command -v shasum >/dev/null 2>&1; then
    shasum -a 256 "$file_path" | awk '{print $1}'
    return 0
  fi

  fail "找不到 sha256sum 或 shasum"
}

file_size_bytes() {
  local file_path="$1"
  wc -c < "$file_path" | tr -d ' '
}

acquire_lock() {
  if ! command -v flock >/dev/null 2>&1; then
    log "警告：当前机器没有 flock，将不做进程级互斥。"
    return 0
  fi

  exec 9>"$LOCK_FILE"
  if ! flock -n 9; then
    fail "已有另一个 ${SCRIPT_NAME} 在运行，锁文件：${LOCK_FILE}"
  fi
}

check_min_interval_gate() {
  local now_epoch interval_seconds last_epoch elapsed

  if [ "$FORCE_RUN" -eq 1 ]; then
    log "已指定 --force，跳过最小间隔限制。"
    return 0
  fi

  if [ "${MIN_INTERVAL_DAYS}" = "0" ]; then
    return 0
  fi

  if [[ ! "${MIN_INTERVAL_DAYS}" =~ ^[0-9]+$ ]]; then
    fail "MIN_INTERVAL_DAYS 必须是非负整数，当前：${MIN_INTERVAL_DAYS}"
  fi

  if [ ! -f "$STATE_FILE" ]; then
    log "当前没有成功执行状态文件，允许本次执行。"
    return 0
  fi

  last_epoch="$(tr -d '[:space:]' < "$STATE_FILE")"
  if [[ ! "$last_epoch" =~ ^[0-9]+$ ]]; then
    fail "状态文件格式非法：${STATE_FILE}"
  fi

  now_epoch="$(date +%s)"
  interval_seconds=$(( MIN_INTERVAL_DAYS * 24 * 60 * 60 ))
  elapsed=$(( now_epoch - last_epoch ))

  if [ "$elapsed" -lt "$interval_seconds" ]; then
    log "距离上次成功轮转仅过去 ${elapsed} 秒，未达到 ${MIN_INTERVAL_DAYS} 天；本次跳过。"
    exit 0
  fi
}

choose_compressor() {
  case "$COMPRESSOR" in
    auto)
      if command -v zstd >/dev/null 2>&1; then
        printf 'zst'
      elif command -v gzip >/dev/null 2>&1; then
        printf 'gz'
      else
        fail "当前机器既没有 zstd 也没有 gzip"
      fi
      ;;
    zstd)
      require_cmd zstd
      printf 'zst'
      ;;
    gzip)
      require_cmd gzip
      printf 'gz'
      ;;
    *)
      fail "COMPRESSOR 只允许 auto / zstd / gzip，当前：${COMPRESSOR}"
      ;;
  esac
}

send_success_email() {
  local subject body mail_response

  [ "${ENABLE_EMAIL_NOTIFY}" = "1" ] || {
    log "已关闭邮件通知，跳过发信。"
    return 0
  }

  require_cmd curl
  [ -n "${RESEND_API_KEY:-}" ] || fail "ENABLE_EMAIL_NOTIFY=1 时必须提供 RESEND_API_KEY"

  subject="${NOTIFY_EMAIL_SUBJECT_PREFIX} ${HOST_LABEL}/${SERVICE_LABEL} request_audit_logs 轮转成功 ${TIMESTAMP}"
  body="$(cat <<EOF
request_audit_logs 轮转备份已完成。

机器：${HOST_LABEL}
服务：${SERVICE_LABEL}
执行时间：$(date '+%Y-%m-%d %H:%M:%S %z')
归档表：${ARCHIVE_TABLE}
备份文件：${BACKUP_FILE}
压缩格式：${COMPRESS_FORMAT}
备份大小（bytes）：${BACKUP_SIZE_BYTES}
sha256：${BACKUP_SHA256}
归档真实行数：${ARCHIVE_ROWS}
文件真实行数：${FILE_ROWS}
轮转前 live 行数：${LIVE_ROWS_BEFORE}
删除归档后 live 行数：${LIVE_ROWS_AFTER}
删除归档后 live 最新时间：${LIVE_LATEST_CREATED_AT_AFTER}
摘要文件：${SUMMARY_FILE}
EOF
)"

  mail_response="$(
    NOTIFY_EMAIL_TO="${NOTIFY_EMAIL_TO}" \
    NOTIFY_EMAIL_FROM="${NOTIFY_EMAIL_FROM}" \
    MAIL_SUBJECT="${subject}" \
    MAIL_TEXT="${body}" \
    python3 - <<'PY' | curl -fsS \
      -X POST "${NOTIFY_EMAIL_API_URL}" \
      -H "Authorization: Bearer ${RESEND_API_KEY}" \
      -H "Content-Type: application/json" \
      --data-binary @-
import json
import os

payload = {
    "from": os.environ["NOTIFY_EMAIL_FROM"],
    "to": [os.environ["NOTIFY_EMAIL_TO"]],
    "subject": os.environ["MAIL_SUBJECT"],
    "text": os.environ["MAIL_TEXT"],
}
print(json.dumps(payload, ensure_ascii=False))
PY
  )"

  {
    printf 'notify_email_to=%s\n' "$NOTIFY_EMAIL_TO"
    printf 'notify_email_from=%s\n' "$NOTIFY_EMAIL_FROM"
    printf 'notify_email_status=sent\n'
    printf 'notify_email_response=%s\n' "$mail_response"
  } >> "$SUMMARY_FILE"

  log "邮件通知已发送：to=${NOTIFY_EMAIL_TO}"
}

validate_identifier "$SCHEMA_NAME" "SCHEMA_NAME"
validate_identifier "$TABLE_NAME" "TABLE_NAME"
require_cmd psql
require_cmd python3
acquire_lock

COMPRESS_FORMAT="$(choose_compressor)"
TIMESTAMP="$(date +%Y%m%d_%H%M%S)"
ARCHIVE_TABLE="${TABLE_NAME}_archived_${TIMESTAMP}"
mkdir -p "$BACKUP_ROOT"
STATE_DIR="${BACKUP_ROOT}/.state"
STATE_FILE="${STATE_DIR}/${TABLE_NAME}_last_success_epoch"
mkdir -p "$STATE_DIR"

if [ "$COMPRESS_FORMAT" = "zst" ]; then
  BACKUP_FILE="${BACKUP_ROOT}/${ARCHIVE_TABLE}.csv.zst"
else
  BACKUP_FILE="${BACKUP_ROOT}/${ARCHIVE_TABLE}.csv.gz"
fi
SUMMARY_FILE="${BACKUP_ROOT}/${ARCHIVE_TABLE}.summary.txt"

log "开始执行 ${TABLE_NAME} 低影响轮转备份。"
log "机器=${HOST_LABEL} 服务=${SERVICE_LABEL} schema=${SCHEMA_NAME} table=${TABLE_NAME}"
log "备份目录=${BACKUP_ROOT} 压缩格式=${COMPRESS_FORMAT}"
log "最小执行间隔=${MIN_INTERVAL_DAYS} 天 邮件通知=${ENABLE_EMAIL_NOTIFY}"

pg_exec "SELECT 1;"
check_min_interval_gate

LIVE_EXISTS="$(pg_scalar "SELECT to_regclass('${SCHEMA_NAME}.${TABLE_NAME}') IS NOT NULL;")"
[ "$LIVE_EXISTS" = "t" ] || fail "目标表不存在：${SCHEMA_NAME}.${TABLE_NAME}"

ARCHIVE_EXISTS="$(pg_scalar "SELECT to_regclass('${SCHEMA_NAME}.${ARCHIVE_TABLE}') IS NOT NULL;")"
[ "$ARCHIVE_EXISTS" = "f" ] || fail "归档表名已存在，拒绝覆盖：${SCHEMA_NAME}.${ARCHIVE_TABLE}"

LIVE_ROWS_BEFORE="$(pg_scalar "SELECT COUNT(*) FROM ${SCHEMA_NAME}.${TABLE_NAME};")"
LIVE_SIZE_BEFORE="$(pg_scalar "SELECT pg_size_pretty(pg_total_relation_size('${SCHEMA_NAME}.${TABLE_NAME}'::regclass));")"
LIVE_MIN_CREATED_AT="$(pg_scalar "SELECT COALESCE(to_char(MIN(created_at), 'YYYY-MM-DD HH24:MI:SS TZ'), 'NULL') FROM ${SCHEMA_NAME}.${TABLE_NAME};")"
LIVE_MAX_CREATED_AT="$(pg_scalar "SELECT COALESCE(to_char(MAX(created_at), 'YYYY-MM-DD HH24:MI:SS TZ'), 'NULL') FROM ${SCHEMA_NAME}.${TABLE_NAME};")"

log "轮转前：rows=${LIVE_ROWS_BEFORE} total_size=${LIVE_SIZE_BEFORE} min_created_at=${LIVE_MIN_CREATED_AT} max_created_at=${LIVE_MAX_CREATED_AT}"

if [ "$LIVE_ROWS_BEFORE" = "0" ]; then
  log "当前 live 表没有任何数据，跳过轮转。"
  exit 0
fi

if [ "$DRY_RUN" -eq 1 ]; then
  log "dry-run：不会真正执行轮转。"
  log "计划归档表：${SCHEMA_NAME}.${ARCHIVE_TABLE}"
  log "计划备份文件：${BACKUP_FILE}"
  exit 0
fi

ROTATE_SQL="$(cat <<SQL
BEGIN;
SET LOCAL lock_timeout = '${LOCK_TIMEOUT}';

ALTER TABLE ${SCHEMA_NAME}.${TABLE_NAME}
  RENAME TO ${ARCHIVE_TABLE};

CREATE TABLE ${SCHEMA_NAME}.${TABLE_NAME}
  (LIKE ${SCHEMA_NAME}.${ARCHIVE_TABLE} INCLUDING ALL);

COMMIT;
SQL
)"

log "开始轮转 live 表。"
pg_exec "$ROTATE_SQL"

SERIAL_SEQUENCE="$(pg_scalar "SELECT pg_get_serial_sequence('${SCHEMA_NAME}.${TABLE_NAME}', 'id');")"
if [ -n "${SERIAL_SEQUENCE}" ]; then
  log "修正序列归属：${SERIAL_SEQUENCE}"
  pg_exec "ALTER SEQUENCE ${SERIAL_SEQUENCE} OWNED BY ${SCHEMA_NAME}.${TABLE_NAME}.id;"
fi

ARCHIVE_ROWS="$(pg_scalar "SELECT COUNT(*) FROM ${SCHEMA_NAME}.${ARCHIVE_TABLE};")"
ARCHIVE_SIZE="$(pg_scalar "SELECT pg_size_pretty(pg_total_relation_size('${SCHEMA_NAME}.${ARCHIVE_TABLE}'::regclass));")"
ARCHIVE_MIN_CREATED_AT="$(pg_scalar "SELECT COALESCE(to_char(MIN(created_at), 'YYYY-MM-DD HH24:MI:SS TZ'), 'NULL') FROM ${SCHEMA_NAME}.${ARCHIVE_TABLE};")"
ARCHIVE_MAX_CREATED_AT="$(pg_scalar "SELECT COALESCE(to_char(MAX(created_at), 'YYYY-MM-DD HH24:MI:SS TZ'), 'NULL') FROM ${SCHEMA_NAME}.${ARCHIVE_TABLE};")"

log "轮转完成：archive_rows=${ARCHIVE_ROWS} archive_size=${ARCHIVE_SIZE}"

COPY_SQL="COPY (SELECT * FROM ${SCHEMA_NAME}.${ARCHIVE_TABLE} ORDER BY id ASC) TO STDOUT WITH CSV HEADER"
log "开始导出归档表到压缩文件：${BACKUP_FILE}"

if [ "$COMPRESS_FORMAT" = "zst" ]; then
  psql -X -v ON_ERROR_STOP=1 -q -c "$COPY_SQL" | zstd -"${ZSTD_LEVEL}" -T0 -o "$BACKUP_FILE"
  zstd -tq "$BACKUP_FILE"
else
  psql -X -v ON_ERROR_STOP=1 -q -c "$COPY_SQL" | gzip -c > "$BACKUP_FILE"
  gzip -t "$BACKUP_FILE"
fi

BACKUP_SHA256="$(sha256_file "$BACKUP_FILE")"
BACKUP_SIZE_BYTES="$(file_size_bytes "$BACKUP_FILE")"
FILE_ROWS="$(csv_row_count_from_file "$BACKUP_FILE" "$COMPRESS_FORMAT")"

log "备份文件已生成：bytes=${BACKUP_SIZE_BYTES} sha256=${BACKUP_SHA256}"
log "记录数校验：archive_rows=${ARCHIVE_ROWS} file_rows=${FILE_ROWS}"

[ "$ARCHIVE_ROWS" = "$FILE_ROWS" ] || fail "记录数校验失败：archive_rows=${ARCHIVE_ROWS} file_rows=${FILE_ROWS}"

cat > "$SUMMARY_FILE" <<EOF
脚本名=${SCRIPT_NAME}
执行时间=$(date '+%Y-%m-%d %H:%M:%S %z')
机器=${HOST_LABEL}
服务=${SERVICE_LABEL}
schema=${SCHEMA_NAME}
live_table=${TABLE_NAME}
archive_table=${ARCHIVE_TABLE}
compress_format=${COMPRESS_FORMAT}
backup_file=${BACKUP_FILE}
backup_size_bytes=${BACKUP_SIZE_BYTES}
backup_sha256=${BACKUP_SHA256}
live_rows_before=${LIVE_ROWS_BEFORE}
live_size_before=${LIVE_SIZE_BEFORE}
live_min_created_at=${LIVE_MIN_CREATED_AT}
live_max_created_at=${LIVE_MAX_CREATED_AT}
archive_rows=${ARCHIVE_ROWS}
archive_size=${ARCHIVE_SIZE}
archive_min_created_at=${ARCHIVE_MIN_CREATED_AT}
archive_max_created_at=${ARCHIVE_MAX_CREATED_AT}
file_rows=${FILE_ROWS}
lock_timeout=${LOCK_TIMEOUT}
EOF

log "开始删除归档表：${SCHEMA_NAME}.${ARCHIVE_TABLE}"
pg_exec "DROP TABLE ${SCHEMA_NAME}.${ARCHIVE_TABLE};"

ARCHIVE_EXISTS_AFTER_DROP="$(pg_scalar "SELECT to_regclass('${SCHEMA_NAME}.${ARCHIVE_TABLE}') IS NOT NULL;")"
[ "$ARCHIVE_EXISTS_AFTER_DROP" = "f" ] || fail "归档表删除后仍存在：${SCHEMA_NAME}.${ARCHIVE_TABLE}"

LIVE_ROWS_AFTER="$(pg_scalar "SELECT COUNT(*) FROM ${SCHEMA_NAME}.${TABLE_NAME};")"
LIVE_SIZE_AFTER="$(pg_scalar "SELECT pg_size_pretty(pg_total_relation_size('${SCHEMA_NAME}.${TABLE_NAME}'::regclass));")"
LIVE_LATEST_CREATED_AT_AFTER="$(pg_scalar "SELECT COALESCE(to_char(MAX(created_at), 'YYYY-MM-DD HH24:MI:SS TZ'), 'NULL') FROM ${SCHEMA_NAME}.${TABLE_NAME};")"

{
  printf 'archive_dropped=yes\n'
  printf 'live_rows_after=%s\n' "$LIVE_ROWS_AFTER"
  printf 'live_size_after=%s\n' "$LIVE_SIZE_AFTER"
  printf 'live_latest_created_at_after=%s\n' "$LIVE_LATEST_CREATED_AT_AFTER"
} >> "$SUMMARY_FILE"

printf '%s\n' "$(date +%s)" > "${STATE_FILE}.tmp"
mv "${STATE_FILE}.tmp" "$STATE_FILE"
ARTIFACTS_FINALIZED=1

log "完成：archive 已删，live 表继续存在。"
log "摘要文件：${SUMMARY_FILE}"
log "备份文件：${BACKUP_FILE}"
send_success_email
