Hetzner PT 刷流防爆终极指南 (舰队版):传家宝IP + 快照秒建 + TG全景中控

对于热衷于 PT 刷流的玩家来说,Hetzner 的高带宽大盘鸡(如 cpx62)无疑是跑流量的极品炮机。但横在大家面前的有一道死命令:单月 20TB 出站流量限额。一旦超量,随之而来的就是让人肉痛的超额账单。

为了实现“极限榨干每一滴免费流量,且绝不被反薅”的终极目标,本文将带你从零部署一套全自动的监控与防爆多机中控系统。本教程已支持 ** 多台机器(舰队模式)** 并发管理!

💡 本套终极方案的核心特性:

  1. 传家宝 IP:每次触发删机重建,自动挂载原生的 IPv4 和 IPv6,彻底告别 IP 变动导致的 PT 站连环警告与封号。

  2. 私有快照直推:摒弃繁琐的开机安装脚本!直接使用你调教好的完美快照(带 qBittorrent、探针、内核优化)进行克隆,开机 10 秒满血复活。

  3. 极限防卡死保底:流量达 18T 自动销毁。即便 qB 在极高负载下 API 卡死无响应,系统也会触发“终极保底”强行物理销毁,死死守住钱包。

  4. TG 双向交互中控:不仅每逢 10% 自动在 Telegram 播报进度,更提供 24 小时常驻的接线员机器人。随时发送 /status 全景查水表,发送 /rebuild 精准定点核爆。


⚙️ 架构思路与准备工作

⚠️ 核心警告:千万不要把本脚本部署在你的 Hetzner 刷流机上! 否则触发阈值时,脚本会把机器连同自己一起“同归于尽”。

我们需要两端分离:

  1. 中控管理机:一台长期稳定在线的服务器(推荐甲骨文 ARM 免费实例等),用于跑定时任务和交互机器人。

  2. Hetzner 刷流舰队:被监控的目标机器(如 hz1, hz2... hz5),生命周期完全由中控机接管。

你需要提前准备:

  1. Hetzner API Token:登录 Hetzner 后台 -> Project -> Security -> API Tokens -> Generate API Token。务必选择 Read & Write 权限

  2. 专属完美快照 (Snapshot):在 Hetzner 后台为你配置好 qB 和探针的机器打一个快照,并记下该快照的 ID(如 360055174)。注意:大硬盘打出的快照,重建时机型不能缩水。

  3. Telegram Bot Token 及你本人的纯数字 Chat ID


步骤一:中控机安装 hcloud CLI 并授权

hcloud 是 Hetzner 的官方命令行工具,也是我们这套系统的“手”。必须安装在管理机上。

登录中控机 SSH 终端,执行以下一键适配 ARM/AMD 架构的安装脚本:

Bash

# 1. 判断架构并下载
ARCH=$(uname -m)
if [ "$ARCH" = "aarch64" ]; then
    wget -O hcloud.tar.gz https://github.com/hetznercloud/cli/releases/latest/download/hcloud-linux-arm64.tar.gz
else
    wget -O hcloud.tar.gz https://github.com/hetznercloud/cli/releases/latest/download/hcloud-linux-amd64.tar.gz
fi

# 2. 解压并配置环境变量
tar -xvf hcloud.tar.gz hcloud
mv hcloud /usr/local/bin/hcloud
chmod +x /usr/local/bin/hcloud
rm hcloud.tar.gz

# 3. 验证安装
hcloud version

绑定 API Token:

Bash

hcloud context create pt-bot

屏幕提示 Token: 时,将你的 Read & Write 权限 Token 粘贴并回车(粘贴时屏幕无显示)。


步骤二:锁定 Hetzner 舰队的“传家宝” IP

为了防止删机时 IP 被回收,我们需要把当前所有刷流机的 IP 转化为“保留 IP”,并按规范重命名。假设你的 5 台机器分别叫 hz1hz5

1. 找出当前的 IP ID:

Bash

hcloud primary-ip list

2. 依次重命名并彻底关闭“自动删除”属性(以 hz1 为例,其他机器同理):

Bash

# 重命名为脚本强制识别的格式 (机器名-V4 和 机器名-V6)
hcloud primary-ip update --name hz1-V4 <hz1的IPv4_ID>
hcloud primary-ip update --name hz1-V6 <hz1的IPv6_ID>

# 关闭自动删除 (赋予 IP 不死之身)
hcloud primary-ip update --auto-delete=false hz1-V4
hcloud primary-ip update --auto-delete=false hz1-V6

全部设置完毕后,再次执行 list 命令,确保所有机器的 IP 名字格式正确,且 AUTO DELETE 列全部为 no


步骤三:中控机 Python 环境与空壳配置

在中控机上安装 Python 环境,使用虚拟环境隔离依赖:

Bash

apt update && apt install -y python3 python3-pip python3-venv wget curl
mkdir -p /opt/hcloud-qbt-bot
cd /opt/hcloud-qbt-bot

# 创建并激活虚拟环境
python3 -m venv venv
source venv/bin/activate

# 安装依赖包
pip install --upgrade pip
pip install qbittorrent-api requests python-telegram-bot

创建空壳装机脚本:

由于使用快照直推,无需执行初始化命令,但 API 强制要求提供 user-data,所以创建一个空壳文件:

Bash

cat << 'EOF' > /opt/hcloud-qbt-bot/user-data.sh
#!/bin/bash
# 机器已通过私人快照恢复,环境已就绪,无需重新安装任何软件!
exit 0
EOF

步骤四:部署静默巡检核心 (hzc.py)

这是整个防爆系统的“刺客大脑”,负责每 5 分钟在后台静默算账。

创建 /opt/hcloud-qbt-bot/hzc.py 文件并填入代码(请按需修改顶部的字典配置):

Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import subprocess
import json
import time
import re
from datetime import datetime, timezone
from typing import Tuple, Optional, List, Dict
import qbittorrentapi
import requests

# ================= 基础配置区 =================
TELEGRAM_TOKEN = '你的TG_Token'
TELEGRAM_CHAT_ID = '你的TG_ChatID'
HCLOUD_BIN = '/usr/local/bin/hcloud'

# 🚀 舰队配置字典:在这里增删你的机器阵列
MANAGED_SERVERS = {
    'hz1': {'type': 'cpx62', 'image': '360055174'},
    'hz2': {'type': 'cpx62', 'image': '360055174'},
    'hz3': {'type': 'cpx62', 'image': '360055174'},
    'hz4': {'type': 'cpx62', 'image': '360055174'},
    'hz5': {'type': 'cpx62', 'image': '360055174'}
}

DEFAULT_LOCATION = 'nbg1'
USER_DATA_FILE = '/opt/hcloud-qbt-bot/user-data.sh'
# ==============================================

# ================= 核心流量与控制 =================
TOTAL_TRAFFIC_LIMIT_TIB = 20.0     
ALERT_TIB_THRESHOLD = 18.0         # 18 TiB 强制删机警戒线
CREATE_TIMEOUT_S = 600             # 大盘快照克隆耗时久,给足 10 分钟
DELETE_TIMEOUT_S = 120
LIST_TIMEOUT_S = 30
PROTECTED_CATEGORY = 'KEEP'
FRDS_GUARD_UPLOAD_BPS = 20 * 1024 * 1024  

QBT_USERNAME = 'admin'
QBT_PASSWORD = '你的qB密码'
QBT_WEB_PORT = 9090                
# ==============================================

TRAFFIC_TRACK_FILE = '/opt/hcloud-qbt-bot/traffic_track.json'
error_logs: List[str] = []
threshold_logs: List[str] = []

def _post_telegram(message: str):
    if not TELEGRAM_TOKEN or not TELEGRAM_CHAT_ID: return
    url = f'https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage'
    try:
        requests.post(url, data={'chat_id': TELEGRAM_CHAT_ID, 'text': message, 'parse_mode': 'Markdown'}, timeout=(5, 10)).raise_for_status()
    except Exception as e:
        error_logs.append(f'TG 发送失败: {e}')

def send_error_digest_if_any():
    if error_logs: _post_telegram('❌ *脚本错误*\n' + '\n'.join([f'- {l}' for l in error_logs]))

def send_threshold_digest_if_any():
    if threshold_logs: _post_telegram('⚠️ *舰队删机告警*\n\n' + '\n\n'.join(threshold_logs))

def _load_traffic_track() -> Dict:
    try:
        with open(TRAFFIC_TRACK_FILE, 'r') as f: return json.load(f)
    except Exception: return {}

def _save_traffic_track(data: Dict):
    try:
        with open(TRAFFIC_TRACK_FILE, 'w') as f: json.dump(data, f, ensure_ascii=False, indent=2)
    except Exception: pass

def check_and_notify_milestone(server_name: str, current_tib: float):
    if current_tib <= 0: return
    pct = (current_tib / TOTAL_TRAFFIC_LIMIT_TIB) * 100
    milestones = [10, 20, 30, 40, 50, 60, 70, 80, 90, 95]
    reached = max([m for m in milestones if pct >= m] + [0])
            
    if reached > 0:
        data = _load_traffic_track()
        if reached > data.get(server_name, 0):
            _post_telegram(f"📊 *流量播报* (`{server_name}`)\n- 当前消耗: `{current_tib:.2f} TiB`\n- 进度: **{reached}%** (限额 {TOTAL_TRAFFIC_LIMIT_TIB} TiB)")
            data[server_name] = reached
            _save_traffic_track(data)

def build_create_args(server_name: str, conf: dict) -> List[str]:
    return ['server', 'create', '--name', server_name, '--type', conf['type'], '--location', DEFAULT_LOCATION, '--image', str(conf['image']), '--user-data-from-file', USER_DATA_FILE, '--primary-ipv4', f'{server_name}-V4', '--primary-ipv6', f'{server_name}-V6']

def get_server_list() -> List[Dict]:
    try:
        output = subprocess.check_output([HCLOUD_BIN, 'server', 'list', '--output', 'json'], stderr=subprocess.STDOUT, timeout=LIST_TIMEOUT_S).decode()
        servers = json.loads(output)
        for s in servers:
            if s.get('name') not in MANAGED_SERVERS: continue
            desc = subprocess.check_output([HCLOUD_BIN, 'server', 'describe', str(s.get('id'))], stderr=subprocess.STDOUT, timeout=LIST_TIMEOUT_S).decode()
            m = re.search('Outgoing:\\s+([\\d.]+)\\s+([A-Za-z]+)', desc)
            s['outgoing_traffic'] = float(m.group(1)) if m and m.group(2).upper().startswith('TI') else 0.0
        return servers
    except Exception as e:
        error_logs.append(f'获取服务器列表失败: {e}')
        return []

def get_qbittorrent_upload_tib(server_ip: str) -> Optional[float]:
    client = qbittorrentapi.Client(host=f'http://{server_ip}:{QBT_WEB_PORT}', username=QBT_USERNAME, password=QBT_PASSWORD, REQUESTS_ARGS={'timeout': (5, 10)})
    try:
        client.auth_log_in()
        return client.transfer_info().up_info_data / 1024 ** 4
    except Exception:
        return None

def should_skip_delete_due_to_frds(server_ip: Optional[str]) -> Tuple[bool, str]:
    if not server_ip: return (False, '无法获取 IP')
    try:
        client = qbittorrentapi.Client(host=f'http://{server_ip}:{QBT_WEB_PORT}', username=QBT_USERNAME, password=QBT_PASSWORD, REQUESTS_ARGS={'timeout': (5, 10)})
        client.auth_log_in()
        for t in client.torrents_info(category=PROTECTED_CATEGORY):
            if (getattr(t, 'progress', 0.0) or 0.0) < 1.0 and (getattr(t, 'upspeed', 0) or 0) >= FRDS_GUARD_UPLOAD_BPS:
                return (True, f'命中保种策略')
        return (False, '')
    except Exception:
        # 🚨 终极防卡死保底:API 无响应时无视护盾强制销毁保钱包
        return (False, '检查保种规则异常,忽略保种')

def main():
    servers = get_server_list()
    for s in servers:
        name = s.get('name')
        if name not in MANAGED_SERVERS: continue

        ip_v4 = (s.get('public_net') or {}).get('ipv4', {}).get('ip')
        out_api = float(s.get('outgoing_traffic', 0.0))
        out_qb = get_qbittorrent_upload_tib(ip_v4) if ip_v4 else None
        
        current_max_traffic = max(out_api, out_qb or 0.0)
        check_and_notify_milestone(name, current_max_traffic)

        if (out_qb and out_qb > ALERT_TIB_THRESHOLD) or out_api > ALERT_TIB_THRESHOLD:
            skip, why = should_skip_delete_due_to_frds(ip_v4)
            if skip: continue

            data = _load_traffic_track()
            if name in data:
                del data[name]
                _save_traffic_track(data)
            
            try:
                subprocess.run([HCLOUD_BIN, 'server', 'delete', name], check=True, timeout=DELETE_TIMEOUT_S)
                time.sleep(20)
                subprocess.run([HCLOUD_BIN, *build_create_args(name, MANAGED_SERVERS[name])], check=True, timeout=CREATE_TIMEOUT_S)
                threshold_logs.append(f"🤖 节点 `{name}`: 已触发超额重建")
            except Exception as e:
                error_logs.append(f'节点 {name} 重建异常: {e}')

    send_error_digest_if_any()
    send_threshold_digest_if_any()

if __name__ == '__main__':
    main()

配置定时任务:

Bash

crontab -e
# 添加以下内容,让它每 5 分钟醒来查一次舰队水表
*/5 * * * * /opt/hcloud-qbt-bot/venv/bin/python /opt/hcloud-qbt-bot/hzc.py >> /var/log/hcloud_qbt.log 2>&1

步骤五:部署 TG 舰队中控机器人 (tg_bot.py)

为了能随时随地查流量和精准执行快照重建,我们需要部署一个 24 小时常驻的接线员。

创建 /opt/hcloud-qbt-bot/tg_bot.py同样修改顶部的配置字典及认证信息:

Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import asyncio
import subprocess
import json
import re
import qbittorrentapi
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import ApplicationBuilder, CommandHandler, CallbackQueryHandler, ContextTypes

# ================= 基础配置区 =================
TELEGRAM_TOKEN = '你的TG_Token'
TELEGRAM_CHAT_ID = 你的纯数字ChatID 
HCLOUD_BIN = '/usr/local/bin/hcloud'

# 🚀 舰队配置字典 (必须与 hzc.py 保持完全一致)
MANAGED_SERVERS = {
    'hz1': {'type': 'cpx62', 'image': '360055174'},
    'hz2': {'type': 'cpx62', 'image': '360055174'},
    'hz3': {'type': 'cpx62', 'image': '360055174'},
    'hz4': {'type': 'cpx62', 'image': '360055174'},
    'hz5': {'type': 'cpx62', 'image': '360055174'}
}

LOCATION = 'nbg1'
USER_DATA_FILE = '/opt/hcloud-qbt-bot/user-data.sh'
QBT_USERNAME = 'admin'
QBT_PASSWORD = '你的qB密码'
QBT_WEB_PORT = 9090                
TOTAL_LIMIT = 20.0                 
# ==============================================

def restricted(func):
    async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs):
        user_id = update.effective_user.id
        if str(user_id) != str(TELEGRAM_CHAT_ID):
            await update.message.reply_text("⛔ 警告:未授权的用户!")
            return
        return await func(update, context, *args, **kwargs)
    return wrapper

def get_all_servers_status():
    status_dict = {}
    try:
        output = subprocess.check_output([HCLOUD_BIN, 'server', 'list', '--output', 'json'], stderr=subprocess.STDOUT, timeout=10).decode()
        servers = json.loads(output)
        for s in servers:
            name = s.get('name')
            if name in MANAGED_SERVERS:
                ip_v4 = (s.get('public_net') or {}).get('ipv4', {}).get('ip')
                desc = subprocess.check_output([HCLOUD_BIN, 'server', 'describe', str(s.get('id'))], stderr=subprocess.STDOUT, timeout=10).decode()
                m = re.search('Outgoing:\\s+([\\d.]+)\\s+([A-Za-z]+)', desc)
                out_api = float(m.group(1)) if m and m.group(2).upper().startswith('TI') else 0.0
                status_dict[name] = {'ip': ip_v4, 'api': out_api}
    except Exception:
        pass
    return status_dict

@restricted
async def status_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
    msg = await update.message.reply_text("⏳ 正在请求舰队实时状态...")
    statuses = get_all_servers_status()
    
    if not statuses:
        await msg.edit_text("❌ 舰队全军覆没,未获取到任何机器状态。")
        return

    reply_text = "🚢 *舰队实时战报*\n" + "="*20 + "\n"
    for name, s_data in statuses.items():
        ip_v4 = s_data['ip']
        out_api = s_data['api']
        out_qb = 0.0
        if ip_v4:
            try:
                client = qbittorrentapi.Client(host=f'http://{ip_v4}:{QBT_WEB_PORT}', username=QBT_USERNAME, password=QBT_PASSWORD, REQUESTS_ARGS={'timeout': (5, 10)})
                client.auth_log_in()
                out_qb = client.transfer_info().up_info_data / 1024 ** 4
            except Exception:
                pass 
        current_max = max(out_api, out_qb)
        pct = (current_max / TOTAL_LIMIT) * 100
        reply_text += f"🔹 **节点 `{name}`**\n- qB: `{out_qb:.2f}T` | API: `{out_api:.2f}T`\n- 进度: **{pct:.1f}%** | IP: `{ip_v4}`\n\n"

    await msg.edit_text(reply_text, parse_mode='Markdown')

@restricted
async def rebuild_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
    keyboard = []
    for name in MANAGED_SERVERS.keys():
        keyboard.append([InlineKeyboardButton(f"🚨 销毁并重建 {name}", callback_data=f'rebuild_{name}')])
    keyboard.append([InlineKeyboardButton("取消", callback_data='cancel')])
    
    reply_markup = InlineKeyboardMarkup(keyboard)
    await update.message.reply_text("⚠️ *舰队核爆控制台*\n请选择要执行快照重建的节点:", reply_markup=reply_markup, parse_mode='Markdown')

async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
    query = update.callback_query
    await query.answer()

    if query.data == 'cancel':
        await query.edit_message_text(text="✅ 已取消操作。")
        return

    if query.data.startswith('rebuild_'):
        target_name = query.data.split('_')[1]
        await query.edit_message_text(text=f"🔄 *指令下发!*\n正在对节点 `{target_name}` 执行物理摧毁...", parse_mode='Markdown')
        asyncio.create_task(perform_rebuild_task(query.message.chat_id, context, target_name))

async def perform_rebuild_task(chat_id, context: ContextTypes.DEFAULT_TYPE, server_name: str):
    try:
        subprocess.run([HCLOUD_BIN, 'server', 'delete', server_name], check=False)
        await asyncio.sleep(20) 
        
        await context.bot.send_message(chat_id=chat_id, text=f"⏳ 节点 `{server_name}` 旧机已毁,正在克隆大盘快照 (预计 6-8 分钟)...")
        
        conf = MANAGED_SERVERS[server_name]
        create_cmd = [
            HCLOUD_BIN, 'server', 'create', '--name', server_name, '--type', conf['type'], 
            '--location', LOCATION, '--image', conf['image'], '--user-data-from-file', USER_DATA_FILE, 
            '--primary-ipv4', f'{server_name}-V4', '--primary-ipv6', f'{server_name}-V6'
        ]
        
        process = await asyncio.create_subprocess_exec(*create_cmd)
        await process.communicate()
        
        if process.returncode == 0:
            await context.bot.send_message(chat_id=chat_id, text=f"🎉 *节点 `{server_name}` 重建竣工!*\n探针和 qB 已上线!", parse_mode='Markdown')
        else:
            await context.bot.send_message(chat_id=chat_id, text=f"⛔ 节点 `{server_name}` 重建异常。")
            
    except Exception as e:
        await context.bot.send_message(chat_id=chat_id, text=f"❌ 错误: {e}")

if __name__ == '__main__':
    app = ApplicationBuilder().token(TELEGRAM_TOKEN).build()
    app.add_handler(CommandHandler("status", status_command))
    app.add_handler(CommandHandler("rebuild", rebuild_command))
    app.add_handler(CallbackQueryHandler(button_handler))
    app.run_polling()

注入灵魂:配置 Systemd 守护进程 (开机自启)

Bash

cat << 'EOF' > /etc/systemd/system/hcloud-tg-bot.service
[Unit]
Description=Hetzner TG Bot Daemon
After=network.target

[Service]
Type=simple
User=root
WorkingDirectory=/opt/hcloud-qbt-bot
ExecStart=/opt/hcloud-qbt-bot/venv/bin/python /opt/hcloud-qbt-bot/tg_bot.py
Restart=always
RestartSec=5

[Install]
WantedBy=multi-user.target
EOF

# 启动并设置开机自启
systemctl daemon-reload
systemctl enable hcloud-tg-bot.service
systemctl start hcloud-tg-bot.service

🎉 享受终极形态的赛博永动机

现在,打开你的 Telegram 机器人:

  • 发送 /status:瞬间获取整个舰队的全景战报。

  • 发送 /rebuild:弹出专属操作台,定点指挥任意节点涅槃重生。

至此,一套包含了底层大盘快照克隆、传家宝固定 IP 群、定时静默巡检、防卡死强制绝杀、以及全天候全景中控的 Hetzner 终极防爆矩阵,完美落成!

尽情享受全自动挂机刷流的乐趣吧!


消息盒子

# 暂无消息 #

只显示最新10条未读和已读信息