52Pokemon 逆向分析:多账户订阅聚合系统开发记录
一个解决多账户机场订阅管理痛点的全栈项目,涵盖订阅合并、自动续费、分布式代理拉取等功能
写在前面
作为一个重度科学上网用户,我手上有十几个机场账户。为什么要这么多?原因很简单:
- 薅羊毛
但问题也随之而来——管理成本爆炸。
每次想用代理,我得:
- 找账户信息
- 登录对应机场网站
- 复制订阅链接
- 导入 Clash
- 重复以上步骤 N 次...
更痛苦的是,每个账户的到期时间、剩余流量都不一样,经常忘记续费导致账户失效。
于是,我决定写一个系统来解决这些问题。
一、需求分析:我到底要什么?
核心痛点
| 痛点 | 描述 | 期望 |
|---|---|---|
| 账户分散 | 十几个账户信息散落各处 | 统一管理所有账户 |
| 订阅繁琐 | 每次都要手动复制多个订阅 | 一个链接搞定所有节点 |
| 容易遗忘 | 忘记续费、流量用完才发现 | 自动提醒 + 自动续费 |
| 状态不明 | 不知道哪个账户还能用 | 实时同步账户状态 |
功能规划
核心功能
├── 账户管理(增删改查)
├── 订阅合并(多合一)
├── 状态同步(自动拉取账户信息)
├── 自动续费(优惠码自动兑换)
└── 到期提醒(即将过期/流量不足)
进阶功能
├── 分布式拉取(多 IP 分散请求)
├── 并行处理(加速订阅获取)
└── 缓存机制(减少重复请求)
二、逆向分析:机场 API 是怎么工作的?
要实现自动同步和续费,首先得搞清楚机场的 API 是怎么工作的。
2.1 抓包分析
打开 Charles,登录机场网站,观察请求:
POST /api/v1/passport/auth/login
Content-Type: application/x-www-form-urlencoded
email=test@example.com&password=123456
返回的却是一坨乱码:
bnN6e2dBV3JrWGx4MDhKNkVxOlY0W2RlTzFEUVRDd20yb0IzdH...
看起来像 Base64,但解码后还是乱码。说明有自定义加密。
2.2 提取前端 JS 源码
使用浏览器开发者工具或者网页资源下载器,把机场网站的 JS 文件全部保存下来。
在 CTmysGc_-1765755971162.js 文件中,找到了 API 请求和响应处理的核心代码:
// 原始混淆源码(已格式化)
const s = e.create({
baseURL: (localStorage.getItem("api_base_url") || "https://api123.136470.xyz") + "/api/v1",
timeout: 1e4,
validateStatus: function(e) { return !0 }
});
// 字符映射函数 - 关键!
function r(e) {
const a = atob("bnN6e2dBV3JrWGx4MDhKNkVxOlY0W2RlTzFEUVRDd20yb0IzdHk5alNZSV03Uk01YkhpVWFmLGN9S3VQR3BOaFpMdkY=");
const t = atob("YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNERUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0NTY3ODksW117fTo=");
return e.split("").map((e => {
const s = a.indexOf(e);
return -1 !== s ? t[s] : e
})).join("")
}
// 响应拦截器 - 解密逻辑
s.interceptors.response.use((e => {
// ... 状态码处理 ...
const s = function(e) {
let a = atob(e); // 1. Base64 解码
for (let s = 0; s < 10; s++) // 2. 循环映射 10 次
a = r(a);
try {
return JSON.parse(a) // 3. 解析 JSON
} catch (t) {
return null
}
}(e.data);
if (s) return {...e, data: s};
throw new Error("Failed to decode and parse response")
}));
2.3 解密算法分析
从源码中可以看出加密方案:
Step 1 - 提取映射表
两个 Base64 字符串解码后是:
MAP_FROM = "nsz{gAWrkXlx08J6Eq:V4[deO1DQTCwm2oB3ty9jSYI]7RM5bHiUaf,c}KuPGpNhZLvF"
MAP_TO = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789,[]{}:"
Step 2 - 映射规则
每个字符在 MAP_FROM 中查找位置,然后替换为 MAP_TO 中相同位置的字符。
例如:
n在 MAP_FROM 的位置是 0,替换为 MAP_TO[0] =as在 MAP_FROM 的位置是 1,替换为 MAP_TO[1] =b- 不在映射表中的字符(如空格、引号)保持不变
Step 3 - 迭代 10 次
关键在于 for (let s = 0; s < 10; s++),需要循环映射 10 次才能还原。
为什么是 10 次?这是开发者故意设置的 "魔数",增加逆向难度。只替换 1 次是解不出来的。
2.4 Python 实现解密
import base64
import json
class AirportClient:
# 解密用的字符映射表(从 JS 中提取,这里用 Base64 存储)
MAP_FROM = base64.b64decode(
"bnN6e2dBV3JrWGx4MDhKNkVxOlY0W2RlTzFEUVRDd20yb0IzdHk5alNZSV03Uk01YkhpVWFmLGN9S3VQR3BOaFpMdkY="
).decode('utf-8')
MAP_TO = base64.b64decode(
"YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNERUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0NTY3ODksW117fTo="
).decode('utf-8')
def _char_map(self, text: str) -> str:
"""单次字符映射"""
result = []
for char in text:
idx = self.MAP_FROM.find(char)
if idx != -1:
result.append(self.MAP_TO[idx])
else:
result.append(char)
return ''.join(result)
def _decrypt_response(self, encrypted_text: str) -> dict:
"""解密响应数据"""
# 1. Base64 解码
decoded = base64.b64decode(encrypted_text).decode('utf-8')
# 2. 循环字符映射 10 次
for _ in range(10):
decoded = self._char_map(decoded)
# 3. 解析 JSON
return json.loads(decoded)
2.5 完整的登录流程
1. POST /passport/auth/login (email, password)
↓ 返回加密字符串
2. Base64 解码 + 10次字符映射 = JSON
↓ 提取 auth_data
3. 后续请求 Header 携带 Authorization: {auth_data}
↓
4. 每个响应都需要同样的解密流程
登录实现:
def login(self, email: str, password: str) -> dict:
"""登录获取认证信息"""
response = requests.post(
f"{API_BASE}/passport/auth/login",
data={"email": email, "password": password},
headers=self.headers
)
# 解密响应
result = self._decrypt_response(response.text)
if "data" in result and "auth_data" in result["data"]:
self.auth_data = result["data"]["auth_data"]
return {"success": True, "auth_data": self.auth_data}
return {"success": False, "error": result.get("message", "登录失败")}
2.6 获取用户信息
登录成功后,带上 auth_data 就可以请求其他接口了:
def get_subscribe(self) -> dict:
"""获取订阅信息(流量、到期时间等)"""
headers = self.headers.copy()
headers["Authorization"] = self.auth_data
response = requests.get(f"{API_BASE}/user/getSubscribe", headers=headers)
result = self._decrypt_response(response.text)
if "data" in result:
data = result["data"]
return {
"success": True,
"plan_name": data.get("plan", {}).get("name"),
"transfer_enable": data.get("transfer_enable"), # 总流量
"upload": data.get("u"), # 已用上传
"download": data.get("d"), # 已用下载
"expired_at": data.get("expired_at"), # 到期时间戳
"subscribe_url": data.get("subscribe_url")
}
return {"success": False, "error": "获取失败"}
2.7 逆向小结
这个机场的加密方案挺有意思:
- 不是标准加密算法(AES/RSA),而是自定义的字符映射
- 多次迭代增加破解难度,不知道次数就解不出来
- 映射表藏在混淆的 JS 里,需要仔细分析
破解关键:找到映射表 + 确定迭代次数(10 次)
三、技术选型
考虑到快速开发和部署便利性,选择了以下技术栈:
| 层级 | 技术 | 理由 |
|---|---|---|
| 后端 | Flask | 轻量、快速、够用 |
| 数据库 | MySQL | 稳定可靠,连接池支持好 |
| 定时任务 | APScheduler | Python 原生,集成简单 |
| 前端 | 原生 HTML/CSS/JS | 不想引入前端框架,保持简单 |
四、核心功能实现
4.1 订阅合并:一个链接搞定所有节点
这是整个系统最核心的功能。原理很简单:
- 遍历所有有效账户
- 拉取每个账户的订阅(Clash YAML 格式)
- 提取所有节点(proxies)
- 合并去重
- 生成新的 Clash 配置
关键代码:
@subscribe_bp.route('/sub/<token>')
def merged_subscribe(token):
"""合并订阅"""
user = UserDB.get_by_token(token)
accounts = AccountDB.get_all(user_id=user['id'])
all_proxies = []
for acc in accounts:
# 跳过已过期、流量耗尽的账户
if is_expired(acc) or is_traffic_exhausted(acc):
continue
# 拉取订阅并提取节点
proxies = fetch_clash_proxies(acc['subscribe_url'])
# 添加账户标识前缀,方便区分
for proxy in proxies:
proxy['name'] = f"[{acc['remark']}] {proxy['name']}"
all_proxies.extend(proxies)
# 生成完整的 Clash 配置
config = generate_clash_config(all_proxies)
return Response(config, content_type='text/yaml')
效果:
以前:管理 10 个订阅链接,每次更新都要手动操作 现在:一个链接 https://your-domain/sub/your-token,自动聚合所有节点
4.2 自动同步:实时掌握账户状态
机场的账户信息(流量、到期时间等)是动态变化的,需要定期同步。
同步流程:
登录机场 API
↓
获取用户信息(plan_name, transfer_enable, expired_at...)
↓
获取订阅链接
↓
更新数据库
↓
记录同步日志
API 调用示例:
def sync_account(email: str, password: str) -> dict:
"""同步单个账户"""
client = AirportClient()
# 1. 登录获取 token
login_result = client.login(email, password)
if not login_result['success']:
return {'success': False, 'error': '登录失败'}
# 2. 获取用户信息
user_info = client.get_user_info()
# 3. 获取订阅链接
subscribe_url = client.get_subscribe_url()
return {
'success': True,
'plan_name': user_info['plan_name'],
'transfer_enable': user_info['transfer_enable'],
'expired_at': user_info['expired_at'],
'subscribe_url': subscribe_url
}
4.3 自动续费:再也不怕忘记
该机场支持优惠码兑换免费套餐。系统可以自动使用优惠码续费即将到期的账户。
定时任务配置:
# 每天下午 3 点执行
scheduler.add_job(
scheduled_auto_purchase,
'cron',
hour=15,
minute=0,
id='auto_renew'
)
续费逻辑:
def scheduled_auto_purchase():
"""定时自动续费"""
# 获取即将到期的账户(7天内)
expiring_accounts = AccountDB.get_expiring_soon(days=7)
# 获取可用的优惠码
coupon = CouponDB.get_active()
if not coupon:
return
for acc in expiring_accounts:
result = auto_purchase(acc, coupon['code'])
RenewLogDB.add(acc['id'], coupon['id'], result)
五、订阅二次加工:不只是合并
机场原始订阅只包含节点信息,直接使用会有几个问题:
- 没有分流规则,所有流量都走代理
- 国内网站也走代理,速度慢且浪费流量
- DNS 配置不合理,可能泄露隐私
所以我们需要对订阅进行二次加工。
5.1 添加完整的代理分组
根据使用场景,创建多个代理组:
# 代理组定义
proxy_groups = [
# 主选择器
{'name': '🚀 节点选择', 'type': 'select', 'proxies': ['⚡ 自动选择', 'DIRECT'] + proxy_names},
{'name': '⚡ 自动选择', 'type': 'url-test', 'proxies': proxy_names, 'interval': 300},
# 按用途分组
{'name': '🤖 AI 服务', 'type': 'select', ...}, # ChatGPT、Claude 等
{'name': '📹 油管视频', 'type': 'select', ...}, # YouTube
{'name': '🔍 谷歌服务', 'type': 'select', ...}, # Google 全家桶
{'name': '📲 电报消息', 'type': 'select', ...}, # Telegram
{'name': '🎬 奈飞', 'type': 'select', ...}, # Netflix
{'name': '🎮 Steam', 'type': 'select', ...}, # Steam 游戏
# 特殊处理
{'name': '🛑 广告拦截', 'type': 'select', 'proxies': ['REJECT', 'DIRECT']},
{'name': '🔒 国内服务', 'type': 'select', 'proxies': ['DIRECT', '🚀 节点选择']},
]
5.2 配置智能 DNS
为了防止 DNS 污染和泄露,配置了完整的 DNS 方案:
'dns': {
'enable': True,
'enhanced-mode': 'fake-ip', # 使用 Fake-IP 模式
'fake-ip-range': '198.18.0.1/16',
# 国内 DNS(用于解析国内域名)
'nameserver': [
'180.76.76.76', # 百度 DNS
'119.29.29.29', # 腾讯 DNSPod
'223.5.5.5', # 阿里 DNS
'https://dns.alidns.com/dns-query',
],
# 国外 DNS(fallback,用于被污染的域名)
'fallback': [
'https://cloudflare-dns.com/dns-query',
'https://dns.google/dns-query',
'tls://8.8.4.4',
'tls://1.0.0.1:853',
],
# Fallback 过滤规则
'fallback-filter': {
'geoip': True,
'domain': ['+.google.com', '+.youtube.com', '+.twitter.com', '+.facebook.com']
},
# Fake-IP 过滤(这些域名使用真实 IP)
'fake-ip-filter': [
'*.lan', 'time.*.com', 'ntp.*.com', # 局域网和时间服务
'*.music.163.com', '*.y.qq.com', # 音乐软件
'*.xboxlive.com', 'stun.*.*' # 游戏和语音通话
]
}
5.3 引入规则集(Rule Providers)
使用 MetaCubeX 维护的规则集,自动更新:
'rule-providers': {
# 广告拦截
'category-ads-all': {
'type': 'http',
'behavior': 'domain',
'url': 'https://github.com/MetaCubeX/meta-rules-dat/raw/meta/geo/geosite/category-ads-all.mrs',
'interval': 86400 # 每天更新
},
# AI 服务
'openai': {'url': '.../geosite/openai.mrs', ...},
'anthropic': {'url': '.../geosite/anthropic.mrs', ...},
# 流媒体
'youtube': {'url': '.../geosite/youtube.mrs', ...},
'netflix': {'url': '.../geosite/netflix.mrs', ...},
# 社交媒体
'telegram': {'url': '.../geosite/telegram.mrs', ...},
'twitter': {'url': '.../geosite/twitter.mrs', ...},
# 地理位置
'geolocation-cn': {'url': '.../geosite/geolocation-cn.mrs', ...},
'geolocation-!cn': {'url': '.../geosite/geolocation-!cn.mrs', ...},
'cn-ip': {'url': '.../geoip/cn.mrs', 'behavior': 'ipcidr', ...},
}
5.4 智能分流规则
根据规则集,自动分流到对应代理组:
'rules': [
# 广告拦截
'RULE-SET,category-ads-all,🛑 广告拦截',
# AI 服务走专用代理
'RULE-SET,openai,🤖 AI 服务',
'RULE-SET,anthropic,🤖 AI 服务',
# 流媒体
'RULE-SET,youtube,📹 油管视频',
'RULE-SET,netflix,🎬 奈飞',
# 社交媒体
'RULE-SET,telegram,📲 电报消息',
'RULE-SET,twitter,🐦 推特/X',
# 国内直连
'RULE-SET,geolocation-cn,🔒 国内服务',
'RULE-SET,cn-ip,🔒 国内服务',
# 国外走代理
'RULE-SET,geolocation-!cn,🌍 非中国',
# 兜底规则
'MATCH,🐟 漏网之鱼'
]
5.5 效果对比
| 特性 | 原始订阅 | 二次加工后 |
|---|---|---|
| 节点信息 | ✅ | ✅ |
| 分流规则 | ❌ | ✅ 30+ 分组 |
| DNS 配置 | ❌ | ✅ Fake-IP + DoH |
| 广告拦截 | ❌ | ✅ |
| 规则自动更新 | ❌ | ✅ 每日更新 |
| 国内直连 | ❌ | ✅ |
六、进阶优化:分布式代理拉取
6.1 问题:单 IP 请求过多会被限制
当账户数量增加到几十个时,新的问题出现了:
- 短时间内从同一个 IP 发起大量请求
- 机场服务器可能会限制或封禁
- 订阅拉取失败率上升
6.2 解决方案:多 VPS 分散请求
我手上有几台 VPS,可以利用它们来分散请求:
主系统 (管理中心)
│
├── 请求代理 ──→ VPS1 (IP1) ──→ 机场 API
├── 请求代理 ──→ VPS2 (IP2) ──→ 机场 API
├── 请求代理 ──→ VPS3 (IP3) ──→ 机场 API
└── 直接请求 ──────────────────→ 机场 API
6.3 代理服务实现
在每台 VPS 上部署一个轻量的代理服务(proxy-worker):
# proxy-worker/app.py
@app.route('/fetch')
def fetch_url():
"""代理拉取订阅"""
url = request.args.get('url')
# 用本地 IP 请求目标 URL
resp = requests.get(url, headers={'User-Agent': 'ClashForAndroid/2.5.12'})
return Response(resp.content, headers={
'Content-Type': resp.headers.get('Content-Type'),
'Subscription-Userinfo': resp.headers.get('Subscription-Userinfo', '')
})
6.4 账户绑定代理
数据库中添加 proxy_index 字段,每个账户固定使用一个代理:
| proxy_index | 出口 |
|---|---|
| 0 | 本地直连 |
| 1 | 代理服务器 1 |
| 2 | 代理服务器 2 |
| 3 | 代理服务器 3 |
| 4 | 代理服务器 4 |
均匀分配:
-- 5 个出口,按 ID 取模分配
UPDATE accounts SET proxy_index = (id % 5);
拉取时选择对应代理:
def fetch_clash_proxies(url: str, proxy_index: int = 0):
"""拉取订阅(支持代理)"""
if proxy_index > 0 and PROXY_WORKERS:
# 使用指定的代理服务器
proxy_url = PROXY_WORKERS[proxy_index - 1]
resp = requests.get(f"{proxy_url}/fetch", params={'url': url})
content = resp.text
else:
# 直接请求
content = requests.get(url).text
return parse_clash_proxies(content)
七、性能优化:并行拉取
7.1 问题:串行拉取太慢
假设有 20 个账户,每个拉取需要 3 秒:
- 串行:20 × 3 = 60 秒
- 用户等待 1 分钟才能拿到订阅,体验极差
7.2 解决方案:线程池并行
使用 concurrent.futures.ThreadPoolExecutor 并行处理:
from concurrent.futures import ThreadPoolExecutor, as_completed
def merged_subscribe(token):
accounts = get_valid_accounts(token)
# 分类:有缓存的 vs 需要拉取的
cached_accounts = []
fetch_accounts = []
for acc in accounts:
if has_valid_cache(acc):
cached_accounts.append(acc)
else:
fetch_accounts.append(acc)
all_proxies = []
# 有缓存的直接读取
for acc in cached_accounts:
proxies = load_from_cache(acc)
all_proxies.extend(proxies)
# 需要拉取的并行处理
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(fetch_account_proxies, acc): acc
for acc in fetch_accounts}
for future in as_completed(futures):
proxies = future.result()
all_proxies.extend(proxies)
return generate_config(all_proxies)
优化效果:
| 场景 | 串行耗时 | 并行耗时 | 提升 |
|---|---|---|---|
| 20 个账户,无缓存 | ~60s | ~6s | 10x |
| 20 个账户,50% 缓存 | ~30s | ~3s | 10x |
| 20 个账户,全缓存 | ~0.5s | ~0.5s | - |
八、缓存策略
为了减少对机场服务器的请求,实现了数据库缓存:
8.1 数据库缓存
class AccountDB:
@staticmethod
def update_proxies_cache(account_id: int, proxies_json: str):
"""更新节点缓存"""
cursor.execute("""
UPDATE accounts
SET proxies_cache = %s, proxies_cache_at = NOW()
WHERE id = %s
""", (proxies_json, account_id))
@staticmethod
def get_proxies_cache(account_id: int, max_age_hours: int = 5):
"""获取节点缓存(5小时内有效)"""
cursor.execute("""
SELECT proxies_cache FROM accounts
WHERE id = %s
AND proxies_cache_at > DATE_SUB(NOW(), INTERVAL %s HOUR)
""", (account_id, max_age_hours))
return cursor.fetchone()
8.2 缓存更新时机
- 同步账户时:主动拉取最新订阅并更新缓存
- 缓存过期时:下次请求时自动刷新
- 手动清除:管理员可手动清除缓存
九、项目结构
最终的项目结构:
fuckbkm/
├── app.py # 入口
├── core/ # 核心模块
│ ├── config.py # 配置
│ ├── database.py # 数据库
│ ├── airport_client.py # 机场 API
│ └── scheduler.py # 定时任务
├── routes/ # 路由
│ ├── accounts.py # 账户管理
│ ├── subscribe.py # 订阅合并
│ └── ...
├── utils/ # 工具
│ └── clash_config.py # Clash 配置生成
├── proxy-worker/ # 代理服务(部署到 VPS)
│ └── app.py
└── templates/ # 前端页面
十、总结
解决的问题
| 问题 | 解决方案 |
|---|---|
| 账户分散难管理 | 统一的 Web 管理界面 |
| 订阅链接太多 | 一键合并为单个订阅 |
| 忘记续费 | 自动检测 + 自动续费 |
| 状态不清楚 | 定时同步 + 实时展示 |
| 单 IP 被限制 | 分布式代理拉取 |
| 拉取速度慢 | 并行处理 + 缓存 |
希望这篇文章对你有帮助