生成算法:亿级码量的唯一性保证

这是"游戏礼包码系统"系列文章的第二篇。上一篇我们聊了礼包码的整体架构设计,今天深入探讨一个核心问题:如何保证海量礼包码的唯一性。

引言

想象这样一个场景:你的游戏要搞一次大规模营销活动,需要生成一亿个礼包码。每个码都要绝对唯一,不能重复。一旦出现重复,意味着两个玩家可能兑换同一个礼包——这是严重的运营事故。

更棘手的是,这一亿个码不是一次性生成的,而是分批次、分活动持续产生的。如何在分布式环境下、高并发场景中,保证每个礼包码全局唯一?

这就是我们今天要聊的:唯一性问题的本质,以及工程化的解决方案


一、唯一性问题的本质

1.1 什么叫"唯一"?

唯一性看起来简单,但实际上有三层含义:

1.2 为什么难?


二、常见生成算法对比

业界有几种成熟的唯一 ID 生成方案,我们来对比一下。

2.1 UUID

UUID(Universally Unique Identifier)是最常见的方案之一,标准的 UUID 是 36 个字符(含 4 个连字符)。

2.2 雪花算法(Snowflake)

Snowflake 是 Twitter 开源的分布式 ID 生成算法,生成的是 64 位整数。

2.3 自增 + 随机

这是很多礼包码系统的实际选择。核心思路是:用自增序列保证唯一性,用随机化保证不可猜测性

  1. 维护一个全局自增序号
  2. 将序号通过特定算法转换成礼包码
  3. 加入随机成分,防止被枚举

三、核心算法:自增序号到礼包码的转换

这一节是整篇文章的技术核心。我们来看看,如何把一个简单的数字序号,转换成一个安全、友好、唯一的礼包码。

3.1 为什么不直接用数字?

直接用自增序号做礼包码?想多了:

序号 1 → 礼包码 "1"
序号 2 → 礼包码 "2"
...
序号 1000000 → 礼包码 "1000000"

问题一大堆:

所以,我们需要一个双向转换算法

3.2 Base31 编码:数字变字符

核心思想:把我们选定的 31 个字符当作"31 进制"的数字系统。

// 31个安全字符:8个数字 + 23个字母(排除 0,O,1,I,L)
const charset = "23456789ABCDEFGHJKMNPQRSTUVWXYZ"
var charsetMap = make(map[rune]int)

func init() {
    for i, c := range charset {
        charsetMap[c] = i
    }
}
// EncodeNumber 将数字序号编码为Base31字符串
func EncodeNumber(num int64) string {
    if num < 0 {
        panic("序号不能为负数")
    }
    if num == 0 {
        return string(charset[0])
    }
    
    var result []byte
    for num > 0 {
        remainder := num % 31
        result = append([]byte{charset[remainder]}, result...)
        num = num / 31
    }
    return string(result)
}
// DecodeString 将Base31字符串解码为数字序号
func DecodeString(s string) (int64, error) {
    var result int64
    for _, c := range s {
        idx, ok := charsetMap[c]
        if !ok {
            return 0, fmt.Errorf("非法字符: %c", c)
        }
        result = result*31 + int64(idx)
    }
    return result, nil
}
fmt.Println(EncodeNumber(1))      // 输出: 3
fmt.Println(EncodeNumber(31))     // 输出: 33
fmt.Println(EncodeNumber(1000000)) // 输出: 2B9MG

num, _ := DecodeString("2B9MG")
fmt.Println(num) // 输出: 1000000

看到没?1000000 变成了 2B9MG,看起来完全不一样了!

3.3 混淆算法:打乱规律

Base31 编码虽然把数字变成了字符,但还是有规律可循。连续的序号会产生相似的编码:

序号 1000000 → 2B9MG
序号 1000001 → 3B9MG  // 只是第一位变了
序号 1000002 → 4B9MG  // 还是第一位

聪明的用户可能发现规律,然后枚举。我们需要一个混淆函数,把序号打乱,但保持一一对应。

// 混淆参数(必须选择合适的乘数和增量)
// 乘数 m 必须与 2^n 互质,增量 c 必须是奇数
const (
    lcmMultiplier int64 = 6364136223846793005  // 大质数
    lcmIncrement  int64 = 1442695040888963407  // 另一个大质数
)

// Obfuscate 混淆序号,使其看起来随机
func Obfuscate(n int64, bits int) int64 {
    // 使用线性同余生成器的特性进行混淆
    // 只保留低 bits 位
    mask := int64((1 << bits) - 1)
    return (n * lcmMultiplier + lcmIncrement) & mask
}

// Deobfuscate 反向解混淆
// 注意:这需要求乘法逆元,实现较复杂
// 实际工程中,如果不需要反向解码,可以简化
func Deobfuscate(obfuscated int64, bits int) int64 {
    // 计算乘数在模 2^bits 下的乘法逆元
    mask := int64((1 << bits) - 1)
    inverse := modInverse(lcmMultiplier, int64(1<<bits))
    return ((obfuscated - lcmIncrement) & mask * inverse) & mask
}

// 模逆元计算(扩展欧几里得算法)
func modInverse(a, m int64) int64 {
    // 实现略,可用扩展欧几里得算法
    // 或预计算
}
原始序号 1000000 → 混淆后 7293847102 → 编码 "XK4NP8"
原始序号 1000001 → 混淆后 4829103847 → 编码 "M3HB2K"
原始序号 1000002 → 混淆后 1938471029 → 编码 "9R7X4M"

看,现在连续的序号产生的编码完全不同了!

3.4 完整的生成流程

把上面的组件串起来:

type GiftCodeGenerator struct {
    prefix      string  // 业务前缀
    codeLength  int     // 有效部分长度(不含前缀和校验位)
    obfuscBits  int     // 混淆位数
    checkDigits int     // 校验位数
}

func (g *GiftCodeGenerator) Generate(sequenceNum int64) string {
    // 1. 混淆序号
    obfuscated := Obfuscate(sequenceNum, g.obfuscBits)
    
    // 2. Base31 编码
    encoded := EncodeNumber(obfuscated)
    
    // 3. 补齐长度(前面填充随机字符)
    for len(encoded) < g.codeLength {
        randomChar := charset[rand.Intn(31)]
        encoded = string(randomChar) + encoded
    }
    
    // 4. 添加校验位
    checksum := g.calculateChecksum(encoded)
    
    // 5. 拼接前缀
    return g.prefix + encoded + checksum
}

// 校验位计算(简化的 Luhn 算法变体)
func (g *GiftCodeGenerator) calculateChecksum(code string) string {
    sum := 0
    for i, c := range code {
        idx := charsetMap[c]
        if i%2 == 0 {
            sum += idx
        } else {
            sum += idx * 2
            if idx*2 >= 31 {
                sum -= 31
            }
        }
    }
    return string(charset[sum%31])
}
序号 1 → NY-AB7K2X9M3
序号 2 → NY-M3HB2K7P5
序号 3 → NY-9R7X4M2N8

每个码都看起来随机,但实际上是一一对应的。


四、礼包码的格式设计

4.1 字符集选择

不是所有字符都适合做礼包码。

这样,6 位码就有 31^6 ≈ 8.87 亿种组合,8 位码有 31^8 ≈ 8528 亿种组合。

4.2 长度权衡

长度 组合数 适用场景
6 位 约 8.9 亿 小规模活动,限量礼包
8 位 约 8528 亿 中等规模,通用场景
10 位 约 8.2 × 10^14 大规模,长期运营
12 位 约 8 × 10^17 超大规模,几乎不会用完

4.3 结构设计

一个好的礼包码通常包含以下几部分:

[前缀][序号编码][随机成分][校验位]

4.4 一个示例

假设我们要生成一个 10 位礼包码:

NY-AB7K2X9M3

用户输入时可以带或不带连字符,系统自动处理。


五、唯一性保证的三层防护

唯一性不是靠一个算法就能保证的,需要多层防护。就像城堡防御,要有护城河、城墙、还有最后的守卫。

5.1 第一层:算法保证

序号 1 → 码 A
序号 2 → 码 B
序号 3 → 码 C
...

只要序号不重复,且编码函数是确定的,生成的码就一定不重复。

所以,我们需要第二层防护。

5.2 第二层:内存快速检测(布隆过滤器)

对于海量数据的碰撞检测,布隆过滤器是利器。

import "github.com/bits-and-blooms/bloom/v3"

type CodeChecker struct {
    filter *bloom.BloomFilter
}

func NewCodeChecker(expectedItems uint, falsePositiveRate float64) *CodeChecker {
    return &CodeChecker{
        filter: bloom.NewWithEstimates(expectedItems, falsePositiveRate),
    }
}

// Check 检查码是否可能已存在
// 返回 false = 一定不存在
// 返回 true = 可能存在,需要进一步确认
func (c *CodeChecker) Check(code string) bool {
    return c.filter.Test([]byte(code))
}

// Add 添加新码到过滤器
func (c *CodeChecker) Add(code string) {
    c.filter.Add([]byte(code))
}
func (c *CodeChecker) LoadFromDB(db *sql.DB) error {
    rows, err := db.Query("SELECT code FROM gift_codes")
    if err != nil {
        return err
    }
    defer rows.Close()
    
    count := 0
    for rows.Next() {
        var code string
        if err := rows.Scan(&code); err != nil {
            return err
        }
        c.filter.Add([]byte(code))
        count++
        if count%100000 == 0 {
            log.Printf("已加载 %d 个码到布隆过滤器", count)
        }
    }
    log.Printf("布隆过滤器初始化完成,共加载 %d 个码", count)
    return nil
}

5.3 第三层:数据库唯一索引

这是最可靠的防线,永远不会失效。

CREATE TABLE gift_codes (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    code VARCHAR(16) NOT NULL,
    batch_id INT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    
    -- 核心防护:唯一索引
    UNIQUE KEY uk_code (code),
    
    -- 辅助索引
    KEY idx_batch (batch_id),
    KEY idx_created (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
func InsertCode(db *sql.DB, code string, batchID int) error {
    _, err := db.Exec(
        "INSERT INTO gift_codes (code, batch_id) VALUES (?, ?)",
        code, batchID,
    )
    
    if err != nil {
        // MySQL 唯一键冲突的错误码是 1062
        if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1062 {
            return ErrDuplicateCode
        }
        return err
    }
    return nil
}

5.4 三层防护的协作流程

                    生成新码
                       ↓
          ┌────────────────────────┐
          │ 第一层:算法保证        │
          │ 自增序号 + 确定性编码   │
          └────────────────────────┘
                       ↓
          ┌────────────────────────┐
          │ 第二层:布隆过滤器      │
          │ 快速排除 99% 的情况    │
          └────────────────────────┘
                       ↓
              可能存在?(假阳性)
                ↙         ↘
             是              否
              ↓               ↓
    ┌─────────────────┐   直接插入
    │ 查数据库确认     │      ↓
    │ 真的存在?      │   ┌─────────────────┐
    └─────────────────┘   │ 第三层:唯一索引 │
         ↙    ↘           │ 最后的兜底      │
       是      否          └─────────────────┘
        ↓       ↓                ↓
    重新生成  插入            插入成功
                 ↓                ↓
           ┌─────────────────────────────┐
           │ 数据库唯一索引会拦截        │
           │ 极端情况的重复(如配置错误)│
           └─────────────────────────────┘

六、批量生成的性能优化

6.1 预分配序号段

type SequenceAllocator struct {
    db         *sql.DB
    nodeID     int
    current    int64
    end        int64
    segmentSize int64
    mutex      sync.Mutex
}

// AllocateSegment 从数据库预分配一个序号段
func (s *SequenceAllocator) AllocateSegment() error {
    s.mutex.Lock()
    defer s.mutex.Unlock()
    
    // 如果当前段还没用完,不需要重新分配
    if s.current < s.end {
        return nil
    }
    
    // 使用数据库事务分配
    tx, err := s.db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()
    
    // 获取当前最大值
    var maxSeq int64
    err = tx.QueryRow(
        "SELECT max_sequence FROM sequence_allocators WHERE node_id = ? FOR UPDATE",
        s.nodeID,
    ).Scan(&maxSeq)
    if err != nil {
        return err
    }
    
    // 分配新段
    newMax := maxSeq + s.segmentSize
    _, err = tx.Exec(
        "UPDATE sequence_allocators SET max_sequence = ? WHERE node_id = ?",
        newMax, s.nodeID,
    )
    if err != nil {
        return err
    }
    
    if err := tx.Commit(); err != nil {
        return err
    }
    
    s.current = maxSeq + 1
    s.end = newMax
    log.Printf("节点 %d 分配序号段: %d - %d", s.nodeID, s.current, s.end)
    
    return nil
}

// Next 获取下一个序号
func (s *SequenceAllocator) Next() (int64, error) {
    s.mutex.Lock()
    defer s.mutex.Unlock()
    
    if s.current >= s.end {
        // 当前段用完,需要重新分配
        if err := s.AllocateSegment(); err != nil {
            return 0, err
        }
    }
    
    seq := s.current
    s.current++
    return seq, nil
}
服务器 A:预分配 1-1000000
服务器 B:预分配 1000001-2000000
服务器 C:预分配 2000001-3000000

每台服务器在本地内存中维护自己的序号段,用完了再去申请新的段。

6.2 并行生成

单线程生成有上限,多核服务器应该并行生成。

func BatchGenerate(generator *GiftCodeGenerator, allocator *SequenceAllocator, count int) ([]string, error) {
    // 根据CPU核心数决定并发数
    workers := runtime.NumCPU()
    if workers > 8 {
        workers = 8 // 最多8个协程
    }
    
    // 每个worker分配的任务数
    batchSize := (count + workers - 1) / workers
    
    results := make([]string, count)
    errors := make([]error, workers)
    
    var wg sync.WaitGroup
    
    for w := 0; w < workers; w++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            
            start := workerID * batchSize
            end := start + batchSize
            if end > count {
                end = count
            }
            
            for i := start; i < end; i++ {
                seq, err := allocator.Next()
                if err != nil {
                    errors[workerID] = err
                    return
                }
                results[i] = generator.Generate(seq)
            }
        }(w)
    }
    
    wg.Wait()
    
    // 检查是否有错误
    for _, err := range errors {
        if err != nil {
            return nil, err
        }
    }
    
    return results, nil
}

6.3 批量入库

逐条插入数据库效率太低,应该批量插入。

func BatchInsertCodes(db *sql.DB, codes []string, batchID int) error {
    // 每批插入 5000 条
    batchSize := 5000
    
    for i := 0; i < len(codes); i += batchSize {
        end := i + batchSize
        if end > len(codes) {
            end = len(codes)
        }
        batch := codes[i:end]
        
        // 构建批量插入语句
        valueStrings := make([]string, 0, len(batch))
        valueArgs := make([]interface{}, 0, len(batch)*2)
        
        for _, code := range batch {
            valueStrings = append(valueStrings, "(?, ?)")
            valueArgs = append(valueArgs, code, batchID)
        }
        
        query := fmt.Sprintf(
            "INSERT INTO gift_codes (code, batch_id) VALUES %s",
            strings.Join(valueStrings, ","),
        )
        
        _, err := db.Exec(query, valueArgs...)
        if err != nil {
            return fmt.Errorf("批量插入失败 (offset %d): %w", i, err)
        }
    }
    
    return nil
}

6.4 异步化流水线

生成和存储分离,使用流水线模式。

type GiftCodePipeline struct {
    generateQueue chan int       // 生成任务队列
    storageQueue  chan []string  // 存储队列
    generator     *GiftCodeGenerator
    allocator     *SequenceAllocator
    db            *sql.DB
}

func NewPipeline(generator *GiftCodeGenerator, allocator *SequenceAllocator, db *sql.DB) *GiftCodePipeline {
    return &GiftCodePipeline{
        generateQueue: make(chan int, 100),
        storageQueue:  make(chan []string, 10),
        generator:     generator,
        allocator:     allocator,
        db:            db,
    }
}

func (p *GiftCodePipeline) Start(workers int) {
    // 启动多个生成协程
    for i := 0; i < workers; i++ {
        go p.generateWorker()
    }
    
    // 启动存储协程
    go p.storageWorker()
}

func (p *GiftCodePipeline) generateWorker() {
    buffer := make([]string, 0, 5000)
    
    for count := range p.generateQueue {
        for i := 0; i < count; i++ {
            seq, err := p.allocator.Next()
            if err != nil {
                log.Printf("获取序号失败: %v", err)
                continue
            }
            buffer = append(buffer, p.generator.Generate(seq))
            
            // 攒够 5000 条就发送到存储队列
            if len(buffer) >= 5000 {
                p.storageQueue <- buffer
                buffer = make([]string, 0, 5000)
            }
        }
    }
}

func (p *GiftCodePipeline) storageWorker() {
    for batch := range p.storageQueue {
        if err := BatchInsertCodes(p.db, batch, 0); err != nil {
            log.Printf("存储失败: %v", err)
        }
    }
}

func (p *GiftCodePipeline) Generate(count int) {
    // 分批发送到生成队列
    batchSize := 10000
    for i := 0; i < count; i += batchSize {
        size := batchSize
        if i+batchSize > count {
            size = count - i
        }
        p.generateQueue <- size
    }
}

七、工程化考量

7.1 可追溯性

每个礼包码应该能追溯到:

CREATE TABLE gift_code_batches (
    id INT PRIMARY KEY AUTO_INCREMENT,
    activity_id INT NOT NULL,
    count INT NOT NULL,
    prefix VARCHAR(4) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    created_by VARCHAR(64) NOT NULL,
    
    KEY idx_activity (activity_id),
    KEY idx_created (created_at)
);

CREATE TABLE gift_codes (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    code VARCHAR(16) NOT NULL,
    batch_id INT NOT NULL,
    sequence_num BIGINT NOT NULL,
    status TINYINT DEFAULT 0 COMMENT '0-未使用, 1-已使用, 2-已作废',
    used_at TIMESTAMP NULL,
    used_by VARCHAR(64) NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    
    UNIQUE KEY uk_code (code),
    KEY idx_batch (batch_id),
    KEY idx_status (status),
    KEY idx_sequence (sequence_num)
);

7.2 可撤销性

万一发现问题,需要能够批量作废某个批次的所有码。

func RevokeBatch(db *sql.DB, batchID int) error {
    result, err := db.Exec(
        "UPDATE gift_codes SET status = 2 WHERE batch_id = ? AND status = 0",
        batchID,
    )
    if err != nil {
        return err
    }
    
    affected, _ := result.RowsAffected()
    log.Printf("批次 %d 已作废,影响 %d 个码", batchID, affected)
    
    return nil
}

7.3 监控告警

type GiftCodeMetrics struct {
    // 生成相关
    GeneratedTotal    prometheus.Counter
    GeneratedPerBatch prometheus.Histogram
    GenerationLatency prometheus.Histogram
    
    // 碰撞相关
    BloomFilterHits   prometheus.Counter
    DBConflicts       prometheus.Counter
    
    // 序号段相关
    SegmentRemaining  prometheus.Gauge
    SegmentAllocTotal prometheus.Counter
    
    // 存储相关
    StorageQueueSize  prometheus.Gauge
    StorageLatency    prometheus.Histogram
    StorageErrors     prometheus.Counter
}

func (m *GiftCodeMetrics) RecordGeneration(count int, latencyMs float64) {
    m.GeneratedTotal.Add(float64(count))
    m.GeneratedPerBatch.Observe(float64(count))
    m.GenerationLatency.Observe(latencyMs)
}
# Prometheus 告警规则示例
groups:
  - name: gift_code_alerts
    rules:
      - alert: HighCollisionRate
        expr: rate(gift_code_db_conflicts[5m]) / rate(gift_code_generated[5m]) > 0.001
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "礼包码碰撞率过高"
          description: "碰撞率 {{ $value }} 超过阈值 0.1%"
      
      - alert: SegmentExhausting
        expr: gift_code_segment_remaining < 100000
        for: 1m
        labels:
          severity: warning
        annotations:
          summary: "序号段即将耗尽"
          description: "剩余序号 {{ $value }} 少于 10 万"
      
      - alert: StorageQueueBacklog
        expr: gift_code_storage_queue_size > 100000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "存储队列积压"
          description: "队列中有 {{ $value }} 条待存储"

八、总结

保证亿级礼包码的唯一性,核心在于:

  1. 选对算法:自增序号 + Base31 编码 + 混淆,是经过实践检验的黄金组合
  2. 设计好格式:平衡唯一性、可读性、用户体验
  3. 多层防护:算法保证 → 布隆过滤器 → 数据库唯一索引
  4. 性能优化:预分配序号段、并行生成、批量入库、异步流水线
  5. 工程保障:可追溯、可撤销、可监控
唯一性 = 好算法 + 好设计 + 多层检测 + 工程保障

技术方案只是手段,业务目标才是核心。在追求技术完美的同时,要时刻问自己:

有时候,一个简单但可靠的方案,比一个完美但复杂的方案更有价值。


💬 评论 (0)

0/500
排序: