防薅羊毛:礼包码的安全防护

本文是游戏运营系统技术分享系列第四篇章的第五篇,探讨礼包码系统的安全防护设计。


前言

在游戏运营中,礼包码是一把双刃剑。用得好,它是拉新促活的利器;用不好,它就是被薅羊毛的重灾区。

我们曾经在春节活动中发放了一批高价值礼包码,本意是回馈老玩家。结果呢?上线两小时,库存就被清空了。后台数据显示,超过60%的兑换请求来自同一批设备指纹。

这件事让我们意识到:没有安全防护的礼包码系统,就是给别人准备的免费午餐。

今天这篇文章,我们就来聊聊如何设计一套完整的礼包码安全防护体系。不讲虚的,我们直接从技术细节、攻防案例、风控设计到异常检测算法,一次性讲透。


一、礼包码被薅的常见手段

知己知彼,才能有的放矢。在设计防护方案之前,我们先来看看薅羊毛的常见套路。

1.1 批量注册小号

这是最基础也是最常见的手法。黑产团伙会利用自动化工具批量注册游戏账号,然后集中兑换礼包码。

黑产通常使用"养号农场"模式。一套自动化脚本可以同时控制成百上千个账号,流程大致如下:

注册流程自动化:
1. 获取手机号(接码平台,0.1-0.5元/个)
2. 自动填写注册信息(随机生成昵称、头像)
3. 通过新手引导(脚本自动完成)
4. 等待礼包码下发
5. 批量兑换
6. 转移资产到主号
7. 弃用小号

一个熟练的黑产工作室,一天可以"养"出上万个可用账号。每个账号的成本可能只有几分钱,但兑换到的礼包价值可能是几块甚至几十块。

2024年某MMO手游周年庆,发放了价值200万的礼包码。活动结束后数据分析发现:

这次活动实际被黑产"吃掉"的损失约60万元。

1.2 设备指纹伪造

更高级一点的手段,会尝试伪造设备信息来绕过设备维度的限制。

设备指纹是识别设备唯一性的核心技术。常见的指纹维度包括:

维度 Android iOS 伪造难度
硬件ID IMEI、Android ID IDFA、IDFV
网络特征 IP、MAC地址 IP、WiFi BSSID
设备参数 型号、系统版本、分辨率 同左
行为特征 传感器数据、触摸模式 同左
环境特征 已安装应用列表、系统配置 受限

黑产常用的"指纹清洗"工具,可以批量修改这些参数:

# 伪代码:黑产工具的指纹伪造逻辑
def fake_device():
    return {
        "imei": generate_random_imei(),      # 随机生成IMEI
        "android_id": random_hex(16),         # 随机Android ID
        "mac": random_mac(),                  # 随机MAC地址
        "model": random_choice(popular_phones), # 随机机型
        "resolution": random_choice(common_resolutions),
        "ip": get_proxy_ip(),                 # 代理IP
    }

我们采用的是"复合指纹"方案,不依赖单一标识符:

// 伪代码:复合指纹生成
public String generateFingerprint(DeviceInfo info) {
    StringBuilder sb = new StringBuilder();
    
    // 硬件层(权重40%)
    sb.append(hash(info.getCpuInfo()));
    sb.append(hash(info.getMemoryInfo()));
    sb.append(hash(info.getSensorList()));  // 传感器列表很难伪造
    
    // 系统层(权重30%)
    sb.append(hash(info.getSystemBuildProps()));
    sb.append(hash(info.getInstalledApps())); // 已安装应用特征
    
    // 行为层(权重30%)
    sb.append(hash(info.getTouchPattern()));  // 触摸行为模式
    sb.append(hash(info.getBatteryUsage()));  // 电池使用模式
    
    return SHA256(sb.toString());
}

关键思路:伪造单一维度容易,伪造所有维度且保持一致性很难。

1.3 礼包码暴力枚举

如果礼包码的生成算法不够安全,攻击者可能通过暴力枚举的方式猜出有效码。

假设礼包码是8位纯数字(如 12345678),可能的组合是 10^8 = 1亿种。一个简单的脚本每秒可以尝试1000次,理论上约28小时可以遍历完所有可能。如果有100个并发,只需17分钟。

实际案例中,我们见过更糟糕的设计:

错误示例:有规律的码
活动码1:GIFT20240001
活动码2:GIFT20240002
...
活动码N:GIFT2024NNNN

这种设计等于把密码写在脸上。攻击者只要猜到规律,就可以批量生成有效码。

// 使用密码学安全的随机数生成器
func GenerateGiftCode(length int) string {
    const charset = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789" // 去掉易混淆字符
    b := make([]byte, length)
    
    // 关键:使用 crypto/rand 而不是 math/rand
    for i := range b {
        randomByte := make([]byte, 1)
        rand.Read(randomByte) // 密码学安全随机数
        b[i] = charset[int(randomByte[0]) % len(charset)]
    }
    
    return string(b)
}

// 加入校验位,防止输入错误
func AddCheckDigit(code string) string {
    // 使用 Luhn 算法变种
    sum := 0
    for i, c := range code {
        val := int(c)
        if i % 2 == 0 {
            val *= 2
            if val > 100 {
                val -= 100
            }
        }
        sum += val
    }
    checkDigit := (100 - (sum % 100)) % 100
    return code + fmt.Sprintf("%02d", checkDigit)
}

1.4 内部泄露

有时候问题出在内部。礼包码被提前泄露到黑产群,或者被内部人员倒卖。

某游戏与直播平台合作,给主播发放专属礼包码用于粉丝福利。结果活动还没开始,礼包码就已经在黑产群里叫卖了。

追查发现:直播平台的运营人员把码截图发给了朋友,朋友转手就卖给了黑产。整个链条:

游戏公司 → 直播平台运营 → 运营的朋友 → 黑产中间商 → 羊毛党
    1000元           500元           200元           50元
  1. 水印追溯:每个渠道的码加上隐蔽标记,泄露后可以定位
// 在码中嵌入渠道标识
func GenerateChannelCode(channelID int) string {
    // 渠道ID编码到码的特定位
    // 比如第4-6位是渠道编码(看起来像随机字符)
    code := GenerateRandomCode(9)
    channelPart := encodeChannel(channelID) // 3位
    return code[:3] + channelPart + code[6:]
}
  1. 分批发放:不要一次性把所有码给渠道,分批给、用完再申请
  1. 有效期控制:码的有效期不要太长,减少泄露后的损失窗口
  1. 审计日志:记录每次码的查询、导出操作,事后可追溯

1.5 重放攻击

同一礼包码被多次提交兑换请求,如果系统缺乏幂等性保护,可能导致重复发放。

正常用户点击"兑换"按钮,可能因为网络延迟连续点击多次。如果没有幂等性保护,后端可能收到多个请求并重复处理。

更恶意的情况:攻击者截获一个有效的兑换请求,然后重放这个请求来重复兑换。

// 使用幂等性键(Idempotency Key)
func RedeemGiftCode(req *RedeemRequest) (*RedeemResponse, error) {
    // 1. 生成幂等性键
    idempotencyKey := fmt.Sprintf("redeem:%s:%s", 
        req.UserID, req.GiftCode)
    
    // 2. 尝试加锁(Redis SETNX)
    locked, err := redis.SetNX(idempotencyKey, "1", 30*time.Second)
    if err != nil {
        return nil, err
    }
    if !locked {
        // 已经有相同的请求在处理中
        return nil, ErrDuplicateRequest
    }
    defer redis.Del(idempotencyKey)
    
    // 3. 检查是否已兑换(数据库层面)
    alreadyRedeemed, err := db.CheckRedeemed(req.UserID, req.GiftCode)
    if alreadyRedeemed {
        return nil, ErrAlreadyRedeemed
    }
    
    // 4. 执行兑换逻辑(事务)
    return executeRedeem(req)
}

关键点:在请求层面和数据库层面都要做幂等性保护。


二、安全防护的多层设计

针对上述威胁,我们需要建立纵深防御体系。单一防护手段总有被绕过的可能,多层防护才能形成真正的安全屏障。

打个比方:你家的门锁(第一层)可能被撬,但如果还有防盗门(第二层)、监控摄像头(第三层)、小区保安(第四层),小偷的成本就大大增加了。薅羊毛的人也是要算账的,成本高了自然就换一家了。

2.1 第一层:码本身的安全性

礼包码本身要足够"硬",让攻击者难以破解。

  1. 足够的熵值
- 码的长度和字符集决定了可能的组合数

- 一般建议至少64位随机性(约12-16位码) - 避免使用有规律的生成方式

  1. 不可预测性
- 使用密码学安全的随机数生成器

- 不要基于时间戳、序号等可预测信息生成 - 每个码独立生成,不要有数学关联

  1. 校验机制
- 内置校验位,快速识别无效码

- 防止输入错误被误判为攻击

格式:XXXX-XXXX-XXXX-XXXX(16位,分4组)

结构:
- 前12位:随机生成的主码
- 后4位:校验码(基于前12位计算)

示例:A7XK-3MNP-9QWR-BT2E
      └──主码──┘ └校验┘

当码的数量很大时(比如百万级),如何快速验证码的有效性?

// 方案一:布隆过滤器(快速排除无效码)
type CodeValidator struct {
    bloom    *bloom.BloomFilter  // 快速判断码不存在
    redis    *redis.Client       // 存储有效码集合
    db       *sql.DB             // 持久化存储
}

func (v *CodeValidator) Validate(code string) (bool, error) {
    // 第一层:布隆过滤器(内存级速度)
    if !v.bloom.Test([]byte(code)) {
        return false, nil // 一定不存在
    }
    
    // 第二层:Redis缓存
    exists, err := v.redis.SIsMember("valid_codes", code).Result()
    if err == nil {
        return exists, nil
    }
    
    // 第三层:数据库(兜底)
    return v.db.QueryBool(
        "SELECT EXISTS(SELECT 1 FROM gift_codes WHERE code = ?)", 
        code)
}

布隆过滤器的妙处:它可以100%准确判断一个码"不存在",只有"可能存在"才需要进一步查。对于大量的无效请求(攻击者猜测的码),第一层就被过滤掉了。

2.2 第二层:使用限制

即使码被泄露,也要让它在攻击者手中变得"不好用"。

  1. 次数限制
- 全局兑换上限(总量控制)

- 单用户兑换上限(防刷) - 单设备兑换上限(防小号)

  1. 时间限制
- 有效期限制(过期作废)

- 兑换时间窗口(比如只在活动期间可用)

  1. 资格限制
- 新用户专享/老用户专享

- 等级门槛 - 实名认证要求

// 兑换限制检查器
type RedeemLimiter struct {
    redis *redis.Client
}

func (l *RedeemLimiter) CheckLimit(req *RedeemRequest) error {
    ctx := context.Background()
    
    // 1. 检查全局兑换次数
    globalKey := fmt.Sprintf("gift:%s:redeemed", req.GiftCodeID)
    globalCount, _ := l.redis.Get(ctx, globalKey).Int()
    if globalCount >= req.GlobalLimit {
        return ErrGlobalLimitExceeded
    }
    
    // 2. 检查用户兑换次数
    userKey := fmt.Sprintf("gift:%s:user:%s", req.GiftCodeID, req.UserID)
    userCount, _ := l.redis.Get(ctx, userKey).Int()
    if userCount >= req.UserLimit {
        return ErrUserLimitExceeded
    }
    
    // 3. 检查设备兑换次数
    deviceKey := fmt.Sprintf("gift:%s:device:%s", req.GiftCodeID, req.DeviceID)
    deviceCount, _ := l.redis.Get(ctx, deviceKey).Int()
    if deviceCount >= req.DeviceLimit {
        return ErrDeviceLimitExceeded
    }
    
    // 4. 检查时间窗口
    if time.Now().Before(req.StartTime) || time.Now().After(req.EndTime) {
        return ErrOutOfTimeWindow
    }
    
    // 5. 检查用户资格
    if err := l.checkEligibility(req); err != nil {
        return err
    }
    
    return nil
}

在高并发场景下,"检查-兑换"必须是一个原子操作:

// 使用 Redis 事务
func (l *RedeemLimiter) RedeemWithLimit(req *RedeemRequest) error {
    return l.redis.Watch(ctx, func(tx *redis.Tx) error {
        // 检查限制
        if err := l.CheckLimit(req); err != nil {
            return err
        }
        
        // 原子性执行兑换
        _, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
            // 增加计数
            pipe.Incr(ctx, fmt.Sprintf("gift:%s:redeemed", req.GiftCodeID))
            pipe.Incr(ctx, fmt.Sprintf("gift:%s:user:%s", req.GiftCodeID, req.UserID))
            pipe.Incr(ctx, fmt.Sprintf("gift:%s:device:%s", req.GiftCodeID, req.DeviceID))
            
            // 执行兑换逻辑
            return l.executeRedeem(pipe, req)
        })
        return err
    })
}

2.3 第三层:渠道管控

从源头控制礼包码的分发,减少泄露风险。

  1. 渠道隔离
- 不同渠道使用不同批次的码

- 出问题可以精准定位和止损

  1. 动态下发
- 用户主动领取时才生成或下发

- 避免提前批量生成大量码

  1. 使用追踪
- 记录每个码的兑换情况

- 异常兑换可追溯

-- 礼包码批次表
CREATE TABLE gift_code_batches (
    id BIGINT PRIMARY KEY,
    batch_code VARCHAR(32) UNIQUE,        -- 批次编码
    channel_id INT,                        -- 渠道ID
    total_count INT,                       -- 总数量
    redeemed_count INT DEFAULT 0,          -- 已兑换数量
    status TINYINT,                        -- 状态:1-正常 2-暂停 3-封禁
    created_at TIMESTAMP,
    expired_at TIMESTAMP,
    
    INDEX idx_channel (channel_id),
    INDEX idx_status (status)
);

-- 当某渠道出现异常时,一键暂停
UPDATE gift_code_batches 
SET status = 2 
WHERE channel_id = ? AND status = 1;
传统模式:
游戏方 → 生成10000个码 → 给渠道 → 渠道分发 → 用户兑换
                     ↑
                  泄露风险点

动态下发模式:
用户点击领取 → 请求渠道服务器 → 渠道请求游戏方 → 生成码 → 返回给用户
                                              ↑
                                         生成即使用,无泄露窗口

2.4 第四层:接口保护

保护兑换接口本身,防止被直接攻击。

  1. 身份验证
- 必须登录才能兑换

- 必要时要求二次验证

  1. 请求限流
- 单用户频率限制

- 单IP频率限制 - 全局QPS保护

  1. 请求签名
- 客户端请求携带签名

- 服务端校验签名有效性 - 防止请求被篡改或重放

// 滑动窗口限流器
type SlidingWindowLimiter struct {
    redis   *redis.Client
    limit   int           // 限制次数
    window  time.Duration // 时间窗口
}

func (l *SlidingWindowLimiter) Allow(key string) bool {
    ctx := context.Background()
    now := time.Now().UnixMilli()
    windowStart := now - l.window.Milliseconds()
    
    // 使用 Redis ZSET 实现滑动窗口
    pipe := l.redis.Pipeline()
    
    // 移除窗口外的记录
    pipe.ZRemRangeByScore(ctx, key, "0", fmt.Sprintf("%d", windowStart))
    
    // 统计当前窗口内的请求数
    countCmd := pipe.ZCard(ctx, key)
    
    // 添加当前请求
    pipe.ZAdd(ctx, key, redis.Z{Score: float64(now), Member: now})
    
    // 设置过期时间
    pipe.Expire(ctx, key, l.window)
    
    pipe.Exec(ctx)
    
    return countCmd.Val() < int64(l.limit)
}

// 使用示例:每分钟最多5次兑换请求
limiter := &SlidingWindowLimiter{
    redis:  rdb,
    limit:  5,
    window: time.Minute,
}

if !limiter.Allow(fmt.Sprintf("redeem:user:%s", userID)) {
    return ErrRateLimitExceeded
}
签名流程:
1. 客户端组装请求参数
2. 按参数名排序,拼接成字符串
3. 加上时间戳和密钥
4. 计算 HMAC-SHA256 签名
5. 请求带上签名和时间戳

服务端验证:
1. 检查时间戳是否在允许范围内(防重放)
2. 用同样的算法计算签名
3. 比对签名是否一致
func VerifySignature(req *http.Request, secret string) bool {
    // 获取时间戳
    timestamp := req.Header.Get("X-Timestamp")
    if timestamp == "" {
        return false
    }
    
    // 检查时间戳(允许5分钟误差)
    ts, _ := strconv.ParseInt(timestamp, 10, 64)
    if math.Abs(float64(time.Now().Unix()-ts)) > 300 {
        return false
    }
    
    // 获取签名
    signature := req.Header.Get("X-Signature")
    if signature == "" {
        return false
    }
    
    // 计算期望签名
    params := sortParams(req.URL.Query())
    payload := strings.Join(params, "&") + timestamp + secret
    expected := hmacSHA256(payload, secret)
    
    // 比对签名
    return hmac.Equal([]byte(signature), []byte(expected))
}

三、风控规则设计

有了多层防护,还需要一套灵活的风控规则来实时判断每次兑换请求的风险等级。

3.1 规则引擎架构

我们采用规则引擎的方式来管理风控规则,而不是硬编码在代码里。

  1. 规则定义层
- 用配置化的方式定义规则

- 支持热更新,不需要发版

  1. 事实收集层
- 收集请求相关的各类信息

- 用户信息、设备信息、行为信息等

  1. 规则计算层
- 根据规则和事实计算风险分数

- 支持规则的组合和权重

  1. 决策执行层
- 根据风险分数决定放行、拒绝或人工审核
┌─────────────────────────────────────────────────────────────┐
│                       兑换请求                               │
└─────────────────┬───────────────────────────────────────────┘
                  ▼
┌─────────────────────────────────────────────────────────────┐
│  事实收集层 (Fact Collector)                                │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │
│  │ 用户画像 │ │ 设备指纹 │ │ 行为特征 │ │ 环境信息 │       │
│  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │
└─────────────────┬───────────────────────────────────────────┘
                  ▼
┌─────────────────────────────────────────────────────────────┐
│  规则计算层 (Rule Engine)                                   │
│  ┌────────────────────────────────────────────────────┐    │
│  │  规则1: 新设备 + 高价值礼包 → 风险+30              │    │
│  │  规则2: 同IP短时间多次请求 → 风险+50              │    │
│  │  规则3: 账号无游戏行为 → 风险+40                  │    │
│  │  ...                                               │    │
│  └────────────────────────────────────────────────────┘    │
│                      ↓ 风险分数: 75                         │
└─────────────────┬───────────────────────────────────────────┘
                  ▼
┌─────────────────────────────────────────────────────────────┐
│  决策执行层 (Decision Engine)                               │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐                    │
│  │ 0-30 放行│ │31-70 验证│ │71-100 拒绝│                    │
│  └──────────┘ └──────────┘ └──────────┘                    │
└─────────────────────────────────────────────────────────────┘

3.2 典型风控规则

基于单一条件的硬性规则,命中即拒绝。

示例:

多个条件组合判断,更精准但也更复杂。

示例:

基于用户行为模式判断。

示例:

{
  "rules": [
    {
      "id": "R001",
      "name": "新设备高价值兑换",
      "type": "composite",
      "conditions": [
        {"field": "device.age_hours", "op": "<", "value": 24},
        {"field": "gift.value", "op": ">", "value": 100}
      ],
      "logic": "AND",
      "score": 40,
      "action": "challenge"
    },
    {
      "id": "R002", 
      "name": "IP短时间高频请求",
      "type": "statistical",
      "conditions": [
        {"field": "ip.request_count_1h", "op": ">", "value": 20}
      ],
      "score": 50,
      "action": "reject"
    },
    {
      "id": "R003",
      "name": "疑似脚本行为",
      "type": "behavior",
      "conditions": [
        {"field": "user.has_gameplay", "op": "==", "value": false},
        {"field": "user.register_to_redeem_minutes", "op": "<", "value": 30}
      ],
      "logic": "AND",
      "score": 60,
      "action": "reject"
    }
  ],
  "decision_thresholds": {
    "pass": 30,
    "challenge": 70,
    "reject": 100
  }
}

3.3 规则引擎实现

// 规则引擎核心结构
type RuleEngine struct {
    rules    []Rule
    facts    map[string]interface{}
    scorer   *RiskScorer
}

type Rule struct {
    ID         string
    Name       string
    Type       string // "static", "composite", "statistical", "behavior"
    Conditions []Condition
    Logic      string // "AND", "OR"
    Score      int
    Action     string // "pass", "challenge", "reject"
}

// 执行规则评估
func (e *RuleEngine) Evaluate() *RiskResult {
    result := &RiskResult{
        TotalScore: 0,
        HitRules:   []string{},
        Action:     "pass",
    }
    
    for _, rule := range e.rules {
        if e.matchRule(rule) {
            result.TotalScore += rule.Score
            result.HitRules = append(result.HitRules, rule.ID)
            
            // 更新决策
            if rule.Action == "reject" && rule.Score >= 50 {
                result.Action = "reject"
            } else if result.Action == "pass" && rule.Score > 30 {
                result.Action = "challenge"
            }
        }
    }
    
    return result
}

// 匹配单条规则
func (e *RuleEngine) matchRule(rule Rule) bool {
    if rule.Logic == "AND" {
        for _, cond := range rule.Conditions {
            if !e.evaluateCondition(cond) {
                return false
            }
        }
        return true
    } else { // OR
        for _, cond := range rule.Conditions {
            if e.evaluateCondition(cond) {
                return true
            }
        }
        return false
    }
}

// 评估单个条件
func (e *RuleEngine) evaluateCondition(cond Condition) bool {
    value := e.getFactValue(cond.Field)
    
    switch cond.Op {
    case "<":
        return toFloat(value) < toFloat(cond.Value)
    case ">":
        return toFloat(value) > toFloat(cond.Value)
    case "==":
        return value == cond.Value
    case "!=":
        return value != cond.Value
    default:
        return false
    }
}

3.4 规则的灰度与迭代

风控规则不是一成不变的,需要持续优化。

  1. 新规则灰度上线
- 先观察模式,记录命中但不拦截

- 分析误伤率和漏过率 - 确认有效后再正式启用

  1. 规则效果监控
- 跟踪每条规则的命中率和准确率

- 定期清理无效规则

  1. A/B测试
- 对比不同规则组合的效果

- 用数据驱动规则优化

新规则上线:
Day 1-3:   影子模式(Shadow Mode)
           - 规则执行但不生效
           - 收集命中数据
           - 分析误伤情况

Day 4-7:   灰度模式(Canary Mode)
           - 对10%流量生效
           - 监控投诉和反馈
           - 调整规则参数

Day 8+:    全量模式(Full Mode)
           - 对所有流量生效
           - 持续监控效果
           - 定期优化迭代
-- 规则效果统计
SELECT 
    rule_id,
    COUNT(*) as hit_count,
    SUM(CASE WHEN action = 'reject' THEN 1 ELSE 0 END) as reject_count,
    SUM(CASE WHEN user_complaint = 1 THEN 1 ELSE 0 END) as complaint_count,
    AVG(CASE WHEN confirmed_fraud = 1 THEN 1 ELSE 0 END) as precision_rate
FROM rule_execution_log
WHERE created_at > DATE_SUB(NOW(), INTERVAL 7 DAY)
GROUP BY rule_id;

四、异常行为检测算法

除了预设的风控规则,我们还需要有发现未知威胁的能力。风控规则是"已知已知"(知道有哪些攻击方式),异常检测是发现"未知未知"(发现新的攻击模式)。

4.1 统计学方法

基于历史数据建立正常行为的统计模型,偏离模型的行为即视为异常。

import numpy as np

class ZScoreDetector:
    """基于Z-Score的异常检测"""
    
    def __init__(self, threshold=3.0):
        self.threshold = threshold
        self.mean = None
        self.std = None
    
    def fit(self, historical_data):
        """用历史数据训练模型"""
        self.mean = np.mean(historical_data)
        self.std = np.std(historical_data)
    
    def detect(self, value):
        """检测单个值是否异常"""
        if self.std == 0:
            return False
        
        z_score = (value - self.mean) / self.std
        return abs(z_score) > self.threshold

# 使用示例:检测某IP的请求频率异常
detector = ZScoreDetector(threshold=3.0)
historical_requests = [10, 12, 8, 15, 11, 9, 13, ...]  # 历史每小时请求数
detector.fit(historical_requests)

current_requests = 50  # 当前小时的请求数
is_anomaly = detector.detect(current_requests)  # True,异常!
class MovingAverageDetector:
    """基于移动平均的异常检测"""
    
    def __init__(self, window_size=24, confidence=2.0):
        self.window_size = window_size
        self.confidence = confidence
        self.history = []
    
    def detect(self, value):
        """实时检测"""
        if len(self.history) < self.window_size:
            self.history.append(value)
            return False
        
        # 计算移动平均和标准差
        window = self.history[-self.window_size:]
        mean = np.mean(window)
        std = np.std(window)
        
        # 更新历史
        self.history.append(value)
        
        # 判断是否超出置信区间
        lower_bound = mean - self.confidence * std
        upper_bound = mean + self.confidence * std
        
        return value < lower_bound or value > upper_bound

# 使用示例:监控每分钟兑换请求量
detector = MovingAverageDetector(window_size=60, confidence=2.5)
for minute_requests in realtime_stream:
    if detector.detect(minute_requests):
        alert(f"请求量异常: {minute_requests}")

4.2 行为序列分析

不只看单次请求,而是分析行为序列。正常用户和黑产的行为路径有明显差异。

from hmmlearn import hmm
import numpy as np

class BehaviorSequenceDetector:
    """基于HMM的行为序列异常检测"""
    
    def __init__(self, n_states=5):
        self.model = hmm.GaussianHMM(
            n_components=n_states,
            covariance_type="diag",
            n_iter=100
        )
        self.is_fitted = False
    
    def extract_features(self, events):
        """从用户行为序列提取特征"""
        features = []
        for i, event in enumerate(events):
            feature = [
                event['time_since_last'],      # 距上次操作时间
                event['operation_type'],        # 操作类型编码
                event['page_depth'],            # 页面深度
                event['session_duration'],      # 会话时长
            ]
            features.append(feature)
        return np.array(features)
    
    def fit(self, normal_sequences):
        """用正常用户行为序列训练"""
        all_features = []
        for seq in normal_sequences:
            features = self.extract_features(seq)
            all_features.append(features)
        
        # 合并所有序列
        X = np.vstack(all_features)
        self.model.fit(X)
        self.is_fitted = True
    
    def detect(self, sequence):
        """检测行为序列是否异常"""
        if not self.is_fitted:
            return False
        
        features = self.extract_features(sequence)
        log_prob = self.model.score(features)
        
        # 如果概率低于阈值,认为是异常序列
        threshold = -100  # 需要根据实际数据调优
        return log_prob < threshold

# 正常用户行为序列示例
normal_user = [
    {'event': 'register', 'time_since_last': 0, 'operation_type': 1, ...},
    {'event': 'tutorial', 'time_since_last': 120, 'operation_type': 2, ...},
    {'event': 'play_game', 'time_since_last': 300, 'operation_type': 3, ...},
    {'event': 'redeem_code', 'time_since_last': 600, 'operation_type': 4, ...},
]

# 黑产行为序列示例
fraud_user = [
    {'event': 'register', 'time_since_last': 0, 'operation_type': 1, ...},
    {'event': 'redeem_code', 'time_since_last': 5, 'operation_type': 4, ...},  # 注册5秒就兑换
    {'event': 'transfer_items', 'time_since_last': 10, 'operation_type': 5, ...},
    {'event': 'logout', 'time_since_last': 15, 'operation_type': 6, ...},
]

4.3 图算法检测团伙

黑产往往是有组织的,多个账号之间存在关联。图算法可以发现这些隐藏的团伙。

节点类型:
- 账号(User)
- 设备(Device)  
- IP地址(IP)
- 礼包码(Code)

边类型:
- USES:账号使用设备
- FROM:请求来自IP
- REDEEMS:账号兑换礼包码
- SAME_AS:设备/IP相似度关联
import networkx as nx
from community import community_louvain

class FraudRingDetector:
    """基于图算法的黑产团伙检测"""
    
    def __init__(self):
        self.graph = nx.Graph()
    
    def add_relation(self, user_id, device_id, ip_addr, code_id):
        """添加关联关系"""
        # 添加节点
        self.graph.add_node(f"user_{user_id}", type="user")
        self.graph.add_node(f"device_{device_id}", type="device")
        self.graph.add_node(f"ip_{ip_addr}", type="ip")
        self.graph.add_node(f"code_{code_id}", type="code")
        
        # 添加边
        self.graph.add_edge(f"user_{user_id}", f"device_{device_id}")
        self.graph.add_edge(f"user_{user_id}", f"ip_{ip_addr}")
        self.graph.add_edge(f"user_{user_id}", f"code_{code_id}")
    
    def detect_communities(self):
        """使用Louvain算法发现社区"""
        communities = community_louvain.best_partition(self.graph)
        
        # 统计每个社区的特征
        community_stats = {}
        for node, community_id in communities.items():
            if community_id not in community_stats:
                community_stats[community_id] = {
                    'users': 0,
                    'devices': 0,
                    'ips': 0,
                    'codes': 0,
                }
            
            node_type = self.graph.nodes[node]['type']
            community_stats[community_id][f"{node_type}s"] += 1
        
        # 识别可疑社区:用户多但设备/IP少(共用设备/IP)
        suspicious_communities = []
        for cid, stats in community_stats.items():
            if stats['users'] > 10 and stats['devices'] < stats['users'] * 0.5:
                suspicious_communities.append({
                    'community_id': cid,
                    'stats': stats,
                    'reason': f"{stats['users']}个用户共用{stats['devices']}个设备"
                })
        
        return suspicious_communities

# 使用示例
detector = FraudRingDetector()

# 添加兑换记录(这些账号在共用设备)
for record in redeem_logs:
    detector.add_relation(
        record['user_id'],
        record['device_id'],
        record['ip'],
        record['code_id']
    )

# 检测团伙
fraud_rings = detector.detect_communities()
for ring in fraud_rings:
    alert(f"发现可疑团伙: {ring['reason']}")

某次活动后,我们用图算法分析兑换记录,发现了一个30人的"羊群":

团伙特征:
- 30个账号
- 只用了5台设备(平均每台设备6个号)
- 集中在3个IP段
- 所有账号都在活动当天注册
- 兑换后全部向同一个主号转移物品

处理:
- 封禁30个小号 + 1个主号
- 追回已转移的物品
- 把设备指纹加入黑名单

4.4 机器学习方法

对于更复杂的场景,可以使用监督学习或无监督学习方法。

from sklearn.ensemble import IsolationForest
import pandas as pd

class AnomalyDetector:
    """基于Isolation Forest的异常检测"""
    
    def __init__(self, contamination=0.1):
        self.model = IsolationForest(
            contamination=contamination,
            random_state=42,
            n_estimators=100
        )
        self.feature_columns = None
    
    def extract_features(self, user_behavior):
        """提取用户行为特征"""
        return {
            'register_to_redeem_minutes': user_behavior['redeem_time'] - user_behavior['register_time'],
            'total_gameplay_minutes': user_behavior['gameplay_time'],
            'session_count': user_behavior['sessions'],
            'avg_session_duration': user_behavior['total_time'] / max(user_behavior['sessions'], 1),
            'distinct_pages_visited': len(user_behavior['pages']),
            'friends_count': user_behavior['friends'],
            'guild_joined': 1 if user_behavior['guild'] else 0,
            'items_transferred': user_behavior['transfer_count'],
            'device_switch_count': user_behavior['device_changes'],
        }
    
    def fit(self, user_behaviors):
        """训练模型"""
        features = [self.extract_features(b) for b in user_behaviors]
        df = pd.DataFrame(features)
        self.feature_columns = df.columns.tolist()
        
        self.model.fit(df)
    
    def predict(self, user_behavior):
        """预测是否异常"""
        features = self.extract_features(user_behavior)
        df = pd.DataFrame([features])
        
        prediction = self.model.predict(df[self.feature_columns])
        # -1 表示异常,1 表示正常
        return prediction[0] == -1

# 使用示例
detector = AnomalyDetector(contamination=0.05)  # 假设5%是异常

# 用历史数据训练
detector.fit(historical_user_behaviors)

# 检测新用户
if detector.predict(current_user_behavior):
    trigger_additional_verification()

4.5 实时 vs 离线检测

在请求到达时即时判断,用于拦截高风险请求。

// 实时检测流水线
func RealtimeDetection(req *RedeemRequest) *DetectionResult {
    result := &DetectionResult{RiskScore: 0}
    
    // 1. 快速规则检测(<10ms)
    result.Merge(ruleEngine.Evaluate(req))
    
    // 2. 统计特征检测(<50ms)
    result.Merge(statisticalDetector.Check(req))
    
    // 3. 设备指纹检测(<30ms)
    result.Merge(deviceFingerprintAnalyzer.Check(req))
    
    // 注意:实时检测不做复杂的模型推理
    // 复杂检测放在离线处理
    
    return result
}

事后批量分析,用于发现问题和完善规则。

# 离线分析任务(每天执行)
def offline_analysis():
    # 1. 拉取昨天的兑换数据
    yesterday_redeems = fetch_redeems(date=yesterday())
    
    # 2. 构建关联图谱
    graph = build_relationship_graph(yesterday_redeems)
    
    # 3. 社区发现
    communities = detect_communities(graph)
    
    # 4. 识别可疑团伙
    for community in communities:
        if is_suspicious(community):
            # 5. 生成新规则
            new_rule = generate_rule_from_community(community)
            
            # 6. 添加到规则引擎(灰度模式)
            add_rule_shadow_mode(new_rule)
            
            # 7. 发送告警
            alert_security_team(community, new_rule)

五、应急响应机制

再完善的防护也可能被突破,关键是出现问题后能否快速响应。

5.1 监控告警

# 核心监控指标
metrics:
  - name: redeem_success_rate
    description: 兑换成功率
    alert_when: 下降超过20%
    
  - name: redeem_request_qps
    description: 兑换请求QPS
    alert_when: 上涨超过5倍
    
  - name: single_device_redeem_count
    description: 单设备兑换次数
    alert_when: 单设备1小时内超过10次
    
  - name: new_user_redeem_ratio
    description: 新用户兑换占比
    alert_when: 超过50%
    
  - name: rule_hit_rate
    description: 风控规则命中率
    alert_when: 单条规则命中率突增或突降
P0(紧急):
- 兑换接口被攻击导致服务不可用
- 大规模薅羊毛正在进行
→ 立即电话通知,5分钟内响应

P1(严重):
- 某批次礼包码异常兑换量激增
- 发现新的攻击模式
→ 飞书/短信通知,15分钟内响应

P2(一般):
- 单一渠道兑换异常
- 风控规则误伤率上升
→ 飞书通知,1小时内响应

5.2 快速止损

发现问题后,要能快速切断损失。

// 止损操作API
type EmergencyController struct {
    db    *sql.DB
    redis *redis.Client
}

// 暂停指定批次
func (c *EmergencyController) PauseBatch(batchID string) error {
    // 1. 更新数据库状态
    _, err := c.db.Exec(
        "UPDATE gift_code_batches SET status = 2 WHERE id = ?", 
        batchID)
    if err != nil {
        return err
    }
    
    // 2. 刷新缓存
    c.redis.Del(ctx, fmt.Sprintf("batch:%s:info", batchID))
    
    // 3. 记录操作日志
    logEmergencyOperation("pause_batch", batchID)
    
    return nil
}

// 暂停指定渠道
func (c *EmergencyController) PauseChannel(channelID int) error {
    return c.db.Exec(
        "UPDATE gift_code_batches SET status = 2 WHERE channel_id = ?",
        channelID)
}

// 批量封禁账号
func (c *EmergencyController) BanUsers(userIDs []string, reason string) error {
    for _, uid := range userIDs {
        c.redis.Set(ctx, fmt.Sprintf("banned:user:%s", uid), reason, 0)
        c.db.Exec("UPDATE users SET status = 'banned' WHERE id = ?", uid)
    }
    return nil
}

// 熔断保护
func (c *EmergencyController) CheckCircuitBreaker(batchID string) bool {
    key := fmt.Sprintf("cb:%s:failures", batchID)
    failures, _ := c.redis.Get(ctx, key).Int()
    return failures > 100 // 失败超过100次触发熔断
}
circuit_breaker:
  global:
    failure_threshold: 1000    # 全局失败阈值
    time_window: 60s           # 时间窗口
    
  per_batch:
    failure_threshold: 100     # 单批次失败阈值
    time_window: 60s
    
  per_channel:
    failure_threshold: 200     # 单渠道失败阈值  
    time_window: 60s
    
  actions:
    - notify: security_team
    - auto_pause: true         # 自动暂停
    - throttle: 0.1            # 限流到10%

5.3 事后分析

止损之后,要搞清楚发生了什么。

# 安全事件分析报告

## 基本信息
- 事件时间:2024-XX-XX 14:30 - 16:45
- 发现时间:2024-XX-XX 15:12
- 影响范围:春节活动礼包码批次A001-A003
- 发现方式:监控告警(单设备兑换次数异常)

## 攻击特征
- 攻击类型:批量小号薅羊毛
- 涉及账号:3,247个
- 涉及设备:约400台(指纹分析)
- 攻击手法:模拟器多开 + 代理IP

## 损失评估
- 异常兑换数量:3,247次
- 涉及礼包价值:约64,940元
- 已追回:约45,000元(通过封禁账号)
- 实际损失:约19,940元

## 防护失效分析
- 被绕过的规则:R003(IP频率限制,攻击者使用了代理池)
- 未覆盖的场景:新设备首次兑换高价值礼包(当时没有这条规则)

## 改进措施
1. 新增规则:新设备24小时内兑换高价值礼包需二次验证
2. 优化指纹:增加传感器特征维度,提高模拟器识别率
3. 加强监控:新增"单批次兑换设备集中度"指标

5.4 改进闭环

每次事件都要沉淀为系统改进。

// 安全事件追踪系统
type SecurityIncidentTracker struct {
    db *sql.DB
}

type Incident struct {
    ID           string
    Type         string
    StartTime    time.Time
    EndTime      time.Time
    AttackMethod string
    Loss         float64
    Recovered    float64
    RootCause    string
    Improvements []Improvement
}

type Improvement struct {
    Type        string // "rule", "monitor", "process"
    Description string
    Status      string // "pending", "in_progress", "done"
    Owner       string
    DueDate     time.Time
}

// 创建改进项
func (t *SecurityIncidentTracker) CreateImprovement(incidentID string, imp Improvement) {
    t.db.Exec(`
        INSERT INTO security_improvements 
        (incident_id, type, description, status, owner, due_date)
        VALUES (?, ?, ?, 'pending', ?, ?)
    `, incidentID, imp.Type, imp.Description, imp.Owner, imp.DueDate)
}

// 定期检查改进进度
func (t *SecurityIncidentTracker) CheckOverdueImprovements() {
    rows, _ := t.db.Query(`
        SELECT id, description, owner, due_date 
        FROM security_improvements 
        WHERE status != 'done' AND due_date < NOW()
    `)
    
    for rows.Next() {
        var id, desc, owner string
        var dueDate time.Time
        rows.Scan(&id, &desc, &owner, &dueDate)
        
        // 发送提醒
        notify(owner, fmt.Sprintf("安全改进项超期: %s", desc))
    }
}

六、成本与效果的平衡

安全防护不是越强越好,要在安全性和用户体验之间找到平衡点。

6.1 误伤率控制

过严格的风控会把正常用户也拦在外面,造成用户投诉和流失。

-- 每日误伤率统计
SELECT 
    DATE(created_at) as date,
    COUNT(*) as total_blocked,
    SUM(CASE WHEN appeal_result = 'approved' THEN 1 ELSE 0 END) as false_positive,
    SUM(CASE WHEN appeal_result = 'approved' THEN 1 ELSE 0 END) * 100.0 / COUNT(*) as false_positive_rate
FROM block_logs
WHERE created_at > DATE_SUB(NOW(), INTERVAL 30 DAY)
GROUP BY DATE(created_at)
ORDER BY date;
场景 可接受误伤率 说明
低价值礼包(<10元) <0.1% 影响小,用户不会太在意
中价值礼包(10-100元) <0.01% 需要谨慎
高价值礼包(>100元) <0.001% 宁可漏过,不可误伤

6.2 分层验证策略

不是所有请求都需要最严格的验证,根据风险等级采用不同策略:

func ChooseVerificationMethod(riskScore int, giftValue float64) string {
    switch {
    case riskScore < 30:
        return "none" // 直接通过
        
    case riskScore < 50 && giftValue < 50:
        return "none" // 低风险+低价值,放行
        
    case riskScore < 70:
        return "captcha" // 图形验证码
        
    case riskScore < 90 && giftValue < 100:
        return "captcha" // 中风险+中价值
        
    default:
        return "sms" // 高风险或高价值,短信验证
    }
}

6.3 用户申诉通道

被误伤的用户需要有渠道申诉:

申诉流程:
1. 用户收到拦截提示
2. 点击"申诉"按钮
3. 填写申诉理由(可选上传截图)
4. 系统自动初审(基于历史行为)
5. 人工复审(如果自动初审不通过)
6. 申诉结果通知
7. 如果申诉通过,自动放行并记录为误伤

七、总结

礼包码安全防护是一个系统工程,没有银弹。我们需要:

  1. 纵深防御 - 多层防护,层层把关
  2. 灵活风控 - 规则引擎,持续优化
  3. 智能识别 - 异常检测,发现未知威胁
  4. 快速响应 - 监控告警,应急止损
  5. 平衡体验 - 控制误伤,提供申诉

最后说一句:没有绝对安全的系统,只有让攻击成本高于收益的系统。 我们的目标不是让系统"不可能被攻破",而是让攻击者觉得"不值得"。

打个比方:你不可能把家建成银行金库,但你可以让小偷觉得——隔壁那家更好下手。


附录:技术选型参考

场景 推荐方案 说明
礼包码存储 Redis Set + MySQL Redis快速校验,MySQL持久化
限流器 Redis + Lua脚本 原子性保证,性能好
规则引擎 自研或Drools 简单场景自研,复杂场景用Drools
异常检测 scikit-learn Isolation Forest等算法
图计算 NetworkX(小规模)/ Spark GraphX(大规模) 团伙检测
监控告警 Prometheus + Grafana 指标采集和可视化
日志分析 ELK Stack 日志收集、存储、分析


- 第1篇:从0到1:礼包码系统的设计思路

- 第2篇:高并发下的礼包码兑换 - 第3篇:礼包码生成算法揭秘 - 第4篇:礼包码的过期与清理 - 第5篇:防薅羊毛:礼包码的安全防护(本文) - 第6篇:礼包码系统演进与展望(待续)

💬 评论 (0)

0/500
排序: