Telegram 双向联系机器人完整部署教程
本教程记录于 2026年1月12日,基于实际部署经验编写。支持文本、图片、文件传输,智能自动回复,SQLite 数据库存储,完整消息统计等功能。
📋 目录
功能特性
前置准备
部署步骤
1. 创建 Telegram Bot
2. 服务器环境配置
3. 部署机器人代码
4. 配置系统服务
5. 测试验证
使用指南
维护管理
常见问题
功能特性
核心功能
✅ 双向通信 - 用户与管理员实时对话
✅ 多媒体支持 - 文本、图片、文件全支持
✅ 智能回复 - 关键词触发自动回复
✅ 数据持久化 - SQLite 数据库存储
✅ 消息统计 - 完整的使用数据分析
✅ 聊天记录 - 完整的对话历史记录
✅ 多管理员 - 支持团队协作
✅ 后台运行 - systemd 服务管理
技术栈
Python 3.13+
python-telegram-bot 22.5
SQLite 3.46+
Systemd
前置准备
所需资源
OVH 服务器(或任何 Debian/Ubuntu 服务器)
Root 访问权限
Telegram 账号
预计时间
首次部署:20-25 分钟
后续维护:5 分钟
部署步骤
1. 创建 Telegram Bot
1.1 与 BotFather 对话
在 Telegram 搜索
@BotFather发送
/newbot按提示设置机器人名称和用户名
示例对话:
你: /newbot
BotFather: Alright, a new bot. How are we going to call it?
你: 我的客服机器人
BotFather: Good. Now let's choose a username for your bot. It must end in `bot`.
你: my_service_bot
BotFather: Done! Use this token to access the HTTP API:
1234567890:ABCdefGHIjklMNOpqrsTUVwxyz1.2 保存 Bot Token
重要: 复制并保存你的 Bot Token,格式如:
1234567890:ABCdefGHIjklMNOpqrsTUVwxyz1.3 获取你的 User ID
在 Telegram 搜索
@userinfobot发送
/start保存返回的 User ID(纯数字)
2. 服务器环境配置
2.1 连接服务器
ssh root@your-server-ip2.2 更新系统并安装依赖
# 更新软件包列表
apt update
# 升级系统
apt upgrade -y
# 安装必要软件
apt install python3 python3-venv python3-pip sqlite3 -y2.3 验证安装
# 检查 Python 版本
python3 --version
# 输出:Python 3.13.5 (或其他版本)
# 检查 pip
pip3 --version
# 检查 SQLite
sqlite3 --version3. 部署机器人代码
3.1 创建项目目录
# 创建目录
mkdir -p /opt/telegram-bot
cd /opt/telegram-bot
# 验证路径
pwd
# 输出:/opt/telegram-bot3.2 创建虚拟环境
# 创建虚拟环境
python3 -m venv venv
# 激活虚拟环境
source venv/bin/activate
# 提示符会变成:(venv) root@server:/opt/telegram-bot#3.3 安装 Python 依赖
# 安装依赖包
pip install python-telegram-bot pillow
# 验证安装
pip list | grep telegram
# 输出:python-telegram-bot 22.53.4 创建配置文件
nano config.json粘贴以下内容(记得替换为你的实际值):
{
"bot_token": "你的Bot Token",
"admin_ids": [你的User ID],
"auto_reply": {
"enabled": true,
"rules": [
{
"keywords": ["价格", "多少钱", "费用"],
"reply": "感谢咨询!我们的价格根据具体需求而定,请稍候,管理员会尽快回复您详细信息。"
},
{
"keywords": ["营业时间", "工作时间"],
"reply": "我们的营业时间是:周一至周五 9:00-18:00"
},
{
"keywords": ["地址", "位置"],
"reply": "我们的地址是:XXX市XXX区XXX路XXX号"
},
{
"keywords": ["你好", "hi", "hello"],
"reply": "你好!👋 有什么可以帮助你的吗?"
}
]
},
"statistics": {
"enabled": true
}
}配置说明:
bot_token: 替换为你从 BotFather 获得的 Tokenadmin_ids: 替换为你的 User ID(数字,不要引号)auto_reply.enabled: 是否启用自动回复auto_reply.rules: 自动回复规则列表statistics.enabled: 是否启用统计功能
保存文件:Ctrl + X → Y → Enter
3.5 创建机器人主程序
nano bot.py粘贴以下完整代码:
import logging
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler
import json
import os
from datetime import datetime
import sqlite3
from typing import Optional, List, Dict
# ==================== 配置日志 ====================
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# ==================== 文件路径 ====================
CONFIG_FILE = 'config.json'
DATABASE_FILE = 'bot_database.db'
# ==================== 配置加载 ====================
def load_config():
"""加载配置文件"""
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return {
'admin_ids': [],
'bot_token': '',
'auto_reply': {'enabled': False, 'rules': []},
'statistics': {'enabled': True}
}
# ==================== 数据库管理类 ====================
class DatabaseManager:
"""数据库管理类"""
def __init__(self, db_file=DATABASE_FILE):
self.db_file = db_file
self.init_database()
def get_connection(self):
return sqlite3.connect(self.db_file)
def init_database(self):
"""初始化数据库表结构"""
conn = self.get_connection()
cursor = conn.cursor()
# 用户表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
first_name TEXT,
last_name TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_active TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 消息表
cursor.execute('''
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
message_type TEXT,
message_text TEXT,
file_id TEXT,
file_name TEXT,
is_from_user BOOLEAN,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (user_id)
)
''')
# 会话表
cursor.execute('''
CREATE TABLE IF NOT EXISTS sessions (
admin_id INTEGER,
active_user_id INTEGER,
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (admin_id)
)
''')
# 统计表
cursor.execute('''
CREATE TABLE IF NOT EXISTS statistics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
stat_date DATE,
total_messages INTEGER DEFAULT 0,
total_users INTEGER DEFAULT 0,
active_users INTEGER DEFAULT 0,
photos_sent INTEGER DEFAULT 0,
files_sent INTEGER DEFAULT 0,
auto_replies INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
logger.info("数据库初始化完成")
def save_user(self, user_id: int, username: str, first_name: str, last_name: str = ''):
"""保存或更新用户信息"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO users (user_id, username, first_name, last_name, last_active)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(user_id) DO UPDATE SET
username = excluded.username,
first_name = excluded.first_name,
last_name = excluded.last_name,
last_active = CURRENT_TIMESTAMP
''', (user_id, username, first_name, last_name))
conn.commit()
conn.close()
def save_message(self, user_id: int, message_type: str, message_text: str = '',
file_id: str = '', file_name: str = '', is_from_user: bool = True):
"""保存消息记录"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO messages (user_id, message_type, message_text, file_id, file_name, is_from_user)
VALUES (?, ?, ?, ?, ?, ?)
''', (user_id, message_type, message_text, file_id, file_name, is_from_user))
conn.commit()
conn.close()
def set_active_chat(self, admin_id: int, user_id: int):
"""设置管理员活跃对话"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO sessions (admin_id, active_user_id, updated_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(admin_id) DO UPDATE SET
active_user_id = excluded.active_user_id,
updated_at = CURRENT_TIMESTAMP
''', (admin_id, user_id))
conn.commit()
conn.close()
def get_active_chat(self, admin_id: int) -> Optional[int]:
"""获取管理员活跃对话"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('SELECT active_user_id FROM sessions WHERE admin_id = ?', (admin_id,))
result = cursor.fetchone()
conn.close()
return result[0] if result else None
def clear_active_chat(self, admin_id: int):
"""清除管理员活跃对话"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('DELETE FROM sessions WHERE admin_id = ?', (admin_id,))
conn.commit()
conn.close()
def get_user_list(self, limit: int = 10) -> List[Dict]:
"""获取最近活跃用户列表"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT user_id, username, first_name, last_name, last_active
FROM users
ORDER BY last_active DESC
LIMIT ?
''', (limit,))
users = []
for row in cursor.fetchall():
users.append({
'user_id': row[0],
'username': row[1],
'first_name': row[2],
'last_name': row[3],
'last_active': row[4]
})
conn.close()
return users
def get_user_info(self, user_id: int) -> Optional[Dict]:
"""获取用户详细信息"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT user_id, username, first_name, last_name, created_at, last_active
FROM users
WHERE user_id = ?
''', (user_id,))
row = cursor.fetchone()
conn.close()
if row:
return {
'user_id': row[0],
'username': row[1],
'first_name': row[2],
'last_name': row[3],
'created_at': row[4],
'last_active': row[5]
}
return None
def update_statistics(self, stat_type: str):
"""更新统计数据"""
conn = self.get_connection()
cursor = conn.cursor()
today = datetime.now().date()
cursor.execute('SELECT id FROM statistics WHERE stat_date = ?', (today,))
if not cursor.fetchone():
cursor.execute('INSERT INTO statistics (stat_date) VALUES (?)', (today,))
if stat_type == 'message':
cursor.execute('UPDATE statistics SET total_messages = total_messages + 1 WHERE stat_date = ?', (today,))
elif stat_type == 'photo':
cursor.execute('UPDATE statistics SET photos_sent = photos_sent + 1 WHERE stat_date = ?', (today,))
elif stat_type == 'file':
cursor.execute('UPDATE statistics SET files_sent = files_sent + 1 WHERE stat_date = ?', (today,))
elif stat_type == 'auto_reply':
cursor.execute('UPDATE statistics SET auto_replies = auto_replies + 1 WHERE stat_date = ?', (today,))
conn.commit()
conn.close()
def get_statistics(self, days: int = 7) -> List[Dict]:
"""获取统计数据"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT stat_date, total_messages, total_users, active_users,
photos_sent, files_sent, auto_replies
FROM statistics
ORDER BY stat_date DESC
LIMIT ?
''', (days,))
stats = []
for row in cursor.fetchall():
stats.append({
'date': row[0],
'total_messages': row[1],
'total_users': row[2],
'active_users': row[3],
'photos_sent': row[4],
'files_sent': row[5],
'auto_replies': row[6]
})
conn.close()
return stats
def get_message_history(self, user_id: int, limit: int = 20) -> List[Dict]:
"""获取用户消息历史"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT message_type, message_text, file_name, is_from_user, created_at
FROM messages
WHERE user_id = ?
ORDER BY created_at DESC
LIMIT ?
''', (user_id, limit))
messages = []
for row in cursor.fetchall():
messages.append({
'type': row[0],
'text': row[1],
'file_name': row[2],
'from_user': row[3],
'time': row[4]
})
conn.close()
return messages
# ==================== 自动回复管理类 ====================
class AutoReplyManager:
"""自动回复管理类"""
def __init__(self, config):
self.enabled = config.get('auto_reply', {}).get('enabled', False)
self.rules = config.get('auto_reply', {}).get('rules', [])
def check_and_reply(self, message_text: str) -> Optional[str]:
"""检查消息并返回自动回复"""
if not self.enabled or not message_text:
return None
message_lower = message_text.lower()
for rule in self.rules:
keywords = rule.get('keywords', [])
for keyword in keywords:
if keyword.lower() in message_lower:
return rule.get('reply', '')
return None
# ==================== 初始化 ====================
config = load_config()
ADMIN_IDS = config.get('admin_ids', [])
db_manager = DatabaseManager()
auto_reply_manager = AutoReplyManager(config)
statistics_enabled = config.get('statistics', {}).get('enabled', True)
# ==================== 命令处理函数 ====================
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理 /start 命令"""
user = update.effective_user
user_id = user.id
db_manager.save_user(user_id, user.username or '', user.first_name, user.last_name or '')
if user_id in ADMIN_IDS:
await update.message.reply_html(
f"👨💼 <b>管理员模式</b>\n\n"
f"📋 <b>基础命令:</b>\n"
f"• /list - 查看最近联系的用户\n"
f"• /chat <用户ID> - 开始与用户对话\n"
f"• /end - 结束当前对话\n"
f"• /status - 查看当前对话状态\n"
f"• /send <用户ID> <消息> - 发送单条消息\n\n"
f"📊 <b>高级命令:</b>\n"
f"• /stats - 查看消息统计\n"
f"• /history <用户ID> - 查看聊天记录\n"
f"• /setadmin <用户ID> - 添加管理员\n\n"
f"💡 支持文本、图片、文件\n"
f"🤖 自动回复:{'✅ 已启用' if auto_reply_manager.enabled else '❌ 未启用'}"
)
else:
await update.message.reply_html(
f"👋 你好 {user.mention_html()}!\n\n"
f"我是智能客服机器人。\n\n"
f"💬 支持发送:文本、图片、文件\n"
f"有任何问题请直接发送,管理员会尽快回复。\n\n"
f"📝 你的ID: <code>{user.id}</code>"
)
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理文本消息"""
user = update.effective_user
user_id = user.id
message = update.message
db_manager.save_user(user_id, user.username or '', user.first_name, user.last_name or '')
if user_id in ADMIN_IDS:
active_user_id = db_manager.get_active_chat(user_id)
if active_user_id:
try:
await context.bot.send_message(chat_id=active_user_id, text=message.text)
db_manager.save_message(active_user_id, 'text', message.text, is_from_user=False)
if statistics_enabled:
db_manager.update_statistics('message')
await message.reply_text(
f"✅ 消息已发送给用户 {active_user_id}",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("❌ 结束对话", callback_data="end_chat")
]])
)
except Exception as e:
await message.reply_text(f"❌ 发送失败: {str(e)}")
else:
await message.reply_text(
"💡 <b>当前没有活跃对话</b>\n\n"
"使用 /list 查看用户列表\n"
"使用 /chat <用户ID> 开始对话",
parse_mode='HTML'
)
else:
db_manager.save_message(user_id, 'text', message.text, is_from_user=True)
if statistics_enabled:
db_manager.update_statistics('message')
auto_reply = auto_reply_manager.check_and_reply(message.text)
if auto_reply:
await message.reply_text(f"🤖 {auto_reply}")
if statistics_enabled:
db_manager.update_statistics('auto_reply')
if ADMIN_IDS:
keyboard = [
[InlineKeyboardButton("💬 回复此用户", callback_data=f"chat_{user_id}")],
[InlineKeyboardButton("📋 查看历史", callback_data=f"history_{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
for admin_id in ADMIN_IDS:
try:
await context.bot.send_message(
chat_id=admin_id,
text=f"📨 <b>新消息</b>\n\n"
f"👤 用户: {user.full_name}\n"
f"🆔 ID: <code>{user_id}</code>\n"
f"📝 Username: @{user.username or 'N/A'}\n\n"
f"💬 消息内容:\n{message.text}",
parse_mode='HTML',
reply_markup=reply_markup
)
except Exception as e:
logger.error(f"转发失败: {e}")
if not auto_reply:
await message.reply_text("✅ 你的消息已发送给管理员,请稍候回复...")
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理图片消息"""
user = update.effective_user
user_id = user.id
message = update.message
db_manager.save_user(user_id, user.username or '', user.first_name, user.last_name or '')
photo = message.photo[-1]
caption = message.caption or ''
if user_id in ADMIN_IDS:
active_user_id = db_manager.get_active_chat(user_id)
if active_user_id:
try:
await context.bot.send_photo(chat_id=active_user_id, photo=photo.file_id, caption=caption)
db_manager.save_message(active_user_id, 'photo', caption, photo.file_id, is_from_user=False)
if statistics_enabled:
db_manager.update_statistics('photo')
await message.reply_text(f"✅ 图片已发送给用户 {active_user_id}")
except Exception as e:
await message.reply_text(f"❌ 发送失败: {str(e)}")
else:
await message.reply_text("💡 请先使用 /chat 开始对话")
else:
db_manager.save_message(user_id, 'photo', caption, photo.file_id, is_from_user=True)
if statistics_enabled:
db_manager.update_statistics('photo')
if ADMIN_IDS:
keyboard = [[InlineKeyboardButton("💬 回复此用户", callback_data=f"chat_{user_id}")]]
for admin_id in ADMIN_IDS:
try:
await context.bot.send_photo(
chat_id=admin_id,
photo=photo.file_id,
caption=f"📨 <b>新图片</b>\n\n"
f"👤 用户: {user.full_name}\n"
f"🆔 ID: <code>{user_id}</code>\n\n"
f"{f'💬 说明: {caption}' if caption else ''}",
parse_mode='HTML',
reply_markup=InlineKeyboardMarkup(keyboard)
)
except Exception as e:
logger.error(f"转发图片失败: {e}")
await message.reply_text("✅ 你的图片已发送给管理员")
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理文件消息"""
user = update.effective_user
user_id = user.id
message = update.message
document = message.document
db_manager.save_user(user_id, user.username or '', user.first_name, user.last_name or '')
file_name = document.file_name
caption = message.caption or ''
if user_id in ADMIN_IDS:
active_user_id = db_manager.get_active_chat(user_id)
if active_user_id:
try:
await context.bot.send_document(
chat_id=active_user_id,
document=document.file_id,
caption=caption,
filename=file_name
)
db_manager.save_message(active_user_id, 'document', caption, document.file_id, file_name, is_from_user=False)
if statistics_enabled:
db_manager.update_statistics('file')
await message.reply_text(f"✅ 文件已发送给用户 {active_user_id}")
except Exception as e:
await message.reply_text(f"❌ 发送失败: {str(e)}")
else:
await message.reply_text("💡 请先使用 /chat 开始对话")
else:
db_manager.save_message(user_id, 'document', caption, document.file_id, file_name, is_from_user=True)
if statistics_enabled:
db_manager.update_statistics('file')
if ADMIN_IDS:
keyboard = [[InlineKeyboardButton("💬 回复此用户", callback_data=f"chat_{user_id}")]]
for admin_id in ADMIN_IDS:
try:
await context.bot.send_document(
chat_id=admin_id,
document=document.file_id,
caption=f"📨 <b>新文件</b>\n\n"
f"👤 用户: {user.full_name}\n"
f"🆔 ID: <code>{user_id}</code>\n"
f"📎 文件名: {file_name}\n\n"
f"{f'💬 说明: {caption}' if caption else ''}",
parse_mode='HTML',
reply_markup=InlineKeyboardMarkup(keyboard),
filename=file_name
)
except Exception as e:
logger.error(f"转发文件失败: {e}")
await message.reply_text("✅ 你的文件已发送给管理员")
async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理按钮回调"""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
data = query.data
if user_id not in ADMIN_IDS:
await query.edit_message_text("❌ 仅管理员可用")
return
if data.startswith("chat_"):
target_user_id = int(data.split("_")[1])
db_manager.set_active_chat(user_id, target_user_id)
user_info = db_manager.get_user_info(target_user_id)
user_name = user_info['first_name'] if user_info else f'用户 {target_user_id}'
await query.edit_message_text(
f"✅ <b>已进入对话模式</b>\n\n"
f"👤 对话用户: {user_name}\n"
f"🆔 用户ID: <code>{target_user_id}</code>\n\n"
f"💡 现在直接发送消息即可回复该用户\n"
f"支持发送文本、图片、文件\n\n"
f"使用 /end 结束对话",
parse_mode='HTML'
)
elif data.startswith("history_"):
target_user_id = int(data.split("_")[1])
messages = db_manager.get_message_history(target_user_id, 10)
if not messages:
await query.edit_message_text("📋 暂无聊天记录")
return
history_text = f"📋 <b>聊天记录(最近10条)</b>\n\n"
history_text += f"🆔 用户ID: <code>{target_user_id}</code>\n\n"
for msg in reversed(messages):
sender = "👤 用户" if msg['from_user'] else "👨💼 管理员"
time = msg['time'][:16]
if msg['type'] == 'text':
history_text += f"{sender} [{time}]:\n{msg['text']}\n\n"
elif msg['type'] == 'photo':
history_text += f"{sender} [{time}]:\n📷 [图片] {msg['text']}\n\n"
elif msg['type'] == 'document':
history_text += f"{sender} [{time}]:\n📎 [文件] {msg['file_name']}\n\n"
keyboard = [[InlineKeyboardButton("💬 开始对话", callback_data=f"chat_{target_user_id}")]]
await query.edit_message_text(history_text, parse_mode='HTML', reply_markup=InlineKeyboardMarkup(keyboard))
elif data == "end_chat":
db_manager.clear_active_chat(user_id)
await query.edit_message_text("✅ 已结束当前对话")
async def chat_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理 /chat 命令"""
user_id = update.effective_user.id
if user_id not in ADMIN_IDS:
await update.message.reply_text("❌ 此命令仅限管理员使用")
return
if not context.args:
await update.message.reply_text("用法: /chat <用户ID>", parse_mode='HTML')
return
try:
target_user_id = int(context.args[0])
except ValueError:
await update.message.reply_text("❌ 用户ID必须是数字")
return
db_manager.set_active_chat(user_id, target_user_id)
user_info = db_manager.get_user_info(target_user_id)
user_name = user_info['first_name'] if user_info else f'用户 {target_user_id}'
await update.message.reply_text(
f"✅ <b>已进入对话模式</b>\n\n"
f"👤 对话用户: {user_name}\n"
f"🆔 用户ID: <code>{target_user_id}</code>\n\n"
f"💡 现在直接发送消息即可回复该用户",
parse_mode='HTML',
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("❌ 结束对话", callback_data="end_chat"),
InlineKeyboardButton("📋 查看历史", callback_data=f"history_{target_user_id}")
]])
)
async def end_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理 /end 命令"""
user_id = update.effective_user.id
if user_id not in ADMIN_IDS:
await update.message.reply_text("❌ 此命令仅限管理员使用")
return
active_user_id = db_manager.get_active_chat(user_id)
if active_user_id:
db_manager.clear_active_chat(user_id)
await update.message.reply_text(f"✅ 已结束与用户 {active_user_id} 的对话")
else:
await update.message.reply_text("💡 当前没有活跃对话")
async def list_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理 /list 命令"""
user_id = update.effective_user.id
if user_id not in ADMIN_IDS:
await update.message.reply_text("❌ 此命令仅限管理员使用")
return
users = db_manager.get_user_list(10)
if not users:
await update.message.reply_text("📋 暂无用户记录")
return
keyboard = []
message_text = "📋 <b>最近联系的用户</b>\n\n"
for idx, user in enumerate(users, 1):
uid = user['user_id']
name = user['first_name']
username = user['username']
last_active = user['last_active'][:16]
message_text += f"{idx}. <b>{name}</b>\n"
message_text += f" 📝 @{username if username else 'N/A'}\n"
message_text += f" 🆔 <code>{uid}</code>\n"
message_text += f" 🕒 {last_active}\n\n"
keyboard.append([
InlineKeyboardButton(f"💬 {name}", callback_data=f"chat_{uid}"),
InlineKeyboardButton("📋 历史", callback_data=f"history_{uid}")
])
await update.message.reply_text(message_text, parse_mode='HTML', reply_markup=InlineKeyboardMarkup(keyboard))
async def status_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理 /status 命令"""
user_id = update.effective_user.id
if user_id not in ADMIN_IDS:
await update.message.reply_text("❌ 此命令仅限管理员使用")
return
active_user_id = db_manager.get_active_chat(user_id)
if active_user_id:
user_info = db_manager.get_user_info(active_user_id)
user_name = user_info['first_name'] if user_info else f'用户 {active_user_id}'
await update.message.reply_text(
f"📊 <b>当前状态</b>\n\n"
f"💬 <b>活跃对话:</b> {user_name}\n"
f"🆔 <b>用户ID:</b> <code>{active_user_id}</code>",
parse_mode='HTML',
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("❌ 结束对话", callback_data="end_chat"),
InlineKeyboardButton("📋 查看历史", callback_data=f"history_{active_user_id}")
]])
)
else:
await update.message.reply_text("📊 <b>当前状态</b>\n\n💬 无活跃对话", parse_mode='HTML')
async def stats_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理 /stats 命令"""
user_id = update.effective_user.id
if user_id not in ADMIN_IDS:
await update.message.reply_text("❌ 此命令仅限管理员使用")
return
if not statistics_enabled:
await update.message.reply_text("❌ 统计功能未启用")
return
stats = db_manager.get_statistics(7)
if not stats:
await update.message.reply_text("📊 暂无统计数据")
return
message_text = "📊 <b>消息统计(最近7天)</b>\n\n"
total_msg = sum(s['total_messages'] for s in stats)
total_photos = sum(s['photos_sent'] for s in stats)
total_files = sum(s['files_sent'] for s in stats)
total_auto_replies = sum(s['auto_replies'] for s in stats)
message_text += f"📈 <b>总计:</b>\n"
message_text += f"• 消息总数: {total_msg}\n"
message_text += f"• 图片: {total_photos}\n"
message_text += f"• 文件: {total_files}\n"
message_text += f"• 自动回复: {total_auto_replies}\n\n"
message_text += f"📅 <b>每日详情:</b>\n"
for stat in stats:
message_text += f"\n{stat['date']}:\n"
message_text += f" 💬 消息: {stat['total_messages']}\n"
message_text += f" 📷 图片: {stat['photos_sent']}\n"
message_text += f" 📎 文件: {stat['files_sent']}\n"
if stat['auto_replies'] > 0:
message_text += f" 🤖 自动回复: {stat['auto_replies']}\n"
await update.message.reply_text(message_text, parse_mode='HTML')
async def history_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理 /history 命令"""
user_id = update.effective_user.id
if user_id not in ADMIN_IDS:
await update.message.reply_text("❌ 此命令仅限管理员使用")
return
if not context.args:
await update.message.reply_text("用法: /history <用户ID>", parse_mode='HTML')
return
try:
target_user_id = int(context.args[0])
except ValueError:
await update.message.reply_text("❌ 用户ID必须是数字")
return
messages = db_manager.get_message_history(target_user_id, 20)
if not messages:
await update.message.reply_text("📋 暂无聊天记录")
return
user_info = db_manager.get_user_info(target_user_id)
user_name = user_info['first_name'] if user_info else f'用户 {target_user_id}'
history_text = f"📋 <b>聊天记录</b>\n\n"
history_text += f"👤 用户: {user_name}\n"
history_text += f"🆔 ID: <code>{target_user_id}</code>\n\n"
history_text += f"━━━━━━━━━━━━━━━━\n\n"
for msg in reversed(messages[-20:]):
sender = "👤 用户" if msg['from_user'] else "👨💼 管理员"
time = msg['time'][5:16]
if msg['type'] == 'text':
history_text += f"{sender} [{time}]:\n{msg['text']}\n\n"
elif msg['type'] == 'photo':
caption = msg['text'] if msg['text'] else ''
history_text += f"{sender} [{time}]:\n📷 [图片] {caption}\n\n"
elif msg['type'] == 'document':
history_text += f"{sender} [{time}]:\n📎 [文件] {msg['file_name']}\n\n"
await update.message.reply_text(history_text, parse_mode='HTML')
async def send_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理 /send 命令"""
user_id = update.effective_user.id
if user_id not in ADMIN_IDS:
await update.message.reply_text("❌ 此命令仅限管理员使用")
return
if len(context.args) < 2:
await update.message.reply_text("用法: /send <用户ID> <消息内容>", parse_mode='HTML')
return
try:
target_id = int(context.args[0])
message_text = ' '.join(context.args[1:])
await context.bot.send_message(chat_id=target_id, text=message_text)
db_manager.save_message(target_id, 'text', message_text, is_from_user=False)
if statistics_enabled:
db_manager.update_statistics('message')
await update.message.reply_text(f"✅ 消息已发送给用户 {target_id}")
except ValueError:
await update.message.reply_text("❌ 用户ID必须是数字")
except Exception as e:
await update.message.reply_text(f"❌ 发送失败: {str(e)}")
async def set_admin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理 /setadmin 命令"""
user_id = update.effective_user.id
if ADMIN_IDS and user_id not in ADMIN_IDS:
await update.message.reply_text("❌ 此命令仅限管理员使用")
return
if not context.args:
await update.message.reply_text("用法: /setadmin <用户ID>", parse_mode='HTML')
return
try:
new_admin_id = int(context.args[0])
if new_admin_id in ADMIN_IDS:
await update.message.reply_text(f"💡 用户 {new_admin_id} 已经是管理员")
return
ADMIN_IDS.append(new_admin_id)
config['admin_ids'] = ADMIN_IDS
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
await update.message.reply_text(
f"✅ 成功添加管理员\n\n"
f"🆔 用户ID: <code>{new_admin_id}</code>",
parse_mode='HTML'
)
except ValueError:
await update.message.reply_text("❌ 用户ID必须是数字")
# ==================== 主函数 ====================
def main() -> None:
"""主函数 - 启动机器人"""
if not config.get('bot_token'):
print("❌ 错误: 请在 config.json 中设置 bot_token")
return
if not ADMIN_IDS:
print("⚠️ 警告: 未设置管理员")
application = (
Application.builder()
.token(config['bot_token'])
.concurrent_updates(True)
.build()
)
# 注册命令处理器
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("chat", chat_command))
application.add_handler(CommandHandler("end", end_command))
application.add_handler(CommandHandler("list", list_command))
application.add_handler(CommandHandler("status", status_command))
application.add_handler(CommandHandler("stats", stats_command))
application.add_handler(CommandHandler("history", history_command))
application.add_handler(CommandHandler("send", send_command))
application.add_handler(CommandHandler("setadmin", set_admin))
# 注册回调和消息处理器
application.add_handler(CallbackQueryHandler(button_callback))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
application.add_handler(MessageHandler(filters.PHOTO, handle_photo))
application.add_handler(MessageHandler(filters.Document.ALL, handle_document))
logger.info("🚀 Telegram 机器人启动中...")
logger.info(f"📝 已配置 {len(ADMIN_IDS)} 个管理员")
logger.info(f"🤖 自动回复: {'✅ 已启用' if auto_reply_manager.enabled else '❌ 未启用'}")
logger.info(f"📊 统计功能: {'✅ 已启用' if statistics_enabled else '❌ 未启用'}")
logger.info(f"💾 数据库: {DATABASE_FILE}")
application.run_polling(allowed_updates=Update.ALL_TYPES, drop_pending_updates=True)
if __name__ == '__main__':
main()保存文件:Ctrl + X → Y → Enter
3.6 测试运行
# 运行机器人
python bot.py看到以下输出表示成功:
2026-01-11 xx:xx:xx - __main__ - INFO - 数据库初始化完成
2026-01-11 xx:xx:xx - __main__ - INFO - 🚀 Telegram 机器人启动中...
2026-01-11 xx:xx:xx - __main__ - INFO - 📝 已配置 1 个管理员
2026-01-11 xx:xx:xx - __main__ - INFO - 🤖 自动回复: ✅ 已启用
2026-01-11 xx:xx:xx - __main__ - INFO - 📊 统计功能: ✅ 已启用测试: 在 Telegram 向机器人发送 /start,应该看到欢迎信息。
确认无误后按 Ctrl + C 停止。
4. 配置系统服务
4.1 创建 systemd 服务文件
nano /etc/systemd/system/telegram-bot.service4.2 粘贴服务配置
[Unit]
Description=Telegram Contact Bot Enhanced
After=network.target
Documentation=https://github.com/python-telegram-bot/python-telegram-bot
[Service]
Type=simple
User=root
WorkingDirectory=/opt/telegram-bot
Environment="PATH=/opt/telegram-bot/venv/bin"
ExecStart=/opt/telegram-bot/venv/bin/python /opt/telegram-bot/bot.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target保存:Ctrl + X → Y → Enter
4.3 启动服务
# 重新加载配置
systemctl daemon-reload
# 启动服务
systemctl start telegram-bot
# 查看状态
systemctl status telegram-bot
# 设置开机自启
systemctl enable telegram-bot4.4 查看日志
# 实时查看日志
journalctl -u telegram-bot -f
# 查看最近 50 条
journalctl -u telegram-bot -n 505. 测试验证
5.1 基础功能测试
管理员测试:
发送
/start- 查看管理员界面发送
/status- 查看当前状态发送
/list- 查看用户列表
用户测试:
用另一个账号向机器人发消息
管理员应该收到通知
点击 "💬 回复此用户"
发送消息测试
5.2 自动回复测试
用户发送:
价格应该收到:
🤖 感谢咨询!我们的价格根据具体需求而定,请稍候,管理员会尽快回复您详细信息。5.3 图片和文件测试
用户发送图片 → 管理员收到
管理员回复图片 → 用户收到
用户发送文件 → 管理员收到
管理员回复文件 → 用户收到
5.4 统计功能测试
# 管理员发送
/stats应该看到消息统计数据。
使用指南
👤 普通用户
发送消息:
直接发送文本、图片或文件
机器人会转发给管理员
某些关键词会触发自动回复
自动回复关键词:
价格、多少钱、费用
营业时间、工作时间
地址、位置
你好、hi、hello
👨💼 管理员
基础命令
高级命令
快速回复流程
收到用户消息通知
点击「💬 回复此用户」
直接发送消息(支持文本 / 图片 / 文件)
点击「❌ 结束对话」或发送
/end
维护管理
日志查看
# 实时日志
journalctl -u telegram-bot -f
# 最近 100 条
journalctl -u telegram-bot -n 100
# 今天的日志
journalctl -u telegram-bot --since today
# 错误日志
journalctl -u telegram-bot -p err服务管理
# 启动
systemctl start telegram-bot
# 停止
systemctl stop telegram-bot
# 重启
systemctl restart telegram-bot
# 状态
systemctl status telegram-bot数据备份
# 手动备份
mkdir -p /root/backups
tar -czf /root/backups/telegram-bot-$(date +%Y%m%d).tar.gz \
-C /opt telegram-bot/config.json \
telegram-bot/bot_database.db定时备份脚本
# 创建备份脚本
nano /root/backup-bot.sh#!/bin/bash
BACKUP_DIR="/root/backups"
mkdir -p $BACKUP_DIR
tar -czf $BACKUP_DIR/telegram-bot-$(date +%Y%m%d-%H%M%S).tar.gz \
-C /opt telegram-bot/config.json telegram-bot/bot_database.db
find $BACKUP_DIR -name "telegram-bot-*.tar.gz" -mtime +30 -delete
echo "[$(date)] 备份完成"# 设置权限
chmod +x /root/backup-bot.sh
# 添加定时任务(每天凌晨3点)
crontab -e
# 添加: 0 3 * * * /root/backup-bot.sh >> /var/log/bot-backup.log 2>&1常见问题
❓ 机器人无响应
# 检查服务
systemctl status telegram-bot
# 查看日志
journalctl -u telegram-bot -n 50❓ 自动回复不生效
检查 config.json 中 auto_reply.enabled 是否为 true
❓ 数据库过大
# 查看大小
du -h /opt/telegram-bot/bot_database.db
# 清理旧数据(30天前)
sqlite3 /opt/telegram-bot/bot_database.db \
"DELETE FROM messages WHERE created_at < datetime('now', '-30 days');"
# 优化数据库
sqlite3 /opt/telegram-bot/bot_database.db "VACUUM;"❓ 更换服务器
旧服务器:
tar -czf ~/bot-migration.tar.gz /opt/telegram-bot
scp ~/bot-migration.tar.gz root@new-server:/root/新服务器:
# 解压
tar -xzf ~/bot-migration.tar.gz -C /
# 安装依赖
cd /opt/telegram-bot
python3 -m venv venv
source venv/bin/activate
pip install python-telegram-bot pillow
# 配置服务(参考第4步)
# 启动
systemctl start telegram-bot项目结构
/opt/telegram-bot/
├── venv/ # Python 虚拟环境
├── bot.py # 机器人主程序
├── config.json # 配置文件
└── bot_database.db # SQLite 数据库
/etc/systemd/system/
└── telegram-bot.service # 系统服务
/root/backups/ # 备份目录
└── telegram-bot-*.tar.gz # 备份文件性能指标
💾 内存占用:40-80 MB
📊 数据库大小:约 1MB / 1000 条消息
🚀 响应速度:< 1 秒
👥 并发支持:多管理员同时工作
总结
本教程实现了一个功能完整的 Telegram 双向联系机器人,具备:
✅ 文本、图片、文件传输
✅ 智能自动回复
✅ SQLite 数据库存储
✅ 完整消息统计
✅ 聊天记录查询
✅ 多管理员支持
✅ Systemd 后台运行
适合用作客服系统、咨询机器人、技术支持等场景。
编写日期: 2026 年 1 月 12 日
版本: 2.0 增强版
作者: 基于实际部署经验整理
技术栈:
Python 3.13+
python-telegram-bot 22.5
SQLite 3.46+
Debian/Ubuntu Linux
祝你使用愉快!晚安! 🌙✨




