【再次升级】Hetzner 流量监控自动重建

Q:我还想再增加一个就是流量使用 10%、20%、30%、40%、50%、60%、70%、80%、90% 会自动提醒。

A:这个功能非常实用,可以让你对流量的使用节奏心里有数,而不是等到最后时刻才“突然死亡”。

为了实现这个功能,我们需要在代码里增加一个 **“记忆变量”**,用来记录上一次提醒到了百分之几(比如记录上次通知了 20%),防止每次检测(每 5 分钟)都重复给你发 10%、20% 的通知。

以下是添加了阶段性流量提醒功能的完整代码。


第一步:修改 main.py

请登录 OVH 服务器,编辑文件:

Bash

nano ~/hetzner_monitor/main.py

清空旧代码,直接粘贴下面的完整新代码:

Python

# -*- coding: utf-8 -*-
import time
import threading
import telebot
from hcloud import Client
from hcloud.images.domain import Image
from hcloud.server_types.domain import ServerType
from hcloud.locations.domain import Location

# ================= ⚠️ 配置区域 (请修改这里) =================

# 1. Hetzner API Token
HETZNER_TOKEN = "你的_HETZNER_API_TOKEN"

# 2. Telegram 通知配置
TG_BOT_TOKEN = "你的_TG_BOT_TOKEN"
TG_CHAT_ID = "你的_TG_CHAT_ID"

# 3. 服务器配置
SERVER_NAME = "My-Server"      # 必须与 Hetzner 后台名称一致
SNAPSHOT_ID = 12345678         # 你的快照 ID (带探针的那个)
LOCATION = "nbg1"
SERVER_TYPE = "cx22"

# 4. 流量阈值
TRAFFIC_LIMIT_TB = 18.0

# 5. 自动检测频率 (秒)
CHECK_INTERVAL = 300

# 6. 提醒百分比节点 (不用动,或者你可以自己增减)
NOTIFY_LEVELS = [10, 20, 30, 40, 50, 60, 70, 80, 90]

# ==========================================================

LIMIT_BYTES = TRAFFIC_LIMIT_TB * 1024 * 1024 * 1024 * 1024
client = Client(token=HETZNER_TOKEN)
bot = telebot.TeleBot(TG_BOT_TOKEN)

# 全局变量
rebuild_lock = threading.Lock()
current_notify_level = 0  # 用于记录当前已通知到的百分比层级

def get_server_usage():
    """获取服务器流量"""
    try:
        server = client.servers.get_by_name(SERVER_NAME)
        if server is None:
            return None, "服务器不存在"
        
        current_usage = server.outgoing_traffic
        usage_gb = current_usage / (1024 ** 3)
        usage_tb = current_usage / (1024 ** 4)
        usage_percent = (current_usage / LIMIT_BYTES) * 100
        
        return current_usage, {
            "gb": usage_gb,
            "tb": usage_tb,
            "percent": usage_percent,
            "ip": server.public_net.ipv4.ip
        }
    except Exception as e:
        return None, str(e)

def perform_rebuild(source="自动监控"):
    """执行重建的核心逻辑"""
    global current_notify_level
    
    if not rebuild_lock.acquire(blocking=False):
        return False, "重建正在进行中,请勿重复操作。"

    try:
        print(f"[{source}] 开始执行重建流程...")
        bot.send_message(TG_CHAT_ID, f"🔄 **[{source}]** 收到指令,开始重置服务器...", parse_mode="Markdown")

        # 1. 删除旧服务器
        try:
            server = client.servers.get_by_name(SERVER_NAME)
            if server:
                server.delete()
                print("旧服务器已删除,等待 API 刷新 (15s)...")
                time.sleep(15)
        except Exception as e:
            print(f"删除失败 (可能已不存在): {e}")

        # 2. 创建新服务器
        print(f"正在通过快照 {SNAPSHOT_ID} 创建新服务器...")
        new_server = client.servers.create(
            name=SERVER_NAME,
            server_type=ServerType(name=SERVER_TYPE),
            image=Image(id=SNAPSHOT_ID),
            location=Location(name=LOCATION)
        )
        
        new_ip = new_server.server.public_net.ipv4.ip
        
        # 3. 重置通知等级计数器 (关键!)
        current_notify_level = 0
        
        msg = (
            f"✅ **重建成功!**\n"
            f"━━━━━━━━━━━━━━━━\n"
            f"来源: {source}\n"
            f"新 IP: `{new_ip}`\n"
            f"通知计数器: 已重置"
        )
        bot.send_message(TG_CHAT_ID, msg, parse_mode="Markdown")
        
        time.sleep(60)
        return True, "成功"

    except Exception as e:
        err_msg = f"❌ 重建失败: {str(e)}"
        print(err_msg)
        bot.send_message(TG_CHAT_ID, err_msg)
        return False, str(e)
    
    finally:
        rebuild_lock.release()

# --- 🤖 Telegram 命令处理 ---

@bot.message_handler(commands=['start', 'help'])
def send_welcome(message):
    if str(message.chat.id) != TG_CHAT_ID: return
    help_text = (
        "🤖 **Hetzner 监控增强版**\n\n"
        "/liuliang - 查看流量状态\n"
        "/rebuild - ⚠️ 强制删机并重建\n"
    )
    bot.reply_to(message, help_text, parse_mode="Markdown")

@bot.message_handler(commands=['liuliang', 'status', 'll'])
def check_traffic(message):
    if str(message.chat.id) != TG_CHAT_ID: return
    bot.reply_to(message, "🔍 查询中...")
    
    usage_bytes, data = get_server_usage()
    if usage_bytes is None:
        bot.send_message(TG_CHAT_ID, f"❌ 查询失败: {data}")
    else:
        reply = (
            f"📊 **当前状态**\n"
            f"IP: `{data['ip']}`\n"
            f"流量: **{data['tb']:.4f} TB** / {TRAFFIC_LIMIT_TB} TB\n"
            f"进度: {data['percent']:.2f}%\n"
            f"下次提醒: {next((x for x in NOTIFY_LEVELS if x > data['percent']), '100')}%"
        )
        bot.send_message(TG_CHAT_ID, reply, parse_mode="Markdown")

@bot.message_handler(commands=['rebuild', 'reset'])
def manual_rebuild(message):
    if str(message.chat.id) != TG_CHAT_ID: return
    bot.reply_to(message, "⚠️ **警告**:正在请求强制重建...")
    threading.Thread(target=perform_rebuild, args=("手动指令",)).start()

# --- 🔄 自动监控线程 ---

def monitor_loop():
    global current_notify_level
    print("🚀 自动监控线程启动...")
    
    # 初始化:首次启动时,同步当前通知等级,避免重复刷屏
    # 如果刚启动发现已经用了 35%,直接把等级设为 30,防止它把 10,20,30 全发一遍
    first_run = True

    while True:
        try:
            usage_bytes, data = get_server_usage()
            
            if usage_bytes is not None:
                percent = data['percent']
                tb_used = data['tb']
                
                print(f"[{time.strftime('%H:%M:%S')}] 流量: {data['gb']:.2f} GB ({percent:.2f}%)")

                # 1. 首次运行时的初始化校准
                if first_run:
                    for level in NOTIFY_LEVELS:
                        if percent >= level:
                            current_notify_level = level
                    first_run = False
                    print(f"初始化完成,当前通知等级设定为: {current_notify_level}%")

                # 2. 检查是否达到新的百分比层级
                # 逻辑:如果当前百分比超过了某个线,且那个线比我们上次通知的线要大
                for level in NOTIFY_LEVELS:
                    if percent >= level and level > current_notify_level:
                        msg = (
                            f"⚠️ **流量消耗提醒**\n"
                            f"━━━━━━━━━━━━━━━━\n"
                            f"已使用: **{level}%**\n"
                            f"数值: `{tb_used:.4f} TB`\n"
                            f"IP: `{data['ip']}`"
                        )
                        bot.send_message(TG_CHAT_ID, msg, parse_mode="Markdown")
                        current_notify_level = level # 更新记忆变量
                        print(f"已发送 {level}% 提醒")

                # 3. 检查是否达到 100% 阈值
                if usage_bytes > LIMIT_BYTES:
                    perform_rebuild(source="流量超标自动触发")

        except Exception as e:
            print(f"监控报错: {e}")
            
        time.sleep(CHECK_INTERVAL)

if __name__ == "__main__":
    threading.Thread(target=monitor_loop, daemon=True).start()
    print("🤖 Bot 启动中...")
    bot.send_message(TG_CHAT_ID, "✅ 监控已更新:开启分级提醒功能 (10%-90%)")
    bot.infinity_polling()

第二步:更新 Docker

因为代码变了,我们需要更新容器。

Bash

cd ~/hetzner_monitor

# 1. 重新构建镜像
docker build -t hetzner-bot .

# 2. 重启容器
docker rm -f hetzner-monitor
docker run -d --name hetzner-monitor --restart always hetzner-bot

功能说明

现在的逻辑变得更加智能了:

  1. 分级提醒

    • 当流量达到 10% 时,你会收到一条通知。

    • 流量继续跑到 20% 时,你会收到下一条通知。

    • 以此类推,直到 90%。

  2. 智能记忆

    • 脚本会记住“我已经通知过 30% 了”,所以下个 5 分钟检查时,如果流量还是 30.5%,它不会重复发消息打扰你。

    • 只有当流量冲破 40% 的线时,它才会再次说话。

  3. 自动重置

    • 当你手动 /rebuild 或者流量超标自动重建成功后,脚本会自动把记忆归零。

    • 新服务器从 0% 开始跑,脚本也会重新从 10% 开始提醒。

  4. 重启不刷屏

    • 如果 Docker 容器重启了,而你的服务器此时已经用了 55% 的流量。脚本启动时会智能识别,直接把进度条拉到 50%,不会把 10%、20%、30%、40% 的消息一股脑发给你轰炸。

消息盒子

# 暂无消息 #

只显示最新10条未读和已读信息