【再次升级】Hetzner 流量监控自动重建
Q:我还想再增加一个就是流量使用 10%、20%、30%、40%、50%、60%、70%、80%、90% 会自动提醒。
A:这个功能非常实用,可以让你对流量的使用节奏心里有数,而不是等到最后时刻才“突然死亡”。
为了实现这个功能,我们需要在代码里增加一个 **“记忆变量”**,用来记录上一次提醒到了百分之几(比如记录上次通知了 20%),防止每次检测(每 5 分钟)都重复给你发 10%、20% 的通知。
以下是添加了阶段性流量提醒功能的完整代码。
第一步:修改 main.py
请登录 OVH 服务器,编辑文件:
Bash
nano ~/hetzner_monitor/main.py
清空旧代码,直接粘贴下面的完整新代码:
Python
# -*- coding: utf-8 -*-
import time
import threading
import telebot
from hcloud import Client
from hcloud.images.domain import Image
from hcloud.server_types.domain import ServerType
from hcloud.locations.domain import Location
# ================= ⚠️ 配置区域 (请修改这里) =================
# 1. Hetzner API Token
HETZNER_TOKEN = "你的_HETZNER_API_TOKEN"
# 2. Telegram 通知配置
TG_BOT_TOKEN = "你的_TG_BOT_TOKEN"
TG_CHAT_ID = "你的_TG_CHAT_ID"
# 3. 服务器配置
SERVER_NAME = "My-Server" # 必须与 Hetzner 后台名称一致
SNAPSHOT_ID = 12345678 # 你的快照 ID (带探针的那个)
LOCATION = "nbg1"
SERVER_TYPE = "cx22"
# 4. 流量阈值
TRAFFIC_LIMIT_TB = 18.0
# 5. 自动检测频率 (秒)
CHECK_INTERVAL = 300
# 6. 提醒百分比节点 (不用动,或者你可以自己增减)
NOTIFY_LEVELS = [10, 20, 30, 40, 50, 60, 70, 80, 90]
# ==========================================================
LIMIT_BYTES = TRAFFIC_LIMIT_TB * 1024 * 1024 * 1024 * 1024
client = Client(token=HETZNER_TOKEN)
bot = telebot.TeleBot(TG_BOT_TOKEN)
# 全局变量
rebuild_lock = threading.Lock()
current_notify_level = 0 # 用于记录当前已通知到的百分比层级
def get_server_usage():
"""获取服务器流量"""
try:
server = client.servers.get_by_name(SERVER_NAME)
if server is None:
return None, "服务器不存在"
current_usage = server.outgoing_traffic
usage_gb = current_usage / (1024 ** 3)
usage_tb = current_usage / (1024 ** 4)
usage_percent = (current_usage / LIMIT_BYTES) * 100
return current_usage, {
"gb": usage_gb,
"tb": usage_tb,
"percent": usage_percent,
"ip": server.public_net.ipv4.ip
}
except Exception as e:
return None, str(e)
def perform_rebuild(source="自动监控"):
"""执行重建的核心逻辑"""
global current_notify_level
if not rebuild_lock.acquire(blocking=False):
return False, "重建正在进行中,请勿重复操作。"
try:
print(f"[{source}] 开始执行重建流程...")
bot.send_message(TG_CHAT_ID, f"🔄 **[{source}]** 收到指令,开始重置服务器...", parse_mode="Markdown")
# 1. 删除旧服务器
try:
server = client.servers.get_by_name(SERVER_NAME)
if server:
server.delete()
print("旧服务器已删除,等待 API 刷新 (15s)...")
time.sleep(15)
except Exception as e:
print(f"删除失败 (可能已不存在): {e}")
# 2. 创建新服务器
print(f"正在通过快照 {SNAPSHOT_ID} 创建新服务器...")
new_server = client.servers.create(
name=SERVER_NAME,
server_type=ServerType(name=SERVER_TYPE),
image=Image(id=SNAPSHOT_ID),
location=Location(name=LOCATION)
)
new_ip = new_server.server.public_net.ipv4.ip
# 3. 重置通知等级计数器 (关键!)
current_notify_level = 0
msg = (
f"✅ **重建成功!**\n"
f"━━━━━━━━━━━━━━━━\n"
f"来源: {source}\n"
f"新 IP: `{new_ip}`\n"
f"通知计数器: 已重置"
)
bot.send_message(TG_CHAT_ID, msg, parse_mode="Markdown")
time.sleep(60)
return True, "成功"
except Exception as e:
err_msg = f"❌ 重建失败: {str(e)}"
print(err_msg)
bot.send_message(TG_CHAT_ID, err_msg)
return False, str(e)
finally:
rebuild_lock.release()
# --- 🤖 Telegram 命令处理 ---
@bot.message_handler(commands=['start', 'help'])
def send_welcome(message):
if str(message.chat.id) != TG_CHAT_ID: return
help_text = (
"🤖 **Hetzner 监控增强版**\n\n"
"/liuliang - 查看流量状态\n"
"/rebuild - ⚠️ 强制删机并重建\n"
)
bot.reply_to(message, help_text, parse_mode="Markdown")
@bot.message_handler(commands=['liuliang', 'status', 'll'])
def check_traffic(message):
if str(message.chat.id) != TG_CHAT_ID: return
bot.reply_to(message, "🔍 查询中...")
usage_bytes, data = get_server_usage()
if usage_bytes is None:
bot.send_message(TG_CHAT_ID, f"❌ 查询失败: {data}")
else:
reply = (
f"📊 **当前状态**\n"
f"IP: `{data['ip']}`\n"
f"流量: **{data['tb']:.4f} TB** / {TRAFFIC_LIMIT_TB} TB\n"
f"进度: {data['percent']:.2f}%\n"
f"下次提醒: {next((x for x in NOTIFY_LEVELS if x > data['percent']), '100')}%"
)
bot.send_message(TG_CHAT_ID, reply, parse_mode="Markdown")
@bot.message_handler(commands=['rebuild', 'reset'])
def manual_rebuild(message):
if str(message.chat.id) != TG_CHAT_ID: return
bot.reply_to(message, "⚠️ **警告**:正在请求强制重建...")
threading.Thread(target=perform_rebuild, args=("手动指令",)).start()
# --- 🔄 自动监控线程 ---
def monitor_loop():
global current_notify_level
print("🚀 自动监控线程启动...")
# 初始化:首次启动时,同步当前通知等级,避免重复刷屏
# 如果刚启动发现已经用了 35%,直接把等级设为 30,防止它把 10,20,30 全发一遍
first_run = True
while True:
try:
usage_bytes, data = get_server_usage()
if usage_bytes is not None:
percent = data['percent']
tb_used = data['tb']
print(f"[{time.strftime('%H:%M:%S')}] 流量: {data['gb']:.2f} GB ({percent:.2f}%)")
# 1. 首次运行时的初始化校准
if first_run:
for level in NOTIFY_LEVELS:
if percent >= level:
current_notify_level = level
first_run = False
print(f"初始化完成,当前通知等级设定为: {current_notify_level}%")
# 2. 检查是否达到新的百分比层级
# 逻辑:如果当前百分比超过了某个线,且那个线比我们上次通知的线要大
for level in NOTIFY_LEVELS:
if percent >= level and level > current_notify_level:
msg = (
f"⚠️ **流量消耗提醒**\n"
f"━━━━━━━━━━━━━━━━\n"
f"已使用: **{level}%**\n"
f"数值: `{tb_used:.4f} TB`\n"
f"IP: `{data['ip']}`"
)
bot.send_message(TG_CHAT_ID, msg, parse_mode="Markdown")
current_notify_level = level # 更新记忆变量
print(f"已发送 {level}% 提醒")
# 3. 检查是否达到 100% 阈值
if usage_bytes > LIMIT_BYTES:
perform_rebuild(source="流量超标自动触发")
except Exception as e:
print(f"监控报错: {e}")
time.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
threading.Thread(target=monitor_loop, daemon=True).start()
print("🤖 Bot 启动中...")
bot.send_message(TG_CHAT_ID, "✅ 监控已更新:开启分级提醒功能 (10%-90%)")
bot.infinity_polling()
第二步:更新 Docker
因为代码变了,我们需要更新容器。
Bash
cd ~/hetzner_monitor
# 1. 重新构建镜像
docker build -t hetzner-bot .
# 2. 重启容器
docker rm -f hetzner-monitor
docker run -d --name hetzner-monitor --restart always hetzner-bot
功能说明
现在的逻辑变得更加智能了:
分级提醒:
当流量达到 10% 时,你会收到一条通知。
流量继续跑到 20% 时,你会收到下一条通知。
以此类推,直到 90%。
智能记忆:
脚本会记住“我已经通知过 30% 了”,所以下个 5 分钟检查时,如果流量还是 30.5%,它不会重复发消息打扰你。
只有当流量冲破 40% 的线时,它才会再次说话。
自动重置:
当你手动
/rebuild或者流量超标自动重建成功后,脚本会自动把记忆归零。新服务器从 0% 开始跑,脚本也会重新从 10% 开始提醒。
重启不刷屏:
如果 Docker 容器重启了,而你的服务器此时已经用了 55% 的流量。脚本启动时会智能识别,直接把进度条拉到 50%,不会把 10%、20%、30%、40% 的消息一股脑发给你轰炸。




