【升级】Hetzner 流量监控自动重建

前面的脚本是“单向通知”(脚本 -> 你)。

要实现“双向交互”(你输入命令 -> 脚本回复),我们需要引入一个专门处理 Telegram 消息的库(pyTelegramBotAPI),并让脚本开启“多线程”:一条线负责每 5 分钟的自动监控,另一条线负责随时监听你的命令

以下是升级改造的详细步骤。


第一步:修改 requirements.txt

我们需要增加一个库来方便地处理 Telegram 命令。

  1. 登录 OVH 服务器。

  2. 进入目录:cd ~/hetzner_monitor

  3. 编辑文件:nano requirements.txt

  4. 将内容修改为(增加了一行):

    Plaintext

    hcloud
    requests
    pyTelegramBotAPI
    
  5. 保存退出 (Ctrl+O -> Enter -> Ctrl+X)。


第二步:重写 main.py

这次的代码逻辑升级了:引入了 threading (多线程) 和 telebot

  1. 编辑文件:nano main.py

  2. 清空原内容,粘贴下面的新代码(记得修改配置区域):

Python

# -*- coding: utf-8 -*-
import time
import threading
import telebot
from hcloud import Client
from hcloud.images.domain import Image
from hcloud.server_types.domain import ServerType
from hcloud.locations.domain import Location

# ================= ⚠️ 配置区域 (请修改这里) =================

# 1. Hetzner API Token
HETZNER_TOKEN = "你的_HETZNER_API_TOKEN"

# 2. Telegram 通知配置
TG_BOT_TOKEN = "你的_TG_BOT_TOKEN"
TG_CHAT_ID = "你的_TG_CHAT_ID" # 必须是数字字符串,例如 "12345678"

# 3. 服务器配置
SERVER_NAME = "My-Server"      # 必须与 Hetzner 后台名称一致
SNAPSHOT_ID = 12345678         # 你的新快照 ID
LOCATION = "nbg1"
SERVER_TYPE = "cpx32"          #机型类型,如cx22,cpx32...等机型

# 4. 流量阈值
TRAFFIC_LIMIT_TB = 18.0        # 单位: TB

# 5. 自动检测频率 (秒)
CHECK_INTERVAL = 300           # 300秒 = 5分钟

# ==========================================================

LIMIT_BYTES = TRAFFIC_LIMIT_TB * 1024 * 1024 * 1024 * 1024
client = Client(token=HETZNER_TOKEN)
bot = telebot.TeleBot(TG_BOT_TOKEN)

def get_server_usage():
    """获取服务器流量信息的通用函数"""
    try:
        server = client.servers.get_by_name(SERVER_NAME)
        if server is None:
            return None, "服务器不存在 (可能正在重建)"
        
        current_usage = server.outgoing_traffic
        usage_gb = current_usage / (1024 ** 3)
        usage_tb = current_usage / (1024 ** 4)
        usage_percent = (current_usage / LIMIT_BYTES) * 100
        
        return current_usage, {
            "gb": usage_gb,
            "tb": usage_tb,
            "percent": usage_percent,
            "ip": server.public_net.ipv4.ip
        }
    except Exception as e:
        return None, str(e)

# --- 🤖 Telegram 命令处理区域 ---

@bot.message_handler(commands=['start', 'help'])
def send_welcome(message):
    # 安全检查:只允许你的 Chat ID 操作
    if str(message.chat.id) != TG_CHAT_ID: return
    bot.reply_to(message, "👋 我是 Hetzner 流量监控机器人。\n发送 /liuliang 或 /status 查看当前流量。")

@bot.message_handler(commands=['liuliang', 'status', 'll'])
def check_traffic_command(message):
    # 安全检查
    if str(message.chat.id) != TG_CHAT_ID: return
    
    bot.reply_to(message, "🔍 正在查询 Hetzner API,请稍候...")
    
    usage_bytes, data = get_server_usage()
    
    if usage_bytes is None:
        bot.send_message(TG_CHAT_ID, f"❌ 查询失败: {data}")
    else:
        # 组装漂亮的回复
        reply = (
            f"📊 **当前服务器流量状态**\n"
            f"━━━━━━━━━━━━━━━━\n"
            f"🖥️ 名称: `{SERVER_NAME}`\n"
            f"🌐 IP: `{data['ip']}`\n"
            f"━━━━━━━━━━━━━━━━\n"
            f"📉 已用流量: **{data['tb']:.4f} TB**\n"
            f"⚠️ 报警阈值: {TRAFFIC_LIMIT_TB} TB\n"
            f"📊 使用百分比: {data['percent']:.2f}%\n"
            f"━━━━━━━━━━━━━━━━\n"
            f"✅ 状态: 正常运行中"
        )
        bot.send_message(TG_CHAT_ID, reply, parse_mode="Markdown")

# --- 🔄 自动监控线程 ---

def monitor_loop():
    print("🚀 自动监控线程已启动...")
    while True:
        try:
            usage_bytes, data = get_server_usage()
            
            if usage_bytes is not None:
                print(f"[{time.strftime('%H:%M:%S')}] 自动检查: {data['gb']:.2f} GB")
                
                # 判断是否超标
                if usage_bytes > LIMIT_BYTES:
                    msg = (
                        f"🚨 **流量超标警告!**\n"
                        f"当前: `{data['tb']:.4f} TB` > 阈值: `{TRAFFIC_LIMIT_TB} TB`\n"
                        f"⚠️ **正在执行自动销毁程序...**"
                    )
                    bot.send_message(TG_CHAT_ID, msg, parse_mode="Markdown")
                    
                    # 执行重建逻辑
                    print("执行删除操作...")
                    server = client.servers.get_by_name(SERVER_NAME)
                    server.delete()
                    time.sleep(15)
                    
                    print(f"执行重建操作 (Snapshot: {SNAPSHOT_ID})...")
                    client.servers.create(
                        name=SERVER_NAME,
                        server_type=ServerType(name=SERVER_TYPE),
                        image=Image(id=SNAPSHOT_ID),
                        location=Location(name=LOCATION)
                    )
                    
                    bot.send_message(TG_CHAT_ID, f"✅ **重建成功!**\n服务器已恢复。", parse_mode="Markdown")
                    time.sleep(60) # 冷却时间
            
        except Exception as e:
            print(f"监控循环出错: {e}")
            
        time.sleep(CHECK_INTERVAL)

# --- 🚀 主程序入口 ---

if __name__ == "__main__":
    # 1. 启动自动监控线程 (在后台跑)
    t = threading.Thread(target=monitor_loop)
    t.start()
    
    # 2. 启动 Bot 监听 (主线程阻塞在这里等待命令)
    print("🤖 Telegram Bot 监听中...")
    bot.send_message(TG_CHAT_ID, "✅ 监控升级完毕!\n您可以发送 `/liuliang` 手动查看状态了。")
    
    # 无限轮询 Telegram 服务器
    bot.infinity_polling()
  1. 保存退出。


第三步:重建并重启 (必须做)

因为我们修改了 requirements.txt(加了新库),必须重新构建 Docker 镜像,否则代码会报错找不到 telebot

执行以下命令:

Bash

# 1. 停止旧容器
docker rm -f hetzner-monitor

# 2. 重新构建镜像 (这一步会下载新加的 pyTelegramBotAPI 库)
docker build -t hetzner-bot .

# 3. 启动新容器
docker run -d --name hetzner-monitor --restart always hetzner-bot

🟢 最终效果

现在,拿起你的手机,打开你的 Telegram 机器人,发送命令:

  • 发送:/liuliang ( 或者 /status, /ll)

  • 机器人会回复

    📊 当前服务器流量状态

    ━━━━━━━━━━━━━━━━

    🖥️ 名称: My-Server

    🌐 IP: 123.123.123.123

    ━━━━━━━━━━━━━━━━

    📉 已用流量: 0.1234 TB

    ⚠️ 报警阈值: 18.0 TB

    📊 使用百分比: 0.68%

    ━━━━━━━━━━━━━━━━

    ✅ 状态: 正常运行中

同时,后台的自动删机监控依然在默默工作,两者互不干扰。现在这套系统既能自动防御,又能手动查岗,完美!

消息盒子

# 暂无消息 #

只显示最新10条未读和已读信息