活动引擎:让运营不再"求开发"

本文是游戏运营系统技术分享系列第六篇(运营工具篇第2篇),将带你了解活动引擎的设计思路,以及它如何让运营团队摆脱对开发的依赖。


一、一个真实的故事

周五下午 4 点,运营同事火急火燎地找到开发:

"能不能帮我加个活动?周末要做个限时双倍金币,周日 24 点结束,只有 level 10 以上的玩家能参加,最好还能有个排行榜,前 100 名有额外奖励……"

开发看了看排期:"下周二才能排上,这周在赶版本。"

运营无奈:"可是竞品明天就上活动了……"

这样的场景,在很多游戏公司每天都在上演。

活动引擎的诞生,就是为了解决这个问题。


二、为什么需要活动引擎

2.1 传统活动开发的困境

在没有活动引擎之前,每个活动都是"定制开发":

2.2 活动引擎的价值

活动引擎的本质,是把"活动能力"从代码中抽离出来,变成可配置的能力。

2.3 活动引擎能做什么?

活动引擎可以支撑的活动类型非常广泛:

核心思路是:把活动的"骨架"固化,让运营只需要填充"血肉"


三、活动引擎的核心设计

3.1 活动的本质是什么?

如果我们要把所有活动抽象成一个通用模型,会发现它们都有这些要素:

这五要素,就是活动引擎的核心抽象。

3.2 活动生命周期管理

一个活动从创建到结束,经历这些阶段:

活动引擎要能处理这些阶段的自动流转,以及在阶段边界上的各种边界情况(比如活动开始前一秒配置还能改吗?活动结束后还能补发奖励吗?)。

3.3 活动状态机

每个玩家对每个活动的参与状态,也是一个状态机:

未参与 → 进行中 → 已完成 → 已领奖
         ↓
       已过期(活动结束时未完成)

状态机的管理要处理:


四、活动引擎的架构设计

4.1 分层架构

一个成熟的活动引擎通常采用分层架构,就像盖房子一样,从地基到屋顶层层分明:

┌─────────────────────────────────────────────┐
│              运营管理后台(Web)               │
├─────────────────────────────────────────────┤
│              API 服务层(Gateway)            │
├─────────────────────────────────────────────┤
│  活动管理 │ 规则引擎 │ 奖励中心 │ 数据分析      │
├─────────────────────────────────────────────┤
│              活动运行时(Runtime)            │
├─────────────────────────────────────────────┤
│  Redis 缓存 │ MySQL 持久化 │ MQ 消息队列      │
└─────────────────────────────────────────────┘

4.2 核心数据模型

活动引擎的数据模型设计,直接决定了系统的灵活性和扩展性。以下是核心实体关系:

-- 活动主表
CREATE TABLE activity (
    id BIGINT PRIMARY KEY,
    name VARCHAR(100) NOT NULL COMMENT '活动名称',
    type VARCHAR(50) NOT NULL COMMENT '活动类型:sign_in, recharge, rank...',
    template_id BIGINT COMMENT '使用的模板ID',
    status TINYINT COMMENT '状态:draft, pending, running, finished, archived',
    start_time DATETIME NOT NULL,
    end_time DATETIME NOT NULL,
    config JSON COMMENT '活动配置(规则、奖励等)',
    created_by VARCHAR(50) COMMENT '创建人',
    created_at DATETIME,
    updated_at DATETIME
);

-- 玩家活动参与记录
CREATE TABLE activity_participation (
    id BIGINT PRIMARY KEY,
    activity_id BIGINT NOT NULL,
    player_id BIGINT NOT NULL,
    status VARCHAR(20) COMMENT '参与状态',
    progress JSON COMMENT '任务进度详情',
    claimed_at DATETIME COMMENT '领奖时间',
    created_at DATETIME,
    updated_at DATETIME,
    UNIQUE KEY uk_activity_player (activity_id, player_id)
);

-- 奖励发放记录
CREATE TABLE reward_log (
    id BIGINT PRIMARY KEY,
    activity_id BIGINT NOT NULL,
    player_id BIGINT NOT NULL,
    reward_content JSON COMMENT '奖励内容',
    status VARCHAR(20) COMMENT '发放状态',
    issued_at DATETIME
);

有人可能会问:为什么不把所有字段都拆成独立列?原因是活动的配置差异很大。签到活动需要"连续签到天数"配置,排行榜活动需要"排名规则"配置。用 JSON 可以灵活存储不同类型活动的特有配置,同时保持表结构简洁。

当然,JSON 字段也要有规范,否则后期维护会变成噩梦。我们建议:

  1. 定义清晰的 JSON Schema,并做好版本管理
  2. 对高频查询字段,可以提取为独立列建索引
  3. 配置变更时保留历史版本,方便问题追溯

4.3 活动配置的 JSON 结构示例

一个完整的活动配置可能是这样的:

{
  "activityId": 10001,
  "name": "新春签到活动",
  "type": "sign_in",
  "timeConfig": {
    "startTime": "2026-01-25 00:00:00",
    "endTime": "2026-02-10 23:59:59",
    "timezone": "Asia/Shanghai"
  },
  "conditions": [
    {
      "type": "player_level",
      "operator": ">=",
      "value": 10
    },
    {
      "type": "register_days",
      "operator": ">=",
      "value": 3
    }
  ],
  "tasks": [
    {
      "taskId": "daily_login",
      "type": "login",
      "target": 1,
      "period": "daily",
      "maxProgress": 1
    }
  ],
  "rewards": [
    {
      "condition": "cumulative_days >= 3",
      "items": [
        {"itemId": "gold", "count": 1000},
        {"itemId": "diamond", "count": 50}
      ]
    },
    {
      "condition": "cumulative_days >= 7",
      "items": [
        {"itemId": "gold", "count": 5000},
        {"itemId": "hero_fragment", "count": 10}
      ]
    }
  ],
  "limits": {
    "dailyClaimLimit": 1,
    "totalClaimLimit": 7
  }
}

这个配置清晰描述了:什么时候的活动、谁能参加、要做什么、能得什么、有什么限制。运营人员通过可视化界面编辑这些配置,系统负责解析和执行。


五、规则引擎设计

5.1 为什么需要规则引擎?

活动引擎的核心挑战之一是:参与条件和任务目标的多样化

参与条件可能是:

任务目标可能是:

如果用硬编码实现,每增加一种条件就要改代码。规则引擎的作用,就是把这些条件变成可配置的"表达式"。

5.2 规则引擎的架构

规则引擎的设计可以分为三个层次:

┌────────────────────────────────────────────┐
│           规则可视化编辑器(UI)              │
├────────────────────────────────────────────┤
│           规则解析器(Parser)               │
├────────────────────────────────────────────┤
│           规则执行引擎(Engine)             │
├────────────────────────────────────────────┤
│  变量上下文 │ 函数库 │ 运算符库              │
└────────────────────────────────────────────┘

5.3 规则表达式的 DSL 设计

我们设计了一套简洁的规则表达式 DSL(Domain Specific Language):

// 简单条件:等级 >= 10 且 VIP >= 3
player.level >= 10 && player.vipLevel >= 3

// 复杂条件:注册超过7天且从未充值
daysSince(player.registerTime) > 7 && totalRecharge() == 0

// 时间范围条件:活动期间累计充值满100
totalRecharge(activity.startTime, activity.endTime) >= 100

// 组合条件:(等级>=20 或 VIP>=5) 且 首次充值
(player.level >= 20 || player.vipLevel >= 5) && isFirstRecharge()

5.4 规则解析与执行

规则解析的核心是将表达式转换为 AST,然后递归执行。以下是简化版的实现:

// 规则节点接口
type RuleNode interface {
    Evaluate(ctx *RuleContext) (interface{}, error)
}

// 二元运算节点(如 a > b)
type BinaryOpNode struct {
    Left     RuleNode
    Operator string // "==", ">=", "&&", etc.
    Right    RuleNode
}

func (n *BinaryOpNode) Evaluate(ctx *RuleContext) (interface{}, error) {
    left, err := n.Left.Evaluate(ctx)
    if err != nil {
        return nil, err
    }
    right, err := n.Right.Evaluate(ctx)
    if err != nil {
        return nil, err
    }
    
    switch n.Operator {
    case ">=":
        return compareNumbers(left, right) >= 0, nil
    case "&&":
        return toBool(left) && toBool(right), nil
    case "||":
        return toBool(left) || toBool(right), nil
    // ... 其他运算符
    }
    return nil, fmt.Errorf("unknown operator: %s", n.Operator)
}

// 变量引用节点(如 player.level)
type VariableNode struct {
    Path string // "player.level"
}

func (n *VariableNode) Evaluate(ctx *RuleContext) (interface{}, error) {
    return ctx.GetVariable(n.Path)
}

// 函数调用节点(如 totalRecharge())
type FunctionCallNode struct {
    Name      string
    Arguments []RuleNode
}

func (n *FunctionCallNode) Evaluate(ctx *RuleContext) (interface{}, error) {
    args := make([]interface{}, len(n.Arguments))
    for i, arg := range n.Arguments {
        val, err := arg.Evaluate(ctx)
        if err != nil {
            return nil, err
        }
        args[i] = val
    }
    return ctx.CallFunction(n.Name, args...)
}

5.5 规则执行的性能优化

规则表达式的执行效率是关键。如果每次判断条件都要解析表达式,性能会成为瓶颈。优化方案包括:

// 活动配置时预编译规则
type Activity struct {
    ConditionExpr string
    compiledRule  RuleNode // 预编译的 AST
}

func (a *Activity) Compile() error {
    parser := NewRuleParser()
    node, err := parser.Parse(a.ConditionExpr)
    if err != nil {
        return err
    }
    a.compiledRule = node
    return nil
}

// 运行时直接执行预编译的规则
func (a *Activity) CheckCondition(ctx *RuleContext) (bool, error) {
    if a.compiledRule == nil {
        return false, errors.New("rule not compiled")
    }
    result, err := a.compiledRule.Evaluate(ctx)
    return toBool(result), err
}
type RuleCache struct {
    cache map[string]cacheEntry // key: playerId_activityId
    ttl   time.Duration
}

type cacheEntry struct {
    value     bool
    timestamp time.Time
}

func (c *RuleCache) Get(playerId, activityId int64) (bool, bool) {
    key := fmt.Sprintf("%d_%d", playerId, activityId)
    entry, ok := c.cache[key]
    if !ok || time.Since(entry.timestamp) > c.ttl {
        return false, false
    }
    return entry.value, true
}
func (n *BinaryOpNode) Evaluate(ctx *RuleContext) (interface{}, error) {
    switch n.Operator {
    case "&&":
        left, err := n.Left.Evaluate(ctx)
        if err != nil {
            return nil, err
        }
        if !toBool(left) {
            return false, nil // 短路:左边为 false,直接返回
        }
        right, err := n.Right.Evaluate(ctx)
        return toBool(right), err
        
    case "||":
        left, err := n.Left.Evaluate(ctx)
        if err != nil {
            return nil, err
        }
        if toBool(left) {
            return true, nil // 短路:左边为 true,直接返回
        }
        right, err := n.Right.Evaluate(ctx)
        return toBool(right), err
    }
    // ... 其他运算符
}

5.6 规则的可视化编辑

规则表达式虽然灵活,但对于非技术背景的运营来说,写表达式还是有门槛。优秀的活动引擎会提供可视化规则编辑器

┌─────────────────────────────────────────────────────────┐
│  参与条件配置                                             │
├─────────────────────────────────────────────────────────┤
│  ┌─────────────┐                                        │
│  │ 等级条件     │  [≥] [10]    [删除]                    │
│  └─────────────┘                                        │
│           [且]                                          │
│  ┌─────────────┐                                        │
│  │ VIP条件      │  [≥] [3]     [删除]                    │
│  └─────────────┘                                        │
│                                                         │
│  [+ 添加条件]  [预览符合条件玩家数]                        │
├─────────────────────────────────────────────────────────┤
│  预览:当前有 12,345 名玩家符合条件                       │
└─────────────────────────────────────────────────────────┘

可视化编辑器的核心功能:

目标是让运营人员不用写一行代码,就能配置出复杂的活动规则。


六、活动模板系统

6.1 为什么要模板化?

即使有了活动引擎,如果每个活动都要从零开始配置,运营的工作量依然很大。

模板化的思路是:把常见活动类型的"通用部分"固化下来,运营只需要填写"差异部分"

就像做饭一样,模板是"菜谱",告诉你需要哪些食材、按什么步骤做。运营只需要按菜谱准备食材(配置参数),就能快速做出一道菜(上线活动)。

6.2 模板的层次结构

活动模板可以分为三个层次:

6.3 模板的数据结构

每个模板由三部分组成:元数据、参数定义、默认配置。

{
  "templateId": "sign_in_v1",
  "name": "签到活动模板",
  "category": "periodic",
  "version": "1.0.0",
  "description": "适用于周期性签到活动,支持连续签到奖励和补签功能",
  
  "parameters": [
    {
      "name": "activityName",
      "type": "string",
      "required": true,
      "label": "活动名称",
      "placeholder": "请输入活动名称"
    },
    {
      "name": "duration",
      "type": "number",
      "required": true,
      "label": "活动天数",
      "default": 7,
      "validation": {"min": 1, "max": 30}
    },
    {
      "name": "enableMakeup",
      "type": "boolean",
      "label": "允许补签",
      "default": false
    },
    {
      "name": "rewards",
      "type": "reward_list",
      "required": true,
      "label": "签到奖励配置"
    }
  ],
  
  "defaultConfig": {
    "conditions": [
      {"type": "player_level", "operator": ">=", "value": 1}
    ],
    "tasks": [
      {"taskId": "daily_login", "type": "login", "target": 1, "period": "daily"}
    ],
    "limits": {
      "dailyClaimLimit": 1
    }
  }
}

6.4 模板的使用流程

运营使用模板创建活动的流程:

  1. 选择模板:从模板库中选择合适的活动模板
  2. 填写参数:根据模板定义的参数,填写活动名称、时间、奖励等
  3. 预览确认:系统生成完整的活动配置,运营预览确认
  4. 提交审核:(可选)提交给上级审核
  5. 上线发布:审核通过后,活动自动上线

6.5 模板的继承与扩展

好的模板系统支持继承和扩展,就像面向对象编程中的类继承一样:

{
  "templateId": "sign_in_continuous_v1",
  "parentTemplate": "sign_in_v1",
  "name": "连续签到活动模板",
  "overrideParameters": [
    {
      "name": "continuousReward",
      "type": "reward_list",
      "label": "连续签到额外奖励"
    }
  ],
  "extendConfig": {
    "continuousRewardEnabled": true
  }
}

比如,一个"签到模板"可以预留扩展点,让运营选择是否开启"连续签到额外奖励"、"是否允许补签"等功能。

6.6 模板的版本管理

模板也需要版本管理,这是一个容易被忽视但非常重要的问题:

  1. 活动创建时快照模板:活动创建时,将模板的完整配置保存为活动的一部分,而不是引用模板。这样模板的后续修改不会影响已创建的活动。
type Activity struct {
    TemplateId    string
    TemplateVer   string          // 模板版本
    TemplateSnap  json.RawMessage // 模板快照(创建时的完整配置)
    // ... 其他字段
}
  1. 模板变更日志:记录每次模板修改的内容、原因、操作人,方便追溯。
  1. 灰度发布:新版本模板可以先在小范围活动上验证,确认无误后再全量发布。

七、活动数据分析

7.1 活动数据的价值

活动数据是运营决策的重要依据:

活动引擎要提供完整的数据采集和分析能力。

7.2 数据采集维度

活动数据采集至少要覆盖这些维度:

7.3 埋点设计

数据采集的基础是埋点。活动引擎需要设计一套完整的埋点规范:

// 活动埋点事件类型
const (
    EventActivityExpose   = "activity_expose"   // 活动曝光
    EventActivityClick    = "activity_click"    // 活动点击
    EventActivityJoin     = "activity_join"     // 活动参与
    EventTaskProgress     = "task_progress"     // 任务进度更新
    EventTaskComplete     = "task_complete"     // 任务完成
    EventRewardClaim      = "reward_claim"      // 奖励领取
    EventActivityFinish   = "activity_finish"   // 活动完成
)

// 埋点数据结构
type ActivityEvent struct {
    EventType   string                 `json:"eventType"`
    ActivityId  int64                  `json:"activityId"`
    PlayerId    int64                  `json:"playerId"`
    Timestamp   int64                  `json:"timestamp"`
    Properties  map[string]interface{} `json:"properties"` // 事件属性
}

// 示例:玩家领取奖励的埋点
func LogRewardClaim(activityId, playerId int64, rewards []Reward) {
    event := ActivityEvent{
        EventType:  EventRewardClaim,
        ActivityId: activityId,
        PlayerId:   playerId,
        Timestamp:  time.Now().Unix(),
        Properties: map[string]interface{}{
            "rewards":     rewards,
            "rewardValue": calculateTotalValue(rewards),
        },
    }
    eventLogger.Log(event)
}

7.4 实时 vs 离线分析

活动数据分析有两种模式:

实时分析通常使用流式计算(如 Flink、Kafka Streams)或时序数据库(如 InfluxDB、Prometheus)。

离线分析通常使用数据仓库(如 Hive、ClickHouse)和 BI 工具(如 Superset、Metabase)。

活动引擎要同时支持这两种模式。

7.5 数据可视化

数据采集只是第一步,更重要的是让运营能"看懂"数据。

好的活动数据分析系统应该提供:

┌─────────────────────────────────────────────────────┐
│  新春签到活动 - 实时数据                               │
├─────────────────────────────────────────────────────┤
│  参与人数        完成人数        完成率        人均参与   │
│  12,345         8,567          69.4%        5.2次    │
├─────────────────────────────────────────────────────┤
│  今日参与趋势                                         │
│  [折线图:展示今日每小时参与人数变化]                    │
├─────────────────────────────────────────────────────┤
│  转化漏斗                                             │
│  曝光 50,000 → 点击 25,000 → 参与 12,345 → 完成 8,567  │
│  转化率:50% → 49.4% → 69.4%                          │
└─────────────────────────────────────────────────────┘

八、奖励系统设计

8.1 奖励的复杂性

活动奖励看似简单,实际上涉及很多复杂场景:

8.2 奖励中心架构

为了解决这些问题,我们设计了统一的"奖励中心":

┌─────────────────────────────────────────────────────┐
│                   奖励中心                           │
├─────────────────────────────────────────────────────┤
│  ┌─────────┐  ┌─────────┐  ┌─────────┐            │
│  │ 奖励定义  │  │ 发放队列  │  │ 发放日志  │            │
│  └─────────┘  └─────────┘  └─────────┘            │
├─────────────────────────────────────────────────────┤
│  ┌─────────────────────────────────────────────┐   │
│  │              奖励发放器(按类型)              │   │
│  │  金币发放器 │ 道具发放器 │ 皮肤发放器 │ ...   │   │
│  └─────────────────────────────────────────────┘   │
├─────────────────────────────────────────────────────┤
│              奖励去重(Redis + DB)                  │
└─────────────────────────────────────────────────────┘
type Reward struct {
    ItemId   string `json:"itemId"`   // 物品ID
    Count    int64  `json:"count"`    // 数量
    ExpireAt int64  `json:"expireAt"` // 过期时间(可选)
}

type RewardPackage struct {
    Rewards   []Reward `json:"rewards"`
    Source    string   `json:"source"`    // 来源:activity_sign_in
    SourceId  int64    `json:"sourceId"`  // 来源ID:活动ID
    Reason    string   `json:"reason"`    // 原因描述
    UniqueId  string   `json:"uniqueId"`  // 去重ID
}
// 发放奖励(异步)
func (c *RewardCenter) IssueAsync(playerId int64, pkg *RewardPackage) error {
    // 1. 生成唯一ID(用于去重)
    if pkg.UniqueId == "" {
        pkg.UniqueId = generateUniqueId(playerId, pkg.Source, pkg.SourceId)
    }
    
    // 2. 检查是否已发放
    if c.isIssued(pkg.UniqueId) {
        return ErrAlreadyIssued
    }
    
    // 3. 写入发放队列
    msg := RewardMessage{
        PlayerId: playerId,
        Package:  pkg,
    }
    return c.queue.Publish("reward_issue", msg)
}

// 发放奖励(同步,直接返回结果)
func (c *RewardCenter) IssueSync(playerId int64, pkg *RewardPackage) (*IssueResult, error) {
    if pkg.UniqueId == "" {
        pkg.UniqueId = generateUniqueId(playerId, pkg.Source, pkg.SourceId)
    }
    
    // Redis 分布式锁 + 去重检查
    lockKey := fmt.Sprintf("reward_lock:%s", pkg.UniqueId)
    if !c.redis.SetNX(lockKey, "1", 30*time.Second) {
        return nil, ErrAlreadyIssued
    }
    defer c.redis.Del(lockKey)
    
    // 执行发放
    result := &IssueResult{}
    for _, reward := range pkg.Rewards {
        issuer := c.getIssuer(reward.ItemId)
        if issuer == nil {
            result.Failed = append(result.Failed, reward)
            continue
        }
        if err := issuer.Issue(playerId, reward); err != nil {
            result.Failed = append(result.Failed, reward)
        } else {
            result.Success = append(result.Success, reward)
        }
    }
    
    // 记录发放日志
    c.logIssue(playerId, pkg, result)
    
    return result, nil
}

8.3 奖励发放器

每种奖励类型有对应的发放器,实现统一的接口:

type RewardIssuer interface {
    // 发放奖励
    Issue(playerId int64, reward Reward) error
    // 检查是否可发放(背包满、上限等)
    CanIssue(playerId int64, reward Reward) error
    // 回滚奖励
    Rollback(playerId int64, reward Reward) error
}

// 金币发放器
type GoldIssuer struct {
    playerRepo PlayerRepository
}

func (i *GoldIssuer) Issue(playerId int64, reward Reward) error {
    return i.playerRepo.AddGold(playerId, reward.Count)
}

// 道具发放器
type ItemIssuer struct {
    inventoryRepo InventoryRepository
}

func (i *ItemIssuer) Issue(playerId int64, reward Reward) error {
    // 检查背包容量
    if !i.inventoryRepo.HasSpace(playerId, 1) {
        return ErrInventoryFull
    }
    return i.inventoryRepo.AddItem(playerId, reward.ItemId, reward.Count, reward.ExpireAt)
}

8.4 奖励去重策略

奖励去重是防止重复发放的关键。我们采用"Redis + 数据库"双重保障:

func (c *RewardCenter) isIssued(uniqueId string) bool {
    // 1. 先查 Redis(快速)
    if c.redis.Exists("reward_issued:" + uniqueId) {
        return true
    }
    
    // 2. 再查数据库(兜底)
    count, _ := c.db.Table("reward_log").
        Where("unique_id = ?", uniqueId).
        Count()
    return count > 0
}

func (c *RewardCenter) markIssued(uniqueId string) {
    // Redis 标记(7天过期,足够覆盖大部分活动周期)
    c.redis.Set("reward_issued:"+uniqueId, "1", 7*24*time.Hour)
}

九、排行榜活动实现

9.1 排行榜的技术挑战

排行榜活动是活动引擎中最复杂的类型之一,面临以下挑战:

9.2 排行榜数据结构

传统的关系数据库很难高效支持实时排行榜。我们采用 Redis 的 Sorted Set(有序集合):

type RankBoard struct {
    ActivityId int64
    Dimension  string       // 排名维度:recharge, kill, score...
    RedisKey   string       // Redis Key
    TTL        time.Duration // 过期时间
}

// 更新玩家分数
func (r *RankBoard) UpdateScore(playerId int64, score float64) error {
    return redis.ZAdd(r.RedisKey, &redis.Z{
        Score:  score,
        Member: playerId,
    })
}

// 增加玩家分数(原子操作)
func (r *RankBoard) IncrScore(playerId int64, delta float64) (float64, error) {
    return redis.ZIncrBy(r.RedisKey, delta, strconv.FormatInt(playerId, 10))
}

// 获取玩家排名(从高到低,第1名返回0)
func (r *RankBoard) GetRank(playerId int64) (int64, error) {
    rank, err := redis.ZRevRank(r.RedisKey, strconv.FormatInt(playerId, 10))
    if err == redis.Nil {
        return -1, nil // 玩家不在榜上
    }
    return rank, err
}

// 获取玩家分数
func (r *RankBoard) GetScore(playerId int64) (float64, error) {
    score, err := redis.ZScore(r.RedisKey, strconv.FormatInt(playerId, 10))
    if err == redis.Nil {
        return 0, nil
    }
    return score, err
}

// 获取 Top N
func (r *RankBoard) GetTopN(n int64) ([]RankEntry, error) {
    members, err := redis.ZRevRangeWithScores(r.RedisKey, 0, n-1)
    if err != nil {
        return nil, err
    }
    
    result := make([]RankEntry, len(members))
    for i, m := range members {
        playerId, _ := strconv.ParseInt(m.Member.(string), 10, 64)
        result[i] = RankEntry{
            Rank:    int64(i + 1),
            PlayerId: playerId,
            Score:   m.Score,
        }
    }
    return result, nil
}

// 获取玩家周围排名(用于展示"我周围的对手")
func (r *RankBoard) GetAroundPlayer(playerId int64, rangeSize int64) ([]RankEntry, error) {
    rank, err := r.GetRank(playerId)
    if err != nil || rank < 0 {
        return nil, err
    }
    
    start := rank - rangeSize
    if start < 0 {
        start = 0
    }
    end := rank + rangeSize
    
    members, err := redis.ZRevRangeWithScores(r.RedisKey, start, end)
    if err != nil {
        return nil, err
    }
    
    result := make([]RankEntry, len(members))
    for i, m := range members {
        pid, _ := strconv.ParseInt(m.Member.(string), 10, 64)
        result[i] = RankEntry{
            Rank:     int64(start) + int64(i) + 1,
            PlayerId: pid,
            Score:    m.Score,
        }
    }
    return result, nil
}

9.3 排行榜活动配置示例

{
  "activityId": 30001,
  "name": "充值冲榜活动",
  "type": "ranking",
  "timeConfig": {
    "startTime": "2026-03-01 00:00:00",
    "endTime": "2026-03-07 23:59:59"
  },
  "rankingConfig": {
    "dimension": "recharge",
    "updateMode": "incremental",  // 累加模式
    "refreshInterval": 60         // 前端刷新间隔(秒)
  },
  "rewards": [
    {
      "rankRange": [1, 1],
      "description": "第1名",
      "items": [
        {"itemId": "legendary_hero", "count": 1},
        {"itemId": "diamond", "count": 10000}
      ]
    },
    {
      "rankRange": [2, 10],
      "description": "第2-10名",
      "items": [
        {"itemId": "epic_hero", "count": 1},
        {"itemId": "diamond", "count": 5000}
      ]
    },
    {
      "rankRange": [11, 100],
      "description": "第11-100名",
      "items": [
        {"itemId": "diamond", "count": 1000}
      ]
    }
  ],
  "conditions": [
    {"type": "player_level", "operator": ">=", "value": 10}
  ]
}

9.4 排行榜结算

活动结束时的排行榜结算是一个关键流程:

func (s *RankingService) Settle(activity *Activity) error {
    // 1. 获取最终排名
    board := s.getRankBoard(activity.Id)
    topPlayers, err := board.GetTopN(activity.GetMaxRewardRank())
    if err != nil {
        return err
    }
    
    // 2. 按排名发放奖励
    for _, entry := range topPlayers {
        reward := activity.GetRewardByRank(entry.Rank)
        if reward == nil {
            continue
        }
        
        pkg := &RewardPackage{
            Rewards:  reward.Items,
            Source:   "ranking_activity",
            SourceId: activity.Id,
            Reason:   fmt.Sprintf("充值冲榜活动第%d名奖励", entry.Rank),
            UniqueId: fmt.Sprintf("rank_%d_%d_%d", activity.Id, entry.PlayerId, entry.Rank),
        }
        
        // 异步发放,避免阻塞
        s.rewardCenter.IssueAsync(entry.PlayerId, pkg)
        
        // 记录排名
        s.db.Table("ranking_record").Insert(map[string]interface{}{
            "activity_id": activity.Id,
            "player_id":   entry.PlayerId,
            "rank":        entry.Rank,
            "score":       entry.Score,
            "settled_at":  time.Now(),
        })
    }
    
    // 3. 更新活动状态
    activity.Status = "settled"
    s.activityRepo.Save(activity)
    
    return nil
}

十、高并发场景处理

10.1 活动开始瞬间的流量洪峰

活动开始瞬间,可能有大量玩家同时请求活动列表,造成流量洪峰。处理策略:

func (m *ActivityManager) PreloadActivity(activityId int64) error {
    activity := m.activityRepo.Get(activityId)
    if activity == nil {
        return ErrActivityNotFound
    }
    
    // 预热到本地缓存
    m.localCache.Set(fmt.Sprintf("activity:%d", activityId), activity, 10*time.Minute)
    
    // 预热到 Redis
    m.redis.Set(fmt.Sprintf("activity:config:%d", activityId), activity.Config, 30*time.Minute)
    
    return nil
}
func (m *ActivityManager) GetActivity(activityId int64) (*Activity, error) {
    cacheKey := fmt.Sprintf("activity:%d", activityId)
    
    // 1. 查本地缓存(最快)
    if val, ok := m.localCache.Get(cacheKey); ok {
        return val.(*Activity), nil
    }
    
    // 2. 查 Redis(次快)
    val, err := m.redis.Get(cacheKey)
    if err == nil {
        var activity Activity
        json.Unmarshal([]byte(val), &activity)
        m.localCache.Set(cacheKey, &activity, 5*time.Minute)
        return &activity, nil
    }
    
    // 3. 查数据库(兜底)
    activity := m.activityRepo.Get(activityId)
    if activity != nil {
        data, _ := json.Marshal(activity)
        m.redis.Set(cacheKey, data, 30*time.Minute)
        m.localCache.Set(cacheKey, activity, 5*time.Minute)
    }
    
    return activity, nil
}

10.2 签到/领奖的并发控制

热门活动可能出现大量玩家同时签到的情况。需要做好并发控制:

UPDATE activity_participation 
SET sign_count = sign_count + 1, 
    last_sign_time = ?, 
    version = version + 1
WHERE activity_id = ? 
  AND player_id = ? 
  AND version = ?
func (s *SignInService) DailySign(playerId, activityId int64) error {
    lockKey := fmt.Sprintf("sign_lock:%d:%d", playerId, activityId)
    
    // 获取分布式锁
    locked, err := s.redis.SetNX(lockKey, "1", 5*time.Second)
    if err != nil || !locked {
        return ErrSignInProgress
    }
    defer s.redis.Del(lockKey)
    
    // 执行签到逻辑...
    return s.doSign(playerId, activityId)
}
type ProgressBatcher struct {
    buffer    map[string]int64 // key: playerId_activityId, value: delta
    bufferMu  sync.Mutex
    flushInterval time.Duration
}

func (b *ProgressBatcher) Add(playerId, activityId int64, delta int64) {
    b.bufferMu.Lock()
    key := fmt.Sprintf("%d_%d", playerId, activityId)
    b.buffer[key] += delta
    b.bufferMu.Unlock()
}

func (b *ProgressBatcher) Run() {
    ticker := time.NewTicker(b.flushInterval)
    for range ticker.C {
        b.flush()
    }
}

func (b *ProgressBatcher) flush() {
    b.bufferMu.Lock()
    data := b.buffer
    b.buffer = make(map[string]int64)
    b.bufferMu.Unlock()
    
    // 批量更新数据库
    for key, delta := range data {
        parts := strings.Split(key, "_")
        playerId, _ := strconv.ParseInt(parts[0], 10, 64)
        activityId, _ := strconv.ParseInt(parts[1], 10, 64)
        b.updateDB(playerId, activityId, delta)
    }
}

10.3 削峰填谷

对于奖励发放等非实时性要求高的操作,走消息队列异步处理:

// 客户端请求签到
func (s *SignInService) SignIn(req *SignInRequest) (*SignInResponse, error) {
    // 1. 快速校验
    if err := s.validateRequest(req); err != nil {
        return nil, err
    }
    
    // 2. 更新进度(同步,快速)
    participation, err := s.updateProgress(req.PlayerId, req.ActivityId)
    if err != nil {
        return nil, err
    }
    
    // 3. 发放奖励(异步)
    s.rewardQueue.Publish(RewardMessage{
        PlayerId:   req.PlayerId,
        ActivityId: req.ActivityId,
        Type:       "daily_sign",
    })
    
    // 4. 立即返回结果
    return &SignInResponse{
        Success:    true,
        SignCount:  participation.SignCount,
        Message:    "签到成功,奖励将在稍后发放",
    }, nil
}

十一、实战案例:七天签到活动

为了让大家更直观地理解活动引擎的工作方式,我们来看一个完整的实战案例:七天签到活动。

8.1 活动需求

运营需求如下:

- 每日签到:金币 × 100

- 累计3天:钻石 × 50 - 累计5天:高级英雄碎片 × 5 - 累计7天:限定皮肤 × 1

8.2 活动配置

运营在后台配置活动,系统生成的完整配置如下:

{
  "activityId": 20001,
  "name": "三月签到活动",
  "type": "sign_in",
  "templateId": "sign_in_continuous_v1",
  "status": "pending",
  
  "timeConfig": {
    "startTime": "2026-03-01 00:00:00",
    "endTime": "2026-03-07 23:59:59",
    "timezone": "Asia/Shanghai"
  },
  
  "conditions": [
    {
      "type": "player_level",
      "operator": ">=",
      "value": 5
    }
  ],
  
  "tasks": [
    {
      "taskId": "daily_sign",
      "type": "login",
      "description": "每日登录签到",
      "target": 1,
      "period": "daily",
      "reward": {
        "items": [
          {"itemId": "gold", "count": 100}
        ]
      }
    }
  ],
  
  "milestoneRewards": [
    {
      "milestone": 3,
      "description": "累计签到3天",
      "items": [
        {"itemId": "diamond", "count": 50}
      ]
    },
    {
      "milestone": 5,
      "description": "累计签到5天",
      "items": [
        {"itemId": "hero_fragment_advanced", "count": 5}
      ]
    },
    {
      "milestone": 7,
      "description": "累计签到7天",
      "items": [
        {"itemId": "skin_limited_001", "count": 1}
      ]
    }
  ],
  
  "limits": {
    "dailySignLimit": 1
  }
}

8.3 玩家参与流程

当玩家登录游戏时,活动引擎的处理流程:

  1. 获取活动列表:客户端请求活动列表,返回当前进行中的活动
// API: GET /api/activities
func GetActivities(playerId int64) ([]ActivityInfo, error) {
    activities := activityManager.GetRunningActivities()
    result := make([]ActivityInfo, 0)
    
    for _, act := range activities {
        // 检查玩家是否满足参与条件
        if act.CheckCondition(playerId) {
            // 获取玩家参与状态
            participation := participationRepo.Get(playerId, act.Id)
            result = append(result, ActivityInfo{
                Activity: act,
                Status:   participation,
            })
        }
    }
    return result, nil
}
  1. 签到操作:玩家点击签到按钮
// API: POST /api/activity/sign
func DailySign(playerId, activityId int64) error {
    // 1. 获取活动配置
    activity := activityManager.GetActivity(activityId)
    if activity == nil {
        return errors.New("activity not found")
    }
    
    // 2. 检查活动状态
    if activity.Status != "running" {
        return errors.New("activity not running")
    }
    
    // 3. 检查参与条件
    if !activity.CheckCondition(playerId) {
        return errors.New("condition not met")
    }
    
    // 4. 获取或创建参与记录
    participation := participationRepo.GetOrCreate(playerId, activityId)
    
    // 5. 检查今日是否已签到
    if participation.HasSignedToday() {
        return errors.New("already signed today")
    }
    
    // 6. 更新签到进度
    participation.SignCount++
    participation.LastSignTime = time.Now()
    participationRepo.Save(participation)
    
    // 7. 发放每日签到奖励
    dailyReward := activity.GetDailyReward()
    rewardService.Issue(playerId, dailyReward, "daily_sign")
    
    // 8. 检查里程碑奖励
    for _, milestone := range activity.MilestoneRewards {
        if participation.SignCount >= milestone.Milestone && 
           !participation.HasClaimedMilestone(milestone.Milestone) {
            participation.ClaimedMilestones = append(
                participation.ClaimedMilestones, 
                milestone.Milestone,
            )
            rewardService.Issue(playerId, milestone.Items, "milestone_reward")
        }
    }
    
    // 9. 记录埋点
    eventLogger.Log(ActivityEvent{
        EventType:  "daily_sign",
        ActivityId: activityId,
        PlayerId:   playerId,
        Properties: map[string]interface{}{
            "signCount": participation.SignCount,
        },
    })
    
    return nil
}

8.4 数据统计

活动结束后,系统自动生成数据报告:

三月签到活动 - 数据报告
========================
活动周期:2026-03-01 ~ 2026-03-07(7天)

参与概况
--------
总参与人数:15,234 人
日均参与:2,176 人
完成率(签到7天):68.3%

里程碑达成
----------
签到3天:14,567 人(95.6%)
签到5天:12,345 人(81.0%)
签到7天:10,405 人(68.3%)

奖励发放
--------
金币:1,523,400(15,234 × 100)
钻石:728,350(14,567 × 50)
英雄碎片:61,725(12,345 × 5)
限定皮肤:10,405

ROI 分析
--------
奖励总价值:¥152,340
参与玩家 ARPU 提升:+12.5%
次日留存率提升:+5.2%

十二、活动测试与灰度发布

12.1 活动配置的校验

活动配置错误可能导致严重的线上事故。因此,在活动上线前必须进行严格校验:

type ActivityValidator interface {
    Validate(activity *Activity) []ValidationError
}

type BaseValidator struct{}

func (v *BaseValidator) Validate(activity *Activity) []ValidationError {
    var errors []ValidationError
    
    if activity.Name == "" {
        errors = append(errors, ValidationError{
            Field:   "name",
            Message: "活动名称不能为空",
        })
    }
    
    if activity.StartTime.After(activity.EndTime) {
        errors = append(errors, ValidationError{
            Field:   "timeConfig",
            Message: "开始时间必须早于结束时间",
        })
    }
    
    if len(activity.Rewards) == 0 {
        errors = append(errors, ValidationError{
            Field:   "rewards",
            Message: "奖励配置不能为空",
        })
    }
    
    return errors
}

type EconomyValidator struct {
    budgetService BudgetService
}

func (v *EconomyValidator) Validate(activity *Activity) []ValidationError {
    var errors []ValidationError
    
    // 估算参与人数
    estimatedParticipants := v.estimateParticipants(activity)
    
    // 计算总奖励价值
    totalValue := v.calculateTotalValue(activity)
    
    // 检查是否超出预算
    budget := v.budgetService.GetBudget(activity.TimeConfig.StartTime, activity.TimeConfig.EndTime)
    if totalValue > budget.Remaining {
        errors = append(errors, ValidationError{
            Field:   "rewards",
            Message: fmt.Sprintf("奖励总价值 %.2f 超出剩余预算 %.2f", totalValue, budget.Remaining),
            Level:   "warning", // 警告级别,不阻止上线
        })
    }
    
    return errors
}

12.2 活动预览与测试

在活动正式上线前,运营人员应该能够预览活动效果:

func (s *ActivityService) Preview(activity *Activity, previewTime time.Time) (*ActivityPreview, error) {
    // 创建活动副本
    previewActivity := activity.Clone()
    
    // 模拟时间
    previewActivity.CurrentTime = previewTime
    
    // 计算当前阶段
    previewActivity.Phase = s.calculatePhase(previewActivity, previewTime)
    
    // 模拟参与数据
    previewActivity.SimulatedParticipants = s.simulateParticipants(previewActivity)
    
    return &ActivityPreview{
        Activity: previewActivity,
        Phase:    previewActivity.Phase,
        Stats:    s.calculateSimulatedStats(previewActivity),
    }, nil
}
type WhitelistTest struct {
    ActivityId int64
    PlayerIds  []int64
    StartTime  time.Time
    EndTime    time.Time
}

func (s *ActivityService) CreateWhitelistTest(test *WhitelistTest) error {
    // 创建测试活动
    activity := s.activityRepo.Get(test.ActivityId)
    if activity == nil {
        return ErrActivityNotFound
    }
    
    // 设置白名单
    activity.Whitelist = test.PlayerIds
    activity.TestMode = true
    activity.StartTime = test.StartTime
    activity.EndTime = test.EndTime
    
    return s.activityRepo.Save(activity)
}

func (s *ActivityService) CheckWhitelist(activity *Activity, playerId int64) bool {
    if !activity.TestMode {
        return true // 非测试模式,所有人可见
    }
    
    for _, id := range activity.Whitelist {
        if id == playerId {
            return true
        }
    }
    return false
}

12.3 灰度发布

对于大型活动,应该采用灰度发布策略,逐步放开参与人数:

type GrayscaleConfig struct {
    ActivityId   int64
    Strategy     string  // "percentage" | "region" | "player_id"
    Percentage   int     // 百分比策略:初始开放比例
    Regions      []string // 区域策略:开放的区域列表
    PlayerIdMod  int     // 玩家ID取模策略
}

func (s *ActivityService) CheckGrayscale(activity *Activity, playerId int64, region string) bool {
    config := activity.GrayscaleConfig
    if config == nil {
        return true // 未配置灰度,全量开放
    }
    
    switch config.Strategy {
    case "percentage":
        // 按比例开放
        hash := crc32.ChecksumIEEE([]byte(strconv.FormatInt(playerId, 10)))
        return hash%100 < uint32(config.Percentage)
        
    case "region":
        // 按区域开放
        for _, r := range config.Regions {
            if r == region {
                return true
            }
        }
        return false
        
    case "player_id_mod":
        // 按玩家ID取模开放
        return playerId%int64(config.PlayerIdMod) == 0
    }
    
    return true
}

// 动态调整灰度比例
func (s *ActivityService) AdjustGrayscale(activityId int64, newPercentage int) error {
    activity := s.activityRepo.Get(activityId)
    if activity == nil {
        return ErrActivityNotFound
    }
    
    if activity.GrayscaleConfig == nil {
        activity.GrayscaleConfig = &GrayscaleConfig{}
    }
    
    activity.GrayscaleConfig.Strategy = "percentage"
    activity.GrayscaleConfig.Percentage = newPercentage
    
    return s.activityRepo.Save(activity)
}

十三、容灾与回滚设计

13.1 活动配置的版本管理

活动配置应该有完整的版本历史,支持回滚:

CREATE TABLE activity_version (
    id BIGINT PRIMARY KEY,
    activity_id BIGINT NOT NULL,
    version INT NOT NULL,
    config JSON COMMENT '活动配置快照',
    change_reason VARCHAR(500) COMMENT '变更原因',
    changed_by VARCHAR(50) COMMENT '操作人',
    created_at DATETIME,
    UNIQUE KEY uk_activity_version (activity_id, version)
);
func (s *ActivityService) SaveVersion(activity *Activity, reason string, operator string) error {
    // 获取当前最大版本号
    maxVersion := s.versionRepo.GetMaxVersion(activity.Id)
    
    // 保存新版本
    version := &ActivityVersion{
        ActivityId:   activity.Id,
        Version:      maxVersion + 1,
        Config:       activity.Config,
        ChangeReason: reason,
        ChangedBy:    operator,
        CreatedAt:    time.Now(),
    }
    
    return s.versionRepo.Save(version)
}

func (s *ActivityService) Rollback(activityId int64, targetVersion int) error {
    // 获取目标版本配置
    version := s.versionRepo.Get(activityId, targetVersion)
    if version == nil {
        return ErrVersionNotFound
    }
    
    // 获取当前活动
    activity := s.activityRepo.Get(activityId)
    if activity == nil {
        return ErrActivityNotFound
    }
    
    // 保存当前版本(便于再次回滚)
    s.SaveVersion(activity, "回滚前备份", "system")
    
    // 恢复目标版本配置
    activity.Config = version.Config
    return s.activityRepo.Save(activity)
}

13.2 异常活动的自动熔断(理论方案,我们暂未实现)

当活动出现异常时(如参与人数异常激增、奖励发放失败率过高),应该自动熔断。以下是熔断器的设计思路:

type CircuitBreaker struct {
    activityId        int64
    failureCount      int
    failureThreshold  int
    lastFailureTime   time.Time
    state             string // "closed", "open", "half_open"
}

func (cb *CircuitBreaker) RecordFailure() {
    cb.failureCount++
    cb.lastFailureTime = time.Now()

    if cb.failureCount >= cb.failureThreshold {
        cb.state = "open"
        // 触发告警
        cb.alert()
    }
}

func (cb *CircuitBreaker) IsOpen() bool {
    if cb.state == "open" {
        // 检查是否可以进入半开状态
        if time.Since(cb.lastFailureTime) > 5*time.Minute {
            cb.state = "half_open"
            return false
        }
        return true
    }
    return false
}

func (s *ActivityService) CheckCircuitBreaker(activityId int64) bool {
    cb := s.getCircuitBreaker(activityId)
    if cb.IsOpen() {
        // 熔断状态,拒绝参与
        return false
    }
    return true
}

13.3 奖励补发机制

当奖励发放失败时,需要有补发机制:

type RewardRetryTask struct {
    Id         int64
    PlayerId   int64
    RewardPkg  *RewardPackage
    RetryCount int
    MaxRetry   int
    NextRetry  time.Time
    Status     string // "pending", "success", "failed"
    Error      string
}

func (s *RewardService) ProcessRetryTasks() {
    tasks := s.retryRepo.GetPendingTasks(time.Now(), 100)
    for _, task := range tasks {
        result, err := s.IssueSync(task.PlayerId, task.RewardPkg)
        
        if err == nil && len(result.Failed) == 0 {
            task.Status = "success"
        } else {
            task.RetryCount++
            task.Error = err.Error()
            
            if task.RetryCount >= task.MaxRetry {
                task.Status = "failed"
                // 告警,人工介入
                s.alertFailedTask(task)
            } else {
                // 指数退避
                task.NextRetry = time.Now().Add(time.Duration(1<<task.RetryCount) * time.Minute)
            }
        }
        
        s.retryRepo.Save(task)
    }
}

十四、总结

活动引擎的本质,是把"活动运营"从技术问题变成业务问题。

核心设计要点回顾

一个优秀的活动引擎,需要解决好以下核心问题:

建设路径建议

活动引擎的建设是一个渐进的过程,建议按以下路径推进:

最重要的是:不要为了技术而技术,要始终围绕运营的真实需求。活动引擎的价值,最终要体现在运营效率的提升和活动效果的改善上。


💬 评论 (0)

0/500
排序: