消息推送:触达用户的"最后一公里"
这是"游戏运营工具"系列的第四篇。在移动互联网时代,推送消息已经成为连接产品与用户的"最后一公里"。一条好的推送,能唤醒沉睡用户、提升活跃度、促进转化;但一条糟糕的推送,则可能让用户厌烦甚至卸载应用。今天我们来聊聊消息推送这门"精细的艺术"。
引言
"打开率 3%,点击率 0.5%。"
这是某游戏项目组的周报数据。运营同事很困惑:明明推送内容精心打磨了,发送时间也选在用户活跃高峰,为什么效果这么差?
问题可能不在单条推送本身,而在整个推送体系的策略设计和技术支撑。
推送不是"发出去"就完事,它是一个从用户洞察到内容创作、从时机选择到效果追踪的完整闭环。今天我们就从运营和技术的双重视角,拆解这个"最后一公里"的技术系统。
一、消息推送的重要性
1.1 为什么推送如此重要?
在竞争激烈的应用市场中,推送消息是重新连接用户的最直接渠道。
数据显示,超过 60% 的用户在安装应用后的 7 天内会流失。而合理的推送策略,可以将这个比例降低 15-20%。对于游戏而言,一次有效的活动推送,可能带来的是数百万的流水增量。
推送的核心价值在于:
1.2 推送的"双刃剑"效应
推送虽好,但滥用会带来严重后果:
因此,推送系统的核心挑战在于:如何在"有效触达"和"避免打扰"之间找到平衡点。
二、推送技术原理深度解析
要真正理解推送系统,我们需要深入到底层,看看消息是如何从服务器到达用户设备的。这就像理解快递系统——不能只知道"寄出去-收得到",还要知道中间经过哪些中转站、有哪些环节可能出问题。
2.1 推送的本质:长连接与心跳
推送的核心问题是:服务器如何主动向客户端发送消息?
在传统的 HTTP 请求中,总是客户端主动发起请求,服务器被动响应。但推送需要反过来——服务器要有能力"主动联系"客户端。
解决方案有两种:
客户端每隔一段时间主动问服务器:"有消息给我吗?"
客户端: 有新消息吗?
服务器: 没有
(30秒后)
客户端: 有新消息吗?
服务器: 没有
(30秒后)
客户端: 有新消息吗?
服务器: 有!这是你要的推送内容
这就像你每隔半小时就给快递员打一次电话问"我的快递到了吗"。显然,这种方式效率极低:
- 大量无效请求,浪费带宽和服务器资源
- 实时性差,消息最多延迟 30 秒
- 客户端耗电严重
客户端与服务器建立一条持久的 TCP 连接,双方可以随时互相发送数据。
客户端: 我要建立连接
服务器: 好的,连接已建立
(连接保持打开状态...)
服务器: 有新消息,推给你!
客户端: 收到了
(连接继续保持...)
服务器: 又有新消息!
客户端: 收到了
这就像你给快递员留了个对讲机,他随时可以呼叫你。这就是现代推送系统的基础。
但这里有个问题:移动网络不稳定。客户端可能切换 WiFi、进入电梯信号变差、系统休眠断开连接……
为了保持连接"活着",双方需要定期发送心跳包(Heartbeat):
客户端: (心跳) 我还活着
服务器: (心跳响应) 我也在
(60秒后)
客户端: (心跳) 我还活着
服务器: (心跳响应) 我也在
心跳的频率很关键:太频繁浪费电,太稀疏连接可能被中间设备(如 NAT 路由器)判定为超时断开。通常移动端心跳间隔在 1-5 分钟。
2.2 iOS 推送机制:APNs
苹果的推送服务(Apple Push Notification service,简称 APNs)是全球最成熟的推送系统之一。它的工作原理非常巧妙:
[你的服务器] --> [APNs 服务器] --> [iPhone 设备] --> [你的 App]
^ |
<table>
<thead><tr>
</tr></thead><tbody>
</tbody></table>
(反馈服务) <------------------------------(用户操作)
- 苹果作为中转站:你的服务器不直接连用户设备,而是通过苹果的 APNs 服务器中转。这样做的好处是:
- 即使你的 App 没运行,苹果的系统进程也在监听推送 - 安全性高,每条推送都要用苹果颁发的证书签名
- 设备令牌(Device Token):每个设备上的每个 App,都有一个唯一的 Device Token。你的服务器需要先获取这个 Token,才能向该设备推送。
// iOS 客户端注册推送
func registerForPushNotifications() {
UNUserNotificationCenter.current().requestAuthorization(
options: [.alert, .sound, .badge]
) { granted, error in
if granted {
DispatchQueue.main.async {
UIApplication.shared.registerForRemoteNotifications()
}
}
}
}
// 获取 Device Token
func application(_ application: UIApplication,
didRegisterForRemoteNotificationsWithDeviceToken deviceToken: Data) {
let token = deviceToken.map { String(format: "%02.2hhx", $0) }.joined()
// 把这个 token 发送给你的服务器保存
sendTokenToServer(token: token)
}
- 证书与签名:APNs 要求每条推送都必须用有效的证书签名。这防止了恶意服务器冒充你发送推送。
// 服务器端发送 APNs 推送(Go 示例)
func SendAPNSPush(deviceToken string, title string, body string) error {
// 加载证书
cert, err := tls.LoadX509KeyPair("cert.pem", "key.pem")
if err != nil {
return err
}
// 构建 JSON Payload
payload := map[string]interface{}{
"aps": map[string]interface{}{
"alert": map[string]string{
"title": title,
"body": body,
},
"badge": 1,
"sound": "default",
},
}
payloadBytes, _ := json.Marshal(payload)
// 发送到 APNs(生产环境)
url := fmt.Sprintf("https://api.push.apple.com/3/device/%s", deviceToken)
req, _ := http.NewRequest("POST", url, bytes.NewReader(payloadBytes))
req.Header.Set("content-type", "application/json")
// 使用证书发送
tlsConfig := &tls.Config{Certificates: []tls.Certificate{cert}}
client := &http.Client{Transport: &http.Transport{TLSClientConfig: tlsConfig}}
_, err = client.Do(req)
return err
}
- 连接限制:APNs 对连接数有严格限制。你不能为每条推送都新建连接,必须维护一个连接池,复用已有连接。苹果官方建议每个服务器最多建立约 20 条并发连接。
- 协议:基于 HTTP/2,支持多路复用
- 端点:开发环境
api.sandbox.push.apple.com,生产环境api.push.apple.com - Payload 限制:最大 4KB(iOS 15 后支持 4KB)
- 优先级:可以设置推送优先级,高优先级立即发送,低优先级可以延迟
- 过期时间:可以设置推送的 TTL(Time To Live),过期后不再发送
2.3 Android 推送机制:碎片化的挑战
Android 的推送生态要复杂得多,因为:
- Google FCM 在国内不可用:Google 服务框架在国内大部分手机上不存在
- 各厂商有自己的推送服务:华为、小米、OPPO、vivo、魅族等都有自己的推送通道
- 系统级进程保活受限:Android 8.0 后,后台进程被严格限制
| 通道 | 覆盖范围 | 特点 | 到达率 |
|---|---|---|---|
| Google FCM | 海外 Android | Google 官方,系统级支持 | 90%+(海外) |
| 华为 HMS Push | 华为设备 | 系统级推送,无需 App 运行 | 95%+ |
| 小米 MiPush | 小米设备 | 支持所有 Android,但小米设备效果最好 | 90%+ |
| OPPO Push | OPPO 设备 | 系统级推送 | 85%+ |
| vivo Push | vivo 设备 | 系统级推送 | 85%+ |
| 自建长连接 | 所有设备 | 依赖 App 进程存活 | 30-70%(不稳定) |
// 推送服务统一接口
type PushService interface {
Push(ctx context.Context, req *PushRequest) (*PushResponse, error)
}
// 多通道推送管理器
type MultiChannelPushManager struct {
apns *APNSService // iOS
fcm *FCMService // Google
huawei *HuaweiPushService
xiaomi *XiaomiPushService
oppo *OPPOPushService
vivo *VIVOPushService
}
// 智能路由:根据设备信息选择最佳通道
func (m *MultiChannelPushManager) Push(ctx context.Context, req *PushRequest) (*PushResponse, error) {
// 根据设备信息选择通道
channel := m.selectChannel(req.DeviceInfo)
var err error
var resp *PushResponse
switch channel {
case "apns":
resp, err = m.apns.Push(ctx, req)
case "fcm":
resp, err = m.fcm.Push(ctx, req)
case "huawei":
resp, err = m.huawei.Push(ctx, req)
case "xiaomi":
resp, err = m.xiaomi.Push(ctx, req)
case "oppo":
resp, err = m.oppo.Push(ctx, req)
case "vivo":
resp, err = m.vivo.Push(ctx, req)
default:
// 降级到自建长连接(注:我们暂未实现自建长连接通道)
resp, err = m.selfConnection.Push(ctx, req)
}
// 如果主通道失败,尝试备用通道
if err != nil {
for _, backupChannel := range m.getBackupChannels(channel) {
resp, err = m.pushByChannel(ctx, backupChannel, req)
if err == nil {
break
}
}
}
return resp, err
}
// 设备注册:收集各通道的推送令牌
type DevicePushInfo struct {
DeviceID string
Platform string // "ios" / "android"
APNSToken string // iOS 设备令牌
FCMMToken string // Google FCM 令牌
HuaweiToken string // 华为推送令牌
XiaomiRegID string // 小米推送 RegID
OPPORegisterID string
VIVORegisterID string
}
// 小米推送示例
func (s *XiaomiPushService) Push(ctx context.Context, req *PushRequest) error {
// 小米推送 API
url := "https://api.xmpush.xiaomi.com/v3/message/regid"
// 构建消息
message := map[string]interface{}{
"registration_id": req.XiaomiRegID,
"title": req.Title,
"description": req.Body,
"notify_type": 1, // DEFAULT_ALL
"time_to_live": 86400000, // 24小时
"extra": map[string]string{
"callback": "https://your-server.com/push/callback",
},
}
body, _ := json.Marshal(message)
httpReq, _ := http.NewRequest("POST", url, bytes.NewReader(body))
httpReq.Header.Set("Authorization", "key="+s.AppSecret)
httpReq.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return err
}
defer resp.Body.Close()
// 解析响应
var result XiaomiPushResult
json.NewDecoder(resp.Body).Decode(&result)
if result.Code != 0 {
return fmt.Errorf("xiaomi push failed: %s", result.Reason)
}
return nil
}
2.4 短信推送:最传统但最可靠的通道
短信推送看似"落后",但在某些场景下仍然是不可替代的:
- 覆盖率 100%,只要有手机号就能收到
- 不依赖 App 是否安装
- 不受系统权限限制
- 打开率较高(15-40%)
- 成本较高(0.03-0.05 元/条)
- 内容受限(70 字以内的纯文本,或长短信)
- 容易被识别为骚扰短信
// 短信推送服务(对接阿里云短信)
type AliyunSMSService struct {
AccessKeyID string
AccessKeySecret string
SignName string // 短信签名,如"XX游戏"
}
func (s *AliyunSMSService) SendSMS(phone string, templateCode string, params map[string]string) error {
// 构建请求参数
args := map[string]string{
"PhoneNumbers": phone,
"SignName": s.SignName,
"TemplateCode": templateCode,
"TemplateParam": toJSON(params),
"OutId": generateUUID(), // 用于追踪
}
// 签名
signature := s.signRequest(args)
// 发送请求
url := "https://dysmsapi.aliyuncs.com/?Action=SendSms&" + toQueryString(args) + "&Signature=" + signature
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
// 解析响应
var result SMSResponse
json.NewDecoder(resp.Body).Decode(&result)
if result.Code != "OK" {
return fmt.Errorf("SMS send failed: %s", result.Message)
}
return nil
}
// 使用示例
func SendRecallSMS(phone string, userName string, days int) error {
sms := &AliyunSMSService{...}
// 使用模板发送
// 模板内容:"【XX游戏】亲爱的${username},您已经${days}天没来玩了,小精灵们都在想您呢!点击回来领取专属礼包:xxx"
return sms.SendSMS(phone, "SMS_123456", map[string]string{
"username": userName,
"days": strconv.Itoa(days),
})
}
- 内容审核:短信内容必须提前在运营商平台审核,敏感词汇会被拦截
- 频率限制:同一用户每天最多 3-5 条,避免被投诉
- 发送时间:避开深夜(22:00-08:00),这个时段发送的短信打开率极低且容易引起反感
- 智能降级:优先使用 App 推送,失败后再用短信
三、推送渠道对比
现代应用通常有多种推送渠道,各有优劣,需要根据场景选择。
3.1 站内信(应用内消息)
站内信是用户打开应用后才能看到的消息,通常以弹窗、通知栏、红点等形式呈现。
- 100% 的触达率(用户已打开应用)
- 可以展示丰富的内容形式(图文、动画、视频)
- 不受外部平台限制,完全自主可控
- 可以与游戏内功能深度联动
- 用户必须先打开应用才能看到
- 无法召回已流失的用户
- 内容过多会造成"信息过载"
- 重要公告、活动详情
- 功能引导、新手教程
- 系统通知、奖励发放
- 用户在应用内的行为触发消息
3.2 短信推送
短信是最传统的推送方式,但至今仍然有效。
- 覆盖率最广,不需要安装应用
- 打开率较高(约 20-40%)
- 可以触达已流失用户
- 不受网络限制(只要有信号)
- 成本较高(每条几分钱)
- 内容受限(通常只能纯文本,字数有限)
- 容易被识别为垃圾短信
- 用户体验较差,容易引起反感
- 重要账号安全通知
- 高价值用户的定向召回
- 验证码、登录提醒
- 紧急事件通知
3.3 App 推送(系统级通知)
App 推送是智能手机系统提供的消息推送能力,可以在应用未运行时触达用户。
- 触达能力强,用户无需打开应用
- 成本低廉(基本免费)
- 支持一定的富媒体内容
- 可以显示在锁屏、通知栏
- 用户可能关闭通知权限
- 受系统限制(iOS 和 Android 机制不同)
- 推送送达率难以 100% 保证
- 内容被系统截断的可能性
- 活动提醒、限时优惠
- 社交互动(好友请求、消息回复)
- 每日任务、签到提醒
- 版本更新、维护通知
3.4 渠道选择策略
不同场景下,应该选择不同的推送渠道:
| 场景 | 推荐渠道 | 原因 |
|---|---|---|
| 日常活动通知 | App 推送 | 成本低、触达广 |
| VIP 用户召回 | 短信 + App 推送 | 多重触达、体现重视 |
| 系统维护公告 | 站内信 + App 推送 | 确保用户知晓 |
| 账号安全警报 | 短信 + 站内信 | 紧急重要、确保触达 |
| 流失用户召回 | 短信 | 可能已卸载应用 |
最佳实践是多渠道协同:先用 App 推送触达活跃用户,再用短信触达无响应的高价值用户,站内信作为应用内信息的补充。
四、推送策略设计
推送不是技术问题,首先是策略问题。以下是一些核心的推送策略原则。
4.1 用户分层与精准推送
"千人千面"是推送的黄金法则。不同用户需要不同的推送内容:
- 新手用户:引导教程、新手福利、功能介绍
- 活跃用户:活动通知、社交互动、成就激励
- 流失风险用户:召回福利、情感化文案
- 已流失用户:重磅活动、限时福利
- 高价值用户:VIP 专属内容、优先体验
- 普通用户:通用活动、日常福利
- 低价值用户:转化激励、付费引导
- 社交型用户:好友动态、组队邀请
- 竞技型用户:排名变化、赛事通知
- 收集型用户:新物品上架、限时获取
精准推送的目的是:在正确的时间,把正确的内容,推给正确的人。
// 用户画像服务
type UserProfileService struct {
db *sql.DB
cache *redis.Client
}
// 用户标签
type UserTags struct {
Lifecycle string // "new", "active", "at_risk", "churned"
Value string // "high", "medium", "low"
Preference []string // ["social", "competitive", "collector"]
LastActiveTime time.Time
TotalPayment float64
}
// 获取用户标签
func (s *UserProfileService) GetUserTags(userID string) (*UserTags, error) {
// 先从缓存获取
cacheKey := "user:tags:" + userID
cached, err := s.cache.Get(cacheKey).Result()
if err == nil {
var tags UserTags
json.Unmarshal([]byte(cached), &tags)
return &tags, nil
}
// 从数据库查询
var tags UserTags
err = s.db.QueryRow(`
SELECT lifecycle, value_tier, preferences, last_active, total_payment
FROM user_profiles WHERE user_id = ?
`, userID).Scan(&tags.Lifecycle, &tags.Value, &tags.Preference, &tags.LastActiveTime, &tags.TotalPayment)
if err != nil {
return nil, err
}
// 缓存结果
tagsJSON, _ := json.Marshal(tags)
s.cache.Set(cacheKey, tagsJSON, time.Hour)
return &tags, nil
}
// 根据标签选择推送模板
func SelectPushTemplate(tags *UserTags, eventType string) string {
// 根据生命周期选择
switch tags.Lifecycle {
case "new":
return "newbie_" + eventType
case "active":
return "active_" + eventType
case "at_risk":
return "recall_" + eventType
case "churned":
return "churn_" + eventType
}
// 根据偏好选择
for _, pref := range tags.Preference {
if pref == "social" {
return "social_" + eventType
}
}
return "default_" + eventType
}
4.2 发送时机优化
推送时机对打开率影响巨大:
// 推送时机优化服务
type TimingOptimizer struct {
userActivityDB *UserActivityDB
}
// 用户活跃时段分析
type UserActiveHours struct {
Hours [24]int // 每个小时的活跃次数
Days [7]int // 每周每天的活跃次数
}
// 找出最佳推送时间
func (t *TimingOptimizer) GetBestPushTime(userID string) time.Time {
// 获取用户历史活跃数据
activeHours := t.userActivityDB.GetUserActiveHours(userID)
// 找出最活跃的时段
bestHour := 0
maxActivity := 0
for hour, count := range activeHours.Hours {
if count > maxActivity {
maxActivity = count
bestHour = hour
}
}
// 找出最活跃的日期
bestDay := 0
maxDayActivity := 0
for day, count := range activeHours.Days {
if count > maxDayActivity {
maxDayActivity = count
bestDay = day
}
}
// 计算下次最佳时间
now := time.Now()
targetTime := time.Date(now.Year(), now.Month(), now.Day(), bestHour, 0, 0, 0, now.Location())
// 调整到最近的该星期几
daysUntilBest := (bestDay - int(now.Weekday()) + 7) % 7
if daysUntilBest == 0 && now.Hour() >= bestHour {
daysUntilBest = 7 // 推到下周
}
targetTime = targetTime.AddDate(0, 0, daysUntilBest)
return targetTime
}
4.3 频率控制
推送频率是影响用户体验的关键因素:
// 推送频率控制器
type FrequencyController struct {
redis *redis.Client
}
// 检查是否可以发送推送
func (f *FrequencyController) CanSendPush(userID string, pushType string) (bool, time.Duration) {
ctx := context.Background()
// 检查每日上限
dailyKey := fmt.Sprintf("push:daily:%s:%s", userID, time.Now().Format("2006-01-02"))
dailyCount, _ := f.redis.Get(ctx, dailyKey).Int()
if dailyCount >= 3 { // 每天最多3条
return false, time.Until(time.Now().AddDate(0, 0, 1).Truncate(24 * time.Hour))
}
// 检查同类消息间隔
typeKey := fmt.Sprintf("push:type:%s:%s", userID, pushType)
lastSent, _ := f.redis.Get(ctx, typeKey).Int64()
if lastSent > 0 {
elapsed := time.Since(time.Unix(lastSent, 0))
if elapsed < 4 * time.Hour { // 同类消息至少间隔4小时
return false, 4*time.Hour - elapsed
}
}
// 检查用户是否长期不响应
responseKey := fmt.Sprintf("push:response:%s", userID)
noResponseDays, _ := f.redis.Get(ctx, responseKey).Int()
if noResponseDays >= 7 { // 7天不响应,降频到每3天1条
lastAnyPush, _ := f.redis.Get(ctx, "push:last:"+userID).Int64()
if time.Since(time.Unix(lastAnyPush, 0)) < 3*24*time.Hour {
return false, 3*24*time.Hour - time.Since(time.Unix(lastAnyPush, 0))
}
}
return true, 0
}
// 记录推送发送
func (f *FrequencyController) RecordPush(userID string, pushType string) {
ctx := context.Background()
now := time.Now()
// 更新每日计数
dailyKey := fmt.Sprintf("push:daily:%s:%s", userID, now.Format("2006-01-02"))
f.redis.Incr(ctx, dailyKey)
f.redis.Expire(ctx, dailyKey, 24*time.Hour)
// 更新同类消息时间
typeKey := fmt.Sprintf("push:type:%s:%s", userID, pushType)
f.redis.Set(ctx, typeKey, now.Unix(), 24*time.Hour)
// 更新最后推送时间
f.redis.Set(ctx, "push:last:"+userID, now.Unix(), 30*24*time.Hour)
}
// 记录用户响应
func (f *FrequencyController) RecordResponse(userID string) {
ctx := context.Background()
// 用户响应了,重置不响应计数
f.redis.Del(ctx, "push:response:"+userID)
}
// 记录用户不响应
func (f *FrequencyController) RecordNoResponse(userID string) {
ctx := context.Background()
f.redis.Incr(ctx, "push:response:"+userID)
f.redis.Expire(ctx, "push:response:"+userID, 30*24*time.Hour)
}
4.4 A/B 测试与迭代优化
推送策略不是一成不变的,需要持续测试和优化:
通过持续的 A/B 测试,逐步积累推送的"最佳实践",形成自己的推送策略知识库。
// A/B 测试服务
type ABTestService struct {
db *sql.DB
}
// 实验配置
type Experiment struct {
ID string
Name string
Variants []Variant
StartTime time.Time
EndTime time.Time
Metrics []string // 关注的指标
}
type Variant struct {
ID string
Name string
Traffic float64 // 流量占比,如 0.5 表示 50%
Config map[string]interface{} // 该变体的配置
}
// 分配用户到实验组
func (a *ABTestService) AssignVariant(userID string, experimentID string) string {
// 获取实验配置
var exp Experiment
// ... 从数据库加载
// 使用一致性哈希确保同一用户总是分配到同一组
hash := md5.Sum([]byte(userID + experimentID))
hashValue := float64(hash[0]) / 255.0
// 根据流量占比分配
cumulative := 0.0
for _, variant := range exp.Variants {
cumulative += variant.Traffic
if hashValue < cumulative {
return variant.ID
}
}
return exp.Variants[0].ID
}
// 记录实验指标
func (a *ABTestService) RecordMetric(experimentID string, variantID string, userID string, metric string, value float64) {
a.db.Exec(`
INSERT INTO experiment_metrics (experiment_id, variant_id, user_id, metric, value, created_at)
VALUES (?, ?, ?, ?, ?, ?)
`, experimentID, variantID, userID, metric, value, time.Now())
}
// 分析实验结果
func (a *ABTestService) AnalyzeExperiment(experimentID string) *ExperimentResult {
// 查询各组指标
rows, _ := a.db.Query(`
SELECT variant_id, metric, AVG(value), COUNT(*)
FROM experiment_metrics
WHERE experiment_id = ?
GROUP BY variant_id, metric
`, experimentID)
results := make(map[string]map[string]MetricStats)
for rows.Next() {
var variantID, metric string
var avgValue float64
var count int
rows.Scan(&variantID, &metric, &avgValue, &count)
if results[variantID] == nil {
results[variantID] = make(map[string]MetricStats)
}
results[variantID][metric] = MetricStats{
Average: avgValue,
Count: count,
}
}
// 计算显著性差异
// ... 使用 t-test 或 chi-square test
return &ExperimentResult{
ExperimentID: experimentID,
Variants: results,
}
}
五、技术架构
推送系统看起来简单,背后却需要一个复杂的技术架构支撑。
5.1 整体架构设计
一个完整的推送系统通常包含以下模块:
┌─────────────────────────────────────────────────────────────────┐
│ 业务系统 │
│ (活动系统、用户系统、社交系统、运营后台等) │
└────────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 推送 API 网关 │
│ - 统一接口 │
│ - 权限验证 │
│ - 流量控制 │
└────────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 消息队列 (Kafka) │
│ - 推送请求缓冲 │
│ - 削峰填谷 │
└────────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 推送处理服务集群 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 用户筛选 │ │ 规则匹配 │ │ 内容渲染 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└────────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 多通道推送网关 │
│ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ APNs │ │ FCM │ │ 华为 │ │ 小米 │ │ 短信 │ │
│ └────────┘ └────────┘ └────────┘ └────────┘ └────────┘ │
└────────────────────────────┬────────────────────────────────────┘
│
▼
┌────────────────┐
│ 用户设备 │
└────────────────┘
5.2 核心代码实现
// 推送服务主流程
type PushService struct {
queue *kafka.Consumer
userManager *UserProfileService
ruleEngine *RuleEngine
templateSvc *TemplateService
channelRouter *MultiChannelPushManager
tracker *TrackingService
}
// 推送请求
type PushRequest struct {
RequestID string
EventType string // 事件类型:activity_start, friend_invite, etc.
TargetUsers []string // 目标用户列表(或规则)
Content PushContent // 推送内容
Schedule *Schedule // 定时发送
Priority int // 优先级
Channels []string // 指定通道(可选)
}
// 处理推送请求
func (s *PushService) ProcessPushRequest(ctx context.Context, req *PushRequest) error {
// 1. 如果是定时推送,加入调度队列
if req.Schedule != nil && req.Schedule.SendAt.After(time.Now()) {
return s.schedulePush(req)
}
// 2. 用户筛选(如果是规则筛选)
targetUsers := req.TargetUsers
if len(req.TargetRules) > 0 {
users, err := s.ruleEngine.MatchUsers(req.TargetRules)
if err != nil {
return err
}
targetUsers = users
}
// 3. 并发处理每个用户
var wg sync.WaitGroup
sem := make(chan struct{}, 100) // 并发控制
for _, userID := range targetUsers {
wg.Add(1)
go func(uid string) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
// 获取用户信息
userTags, _ := s.userManager.GetUserTags(uid)
deviceInfo, _ := s.userManager.GetDeviceInfo(uid)
// 频率控制检查
if canSend, _ := s.frequencyCtrl.CanSendPush(uid, req.EventType); !canSend {
s.tracker.RecordSkip(uid, req.RequestID, "frequency_limit")
return
}
// 渲染推送内容(模板 + 变量)
content := s.templateSvc.Render(req.Content.TemplateID, map[string]interface{}{
"username": uid,
"tags": userTags,
})
// 选择推送通道
channel := s.channelRouter.SelectChannel(deviceInfo, req.Channels)
// 发送推送
resp, err := s.channelRouter.Push(ctx, channel, &ChannelPushRequest{
DeviceInfo: deviceInfo,
Content: content,
})
if err != nil {
s.tracker.RecordFailure(uid, req.RequestID, err.Error())
} else {
s.tracker.RecordSuccess(uid, req.RequestID, resp.MessageID)
s.frequencyCtrl.RecordPush(uid, req.EventType)
}
}(userID)
}
wg.Wait()
return nil
}
5.3 高并发场景处理
大型活动时,可能需要同时向数百万用户发送推送:
// 批量推送处理器
type BatchPushProcessor struct {
batchSize int // 每批数量
batchTimeout time.Duration // 批次超时
pushService *PushService
}
func (p *BatchPushProcessor) ProcessBatch(users []string, content PushContent) error {
// 分批处理
for i := 0; i < len(users); i += p.batchSize {
end := i + p.batchSize
if end > len(users) {
end = len(users)
}
batch := users[i:end]
// 并发处理这一批
var wg sync.WaitGroup
for _, userID := range batch {
wg.Add(1)
go func(uid string) {
defer wg.Done()
p.pushService.SendToUser(uid, content)
}(userID)
}
wg.Wait()
// 批次间延迟,避免瞬时压力
if end < len(users) {
time.Sleep(100 * time.Millisecond)
}
}
return nil
}
六、可靠性保障机制
推送系统的可靠性至关重要。用户收不到关键通知(如账号安全警报)可能导致严重后果。下面从多个维度介绍如何保障推送的可靠性。
6.1 送达保障
- 设备离线
- 网络问题
- 通道故障
- Token 过期
// 多通道推送,确保送达
func (m *MultiChannelPushManager) PushWithFallback(ctx context.Context, req *PushRequest) error {
// 按优先级尝试各个通道
channels := m.getChannelPriority(req.DeviceInfo)
var lastErr error
for _, channel := range channels {
resp, err := m.pushByChannel(ctx, channel, req)
if err == nil {
// 发送成功,记录使用的通道
m.recordSuccess(req.UserID, channel, resp.MessageID)
return nil
}
lastErr = err
m.recordFailure(req.UserID, channel, err.Error())
// 如果是永久性错误(如 Token 无效),不再尝试其他通道
if isPermanentError(err) {
break
}
}
// 所有通道都失败,降级到短信(如果用户价值足够高)
if req.UserValue == "high" && req.DeviceInfo.Phone != "" {
return m.smsService.SendSMS(req.DeviceInfo.Phone, req.Content.ShortText())
}
return lastErr
}
当用户设备离线时,消息需要缓存起来,等用户上线后再推送:
// 离线消息存储
type OfflineMessageStore struct {
redis *redis.Client
}
// 存储离线消息
func (s *OfflineMessageStore) Store(userID string, msg *PushMessage) error {
key := fmt.Sprintf("offline:msg:%s", userID)
// 序列化消息
msgBytes, _ := json.Marshal(msg)
// 添加到列表
s.redis.LPush(context.Background(), key, msgBytes)
// 限制数量,避免占用过多内存
s.redis.LTrim(context.Background(), key, 0, 99) // 最多保留100条
// 设置过期时间(7天)
s.redis.Expire(context.Background(), key, 7*24*time.Hour)
return nil
}
// 用户上线时获取离线消息
func (s *OfflineMessageStore) GetPendingMessages(userID string) ([]*PushMessage, error) {
key := fmt.Sprintf("offline:msg:%s", userID)
// 获取所有消息
results, err := s.redis.LRange(context.Background(), key, 0, -1).Result()
if err != nil {
return nil, err
}
messages := make([]*PushMessage, 0, len(results))
for _, result := range results {
var msg PushMessage
json.Unmarshal([]byte(result), &msg)
messages = append(messages, &msg)
}
// 清空已读取的消息
s.redis.Del(context.Background(), key)
return messages, nil
}
// 推送状态追踪
type PushTracker struct {
db *sql.DB
}
// 记录推送状态
type PushStatus struct {
MessageID string
UserID string
Channel string
Status string // "sent", "delivered", "clicked", "failed"
SentAt time.Time
DeliveredAt *time.Time
ClickedAt *time.Time
ErrorMsg string
}
// 更新推送状态(通过回调)
func (t *PushTracker) UpdateStatus(messageID string, status string, errorMsg string) error {
now := time.Now()
query := `
UPDATE push_status
SET status = ?,
delivered_at = CASE WHEN ? = 'delivered' THEN ? ELSE delivered_at END,
clicked_at = CASE WHEN ? = 'clicked' THEN ? ELSE clicked_at END,
error_msg = ?
WHERE message_id = ?
`
_, err := t.db.Exec(query, status, status, now, status, now, errorMsg, messageID)
return err
}
// 重试未送达的推送
func (t *PushTracker) RetryUndelivered() {
// 查找 30 分钟前发送但仍未送达的推送
rows, _ := t.db.Query(`
SELECT message_id, user_id, content
FROM push_status
WHERE status = 'sent'
AND sent_at < ?
AND retry_count < 3
`, time.Now().Add(-30*time.Minute))
for rows.Next() {
var messageID, userID, content string
rows.Scan(&messageID, &userID, &content)
// 重新发送
// ...
// 更新重试次数
t.db.Exec("UPDATE push_status SET retry_count = retry_count + 1 WHERE message_id = ?", messageID)
}
}
6.2 幂等性保障
推送请求可能因为网络问题重复发送,必须保证幂等性——同一请求多次发送,用户只收到一条推送。
// 幂等性检查
type IdempotencyChecker struct {
redis *redis.Client
}
// 检查并标记请求
func (i *IdempotencyChecker) CheckAndMark(requestID string) (bool, error) {
key := fmt.Sprintf("push:idempotent:%s", requestID)
// 使用 SETNX 保证原子性
ok, err := i.redis.SetNX(context.Background(), key, "1", 24*time.Hour).Result()
if err != nil {
return false, err
}
// ok=true 表示这是第一次处理,可以继续
// ok=false 表示已经处理过,应该跳过
return ok, nil
}
// 在推送处理流程中使用
func (s *PushService) ProcessPushRequest(req *PushRequest) error {
// 幂等性检查
isFirst, err := s.idempotency.CheckAndMark(req.RequestID)
if err != nil {
return err
}
if !isFirst {
// 已经处理过,跳过
return nil
}
// 继续处理推送...
return s.doProcessPush(req)
}
6.3 监控与告警
| 指标 | 含义 | 告警阈值 |
|---|---|---|
| 推送请求量 | 每分钟请求的推送数量 | 异常突增 |
| 成功率 | 成功发送/总请求 | < 95% |
| 平均延迟 | 从请求到发送的时间 | > 5s |
| 通道成功率 | 各通道的独立成功率 | < 90% |
| Token 有效率 | 有效 Token/总 Token | < 80% |
// 推送监控
type PushMonitor struct {
prometheus *prometheus.Registry
}
var (
pushTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "push_requests_total",
Help: "Total number of push requests",
},
[]string{"channel", "status"},
)
pushLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "push_latency_seconds",
Help: "Push request latency",
Buckets: []float64{0.1, 0.5, 1, 2, 5, 10},
},
[]string{"channel"},
)
)
// 记录推送指标
func (m *PushMonitor) RecordPush(channel string, status string, latency float64) {
pushTotal.WithLabelValues(channel, status).Inc()
pushLatency.WithLabelValues(channel).Observe(latency)
}
// 告警规则(Prometheus 配置示例)
/*
groups:
- name: push_alerts
rules:
- alert: PushSuccessRateLow
expr: rate(push_requests_total{status="success"}[5m]) / rate(push_requests_total[5m]) < 0.95
for: 5m
labels:
severity: critical
annotations:
summary: "Push success rate below 95%"
- alert: PushLatencyHigh
expr: histogram_quantile(0.95, rate(push_latency_seconds_bucket[5m])) > 5
for: 5m
labels:
severity: warning
annotations:
summary: "Push latency P95 above 5 seconds"
*/
6.4 灾备方案
当推送系统完全不可用时(如机房故障),需要有灾备方案:
┌─────────────┐ ┌─────────────┐
│ 机房 A │ │ 机房 B │
│ (主) │ │ (备) │
│ 推送服务 │────▶│ 推送服务 │
│ 数据库 │◀────│ 数据库副本 │
└─────────────┘ └─────────────┘
// 系统降级管理器
type DegradationManager struct {
status string // "normal", "degraded", "emergency"
}
func (d *DegradationManager) ProcessPush(req *PushRequest) error {
switch d.status {
case "normal":
// 正常处理,所有功能可用
return d.fullProcess(req)
case "degraded":
// 降级模式:只处理高优先级推送
if req.Priority >= PriorityHigh {
return d.simplifiedProcess(req)
}
return ErrSystemDegraded
case "emergency":
// 紧急模式:只处理安全相关推送
if req.EventType == "security_alert" {
return d.emergencyProcess(req)
}
return ErrSystemEmergency
}
return nil
}
七、推送效果分析
推送发出后,工作并没有结束。效果分析是优化推送策略的关键。
7.1 核心指标定义
// 推送效果分析
type PushAnalytics struct {
db *sql.DB
}
// 计算核心指标
func (a *PushAnalytics) CalculateMetrics(campaignID string, period time.Duration) *Metrics {
startTime := time.Now().Add(-period)
var sent, delivered, clicked, converted int64
// 查询发送数
a.db.QueryRow(`
SELECT COUNT(*) FROM push_status
WHERE campaign_id = ? AND sent_at >= ?
`, campaignID, startTime).Scan(&sent)
// 查询送达数
a.db.QueryRow(`
SELECT COUNT(*) FROM push_status
WHERE campaign_id = ? AND delivered_at >= ?
`, campaignID, startTime).Scan(&delivered)
// 查询点击数
a.db.QueryRow(`
SELECT COUNT(*) FROM push_status
WHERE campaign_id = ? AND clicked_at >= ?
`, campaignID, startTime).Scan(&clicked)
// 查询转化数(如充值、下单等)
a.db.QueryRow(`
SELECT COUNT(DISTINCT user_id) FROM user_actions
WHERE campaign_id = ? AND action_type = 'conversion' AND created_at >= ?
`, campaignID, startTime).Scan(&converted)
return &Metrics{
SentRate: float64(delivered) / float64(sent) * 100,
DeliveryRate: float64(delivered) / float64(sent) * 100,
OpenRate: float64(clicked) / float64(delivered) * 100,
ConversionRate: float64(converted) / float64(clicked) * 100,
}
}
7.2 数据分析维度
7.3 归因分析
用户打开应用后完成了某个行为(比如充值),这个行为是否由推送触发?
7.4 持续优化闭环
效果分析的最终目的是指导优化:
八、最佳实践与案例
8.1 某游戏的推送实践
- 推送内容单一,全是活动通知
- 发送时间固定,不考虑用户习惯
- 频率过高,每天 5-8 条
- 没有 A/B 测试,凭感觉发
// 根据用户状态选择推送内容
func getPushContent(user *User) string {
daysSinceLastLogin := time.Since(user.LastLoginAt).Hours() / 24
switch {
case user.IsNew && daysSinceLastLogin < 3:
// 新手:引导推送
return "您的新手礼包还没领完呢,快来领取!"
case daysSinceLastLogin >= 7:
// 流失用户:召回推送
return "您的公会正在召唤您!回归即送SSR卡牌一张!"
case user.TotalPayment > 1000:
// 高价值用户:VIP专属
return "尊贵的VIP玩家,专属活动已开启,限时福利等你来拿!"
case user.Preference == "social":
// 社交型:好友互动
return "您的好友小明刚刚突破了第50关,来挑战他吧!"
default:
// 默认:通用活动
return "限时活动开启,稀有道具等你来抽!"
}
}
// 根据用户历史行为选择最佳推送时间
func getBestPushTime(userID string) int {
// 查询用户过去7天的活跃时段
rows, _ := db.Query(`
SELECT HOUR(created_at) as hour, COUNT(*) as cnt
FROM user_sessions
WHERE user_id = ? AND created_at > DATE_SUB(NOW(), INTERVAL 7 DAY)
GROUP BY hour
ORDER BY cnt DESC
LIMIT 1
`, userID)
var bestHour int
if rows.Next() {
rows.Scan(&bestHour)
} else {
bestHour = 20 // 默认晚上8点
}
return bestHour
}
- 每日上限:3 条
- 同类消息间隔:6 小时
- 长期不响应降频:7 天不点开,降为每 2 天 1 条
- 次日留存:30% → 38%
- 7 日留存:15% → 22%
- 推送打开率:3% → 12%
- 用户投诉率:下降 60%
8.2 推送内容最佳实践
❌ "活动开始了,快来参加!" ✅ "亲爱的玩家,您收藏的限定皮肤'暗夜使者'现在打折了,仅限24小时!"
❌ "您已7天未登录" ✅ "您的小精灵已经饿了7天了,它想您了!回来喂喂它吧~"
❌ "新版本已发布" ✅ "新版本重磅更新!前10000名更新玩家可获得限定头像框!"
❌ "您的好友上线了" ✅ "您的好友小明刚刚击败了最终Boss,来挑战他的记录吧!"
九、总结
消息推送,看似简单,实则是一门需要技术、数据、心理学多学科融合的艺术。
最后,也是最重要的:推送的本质是与用户建立连接,而不是打扰用户。每一条推送发送前,都问问自己:这条推送对用户有价值吗?如果答案是"不确定",那就不要发。
毕竟,用户的注意力是最稀缺的资源,浪费它是一种罪过。
💬 评论 (0)