消息推送:触达用户的"最后一公里"

这是"游戏运营工具"系列的第四篇。在移动互联网时代,推送消息已经成为连接产品与用户的"最后一公里"。一条好的推送,能唤醒沉睡用户、提升活跃度、促进转化;但一条糟糕的推送,则可能让用户厌烦甚至卸载应用。今天我们来聊聊消息推送这门"精细的艺术"。


引言

"打开率 3%,点击率 0.5%。"

这是某游戏项目组的周报数据。运营同事很困惑:明明推送内容精心打磨了,发送时间也选在用户活跃高峰,为什么效果这么差?

问题可能不在单条推送本身,而在整个推送体系的策略设计技术支撑

推送不是"发出去"就完事,它是一个从用户洞察到内容创作、从时机选择到效果追踪的完整闭环。今天我们就从运营和技术的双重视角,拆解这个"最后一公里"的技术系统。


一、消息推送的重要性

1.1 为什么推送如此重要?

在竞争激烈的应用市场中,推送消息是重新连接用户的最直接渠道

数据显示,超过 60% 的用户在安装应用后的 7 天内会流失。而合理的推送策略,可以将这个比例降低 15-20%。对于游戏而言,一次有效的活动推送,可能带来的是数百万的流水增量。

推送的核心价值在于:

1.2 推送的"双刃剑"效应

推送虽好,但滥用会带来严重后果:

因此,推送系统的核心挑战在于:如何在"有效触达"和"避免打扰"之间找到平衡点。


二、推送技术原理深度解析

要真正理解推送系统,我们需要深入到底层,看看消息是如何从服务器到达用户设备的。这就像理解快递系统——不能只知道"寄出去-收得到",还要知道中间经过哪些中转站、有哪些环节可能出问题。

2.1 推送的本质:长连接与心跳

推送的核心问题是:服务器如何主动向客户端发送消息?

在传统的 HTTP 请求中,总是客户端主动发起请求,服务器被动响应。但推送需要反过来——服务器要有能力"主动联系"客户端。

解决方案有两种:

客户端每隔一段时间主动问服务器:"有消息给我吗?"

客户端: 有新消息吗?
服务器: 没有
(30秒后)
客户端: 有新消息吗?
服务器: 没有
(30秒后)
客户端: 有新消息吗?
服务器: 有!这是你要的推送内容

这就像你每隔半小时就给快递员打一次电话问"我的快递到了吗"。显然,这种方式效率极低:

客户端与服务器建立一条持久的 TCP 连接,双方可以随时互相发送数据。

客户端: 我要建立连接
服务器: 好的,连接已建立
(连接保持打开状态...)
服务器: 有新消息,推给你!
客户端: 收到了
(连接继续保持...)
服务器: 又有新消息!
客户端: 收到了

这就像你给快递员留了个对讲机,他随时可以呼叫你。这就是现代推送系统的基础。

但这里有个问题:移动网络不稳定。客户端可能切换 WiFi、进入电梯信号变差、系统休眠断开连接……

为了保持连接"活着",双方需要定期发送心跳包(Heartbeat):

客户端: (心跳) 我还活着
服务器: (心跳响应) 我也在
(60秒后)
客户端: (心跳) 我还活着
服务器: (心跳响应) 我也在

心跳的频率很关键:太频繁浪费电,太稀疏连接可能被中间设备(如 NAT 路由器)判定为超时断开。通常移动端心跳间隔在 1-5 分钟。

2.2 iOS 推送机制:APNs

苹果的推送服务(Apple Push Notification service,简称 APNs)是全球最成熟的推送系统之一。它的工作原理非常巧妙:

[你的服务器] --> [APNs 服务器] --> [iPhone 设备] --> [你的 App]
     ^                                          |
<table>
<thead><tr>
</tr></thead><tbody>
</tbody></table>
   (反馈服务) <------------------------------(用户操作)
  1. 苹果作为中转站:你的服务器不直接连用户设备,而是通过苹果的 APNs 服务器中转。这样做的好处是:
- 苹果统一管理所有 App 的推送,效率更高

- 即使你的 App 没运行,苹果的系统进程也在监听推送 - 安全性高,每条推送都要用苹果颁发的证书签名

  1. 设备令牌(Device Token):每个设备上的每个 App,都有一个唯一的 Device Token。你的服务器需要先获取这个 Token,才能向该设备推送。
// iOS 客户端注册推送
func registerForPushNotifications() {
    UNUserNotificationCenter.current().requestAuthorization(
        options: [.alert, .sound, .badge]
    ) { granted, error in
        if granted {
            DispatchQueue.main.async {
                UIApplication.shared.registerForRemoteNotifications()
            }
        }
    }
}

// 获取 Device Token
func application(_ application: UIApplication, 
                 didRegisterForRemoteNotificationsWithDeviceToken deviceToken: Data) {
    let token = deviceToken.map { String(format: "%02.2hhx", $0) }.joined()
    // 把这个 token 发送给你的服务器保存
    sendTokenToServer(token: token)
}
  1. 证书与签名:APNs 要求每条推送都必须用有效的证书签名。这防止了恶意服务器冒充你发送推送。
// 服务器端发送 APNs 推送(Go 示例)
func SendAPNSPush(deviceToken string, title string, body string) error {
    // 加载证书
    cert, err := tls.LoadX509KeyPair("cert.pem", "key.pem")
    if err != nil {
        return err
    }
    
    // 构建 JSON Payload
    payload := map[string]interface{}{
        "aps": map[string]interface{}{
            "alert": map[string]string{
                "title": title,
                "body":  body,
            },
            "badge": 1,
            "sound": "default",
        },
    }
    payloadBytes, _ := json.Marshal(payload)
    
    // 发送到 APNs(生产环境)
    url := fmt.Sprintf("https://api.push.apple.com/3/device/%s", deviceToken)
    req, _ := http.NewRequest("POST", url, bytes.NewReader(payloadBytes))
    req.Header.Set("content-type", "application/json")
    
    // 使用证书发送
    tlsConfig := &tls.Config{Certificates: []tls.Certificate{cert}}
    client := &http.Client{Transport: &http.Transport{TLSClientConfig: tlsConfig}}
    _, err = client.Do(req)
    
    return err
}
  1. 连接限制:APNs 对连接数有严格限制。你不能为每条推送都新建连接,必须维护一个连接池,复用已有连接。苹果官方建议每个服务器最多建立约 20 条并发连接。

2.3 Android 推送机制:碎片化的挑战

Android 的推送生态要复杂得多,因为:

  1. Google FCM 在国内不可用:Google 服务框架在国内大部分手机上不存在
  2. 各厂商有自己的推送服务:华为、小米、OPPO、vivo、魅族等都有自己的推送通道
  3. 系统级进程保活受限:Android 8.0 后,后台进程被严格限制
通道 覆盖范围 特点 到达率
Google FCM 海外 Android Google 官方,系统级支持 90%+(海外)
华为 HMS Push 华为设备 系统级推送,无需 App 运行 95%+
小米 MiPush 小米设备 支持所有 Android,但小米设备效果最好 90%+
OPPO Push OPPO 设备 系统级推送 85%+
vivo Push vivo 设备 系统级推送 85%+
自建长连接 所有设备 依赖 App 进程存活 30-70%(不稳定)
// 推送服务统一接口
type PushService interface {
    Push(ctx context.Context, req *PushRequest) (*PushResponse, error)
}

// 多通道推送管理器
type MultiChannelPushManager struct {
    apns     *APNSService      // iOS
    fcm      *FCMService       // Google
    huawei   *HuaweiPushService
    xiaomi   *XiaomiPushService
    oppo     *OPPOPushService
    vivo     *VIVOPushService
}

// 智能路由:根据设备信息选择最佳通道
func (m *MultiChannelPushManager) Push(ctx context.Context, req *PushRequest) (*PushResponse, error) {
    // 根据设备信息选择通道
    channel := m.selectChannel(req.DeviceInfo)
    
    var err error
    var resp *PushResponse
    
    switch channel {
    case "apns":
        resp, err = m.apns.Push(ctx, req)
    case "fcm":
        resp, err = m.fcm.Push(ctx, req)
    case "huawei":
        resp, err = m.huawei.Push(ctx, req)
    case "xiaomi":
        resp, err = m.xiaomi.Push(ctx, req)
    case "oppo":
        resp, err = m.oppo.Push(ctx, req)
    case "vivo":
        resp, err = m.vivo.Push(ctx, req)
    default:
        // 降级到自建长连接(注:我们暂未实现自建长连接通道)
        resp, err = m.selfConnection.Push(ctx, req)
    }
    
    // 如果主通道失败,尝试备用通道
    if err != nil {
        for _, backupChannel := range m.getBackupChannels(channel) {
            resp, err = m.pushByChannel(ctx, backupChannel, req)
            if err == nil {
                break
            }
        }
    }
    
    return resp, err
}

// 设备注册:收集各通道的推送令牌
type DevicePushInfo struct {
    DeviceID      string
    Platform      string // "ios" / "android"
    APNSToken     string // iOS 设备令牌
    FCMMToken     string // Google FCM 令牌
    HuaweiToken   string // 华为推送令牌
    XiaomiRegID   string // 小米推送 RegID
    OPPORegisterID string
    VIVORegisterID string
}
// 小米推送示例
func (s *XiaomiPushService) Push(ctx context.Context, req *PushRequest) error {
    // 小米推送 API
    url := "https://api.xmpush.xiaomi.com/v3/message/regid"
    
    // 构建消息
    message := map[string]interface{}{
        "registration_id": req.XiaomiRegID,
        "title":           req.Title,
        "description":     req.Body,
        "notify_type":     1, // DEFAULT_ALL
        "time_to_live":    86400000, // 24小时
        "extra": map[string]string{
            "callback": "https://your-server.com/push/callback",
        },
    }
    
    body, _ := json.Marshal(message)
    httpReq, _ := http.NewRequest("POST", url, bytes.NewReader(body))
    httpReq.Header.Set("Authorization", "key="+s.AppSecret)
    httpReq.Header.Set("Content-Type", "application/json")
    
    resp, err := http.DefaultClient.Do(httpReq)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    // 解析响应
    var result XiaomiPushResult
    json.NewDecoder(resp.Body).Decode(&result)
    
    if result.Code != 0 {
        return fmt.Errorf("xiaomi push failed: %s", result.Reason)
    }
    
    return nil
}

2.4 短信推送:最传统但最可靠的通道

短信推送看似"落后",但在某些场景下仍然是不可替代的:

// 短信推送服务(对接阿里云短信)
type AliyunSMSService struct {
    AccessKeyID     string
    AccessKeySecret string
    SignName        string // 短信签名,如"XX游戏"
}

func (s *AliyunSMSService) SendSMS(phone string, templateCode string, params map[string]string) error {
    // 构建请求参数
    args := map[string]string{
        "PhoneNumbers":  phone,
        "SignName":      s.SignName,
        "TemplateCode":  templateCode,
        "TemplateParam": toJSON(params),
        "OutId":         generateUUID(), // 用于追踪
    }
    
    // 签名
    signature := s.signRequest(args)
    
    // 发送请求
    url := "https://dysmsapi.aliyuncs.com/?Action=SendSms&" + toQueryString(args) + "&Signature=" + signature
    resp, err := http.Get(url)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    // 解析响应
    var result SMSResponse
    json.NewDecoder(resp.Body).Decode(&result)
    
    if result.Code != "OK" {
        return fmt.Errorf("SMS send failed: %s", result.Message)
    }
    
    return nil
}

// 使用示例
func SendRecallSMS(phone string, userName string, days int) error {
    sms := &AliyunSMSService{...}
    
    // 使用模板发送
    // 模板内容:"【XX游戏】亲爱的${username},您已经${days}天没来玩了,小精灵们都在想您呢!点击回来领取专属礼包:xxx"
    return sms.SendSMS(phone, "SMS_123456", map[string]string{
        "username": userName,
        "days":     strconv.Itoa(days),
    })
}
  1. 内容审核:短信内容必须提前在运营商平台审核,敏感词汇会被拦截
  2. 频率限制:同一用户每天最多 3-5 条,避免被投诉
  3. 发送时间:避开深夜(22:00-08:00),这个时段发送的短信打开率极低且容易引起反感
  4. 智能降级:优先使用 App 推送,失败后再用短信

三、推送渠道对比

现代应用通常有多种推送渠道,各有优劣,需要根据场景选择。

3.1 站内信(应用内消息)

站内信是用户打开应用后才能看到的消息,通常以弹窗、通知栏、红点等形式呈现。

3.2 短信推送

短信是最传统的推送方式,但至今仍然有效。

3.3 App 推送(系统级通知)

App 推送是智能手机系统提供的消息推送能力,可以在应用未运行时触达用户。

3.4 渠道选择策略

不同场景下,应该选择不同的推送渠道:

场景 推荐渠道 原因
日常活动通知 App 推送 成本低、触达广
VIP 用户召回 短信 + App 推送 多重触达、体现重视
系统维护公告 站内信 + App 推送 确保用户知晓
账号安全警报 短信 + 站内信 紧急重要、确保触达
流失用户召回 短信 可能已卸载应用

最佳实践是多渠道协同:先用 App 推送触达活跃用户,再用短信触达无响应的高价值用户,站内信作为应用内信息的补充。


四、推送策略设计

推送不是技术问题,首先是策略问题。以下是一些核心的推送策略原则。

4.1 用户分层与精准推送

"千人千面"是推送的黄金法则。不同用户需要不同的推送内容:

精准推送的目的是:在正确的时间,把正确的内容,推给正确的人。

// 用户画像服务
type UserProfileService struct {
    db *sql.DB
    cache *redis.Client
}

// 用户标签
type UserTags struct {
    Lifecycle string // "new", "active", "at_risk", "churned"
    Value     string // "high", "medium", "low"
    Preference []string // ["social", "competitive", "collector"]
    LastActiveTime time.Time
    TotalPayment  float64
}

// 获取用户标签
func (s *UserProfileService) GetUserTags(userID string) (*UserTags, error) {
    // 先从缓存获取
    cacheKey := "user:tags:" + userID
    cached, err := s.cache.Get(cacheKey).Result()
    if err == nil {
        var tags UserTags
        json.Unmarshal([]byte(cached), &tags)
        return &tags, nil
    }
    
    // 从数据库查询
    var tags UserTags
    err = s.db.QueryRow(`
        SELECT lifecycle, value_tier, preferences, last_active, total_payment
        FROM user_profiles WHERE user_id = ?
    `, userID).Scan(&tags.Lifecycle, &tags.Value, &tags.Preference, &tags.LastActiveTime, &tags.TotalPayment)
    
    if err != nil {
        return nil, err
    }
    
    // 缓存结果
    tagsJSON, _ := json.Marshal(tags)
    s.cache.Set(cacheKey, tagsJSON, time.Hour)
    
    return &tags, nil
}

// 根据标签选择推送模板
func SelectPushTemplate(tags *UserTags, eventType string) string {
    // 根据生命周期选择
    switch tags.Lifecycle {
    case "new":
        return "newbie_" + eventType
    case "active":
        return "active_" + eventType
    case "at_risk":
        return "recall_" + eventType
    case "churned":
        return "churn_" + eventType
    }
    
    // 根据偏好选择
    for _, pref := range tags.Preference {
        if pref == "social" {
            return "social_" + eventType
        }
    }
    
    return "default_" + eventType
}

4.2 发送时机优化

推送时机对打开率影响巨大:

// 推送时机优化服务
type TimingOptimizer struct {
    userActivityDB *UserActivityDB
}

// 用户活跃时段分析
type UserActiveHours struct {
    Hours [24]int // 每个小时的活跃次数
    Days  [7]int  // 每周每天的活跃次数
}

// 找出最佳推送时间
func (t *TimingOptimizer) GetBestPushTime(userID string) time.Time {
    // 获取用户历史活跃数据
    activeHours := t.userActivityDB.GetUserActiveHours(userID)
    
    // 找出最活跃的时段
    bestHour := 0
    maxActivity := 0
    for hour, count := range activeHours.Hours {
        if count > maxActivity {
            maxActivity = count
            bestHour = hour
        }
    }
    
    // 找出最活跃的日期
    bestDay := 0
    maxDayActivity := 0
    for day, count := range activeHours.Days {
        if count > maxDayActivity {
            maxDayActivity = count
            bestDay = day
        }
    }
    
    // 计算下次最佳时间
    now := time.Now()
    targetTime := time.Date(now.Year(), now.Month(), now.Day(), bestHour, 0, 0, 0, now.Location())
    
    // 调整到最近的该星期几
    daysUntilBest := (bestDay - int(now.Weekday()) + 7) % 7
    if daysUntilBest == 0 && now.Hour() >= bestHour {
        daysUntilBest = 7 // 推到下周
    }
    targetTime = targetTime.AddDate(0, 0, daysUntilBest)
    
    return targetTime
}

4.3 频率控制

推送频率是影响用户体验的关键因素:

// 推送频率控制器
type FrequencyController struct {
    redis *redis.Client
}

// 检查是否可以发送推送
func (f *FrequencyController) CanSendPush(userID string, pushType string) (bool, time.Duration) {
    ctx := context.Background()
    
    // 检查每日上限
    dailyKey := fmt.Sprintf("push:daily:%s:%s", userID, time.Now().Format("2006-01-02"))
    dailyCount, _ := f.redis.Get(ctx, dailyKey).Int()
    if dailyCount >= 3 { // 每天最多3条
        return false, time.Until(time.Now().AddDate(0, 0, 1).Truncate(24 * time.Hour))
    }
    
    // 检查同类消息间隔
    typeKey := fmt.Sprintf("push:type:%s:%s", userID, pushType)
    lastSent, _ := f.redis.Get(ctx, typeKey).Int64()
    if lastSent > 0 {
        elapsed := time.Since(time.Unix(lastSent, 0))
        if elapsed < 4 * time.Hour { // 同类消息至少间隔4小时
            return false, 4*time.Hour - elapsed
        }
    }
    
    // 检查用户是否长期不响应
    responseKey := fmt.Sprintf("push:response:%s", userID)
    noResponseDays, _ := f.redis.Get(ctx, responseKey).Int()
    if noResponseDays >= 7 { // 7天不响应,降频到每3天1条
        lastAnyPush, _ := f.redis.Get(ctx, "push:last:"+userID).Int64()
        if time.Since(time.Unix(lastAnyPush, 0)) < 3*24*time.Hour {
            return false, 3*24*time.Hour - time.Since(time.Unix(lastAnyPush, 0))
        }
    }
    
    return true, 0
}

// 记录推送发送
func (f *FrequencyController) RecordPush(userID string, pushType string) {
    ctx := context.Background()
    now := time.Now()
    
    // 更新每日计数
    dailyKey := fmt.Sprintf("push:daily:%s:%s", userID, now.Format("2006-01-02"))
    f.redis.Incr(ctx, dailyKey)
    f.redis.Expire(ctx, dailyKey, 24*time.Hour)
    
    // 更新同类消息时间
    typeKey := fmt.Sprintf("push:type:%s:%s", userID, pushType)
    f.redis.Set(ctx, typeKey, now.Unix(), 24*time.Hour)
    
    // 更新最后推送时间
    f.redis.Set(ctx, "push:last:"+userID, now.Unix(), 30*24*time.Hour)
}

// 记录用户响应
func (f *FrequencyController) RecordResponse(userID string) {
    ctx := context.Background()
    // 用户响应了,重置不响应计数
    f.redis.Del(ctx, "push:response:"+userID)
}

// 记录用户不响应
func (f *FrequencyController) RecordNoResponse(userID string) {
    ctx := context.Background()
    f.redis.Incr(ctx, "push:response:"+userID)
    f.redis.Expire(ctx, "push:response:"+userID, 30*24*time.Hour)
}

4.4 A/B 测试与迭代优化

推送策略不是一成不变的,需要持续测试和优化:

通过持续的 A/B 测试,逐步积累推送的"最佳实践",形成自己的推送策略知识库。

// A/B 测试服务
type ABTestService struct {
    db *sql.DB
}

// 实验配置
type Experiment struct {
    ID          string
    Name        string
    Variants    []Variant
    StartTime   time.Time
    EndTime     time.Time
    Metrics     []string // 关注的指标
}

type Variant struct {
    ID       string
    Name     string
    Traffic  float64 // 流量占比,如 0.5 表示 50%
    Config   map[string]interface{} // 该变体的配置
}

// 分配用户到实验组
func (a *ABTestService) AssignVariant(userID string, experimentID string) string {
    // 获取实验配置
    var exp Experiment
    // ... 从数据库加载
    
    // 使用一致性哈希确保同一用户总是分配到同一组
    hash := md5.Sum([]byte(userID + experimentID))
    hashValue := float64(hash[0]) / 255.0
    
    // 根据流量占比分配
    cumulative := 0.0
    for _, variant := range exp.Variants {
        cumulative += variant.Traffic
        if hashValue < cumulative {
            return variant.ID
        }
    }
    
    return exp.Variants[0].ID
}

// 记录实验指标
func (a *ABTestService) RecordMetric(experimentID string, variantID string, userID string, metric string, value float64) {
    a.db.Exec(`
        INSERT INTO experiment_metrics (experiment_id, variant_id, user_id, metric, value, created_at)
        VALUES (?, ?, ?, ?, ?, ?)
    `, experimentID, variantID, userID, metric, value, time.Now())
}

// 分析实验结果
func (a *ABTestService) AnalyzeExperiment(experimentID string) *ExperimentResult {
    // 查询各组指标
    rows, _ := a.db.Query(`
        SELECT variant_id, metric, AVG(value), COUNT(*)
        FROM experiment_metrics
        WHERE experiment_id = ?
        GROUP BY variant_id, metric
    `, experimentID)
    
    results := make(map[string]map[string]MetricStats)
    for rows.Next() {
        var variantID, metric string
        var avgValue float64
        var count int
        rows.Scan(&variantID, &metric, &avgValue, &count)
        
        if results[variantID] == nil {
            results[variantID] = make(map[string]MetricStats)
        }
        results[variantID][metric] = MetricStats{
            Average: avgValue,
            Count:   count,
        }
    }
    
    // 计算显著性差异
    // ... 使用 t-test 或 chi-square test
    
    return &ExperimentResult{
        ExperimentID: experimentID,
        Variants:     results,
    }
}

五、技术架构

推送系统看起来简单,背后却需要一个复杂的技术架构支撑。

5.1 整体架构设计

一个完整的推送系统通常包含以下模块:

┌─────────────────────────────────────────────────────────────────┐
│                         业务系统                                 │
│  (活动系统、用户系统、社交系统、运营后台等)                        │
└────────────────────────────┬────────────────────────────────────┘
                             │
                             ▼
┌─────────────────────────────────────────────────────────────────┐
│                       推送 API 网关                              │
│  - 统一接口                                                      │
│  - 权限验证                                                      │
│  - 流量控制                                                      │
└────────────────────────────┬────────────────────────────────────┘
                             │
                             ▼
┌─────────────────────────────────────────────────────────────────┐
│                       消息队列 (Kafka)                           │
│  - 推送请求缓冲                                                  │
│  - 削峰填谷                                                      │
└────────────────────────────┬────────────────────────────────────┘
                             │
                             ▼
┌─────────────────────────────────────────────────────────────────┐
│                    推送处理服务集群                               │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐          │
│  │  用户筛选    │  │  规则匹配    │  │  内容渲染    │          │
│  └──────────────┘  └──────────────┘  └──────────────┘          │
└────────────────────────────┬────────────────────────────────────┘
                             │
                             ▼
┌─────────────────────────────────────────────────────────────────┐
│                    多通道推送网关                                │
│  ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐        │
│  │  APNs  │ │  FCM   │ │ 华为   │ │ 小米   │ │  短信  │        │
│  └────────┘ └────────┘ └────────┘ └────────┘ └────────┘        │
└────────────────────────────┬────────────────────────────────────┘
                             │
                             ▼
                    ┌────────────────┐
                    │   用户设备      │
                    └────────────────┘

5.2 核心代码实现

// 推送服务主流程
type PushService struct {
    queue         *kafka.Consumer
    userManager   *UserProfileService
    ruleEngine    *RuleEngine
    templateSvc   *TemplateService
    channelRouter *MultiChannelPushManager
    tracker       *TrackingService
}

// 推送请求
type PushRequest struct {
    RequestID   string
    EventType   string      // 事件类型:activity_start, friend_invite, etc.
    TargetUsers []string    // 目标用户列表(或规则)
    Content     PushContent // 推送内容
    Schedule    *Schedule   // 定时发送
    Priority    int         // 优先级
    Channels    []string    // 指定通道(可选)
}

// 处理推送请求
func (s *PushService) ProcessPushRequest(ctx context.Context, req *PushRequest) error {
    // 1. 如果是定时推送,加入调度队列
    if req.Schedule != nil && req.Schedule.SendAt.After(time.Now()) {
        return s.schedulePush(req)
    }
    
    // 2. 用户筛选(如果是规则筛选)
    targetUsers := req.TargetUsers
    if len(req.TargetRules) > 0 {
        users, err := s.ruleEngine.MatchUsers(req.TargetRules)
        if err != nil {
            return err
        }
        targetUsers = users
    }
    
    // 3. 并发处理每个用户
    var wg sync.WaitGroup
    sem := make(chan struct{}, 100) // 并发控制
    
    for _, userID := range targetUsers {
        wg.Add(1)
        go func(uid string) {
            defer wg.Done()
            sem <- struct{}{}
            defer func() { <-sem }()
            
            // 获取用户信息
            userTags, _ := s.userManager.GetUserTags(uid)
            deviceInfo, _ := s.userManager.GetDeviceInfo(uid)
            
            // 频率控制检查
            if canSend, _ := s.frequencyCtrl.CanSendPush(uid, req.EventType); !canSend {
                s.tracker.RecordSkip(uid, req.RequestID, "frequency_limit")
                return
            }
            
            // 渲染推送内容(模板 + 变量)
            content := s.templateSvc.Render(req.Content.TemplateID, map[string]interface{}{
                "username": uid,
                "tags":     userTags,
            })
            
            // 选择推送通道
            channel := s.channelRouter.SelectChannel(deviceInfo, req.Channels)
            
            // 发送推送
            resp, err := s.channelRouter.Push(ctx, channel, &ChannelPushRequest{
                DeviceInfo: deviceInfo,
                Content:    content,
            })
            
            if err != nil {
                s.tracker.RecordFailure(uid, req.RequestID, err.Error())
            } else {
                s.tracker.RecordSuccess(uid, req.RequestID, resp.MessageID)
                s.frequencyCtrl.RecordPush(uid, req.EventType)
            }
        }(userID)
    }
    
    wg.Wait()
    return nil
}

5.3 高并发场景处理

大型活动时,可能需要同时向数百万用户发送推送:

// 批量推送处理器
type BatchPushProcessor struct {
    batchSize    int           // 每批数量
    batchTimeout time.Duration // 批次超时
    pushService  *PushService
}

func (p *BatchPushProcessor) ProcessBatch(users []string, content PushContent) error {
    // 分批处理
    for i := 0; i < len(users); i += p.batchSize {
        end := i + p.batchSize
        if end > len(users) {
            end = len(users)
        }
        
        batch := users[i:end]
        
        // 并发处理这一批
        var wg sync.WaitGroup
        for _, userID := range batch {
            wg.Add(1)
            go func(uid string) {
                defer wg.Done()
                p.pushService.SendToUser(uid, content)
            }(userID)
        }
        wg.Wait()
        
        // 批次间延迟,避免瞬时压力
        if end < len(users) {
            time.Sleep(100 * time.Millisecond)
        }
    }
    
    return nil
}

六、可靠性保障机制

推送系统的可靠性至关重要。用户收不到关键通知(如账号安全警报)可能导致严重后果。下面从多个维度介绍如何保障推送的可靠性。

6.1 送达保障

// 多通道推送,确保送达
func (m *MultiChannelPushManager) PushWithFallback(ctx context.Context, req *PushRequest) error {
    // 按优先级尝试各个通道
    channels := m.getChannelPriority(req.DeviceInfo)
    
    var lastErr error
    for _, channel := range channels {
        resp, err := m.pushByChannel(ctx, channel, req)
        if err == nil {
            // 发送成功,记录使用的通道
            m.recordSuccess(req.UserID, channel, resp.MessageID)
            return nil
        }
        
        lastErr = err
        m.recordFailure(req.UserID, channel, err.Error())
        
        // 如果是永久性错误(如 Token 无效),不再尝试其他通道
        if isPermanentError(err) {
            break
        }
    }
    
    // 所有通道都失败,降级到短信(如果用户价值足够高)
    if req.UserValue == "high" && req.DeviceInfo.Phone != "" {
        return m.smsService.SendSMS(req.DeviceInfo.Phone, req.Content.ShortText())
    }
    
    return lastErr
}

当用户设备离线时,消息需要缓存起来,等用户上线后再推送:

// 离线消息存储
type OfflineMessageStore struct {
    redis *redis.Client
}

// 存储离线消息
func (s *OfflineMessageStore) Store(userID string, msg *PushMessage) error {
    key := fmt.Sprintf("offline:msg:%s", userID)
    
    // 序列化消息
    msgBytes, _ := json.Marshal(msg)
    
    // 添加到列表
    s.redis.LPush(context.Background(), key, msgBytes)
    
    // 限制数量,避免占用过多内存
    s.redis.LTrim(context.Background(), key, 0, 99) // 最多保留100条
    
    // 设置过期时间(7天)
    s.redis.Expire(context.Background(), key, 7*24*time.Hour)
    
    return nil
}

// 用户上线时获取离线消息
func (s *OfflineMessageStore) GetPendingMessages(userID string) ([]*PushMessage, error) {
    key := fmt.Sprintf("offline:msg:%s", userID)
    
    // 获取所有消息
    results, err := s.redis.LRange(context.Background(), key, 0, -1).Result()
    if err != nil {
        return nil, err
    }
    
    messages := make([]*PushMessage, 0, len(results))
    for _, result := range results {
        var msg PushMessage
        json.Unmarshal([]byte(result), &msg)
        messages = append(messages, &msg)
    }
    
    // 清空已读取的消息
    s.redis.Del(context.Background(), key)
    
    return messages, nil
}
// 推送状态追踪
type PushTracker struct {
    db *sql.DB
}

// 记录推送状态
type PushStatus struct {
    MessageID   string
    UserID      string
    Channel     string
    Status      string // "sent", "delivered", "clicked", "failed"
    SentAt      time.Time
    DeliveredAt *time.Time
    ClickedAt   *time.Time
    ErrorMsg    string
}

// 更新推送状态(通过回调)
func (t *PushTracker) UpdateStatus(messageID string, status string, errorMsg string) error {
    now := time.Now()
    
    query := `
        UPDATE push_status 
        SET status = ?, 
            delivered_at = CASE WHEN ? = 'delivered' THEN ? ELSE delivered_at END,
            clicked_at = CASE WHEN ? = 'clicked' THEN ? ELSE clicked_at END,
            error_msg = ?
        WHERE message_id = ?
    `
    
    _, err := t.db.Exec(query, status, status, now, status, now, errorMsg, messageID)
    return err
}

// 重试未送达的推送
func (t *PushTracker) RetryUndelivered() {
    // 查找 30 分钟前发送但仍未送达的推送
    rows, _ := t.db.Query(`
        SELECT message_id, user_id, content 
        FROM push_status 
        WHERE status = 'sent' 
          AND sent_at < ?
          AND retry_count < 3
    `, time.Now().Add(-30*time.Minute))
    
    for rows.Next() {
        var messageID, userID, content string
        rows.Scan(&messageID, &userID, &content)
        
        // 重新发送
        // ...
        
        // 更新重试次数
        t.db.Exec("UPDATE push_status SET retry_count = retry_count + 1 WHERE message_id = ?", messageID)
    }
}

6.2 幂等性保障

推送请求可能因为网络问题重复发送,必须保证幂等性——同一请求多次发送,用户只收到一条推送。

// 幂等性检查
type IdempotencyChecker struct {
    redis *redis.Client
}

// 检查并标记请求
func (i *IdempotencyChecker) CheckAndMark(requestID string) (bool, error) {
    key := fmt.Sprintf("push:idempotent:%s", requestID)
    
    // 使用 SETNX 保证原子性
    ok, err := i.redis.SetNX(context.Background(), key, "1", 24*time.Hour).Result()
    if err != nil {
        return false, err
    }
    
    // ok=true 表示这是第一次处理,可以继续
    // ok=false 表示已经处理过,应该跳过
    return ok, nil
}

// 在推送处理流程中使用
func (s *PushService) ProcessPushRequest(req *PushRequest) error {
    // 幂等性检查
    isFirst, err := s.idempotency.CheckAndMark(req.RequestID)
    if err != nil {
        return err
    }
    
    if !isFirst {
        // 已经处理过,跳过
        return nil
    }
    
    // 继续处理推送...
    return s.doProcessPush(req)
}

6.3 监控与告警

指标 含义 告警阈值
推送请求量 每分钟请求的推送数量 异常突增
成功率 成功发送/总请求 < 95%
平均延迟 从请求到发送的时间 > 5s
通道成功率 各通道的独立成功率 < 90%
Token 有效率 有效 Token/总 Token < 80%
// 推送监控
type PushMonitor struct {
    prometheus *prometheus.Registry
}

var (
    pushTotal = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "push_requests_total",
            Help: "Total number of push requests",
        },
        []string{"channel", "status"},
    )
    
    pushLatency = prometheus.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "push_latency_seconds",
            Help:    "Push request latency",
            Buckets: []float64{0.1, 0.5, 1, 2, 5, 10},
        },
        []string{"channel"},
    )
)

// 记录推送指标
func (m *PushMonitor) RecordPush(channel string, status string, latency float64) {
    pushTotal.WithLabelValues(channel, status).Inc()
    pushLatency.WithLabelValues(channel).Observe(latency)
}

// 告警规则(Prometheus 配置示例)
/*
groups:
  - name: push_alerts
    rules:
      - alert: PushSuccessRateLow
        expr: rate(push_requests_total{status="success"}[5m]) / rate(push_requests_total[5m]) < 0.95
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Push success rate below 95%"
          
      - alert: PushLatencyHigh
        expr: histogram_quantile(0.95, rate(push_latency_seconds_bucket[5m])) > 5
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Push latency P95 above 5 seconds"
*/

6.4 灾备方案

当推送系统完全不可用时(如机房故障),需要有灾备方案:

┌─────────────┐     ┌─────────────┐
│  机房 A      │     │  机房 B      │
│  (主)       │     │  (备)       │
│  推送服务   │────▶│  推送服务   │
│  数据库     │◀────│  数据库副本 │
└─────────────┘     └─────────────┘
// 系统降级管理器
type DegradationManager struct {
    status string // "normal", "degraded", "emergency"
}

func (d *DegradationManager) ProcessPush(req *PushRequest) error {
    switch d.status {
    case "normal":
        // 正常处理,所有功能可用
        return d.fullProcess(req)

    case "degraded":
        // 降级模式:只处理高优先级推送
        if req.Priority >= PriorityHigh {
            return d.simplifiedProcess(req)
        }
        return ErrSystemDegraded

    case "emergency":
        // 紧急模式:只处理安全相关推送
        if req.EventType == "security_alert" {
            return d.emergencyProcess(req)
        }
        return ErrSystemEmergency
    }

    return nil
}

七、推送效果分析

推送发出后,工作并没有结束。效果分析是优化推送策略的关键。

7.1 核心指标定义

// 推送效果分析
type PushAnalytics struct {
    db *sql.DB
}

// 计算核心指标
func (a *PushAnalytics) CalculateMetrics(campaignID string, period time.Duration) *Metrics {
    startTime := time.Now().Add(-period)
    
    var sent, delivered, clicked, converted int64
    
    // 查询发送数
    a.db.QueryRow(`
        SELECT COUNT(*) FROM push_status 
        WHERE campaign_id = ? AND sent_at >= ?
    `, campaignID, startTime).Scan(&sent)
    
    // 查询送达数
    a.db.QueryRow(`
        SELECT COUNT(*) FROM push_status 
        WHERE campaign_id = ? AND delivered_at >= ?
    `, campaignID, startTime).Scan(&delivered)
    
    // 查询点击数
    a.db.QueryRow(`
        SELECT COUNT(*) FROM push_status 
        WHERE campaign_id = ? AND clicked_at >= ?
    `, campaignID, startTime).Scan(&clicked)
    
    // 查询转化数(如充值、下单等)
    a.db.QueryRow(`
        SELECT COUNT(DISTINCT user_id) FROM user_actions 
        WHERE campaign_id = ? AND action_type = 'conversion' AND created_at >= ?
    `, campaignID, startTime).Scan(&converted)
    
    return &Metrics{
        SentRate:      float64(delivered) / float64(sent) * 100,
        DeliveryRate:  float64(delivered) / float64(sent) * 100,
        OpenRate:      float64(clicked) / float64(delivered) * 100,
        ConversionRate: float64(converted) / float64(clicked) * 100,
    }
}

7.2 数据分析维度

7.3 归因分析

用户打开应用后完成了某个行为(比如充值),这个行为是否由推送触发?

7.4 持续优化闭环

效果分析的最终目的是指导优化:


八、最佳实践与案例

8.1 某游戏的推送实践

// 根据用户状态选择推送内容
func getPushContent(user *User) string {
    daysSinceLastLogin := time.Since(user.LastLoginAt).Hours() / 24
    
    switch {
    case user.IsNew && daysSinceLastLogin < 3:
        // 新手:引导推送
        return "您的新手礼包还没领完呢,快来领取!"
        
    case daysSinceLastLogin >= 7:
        // 流失用户:召回推送
        return "您的公会正在召唤您!回归即送SSR卡牌一张!"
        
    case user.TotalPayment > 1000:
        // 高价值用户:VIP专属
        return "尊贵的VIP玩家,专属活动已开启,限时福利等你来拿!"
        
    case user.Preference == "social":
        // 社交型:好友互动
        return "您的好友小明刚刚突破了第50关,来挑战他吧!"
        
    default:
        // 默认:通用活动
        return "限时活动开启,稀有道具等你来抽!"
    }
}
// 根据用户历史行为选择最佳推送时间
func getBestPushTime(userID string) int {
    // 查询用户过去7天的活跃时段
    rows, _ := db.Query(`
        SELECT HOUR(created_at) as hour, COUNT(*) as cnt
        FROM user_sessions
        WHERE user_id = ? AND created_at > DATE_SUB(NOW(), INTERVAL 7 DAY)
        GROUP BY hour
        ORDER BY cnt DESC
        LIMIT 1
    `, userID)
    
    var bestHour int
    if rows.Next() {
        rows.Scan(&bestHour)
    } else {
        bestHour = 20 // 默认晚上8点
    }
    
    return bestHour
}

8.2 推送内容最佳实践

❌ "活动开始了,快来参加!" ✅ "亲爱的玩家,您收藏的限定皮肤'暗夜使者'现在打折了,仅限24小时!"

❌ "您已7天未登录" ✅ "您的小精灵已经饿了7天了,它想您了!回来喂喂它吧~"

❌ "新版本已发布" ✅ "新版本重磅更新!前10000名更新玩家可获得限定头像框!"

❌ "您的好友上线了" ✅ "您的好友小明刚刚击败了最终Boss,来挑战他的记录吧!"


九、总结

消息推送,看似简单,实则是一门需要技术、数据、心理学多学科融合的艺术。

最后,也是最重要的:推送的本质是与用户建立连接,而不是打扰用户。每一条推送发送前,都问问自己:这条推送对用户有价值吗?如果答案是"不确定",那就不要发。

毕竟,用户的注意力是最稀缺的资源,浪费它是一种罪过。


💬 评论 (0)

0/500
排序: