🚀 Hetzner 流量监控 + 自动删机重建 + DDNS 全自动化保姆级教程


📝 前言

Hetzner 的云服务器性价比极高,但流量超标后的账单往往让人肉疼。为了彻底解决这个问题,我利用 Docker + Python 搭建了一套 **“全自动流量监控系统”**。

它不仅仅是一个报警器,更是一套全自动化的运维闭环

  1. 多机监控:同时监控多台 Hetzner 服务器(QB 下载机、Emby 服等)。

  2. 自动避险:流量达到设定阈值(如 18TB),自动删机,绝不多花一分钱。

  3. 原地复活:自动利用“快照”重建服务器,保留所有软件配置和探针。

  4. 自动寻路 (DDNS):重建后 IP 变动?脚本自动调用 Cloudflare API 更新域名解析,让 Vertex/QB 等应用通过域名无感连接。

  5. Telegram 遥控:随时查看所有服务器状态,或发送指令强制一键重开。


🛠️ 准备工作

1. 核心资源

  • Hetzner 账号

    • 获取 API Token ( 权限选 Read & Write)。

    • 准备好服务器的 快照 ID (Snapshot ID)(建议在快照里装好哪吒探针、QB 等,实现开机即用)。

  • Cloudflare

    • 域名托管在 CF。

    • 获取 Zone IDAPI Token ( 权限模板:Edit Zone DNS)。

    • 提前添加好 A 记录(如 hz1.yourdomain.com),关闭小云朵(仅 DNS 模式)

  • Telegram

    • Bot Token ( 找 @BotFather)。

    • Chat ID ( 找 @userinfobot)。

2. 监控机 (宿主机)

  • 建议使用一台稳定的 VPS(如 OVH、甲骨文、Racknerd 等)。

  • 系统需安装好 Docker


📦 部署流程

我们将代码部署在监控机上,通过 Docker 运行,确保 7x24 小时稳定在线。

第一步:创建项目目录

Bash

mkdir -p /root/hetzner_monitor
cd /root/hetzner_monitor

第二步:编写配置文件 (main.py)

这是核心逻辑,支持无限添加服务器

创建文件:

Bash

nano main.py

👇 复制以下代码(注意修改顶部的配置区域):

Python

# -*- coding: utf-8 -*-
import time
import threading
import telebot
import requests
from hcloud import Client
from hcloud.images.domain import Image
from hcloud.server_types.domain import ServerType
from hcloud.locations.domain import Location

# ================= ⚠️ 全局配置区域 =================

# 1. 账号级配置 (所有服务器共用)
HETZNER_TOKEN = "你的_HETZNER_API_TOKEN"
TG_BOT_TOKEN = "你的_TG_BOT_TOKEN"
TG_CHAT_ID = "你的_TG_CHAT_ID"

CF_ENABLE = True  # 是否开启 Cloudflare DDNS
CF_API_TOKEN = "你的_CF_API_TOKEN"

# 2. 提醒分级 (到达这些百分比时推送通知)
NOTIFY_LEVELS = [10, 20, 30, 40, 50, 60, 70, 80, 90]
CHECK_INTERVAL = 300  # 5分钟检查一次

# ================= ⚠️ 服务器列表 (在这里添加多台) =================

SERVERS = [
    # --- 第 1 台服务器 ---
    {
        "name": "HZ-QB-01",           # 必须与 Hetzner 后台名字一致
        "snapshot_id": 12345678,      # 对应的快照 ID
        "location": "nbg1",           # 机房: nbg1, fsn1, hel1, ash
        "type": "cx22",               # 机型
        "limit_tb": 18.0,             # 流量阈值
        "cf_zone_id": "你的_ZONE_ID",        # Cloudflare Zone ID
        "cf_domain": "hz1.yourdomain.com"    # 对应的域名
    },
    
    # --- 第 2 台服务器 (可选,不需要可删除) ---
    {
        "name": "HZ-Emby-02",
        "snapshot_id": 87654321,
        "location": "fsn1",
        "type": "cpx11",
        "limit_tb": 18.0,
        "cf_zone_id": "你的_ZONE_ID",
        "cf_domain": "hz2.yourdomain.com"
    },
]

# ================================================================

client = Client(token=HETZNER_TOKEN)
bot = telebot.TeleBot(TG_BOT_TOKEN)

# 存储各服务器状态
server_states = {} 
for s in SERVERS:
    server_states[s['name']] = { "lock": threading.Lock(), "notify_level": 0 }

def update_cloudflare(conf, new_ip):
    """更新 Cloudflare DNS"""
    if not CF_ENABLE: return "DNS未开启"
    headers = { "Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json" }
    try:
        # 获取记录ID
        list_url = f"https://api.cloudflare.com/client/v4/zones/{conf['cf_zone_id']}/dns_records?name={conf['cf_domain']}"
        resp = requests.get(list_url, headers=headers).json()
        if not resp.get('success') or not resp['result']: return f"❌ CF记录不存在: {conf['cf_domain']}"
        
        record_id = resp['result'][0]['id']
        # 更新记录
        update_url = f"https://api.cloudflare.com/client/v4/zones/{conf['cf_zone_id']}/dns_records/{record_id}"
        data = { "type": "A", "name": conf['cf_domain'], "content": new_ip, "ttl": 60, "proxied": False }
        requests.put(update_url, headers=headers, json=data)
        return f"✅ DNS已更新: {conf['cf_domain']} -> {new_ip}"
    except Exception as e: return f"❌ DNS异常: {str(e)}"

def get_usage(conf):
    """获取流量数据"""
    try:
        server = client.servers.get_by_name(conf['name'])
        if server is None: return None, "服务器不存在"
        limit_bytes = conf['limit_tb'] * 1024**4
        current = server.outgoing_traffic
        percent = (current / limit_bytes) * 100
        return current, {
            "name": conf['name'], "tb": current / 1024**4,
            "percent": percent, "ip": server.public_net.ipv4.ip
        }
    except Exception as e: return None, str(e)

def perform_rebuild(conf, source="自动监控"):
    """执行重建流程"""
    state = server_states[conf['name']]
    if not state["lock"].acquire(blocking=False): return

    try:
        bot.send_message(TG_CHAT_ID, f"🔄 **[{conf['name']}]** 正在重建 ({source})...", parse_mode="Markdown")
        try:
            server = client.servers.get_by_name(conf['name'])
            if server: server.delete(); time.sleep(15)
        except: pass

        new_server = client.servers.create(
            name=conf['name'], server_type=ServerType(name=conf['type']),
            image=Image(id=conf['snapshot_id']), location=Location(name=conf['location'])
        )
        new_ip = new_server.server.public_net.ipv4.ip
        state["notify_level"] = 0 
        
        dns_msg = update_cloudflare(conf, new_ip)
        bot.send_message(TG_CHAT_ID, f"✅ **{conf['name']} 重建完成**\nIP: `{new_ip}`\n{dns_msg}", parse_mode="Markdown")
        time.sleep(60)
    except Exception as e:
        bot.send_message(TG_CHAT_ID, f"❌ {conf['name']} 重建失败: {e}")
    finally:
        state["lock"].release()

# --- 独立监控线程 ---
def server_monitor_thread(conf):
    print(f"🚀 启动监控: {conf['name']}")
    state = server_states[conf['name']]
    # 开机自检 DNS
    u, d = get_usage(conf)
    if u is not None:
        print(update_cloudflare(conf, d['ip']))
        for l in NOTIFY_LEVELS:
            if d['percent'] >= l: state["notify_level"] = l

    while True:
        try:
            u, d = get_usage(conf)
            if u is not None:
                limit_bytes = conf['limit_tb'] * 1024**4
                print(f"[{conf['name']}] {d['percent']:.2f}%")
                # 分级提醒
                for l in NOTIFY_LEVELS:
                    if d['percent'] >= l and l > state["notify_level"]:
                        bot.send_message(TG_CHAT_ID, f"⚠️ **{conf['name']}** 流量提醒: {l}% ({d['tb']:.2f} TB)", parse_mode="Markdown")
                        state["notify_level"] = l
                # 超标重建
                if u > limit_bytes: perform_rebuild(conf, "流量超标")
        except Exception as e: print(f"{conf['name']} Error: {e}")
        time.sleep(CHECK_INTERVAL)

# --- Telegram 命令 ---
@bot.message_handler(commands=['start'])
def h(m): bot.reply_to(m, "🤖 多服务器监控运行中\n/ll - 查看状态\n/rebuild 服务器名 - 强制重建")

@bot.message_handler(commands=['ll', 'status'])
def s(m):
    if str(m.chat.id) != TG_CHAT_ID: return
    msg = "📊 **服务器状态列表**\n"
    for conf in SERVERS:
        u, d = get_usage(conf)
        if u is not None: msg += f"━━━━━━━━━━\n🖥️ `{d['name']}`\n📉 {d['tb']:.4f} TB ({d['percent']:.2f}%)\n🌐 `{d['ip']}`\n"
        else: msg += f"━━━━━━━━━━\n🖥️ `{conf['name']}`\n❌ 获取失败\n"
    bot.send_message(TG_CHAT_ID, msg, parse_mode="Markdown")

@bot.message_handler(commands=['rebuild'])
def r(m):
    if str(m.chat.id) != TG_CHAT_ID: return
    try:
        target_name = m.text.split()[1]
        target_conf = next((s for s in SERVERS if s['name'] == target_name), None)
        if target_conf:
            threading.Thread(target=perform_rebuild, args=(target_conf, "手动指令")).start()
            bot.reply_to(m, f"收到,正在重建 {target_name}...")
        else: bot.reply_to(m, "❌ 找不到该服务器")
    except: bot.reply_to(m, "⚠️ 用法: `/rebuild 服务器名`", parse_mode="Markdown")

if __name__ == "__main__":
    for server_conf in SERVERS:
        threading.Thread(target=server_monitor_thread, args=(server_conf,), daemon=True).start()
    print("🤖 Bot 启动...")
    bot.infinity_polling()

第三步:依赖与 Docker 环境

  1. 创建 requirements.txt

    Plaintext

    hcloud
    requests
    pyTelegramBotAPI
    
  2. 创建 Dockerfile

    Dockerfile

    FROM python:3.9-slim
    WORKDIR /app
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
    COPY main.py .
    CMD ["python", "-u", "main.py"]
    

第四步:构建并运行

Bash

# 1. 构建镜像
docker build -t hetzner-bot .

# 2. 启动容器 (自动重启模式)
docker run -d --name hetzner-monitor --restart always hetzner-bot

# 3. 检查日志
docker logs -f hetzner-monitor

🔗 最后一步:闭环配置 (Vertex/QB)

这步非常关键!为了让 IP 变动不影响你的刷流业务:

  1. 进入 Vertex 或其他下载器管理面板。

  2. 将下载器的连接地址 (Host) 从 IP 改为你在代码里配置的域名(如 hz1.yourdomain.com)。

  3. 原理:每次重建,脚本会自动把新 IP 更新到 Cloudflare,Vertex 通过域名就能永远找到最新的服务器。


🤖 怎么使用?

拿起手机,给你的 Bot 发送命令:

  • 查看状态/ll

    • 机器人会列出所有服务器的流量百分比、已用量和当前 IP。

  • 强制重开/rebuild 服务器名

    • 例如:/rebuild HZ-QB-01

    • 不管流量有没有超标,强制炸掉当前机器,换个新 IP 重新开始。


💡 维护小贴士

  1. 快照更新:如果你更新了 QB 版本,记得去 Hetzner 重新拍个快照,并更新脚本里的 snapshot_id

  2. 端口访问:浏览器访问域名时记得加端口(如 http://域名:8080),因为我们使用的是 CF DNS 模式。

  3. 享受生活:现在,你可以忘掉服务器管理,安心睡觉了!🥂

消息盒子

# 暂无消息 #

只显示最新10条未读和已读信息