🚀 Hetzner 流量监控 + 自动删机重建 + DDNS 全自动化保姆级教程
📝 前言
Hetzner 的云服务器性价比极高,但流量超标后的账单往往让人肉疼。为了彻底解决这个问题,我利用 Docker + Python 搭建了一套 **“全自动流量监控系统”**。
它不仅仅是一个报警器,更是一套全自动化的运维闭环:
多机监控:同时监控多台 Hetzner 服务器(QB 下载机、Emby 服等)。
自动避险:流量达到设定阈值(如 18TB),自动删机,绝不多花一分钱。
原地复活:自动利用“快照”重建服务器,保留所有软件配置和探针。
自动寻路 (DDNS):重建后 IP 变动?脚本自动调用 Cloudflare API 更新域名解析,让 Vertex/QB 等应用通过域名无感连接。
Telegram 遥控:随时查看所有服务器状态,或发送指令强制一键重开。
🛠️ 准备工作
1. 核心资源
Hetzner 账号:
获取 API Token ( 权限选
Read & Write)。准备好服务器的 快照 ID (Snapshot ID)(建议在快照里装好哪吒探针、QB 等,实现开机即用)。
Cloudflare:
域名托管在 CF。
获取 Zone ID 和 API Token ( 权限模板:
Edit Zone DNS)。提前添加好 A 记录(如
hz1.yourdomain.com),关闭小云朵(仅 DNS 模式)。
Telegram:
Bot Token ( 找
@BotFather)。Chat ID ( 找
@userinfobot)。
2. 监控机 (宿主机)
建议使用一台稳定的 VPS(如 OVH、甲骨文、Racknerd 等)。
系统需安装好 Docker。
📦 部署流程
我们将代码部署在监控机上,通过 Docker 运行,确保 7x24 小时稳定在线。
第一步:创建项目目录
Bash
mkdir -p /root/hetzner_monitor
cd /root/hetzner_monitor
第二步:编写配置文件 (main.py)
这是核心逻辑,支持无限添加服务器。
创建文件:
Bash
nano main.py
👇 复制以下代码(注意修改顶部的配置区域):
Python
# -*- coding: utf-8 -*-
import time
import threading
import telebot
import requests
from hcloud import Client
from hcloud.images.domain import Image
from hcloud.server_types.domain import ServerType
from hcloud.locations.domain import Location
# ================= ⚠️ 全局配置区域 =================
# 1. 账号级配置 (所有服务器共用)
HETZNER_TOKEN = "你的_HETZNER_API_TOKEN"
TG_BOT_TOKEN = "你的_TG_BOT_TOKEN"
TG_CHAT_ID = "你的_TG_CHAT_ID"
CF_ENABLE = True # 是否开启 Cloudflare DDNS
CF_API_TOKEN = "你的_CF_API_TOKEN"
# 2. 提醒分级 (到达这些百分比时推送通知)
NOTIFY_LEVELS = [10, 20, 30, 40, 50, 60, 70, 80, 90]
CHECK_INTERVAL = 300 # 5分钟检查一次
# ================= ⚠️ 服务器列表 (在这里添加多台) =================
SERVERS = [
# --- 第 1 台服务器 ---
{
"name": "HZ-QB-01", # 必须与 Hetzner 后台名字一致
"snapshot_id": 12345678, # 对应的快照 ID
"location": "nbg1", # 机房: nbg1, fsn1, hel1, ash
"type": "cx22", # 机型
"limit_tb": 18.0, # 流量阈值
"cf_zone_id": "你的_ZONE_ID", # Cloudflare Zone ID
"cf_domain": "hz1.yourdomain.com" # 对应的域名
},
# --- 第 2 台服务器 (可选,不需要可删除) ---
{
"name": "HZ-Emby-02",
"snapshot_id": 87654321,
"location": "fsn1",
"type": "cpx11",
"limit_tb": 18.0,
"cf_zone_id": "你的_ZONE_ID",
"cf_domain": "hz2.yourdomain.com"
},
]
# ================================================================
client = Client(token=HETZNER_TOKEN)
bot = telebot.TeleBot(TG_BOT_TOKEN)
# 存储各服务器状态
server_states = {}
for s in SERVERS:
server_states[s['name']] = { "lock": threading.Lock(), "notify_level": 0 }
def update_cloudflare(conf, new_ip):
"""更新 Cloudflare DNS"""
if not CF_ENABLE: return "DNS未开启"
headers = { "Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json" }
try:
# 获取记录ID
list_url = f"https://api.cloudflare.com/client/v4/zones/{conf['cf_zone_id']}/dns_records?name={conf['cf_domain']}"
resp = requests.get(list_url, headers=headers).json()
if not resp.get('success') or not resp['result']: return f"❌ CF记录不存在: {conf['cf_domain']}"
record_id = resp['result'][0]['id']
# 更新记录
update_url = f"https://api.cloudflare.com/client/v4/zones/{conf['cf_zone_id']}/dns_records/{record_id}"
data = { "type": "A", "name": conf['cf_domain'], "content": new_ip, "ttl": 60, "proxied": False }
requests.put(update_url, headers=headers, json=data)
return f"✅ DNS已更新: {conf['cf_domain']} -> {new_ip}"
except Exception as e: return f"❌ DNS异常: {str(e)}"
def get_usage(conf):
"""获取流量数据"""
try:
server = client.servers.get_by_name(conf['name'])
if server is None: return None, "服务器不存在"
limit_bytes = conf['limit_tb'] * 1024**4
current = server.outgoing_traffic
percent = (current / limit_bytes) * 100
return current, {
"name": conf['name'], "tb": current / 1024**4,
"percent": percent, "ip": server.public_net.ipv4.ip
}
except Exception as e: return None, str(e)
def perform_rebuild(conf, source="自动监控"):
"""执行重建流程"""
state = server_states[conf['name']]
if not state["lock"].acquire(blocking=False): return
try:
bot.send_message(TG_CHAT_ID, f"🔄 **[{conf['name']}]** 正在重建 ({source})...", parse_mode="Markdown")
try:
server = client.servers.get_by_name(conf['name'])
if server: server.delete(); time.sleep(15)
except: pass
new_server = client.servers.create(
name=conf['name'], server_type=ServerType(name=conf['type']),
image=Image(id=conf['snapshot_id']), location=Location(name=conf['location'])
)
new_ip = new_server.server.public_net.ipv4.ip
state["notify_level"] = 0
dns_msg = update_cloudflare(conf, new_ip)
bot.send_message(TG_CHAT_ID, f"✅ **{conf['name']} 重建完成**\nIP: `{new_ip}`\n{dns_msg}", parse_mode="Markdown")
time.sleep(60)
except Exception as e:
bot.send_message(TG_CHAT_ID, f"❌ {conf['name']} 重建失败: {e}")
finally:
state["lock"].release()
# --- 独立监控线程 ---
def server_monitor_thread(conf):
print(f"🚀 启动监控: {conf['name']}")
state = server_states[conf['name']]
# 开机自检 DNS
u, d = get_usage(conf)
if u is not None:
print(update_cloudflare(conf, d['ip']))
for l in NOTIFY_LEVELS:
if d['percent'] >= l: state["notify_level"] = l
while True:
try:
u, d = get_usage(conf)
if u is not None:
limit_bytes = conf['limit_tb'] * 1024**4
print(f"[{conf['name']}] {d['percent']:.2f}%")
# 分级提醒
for l in NOTIFY_LEVELS:
if d['percent'] >= l and l > state["notify_level"]:
bot.send_message(TG_CHAT_ID, f"⚠️ **{conf['name']}** 流量提醒: {l}% ({d['tb']:.2f} TB)", parse_mode="Markdown")
state["notify_level"] = l
# 超标重建
if u > limit_bytes: perform_rebuild(conf, "流量超标")
except Exception as e: print(f"{conf['name']} Error: {e}")
time.sleep(CHECK_INTERVAL)
# --- Telegram 命令 ---
@bot.message_handler(commands=['start'])
def h(m): bot.reply_to(m, "🤖 多服务器监控运行中\n/ll - 查看状态\n/rebuild 服务器名 - 强制重建")
@bot.message_handler(commands=['ll', 'status'])
def s(m):
if str(m.chat.id) != TG_CHAT_ID: return
msg = "📊 **服务器状态列表**\n"
for conf in SERVERS:
u, d = get_usage(conf)
if u is not None: msg += f"━━━━━━━━━━\n🖥️ `{d['name']}`\n📉 {d['tb']:.4f} TB ({d['percent']:.2f}%)\n🌐 `{d['ip']}`\n"
else: msg += f"━━━━━━━━━━\n🖥️ `{conf['name']}`\n❌ 获取失败\n"
bot.send_message(TG_CHAT_ID, msg, parse_mode="Markdown")
@bot.message_handler(commands=['rebuild'])
def r(m):
if str(m.chat.id) != TG_CHAT_ID: return
try:
target_name = m.text.split()[1]
target_conf = next((s for s in SERVERS if s['name'] == target_name), None)
if target_conf:
threading.Thread(target=perform_rebuild, args=(target_conf, "手动指令")).start()
bot.reply_to(m, f"收到,正在重建 {target_name}...")
else: bot.reply_to(m, "❌ 找不到该服务器")
except: bot.reply_to(m, "⚠️ 用法: `/rebuild 服务器名`", parse_mode="Markdown")
if __name__ == "__main__":
for server_conf in SERVERS:
threading.Thread(target=server_monitor_thread, args=(server_conf,), daemon=True).start()
print("🤖 Bot 启动...")
bot.infinity_polling()
第三步:依赖与 Docker 环境
创建
requirements.txt:Plaintext
hcloud requests pyTelegramBotAPI创建
Dockerfile:Dockerfile
FROM python:3.9-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime COPY main.py . CMD ["python", "-u", "main.py"]
第四步:构建并运行
Bash
# 1. 构建镜像
docker build -t hetzner-bot .
# 2. 启动容器 (自动重启模式)
docker run -d --name hetzner-monitor --restart always hetzner-bot
# 3. 检查日志
docker logs -f hetzner-monitor
🔗 最后一步:闭环配置 (Vertex/QB)
这步非常关键!为了让 IP 变动不影响你的刷流业务:
进入 Vertex 或其他下载器管理面板。
将下载器的连接地址 (Host) 从 IP 改为你在代码里配置的域名(如
hz1.yourdomain.com)。原理:每次重建,脚本会自动把新 IP 更新到 Cloudflare,Vertex 通过域名就能永远找到最新的服务器。
🤖 怎么使用?
拿起手机,给你的 Bot 发送命令:
查看状态:
/ll机器人会列出所有服务器的流量百分比、已用量和当前 IP。
强制重开:
/rebuild 服务器名例如:
/rebuild HZ-QB-01不管流量有没有超标,强制炸掉当前机器,换个新 IP 重新开始。
💡 维护小贴士
快照更新:如果你更新了 QB 版本,记得去 Hetzner 重新拍个快照,并更新脚本里的
snapshot_id。端口访问:浏览器访问域名时记得加端口(如
http://域名:8080),因为我们使用的是 CF DNS 模式。享受生活:现在,你可以忘掉服务器管理,安心睡觉了!🥂




