52Pokemon 逆向分析:多账户订阅聚合系统开发记录

Published on
30 8.5~10.9 min 3824

一个解决多账户机场订阅管理痛点的全栈项目,涵盖订阅合并、自动续费、分布式代理拉取等功能

写在前面

作为一个重度科学上网用户,我手上有十几个机场账户。为什么要这么多?原因很简单:

  1. 薅羊毛

但问题也随之而来——管理成本爆炸

每次想用代理,我得:

  1. 找账户信息
  2. 登录对应机场网站
  3. 复制订阅链接
  4. 导入 Clash
  5. 重复以上步骤 N 次...

更痛苦的是,每个账户的到期时间、剩余流量都不一样,经常忘记续费导致账户失效。

于是,我决定写一个系统来解决这些问题。


一、需求分析:我到底要什么?

核心痛点

痛点 描述 期望
账户分散 十几个账户信息散落各处 统一管理所有账户
订阅繁琐 每次都要手动复制多个订阅 一个链接搞定所有节点
容易遗忘 忘记续费、流量用完才发现 自动提醒 + 自动续费
状态不明 不知道哪个账户还能用 实时同步账户状态

功能规划

核心功能
├── 账户管理(增删改查)
├── 订阅合并(多合一)
├── 状态同步(自动拉取账户信息)
├── 自动续费(优惠码自动兑换)
└── 到期提醒(即将过期/流量不足)

进阶功能
├── 分布式拉取(多 IP 分散请求)
├── 并行处理(加速订阅获取)
└── 缓存机制(减少重复请求)

二、逆向分析:机场 API 是怎么工作的?

要实现自动同步和续费,首先得搞清楚机场的 API 是怎么工作的。

2.1 抓包分析

打开 Charles,登录机场网站,观察请求:

POST /api/v1/passport/auth/login
Content-Type: application/x-www-form-urlencoded

email=test@example.com&password=123456

返回的却是一坨乱码:

bnN6e2dBV3JrWGx4MDhKNkVxOlY0W2RlTzFEUVRDd20yb0IzdH...

看起来像 Base64,但解码后还是乱码。说明有自定义加密

2.2 提取前端 JS 源码

使用浏览器开发者工具或者网页资源下载器,把机场网站的 JS 文件全部保存下来。

CTmysGc_-1765755971162.js 文件中,找到了 API 请求和响应处理的核心代码:

// 原始混淆源码(已格式化)
const s = e.create({
    baseURL: (localStorage.getItem("api_base_url") || "https://api123.136470.xyz") + "/api/v1",
    timeout: 1e4,
    validateStatus: function(e) { return !0 }
});

// 字符映射函数 - 关键!
function r(e) {
    const a = atob("bnN6e2dBV3JrWGx4MDhKNkVxOlY0W2RlTzFEUVRDd20yb0IzdHk5alNZSV03Uk01YkhpVWFmLGN9S3VQR3BOaFpMdkY=");
    const t = atob("YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNERUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0NTY3ODksW117fTo=");
    return e.split("").map((e => {
        const s = a.indexOf(e);
        return -1 !== s ? t[s] : e
    })).join("")
}

// 响应拦截器 - 解密逻辑
s.interceptors.response.use((e => {
    // ... 状态码处理 ...
    
    const s = function(e) {
        let a = atob(e);              // 1. Base64 解码
        for (let s = 0; s < 10; s++)  // 2. 循环映射 10 次
            a = r(a);
        try {
            return JSON.parse(a)       // 3. 解析 JSON
        } catch (t) {
            return null
        }
    }(e.data);
    
    if (s) return {...e, data: s};
    throw new Error("Failed to decode and parse response")
}));

2.3 解密算法分析

从源码中可以看出加密方案:

Step 1 - 提取映射表

两个 Base64 字符串解码后是:

MAP_FROM = "nsz{gAWrkXlx08J6Eq:V4[deO1DQTCwm2oB3ty9jSYI]7RM5bHiUaf,c}KuPGpNhZLvF"
MAP_TO   = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789,[]{}:"

Step 2 - 映射规则

每个字符在 MAP_FROM 中查找位置,然后替换为 MAP_TO 中相同位置的字符。

例如:

  • n 在 MAP_FROM 的位置是 0,替换为 MAP_TO[0] = a
  • s 在 MAP_FROM 的位置是 1,替换为 MAP_TO[1] = b
  • 不在映射表中的字符(如空格、引号)保持不变

Step 3 - 迭代 10 次

关键在于 for (let s = 0; s < 10; s++),需要循环映射 10 次才能还原。

为什么是 10 次?这是开发者故意设置的 "魔数",增加逆向难度。只替换 1 次是解不出来的。

2.4 Python 实现解密

import base64
import json

class AirportClient:
    # 解密用的字符映射表(从 JS 中提取,这里用 Base64 存储)
    MAP_FROM = base64.b64decode(
        "bnN6e2dBV3JrWGx4MDhKNkVxOlY0W2RlTzFEUVRDd20yb0IzdHk5alNZSV03Uk01YkhpVWFmLGN9S3VQR3BOaFpMdkY="
    ).decode('utf-8')
    MAP_TO = base64.b64decode(
        "YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNERUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0NTY3ODksW117fTo="
    ).decode('utf-8')
    
    def _char_map(self, text: str) -> str:
        """单次字符映射"""
        result = []
        for char in text:
            idx = self.MAP_FROM.find(char)
            if idx != -1:
                result.append(self.MAP_TO[idx])
            else:
                result.append(char)
        return ''.join(result)
    
    def _decrypt_response(self, encrypted_text: str) -> dict:
        """解密响应数据"""
        # 1. Base64 解码
        decoded = base64.b64decode(encrypted_text).decode('utf-8')
        
        # 2. 循环字符映射 10 次
        for _ in range(10):
            decoded = self._char_map(decoded)
        
        # 3. 解析 JSON
        return json.loads(decoded)

2.5 完整的登录流程

1. POST /passport/auth/login (email, password)
   ↓ 返回加密字符串
2. Base64 解码 + 10次字符映射 = JSON
   ↓ 提取 auth_data
3. 后续请求 Header 携带 Authorization: {auth_data}
   ↓
4. 每个响应都需要同样的解密流程

登录实现:

def login(self, email: str, password: str) -> dict:
    """登录获取认证信息"""
    response = requests.post(
        f"{API_BASE}/passport/auth/login",
        data={"email": email, "password": password},
        headers=self.headers
    )
    
    # 解密响应
    result = self._decrypt_response(response.text)
    
    if "data" in result and "auth_data" in result["data"]:
        self.auth_data = result["data"]["auth_data"]
        return {"success": True, "auth_data": self.auth_data}
    
    return {"success": False, "error": result.get("message", "登录失败")}

2.6 获取用户信息

登录成功后,带上 auth_data 就可以请求其他接口了:

def get_subscribe(self) -> dict:
    """获取订阅信息(流量、到期时间等)"""
    headers = self.headers.copy()
    headers["Authorization"] = self.auth_data
    
    response = requests.get(f"{API_BASE}/user/getSubscribe", headers=headers)
    result = self._decrypt_response(response.text)
    
    if "data" in result:
        data = result["data"]
        return {
            "success": True,
            "plan_name": data.get("plan", {}).get("name"),
            "transfer_enable": data.get("transfer_enable"),  # 总流量
            "upload": data.get("u"),           # 已用上传
            "download": data.get("d"),         # 已用下载
            "expired_at": data.get("expired_at"),  # 到期时间戳
            "subscribe_url": data.get("subscribe_url")
        }
    return {"success": False, "error": "获取失败"}

2.7 逆向小结

这个机场的加密方案挺有意思:

  • 不是标准加密算法(AES/RSA),而是自定义的字符映射
  • 多次迭代增加破解难度,不知道次数就解不出来
  • 映射表藏在混淆的 JS 里,需要仔细分析

破解关键:找到映射表 + 确定迭代次数(10 次)


三、技术选型

考虑到快速开发和部署便利性,选择了以下技术栈:

层级 技术 理由
后端 Flask 轻量、快速、够用
数据库 MySQL 稳定可靠,连接池支持好
定时任务 APScheduler Python 原生,集成简单
前端 原生 HTML/CSS/JS 不想引入前端框架,保持简单

四、核心功能实现

4.1 订阅合并:一个链接搞定所有节点

这是整个系统最核心的功能。原理很简单:

  1. 遍历所有有效账户
  2. 拉取每个账户的订阅(Clash YAML 格式)
  3. 提取所有节点(proxies)
  4. 合并去重
  5. 生成新的 Clash 配置

关键代码:

@subscribe_bp.route('/sub/<token>')
def merged_subscribe(token):
    """合并订阅"""
    user = UserDB.get_by_token(token)
    accounts = AccountDB.get_all(user_id=user['id'])
    
    all_proxies = []
    for acc in accounts:
        # 跳过已过期、流量耗尽的账户
        if is_expired(acc) or is_traffic_exhausted(acc):
            continue
        
        # 拉取订阅并提取节点
        proxies = fetch_clash_proxies(acc['subscribe_url'])
        
        # 添加账户标识前缀,方便区分
        for proxy in proxies:
            proxy['name'] = f"[{acc['remark']}] {proxy['name']}"
        
        all_proxies.extend(proxies)
    
    # 生成完整的 Clash 配置
    config = generate_clash_config(all_proxies)
    return Response(config, content_type='text/yaml')

效果:

以前:管理 10 个订阅链接,每次更新都要手动操作 现在:一个链接 https://your-domain/sub/your-token,自动聚合所有节点

4.2 自动同步:实时掌握账户状态

机场的账户信息(流量、到期时间等)是动态变化的,需要定期同步。

同步流程:

登录机场 API
    ↓
获取用户信息(plan_name, transfer_enable, expired_at...)
    ↓
获取订阅链接
    ↓
更新数据库
    ↓
记录同步日志

API 调用示例:

def sync_account(email: str, password: str) -> dict:
    """同步单个账户"""
    client = AirportClient()
    
    # 1. 登录获取 token
    login_result = client.login(email, password)
    if not login_result['success']:
        return {'success': False, 'error': '登录失败'}
    
    # 2. 获取用户信息
    user_info = client.get_user_info()
    
    # 3. 获取订阅链接
    subscribe_url = client.get_subscribe_url()
    
    return {
        'success': True,
        'plan_name': user_info['plan_name'],
        'transfer_enable': user_info['transfer_enable'],
        'expired_at': user_info['expired_at'],
        'subscribe_url': subscribe_url
    }

4.3 自动续费:再也不怕忘记

该机场支持优惠码兑换免费套餐。系统可以自动使用优惠码续费即将到期的账户。

定时任务配置:

# 每天下午 3 点执行
scheduler.add_job(
    scheduled_auto_purchase,
    'cron',
    hour=15,
    minute=0,
    id='auto_renew'
)

续费逻辑:

def scheduled_auto_purchase():
    """定时自动续费"""
    # 获取即将到期的账户(7天内)
    expiring_accounts = AccountDB.get_expiring_soon(days=7)
    
    # 获取可用的优惠码
    coupon = CouponDB.get_active()
    if not coupon:
        return
    
    for acc in expiring_accounts:
        result = auto_purchase(acc, coupon['code'])
        RenewLogDB.add(acc['id'], coupon['id'], result)

五、订阅二次加工:不只是合并

机场原始订阅只包含节点信息,直接使用会有几个问题:

  • 没有分流规则,所有流量都走代理
  • 国内网站也走代理,速度慢且浪费流量
  • DNS 配置不合理,可能泄露隐私

所以我们需要对订阅进行二次加工

5.1 添加完整的代理分组

根据使用场景,创建多个代理组:

# 代理组定义
proxy_groups = [
    # 主选择器
    {'name': '🚀 节点选择', 'type': 'select', 'proxies': ['⚡ 自动选择', 'DIRECT'] + proxy_names},
    {'name': '⚡ 自动选择', 'type': 'url-test', 'proxies': proxy_names, 'interval': 300},
    
    # 按用途分组
    {'name': '🤖 AI 服务', 'type': 'select', ...},      # ChatGPT、Claude 等
    {'name': '📹 油管视频', 'type': 'select', ...},     # YouTube
    {'name': '🔍 谷歌服务', 'type': 'select', ...},     # Google 全家桶
    {'name': '📲 电报消息', 'type': 'select', ...},     # Telegram
    {'name': '🎬 奈飞', 'type': 'select', ...},         # Netflix
    {'name': '🎮 Steam', 'type': 'select', ...},        # Steam 游戏
    
    # 特殊处理
    {'name': '🛑 广告拦截', 'type': 'select', 'proxies': ['REJECT', 'DIRECT']},
    {'name': '🔒 国内服务', 'type': 'select', 'proxies': ['DIRECT', '🚀 节点选择']},
]

5.2 配置智能 DNS

为了防止 DNS 污染和泄露,配置了完整的 DNS 方案:

'dns': {
    'enable': True,
    'enhanced-mode': 'fake-ip',        # 使用 Fake-IP 模式
    'fake-ip-range': '198.18.0.1/16',
    
    # 国内 DNS(用于解析国内域名)
    'nameserver': [
        '180.76.76.76',                 # 百度 DNS
        '119.29.29.29',                 # 腾讯 DNSPod
        '223.5.5.5',                    # 阿里 DNS
        'https://dns.alidns.com/dns-query',
    ],
    
    # 国外 DNS(fallback,用于被污染的域名)
    'fallback': [
        'https://cloudflare-dns.com/dns-query',
        'https://dns.google/dns-query',
        'tls://8.8.4.4',
        'tls://1.0.0.1:853',
    ],
    
    # Fallback 过滤规则
    'fallback-filter': {
        'geoip': True,
        'domain': ['+.google.com', '+.youtube.com', '+.twitter.com', '+.facebook.com']
    },
    
    # Fake-IP 过滤(这些域名使用真实 IP)
    'fake-ip-filter': [
        '*.lan', 'time.*.com', 'ntp.*.com',     # 局域网和时间服务
        '*.music.163.com', '*.y.qq.com',        # 音乐软件
        '*.xboxlive.com', 'stun.*.*'            # 游戏和语音通话
    ]
}

5.3 引入规则集(Rule Providers)

使用 MetaCubeX 维护的规则集,自动更新:

'rule-providers': {
    # 广告拦截
    'category-ads-all': {
        'type': 'http',
        'behavior': 'domain',
        'url': 'https://github.com/MetaCubeX/meta-rules-dat/raw/meta/geo/geosite/category-ads-all.mrs',
        'interval': 86400  # 每天更新
    },
    
    # AI 服务
    'openai': {'url': '.../geosite/openai.mrs', ...},
    'anthropic': {'url': '.../geosite/anthropic.mrs', ...},
    
    # 流媒体
    'youtube': {'url': '.../geosite/youtube.mrs', ...},
    'netflix': {'url': '.../geosite/netflix.mrs', ...},
    
    # 社交媒体
    'telegram': {'url': '.../geosite/telegram.mrs', ...},
    'twitter': {'url': '.../geosite/twitter.mrs', ...},
    
    # 地理位置
    'geolocation-cn': {'url': '.../geosite/geolocation-cn.mrs', ...},
    'geolocation-!cn': {'url': '.../geosite/geolocation-!cn.mrs', ...},
    'cn-ip': {'url': '.../geoip/cn.mrs', 'behavior': 'ipcidr', ...},
}

5.4 智能分流规则

根据规则集,自动分流到对应代理组:

'rules': [
    # 广告拦截
    'RULE-SET,category-ads-all,🛑 广告拦截',
    
    # AI 服务走专用代理
    'RULE-SET,openai,🤖 AI 服务',
    'RULE-SET,anthropic,🤖 AI 服务',
    
    # 流媒体
    'RULE-SET,youtube,📹 油管视频',
    'RULE-SET,netflix,🎬 奈飞',
    
    # 社交媒体
    'RULE-SET,telegram,📲 电报消息',
    'RULE-SET,twitter,🐦 推特/X',
    
    # 国内直连
    'RULE-SET,geolocation-cn,🔒 国内服务',
    'RULE-SET,cn-ip,🔒 国内服务',
    
    # 国外走代理
    'RULE-SET,geolocation-!cn,🌍 非中国',
    
    # 兜底规则
    'MATCH,🐟 漏网之鱼'
]

5.5 效果对比

特性 原始订阅 二次加工后
节点信息
分流规则 ✅ 30+ 分组
DNS 配置 ✅ Fake-IP + DoH
广告拦截
规则自动更新 ✅ 每日更新
国内直连

六、进阶优化:分布式代理拉取

6.1 问题:单 IP 请求过多会被限制

当账户数量增加到几十个时,新的问题出现了:

  • 短时间内从同一个 IP 发起大量请求
  • 机场服务器可能会限制或封禁
  • 订阅拉取失败率上升

6.2 解决方案:多 VPS 分散请求

我手上有几台 VPS,可以利用它们来分散请求:

主系统 (管理中心)
       │
       ├── 请求代理 ──→ VPS1 (IP1) ──→ 机场 API
       ├── 请求代理 ──→ VPS2 (IP2) ──→ 机场 API
       ├── 请求代理 ──→ VPS3 (IP3) ──→ 机场 API
       └── 直接请求 ──────────────────→ 机场 API

6.3 代理服务实现

在每台 VPS 上部署一个轻量的代理服务(proxy-worker):

# proxy-worker/app.py
@app.route('/fetch')
def fetch_url():
    """代理拉取订阅"""
    url = request.args.get('url')
    
    # 用本地 IP 请求目标 URL
    resp = requests.get(url, headers={'User-Agent': 'ClashForAndroid/2.5.12'})
    
    return Response(resp.content, headers={
        'Content-Type': resp.headers.get('Content-Type'),
        'Subscription-Userinfo': resp.headers.get('Subscription-Userinfo', '')
    })

6.4 账户绑定代理

数据库中添加 proxy_index 字段,每个账户固定使用一个代理:

proxy_index 出口
0 本地直连
1 代理服务器 1
2 代理服务器 2
3 代理服务器 3
4 代理服务器 4

均匀分配:

-- 5 个出口,按 ID 取模分配
UPDATE accounts SET proxy_index = (id % 5);

拉取时选择对应代理:

def fetch_clash_proxies(url: str, proxy_index: int = 0):
    """拉取订阅(支持代理)"""
    if proxy_index > 0 and PROXY_WORKERS:
        # 使用指定的代理服务器
        proxy_url = PROXY_WORKERS[proxy_index - 1]
        resp = requests.get(f"{proxy_url}/fetch", params={'url': url})
        content = resp.text
    else:
        # 直接请求
        content = requests.get(url).text
    
    return parse_clash_proxies(content)

七、性能优化:并行拉取

7.1 问题:串行拉取太慢

假设有 20 个账户,每个拉取需要 3 秒:

  • 串行:20 × 3 = 60 秒
  • 用户等待 1 分钟才能拿到订阅,体验极差

7.2 解决方案:线程池并行

使用 concurrent.futures.ThreadPoolExecutor 并行处理:

from concurrent.futures import ThreadPoolExecutor, as_completed

def merged_subscribe(token):
    accounts = get_valid_accounts(token)
    
    # 分类:有缓存的 vs 需要拉取的
    cached_accounts = []
    fetch_accounts = []
    
    for acc in accounts:
        if has_valid_cache(acc):
            cached_accounts.append(acc)
        else:
            fetch_accounts.append(acc)
    
    all_proxies = []
    
    # 有缓存的直接读取
    for acc in cached_accounts:
        proxies = load_from_cache(acc)
        all_proxies.extend(proxies)
    
    # 需要拉取的并行处理
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(fetch_account_proxies, acc): acc 
                   for acc in fetch_accounts}
        
        for future in as_completed(futures):
            proxies = future.result()
            all_proxies.extend(proxies)
    
    return generate_config(all_proxies)

优化效果:

场景 串行耗时 并行耗时 提升
20 个账户,无缓存 ~60s ~6s 10x
20 个账户,50% 缓存 ~30s ~3s 10x
20 个账户,全缓存 ~0.5s ~0.5s -

八、缓存策略

为了减少对机场服务器的请求,实现了数据库缓存:

8.1 数据库缓存

class AccountDB:
    @staticmethod
    def update_proxies_cache(account_id: int, proxies_json: str):
        """更新节点缓存"""
        cursor.execute("""
            UPDATE accounts 
            SET proxies_cache = %s, proxies_cache_at = NOW() 
            WHERE id = %s
        """, (proxies_json, account_id))
    
    @staticmethod
    def get_proxies_cache(account_id: int, max_age_hours: int = 5):
        """获取节点缓存(5小时内有效)"""
        cursor.execute("""
            SELECT proxies_cache FROM accounts 
            WHERE id = %s 
            AND proxies_cache_at > DATE_SUB(NOW(), INTERVAL %s HOUR)
        """, (account_id, max_age_hours))
        return cursor.fetchone()

8.2 缓存更新时机

  • 同步账户时:主动拉取最新订阅并更新缓存
  • 缓存过期时:下次请求时自动刷新
  • 手动清除:管理员可手动清除缓存

九、项目结构

最终的项目结构:

fuckbkm/
├── app.py                 # 入口
├── core/                  # 核心模块
│   ├── config.py          # 配置
│   ├── database.py        # 数据库
│   ├── airport_client.py  # 机场 API
│   └── scheduler.py       # 定时任务
├── routes/                # 路由
│   ├── accounts.py        # 账户管理
│   ├── subscribe.py       # 订阅合并
│   └── ...
├── utils/                 # 工具
│   └── clash_config.py    # Clash 配置生成
├── proxy-worker/          # 代理服务(部署到 VPS)
│   └── app.py
└── templates/             # 前端页面

十、总结

解决的问题

问题 解决方案
账户分散难管理 统一的 Web 管理界面
订阅链接太多 一键合并为单个订阅
忘记续费 自动检测 + 自动续费
状态不清楚 定时同步 + 实时展示
单 IP 被限制 分布式代理拉取
拉取速度慢 并行处理 + 缓存

希望这篇文章对你有帮助


Prev Post FuckTsn Web - 体适能跑步模拟平台使用指南
Next Post Motrix WebExtension:让浏览器下载自动走 Motrix