长连接网关:实时通信的秘密
系列七:基础设施篇 | 第1篇
当你打开微信,朋友的消息瞬间出现在屏幕上;当你在股票App里,K线图实时跳动;当你在玩游戏,队友的操作毫秒级同步——这一切的背后,都藏着一个默默工作的技术:长连接网关。
今天,我们来揭开实时通信的秘密。
一、长连接 vs 短连接:两种对话方式
想象你和一个朋友聊天。
为什么短连接不够用?
- 延迟高:每次建立TCP连接需要三次握手,加上TLS握手,光是"拨号"就要几百毫秒
- 资源浪费:每次请求都要携带完整的HTTP头部,可能几百字节的数据只为了传几个字的业务内容
- 无法主动推送:服务器有消息时,只能等客户端来问(轮询),效率极低
长连接的代价
当然,长连接不是万能的:
- 服务器压力大:每个连接都需要维护状态,占用内存和文件描述符
- 复杂度高:需要处理心跳、重连、负载均衡等复杂问题
- 运维成本:连接数监控、异常检测、容量规划都更复杂
HTTP长轮询 vs WebSocket:一场不对等的较量
在WebSocket普及之前,很多实时功能是用HTTP长轮询实现的。它的原理是:客户端发起请求,服务器如果有数据就返回,没有就"挂着"等,直到有数据或超时。
客户端: "有新消息吗?"
服务器: (等待...等待...)"有了!这是消息!"
客户端: (收到后立即发下一个请求)"还有吗?"
- 资源浪费:每次请求都有HTTP头部开销,即使没有数据也要反复建立连接
- 服务器压力:大量请求"挂起",占用线程或连接池
- 不是真正的实时:消息到达时,可能正好在两次请求的间隙
来个直观的对比:
| 特性 | HTTP长轮询 | WebSocket |
|---|---|---|
| 连接复用 | 每次请求都新建 | 一次连接,持续使用 |
| 头部开销 | 每次几百字节 | 仅2-14字节 |
| 服务端推送 | 需要客户端先问 | 随时可推 |
| 实时性 | 秒级 | 毫秒级 |
| 实现复杂度 | 低 | 中 |
| 浏览器支持 | 全部 | 现代浏览器 |
二、WebSocket协议原理:让连接"活"起来
HTTP设计之初是为了传输文档,不是实时通信。WebSocket的出现,正是为了弥补这个短板。
从HTTP到WebSocket:一次优雅的"变身"
WebSocket的建立过程非常巧妙,叫做握手。这个过程只发生一次,之后就是纯粹的TCP数据传输。
握手过程详解
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Origin: https://example.com
关键信息解读:
Upgrade: websocket—— 告诉服务器:"我想升级协议"Connection: Upgrade—— 确认这是个升级请求Sec-WebSocket-Key—— 一个随机的Base64编码值,用于安全验证Sec-WebSocket-Version: 13—— WebSocket协议版本
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
那个 Sec-WebSocket-Accept 是怎么来的?这是协议设计的一个小聪明:
Sec-WebSocket-Accept = base64(sha1(Sec-WebSocket-Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"))
这个魔法字符串是WebSocket协议规定的固定值。服务器通过计算这个值,证明自己"真的懂WebSocket",而不是随便返回一个101。
从这一刻起,这个TCP连接就不再是HTTP了,双方都可以随时发消息,不再需要请求-响应的模式。
帧结构:数据的"集装箱"
WebSocket通信的基本单位是"帧"(Frame)。理解帧结构,是理解WebSocket性能优势的关键。
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
<table>
<thead><tr>
<th>F</th>
<th>R</th>
<th>R</th>
<th>R</th>
<th>opcode</th>
<th>M</th>
<th>Payload len</th>
<th>Extended payload length</th>
</tr></thead><tbody>
<thead><tr>
<th>I</th>
<th>S</th>
<th>S</th>
<th>S</th>
<th>(4)</th>
<th>A</th>
<th>(7)</th>
<th>(16/64)</th>
</tr></thead><tbody>
<thead><tr>
<th>N</th>
<th>V</th>
<th>V</th>
<th>V</th>
<th></th>
<th>S</th>
<th></th>
<th>(if payload len==126/127)</th>
</tr></thead><tbody>
<thead><tr>
<th></th>
<th>1</th>
<th>2</th>
<th>3</th>
<th></th>
<th>K</th>
<th></th>
<th></th>
</tr></thead><tbody>
</tbody></table>
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
<table>
<thead><tr>
<th>Extended payload length continued, if payload len == 127</th>
</tr></thead><tbody>
</tbody></table>
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
<table>
<thead><tr>
<th></th>
<th>Masking-key, if MASK set to 1</th>
</tr></thead><tbody>
</tbody></table>
+-------------------------------+-------------------------------+
<table>
<thead><tr>
<th>Masking-key (continued)</th>
<th>Payload Data</th>
</tr></thead><tbody>
</tbody></table>
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
<table>
<thead><tr>
<th>Payload Data continued ...</th>
</tr></thead><tbody>
</tbody></table>
+---------------------------------------------------------------+
别被这个图吓到,我来拆解:
- FIN(1位):是否是最后一帧。大数据可能分成多帧传输。
- Opcode(4位):帧类型
0x1 文本帧
- 0x2 二进制帧 - 0x8 关闭连接 - 0x9 心跳Ping - 0xA 心跳Pong
- MASK(1位):是否掩码。客户端发给服务器的必须掩码!
- Payload Length(7位/7+16位/7+64位):数据长度,变长编码
这是为了防止一种叫"缓存污染攻击"的安全问题。恶意网页可能让浏览器向某个服务器发送看起来像HTTP请求的WebSocket数据,如果中间有代理缓存,可能被污染。掩码后,数据看起来就是随机的。
实际的握手代码示例
const ws = new WebSocket('wss://example.com/chat');
ws.onopen = () => {
console.log('连接建立成功');
ws.send('Hello Server!');
};
ws.onmessage = (event) => {
console.log('收到消息:', event.data);
};
ws.onerror = (error) => {
console.error('连接错误:', error);
};
ws.onclose = (event) => {
console.log('连接关闭:', event.code, event.reason);
};
import (
"github.com/gorilla/websocket"
"net/http"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true // 生产环境要校验Origin
},
}
func handleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("升级失败:", err)
return
}
defer conn.Close()
for {
messageType, message, err := conn.ReadMessage()
if err != nil {
log.Println("读取失败:", err)
break
}
log.Printf("收到消息: %s", message)
// 回显
err = conn.WriteMessage(messageType, message)
if err != nil {
log.Println("发送失败:", err)
break
}
}
}
为什么选择WebSocket?
- 全双工:客户端和服务器可以同时发消息,不需要轮流
- 轻量级:每帧只有2-14字节的额外开销,比HTTP头部小得多
- 兼容性好:几乎所有现代浏览器都支持
- 穿透性强:使用80/443端口,能穿过大多数防火墙
三、长连接网关的架构设计:处理百万连接的秘密
单台服务器能撑多少连接?理论上是65535个(端口号上限),但实际受限于内存、CPU、文件描述符等资源。要支撑百万级并发,必须设计合理的架构。
架构图描述
想象一张架构图:
┌─────────────────────────────────────────┐
│ 客户端层 │
│ [手机App] [Web浏览器] [桌面客户端] │
└────────────────┬────────────────────────┘
│ WebSocket连接
▼
┌────────────────────────────────────────────────────────────────────┐
│ 负载均衡层 (LB) │
│ 功能:SSL卸载、连接分发、健康检查、会话保持 │
│ [Nginx] [HAProxy] [云厂商LB] │
└────────────────────────────────┬───────────────────────────────────┘
│
┌────────────────────────┼────────────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Gateway-1 │ │ Gateway-2 │ │ Gateway-N │
│ 接入层 │ │ 接入层 │ │ 接入层 │
│ │ │ │ │ │
│ - 连接管理 │ │ - 连接管理 │ │ - 连接管理 │
│ - 协议解析 │ │ - 协议解析 │ │ - 协议解析 │
│ - 心跳检测 │ │ - 心跳检测 │ │ - 心跳检测 │
│ - 消息转发 │ │ - 消息转发 │ │ - 消息转发 │
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │
└────────────────────────┼────────────────────────┘
│ 内部RPC/消息队列
▼
┌─────────────────────────────────────────┐
│ 路由层 (Router) │
│ │
│ ┌─────────────────────────────────┐ │
│ │ Redis集群:用户-连接映射表 │ │
│ │ 用户A → Gateway-1:ConnID123 │ │
│ │ 用户B → Gateway-2:ConnID456 │ │
│ └─────────────────────────────────┘ │
│ │
│ 功能:连接定位、会话管理、消息路由 │
└────────────────┬────────────────────────┘
│
┌────────────────────────────┼────────────────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Logic-1 │ │ Logic-2 │ │ Logic-N │
│ 业务层 │ │ 业务层 │ │ 业务层 │
│ │ │ │ │ │
│ - 聊天服务 │ │ - 推送服务 │ │ - 其他业务 │
│ - 好友服务 │ │ - 通知服务 │ │ │
└───────────────┘ └───────────────┘ └───────────────┘
三层架构详解
接入层是整个系统的"门面",负责维持与客户端的长连接。
核心职责:
- 连接管理:维护连接的生命周期(建立、保活、关闭)
- 协议解析:处理WebSocket、TCP、MQTT等多种协议
- 心跳检测:及时发现死连接,释放资源
- 消息转发:将上行消息转发给业务层,将下行消息推给客户端
设计要点:
// 每个连接的结构
type Connection struct {
ConnID string // 连接唯一标识
UserID string // 绑定的用户ID
DeviceID string // 设备标识
Conn *websocket.Conn // 底层连接
LastActive time.Time // 最后活跃时间
SendChan chan []byte // 发送队列
Done chan struct{} // 关闭信号
}
// 连接管理器
type ConnManager struct {
connections sync.Map // 并发安全的连接表
userConns map[string][]*Connection // 用户->连接列表
mu sync.RWMutex
}
路由层知道每个用户的连接在哪个Gateway上,是消息推送的关键。
核心数据结构:
Redis存储结构:
Key: user:{userID}:connections
Value: {
"gateway_id": "gateway-1",
"conn_id": "conn-12345",
"device": "iphone",
"last_active": 1709012345
}
为什么用Redis?
- 读写极快(微秒级)
- 支持过期时间(自动清理僵尸连接)
- 支持发布订阅(Gateway下线时广播通知)
业务层处理具体业务逻辑,不直接与客户端通信。
工作流程:
- 接收Gateway转发的消息
- 处理业务逻辑(存数据库、调用其他服务)
- 需要推送时,查询Router找到目标Gateway
- 通过内部RPC或消息队列,通知Gateway推送
关键设计原则
高并发优化技巧
单机要支持百万连接,需要调优:
# 文件描述符限制
ulimit -n 1000000
# 内核参数调优
echo "net.ipv4.tcp_wmem = 4096 87380 4194304" >> /etc/sysctl.conf
echo "net.ipv4.tcp_rmem = 4096 87380 4194304" >> /etc/sysctl.conf
echo "net.core.somaxconn = 65535" >> /etc/sysctl.conf
echo "net.ipv4.tcp_max_syn_backlog = 65535" >> /etc/sysctl.conf
sysctl -p
每个连接都占用内存,要精打细算:
- 使用对象池复用连接对象
- 读写缓冲区按需分配
- 避免频繁创建goroutine(Go)
// 不要这样:每个消息一个goroutine
go handle message()
// 要这样:使用worker池
type WorkerPool struct {
taskChan chan Task
workers int
}
func (p *WorkerPool) Start() {
for i := 0; i < p.workers; i++ {
go p.worker()
}
}
func (p *WorkerPool) worker() {
for task := range p.taskChan {
task.Execute()
}
}
四、心跳与断线重连:让连接"不死"
网络是不可靠的。Wi-Fi断了、4G切3G、手机进入省电模式……连接随时可能断掉。如何快速发现问题、快速恢复?这就是心跳机制的职责。
心跳:网络的"脉搏"
心跳的原理很简单:双方约定一个间隔(比如30秒),如果一方在这个时间内没收到对方的消息,就主动发一个"我还活着"的包。如果连续多次没收到心跳响应,就认为连接已断开。
心跳机制的设计考量
| 场景 | 推荐间隔 | 原因 |
|---|---|---|
| 游戏 | 5-10秒 | 需要快速感知断线 |
| IM聊天 | 30-60秒 | 平衡实时性和省电 |
| 推送通知 | 3-5分钟 | 大部分时候空闲 |
两种方案:
- 客户端发起:简单,但客户端需要保持活跃
- 双向心跳:更可靠,但实现复杂
推荐:客户端发起 + 服务端检测超时
不够!TCP Keepalive默认2小时才检测一次,而且无法穿透NAT。应用层必须自己实现心跳。
# TCP Keepalive参数
net.ipv4.tcp_keepalive_time = 7200 # 2小时
net.ipv4.tcp_keepalive_intvl = 75 # 检测间隔
net.ipv4.tcp_keepalive_probes = 9 # 检测次数
心跳实现代码
class WebSocketClient {
constructor(url) {
this.url = url;
this.heartbeatInterval = 30000; // 30秒
this.heartbeatTimer = null;
this.missedHeartbeats = 0;
this.maxMissedHeartbeats = 3;
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('连接成功');
this.startHeartbeat();
};
this.ws.onmessage = (event) => {
if (event.data === 'pong') {
this.missedHeartbeats = 0;
return;
}
// 处理业务消息
};
this.ws.onclose = () => {
this.stopHeartbeat();
this.reconnect();
};
}
startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.missedHeartbeats++;
if (this.missedHeartbeats > this.maxMissedHeartbeats) {
console.log('心跳超时,重连');
this.ws.close();
return;
}
this.ws.send('ping');
}
}, this.heartbeatInterval);
}
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
}
type Connection struct {
conn *websocket.Conn
lastActive time.Time
mu sync.Mutex
}
func (c *Connection) heartbeatMonitor() {
ticker := time.NewTicker(10 * time.Second)
timeout := 60 * time.Second // 60秒无活动则断开
for {
select {
case <-ticker.C:
c.mu.Lock()
if time.Since(c.lastActive) > timeout {
c.conn.Close()
c.mu.Unlock()
return
}
c.mu.Unlock()
}
}
}
func (c *Connection) handleMessage(message string) {
c.mu.Lock()
c.lastActive = time.Now()
c.mu.Unlock()
if message == "ping" {
c.conn.WriteMessage(websocket.TextMessage, []byte("pong"))
}
}
智能心跳:自适应网络状况
高级的心跳机制可以根据网络状况动态调整:
class SmartHeartbeat {
constructor() {
this.baseInterval = 30000; // 基础30秒
this.minInterval = 10000; // 最小10秒
this.maxInterval = 120000; // 最大2分钟
this.currentInterval = this.baseInterval;
this.networkQuality = 'good'; // good/medium/poor
}
adjustInterval(rtt) {
// 根据往返时延调整
if (rtt < 100) {
this.networkQuality = 'good';
this.currentInterval = Math.min(this.currentInterval * 1.1, this.maxInterval);
} else if (rtt > 500) {
this.networkQuality = 'poor';
this.currentInterval = Math.max(this.currentInterval * 0.8, this.minInterval);
} else {
this.networkQuality = 'medium';
}
}
}
断线重连:快速恢复
当检测到连接断开后,客户端需要自动重连。这个过程要精心设计。
1. 退避策略
// ❌ 错误:可能导致服务器雪崩
ws.onclose = () => {
setTimeout(() => connect(), 100); // 立即重连
};
如果服务器刚重启,百万客户端同时重连会直接把服务器打挂。
class ReconnectPolicy {
constructor() {
this.baseDelay = 1000; // 初始1秒
this.maxDelay = 30000; // 最大30秒
this.attempts = 0;
}
getNextDelay() {
// 指数退避
delay = Math.min(this.baseDelay * Math.pow(2, this.attempts), this.maxDelay);
// 加随机抖动(防止多客户端同步重连)
jitter = delay * 0.2 * Math.random();
this.attempts++;
return delay + jitter;
}
reset() {
this.attempts = 0;
}
}
重连时间线:1秒 → 2秒 → 4秒 → 8秒 → 16秒 → 30秒(封顶)
2. 断点续传
重连后,怎么收到断开期间错过的消息?
服务端为每个用户维护一个消息队列:
┌─────────────────────────────────────┐
│ 用户A的消息队列(最多保留100条) │
├─────────────────────────────────────┤
│ [1001] "你好" 时间:10:01 │
│ [1002] "在吗?" 时间:10:02 │
│ [1003] "下午开会" 时间:10:05 │
└─────────────────────────────────────┘
客户端重连时:
Client: "我上次收到消息1001,之后有什么?"
Server: "你有2条新消息:[1002], [1003]"
class MessageReceiver {
constructor() {
this.lastReceivedId = 0;
this.pendingMessages = new Map();
}
onMessage(message) {
if (message.id <= this.lastReceivedId) {
// 重复消息,忽略
return;
}
// 处理消息
this.processMessage(message);
// 发送确认
this.sendAck(message.id);
this.lastReceivedId = message.id;
}
onReconnect() {
// 重连后请求丢失的消息
this.send({
type: 'sync',
lastReceivedId: this.lastReceivedId
});
}
}
3. 会话恢复
重连后,登录状态还在吗?
class SessionManager {
constructor() {
this.token = localStorage.getItem('session_token');
}
onReconnect(ws) {
if (this.token) {
// 用保存的token快速恢复会话
ws.send({
type: 'resume',
token: this.token
});
} else {
// 需要重新登录
this.login();
}
}
}
服务端处理:对付僵尸连接
常见原因:
- 客户端进程被杀死
- 网络设备中间断开
- NAT映射过期
服务端对策:
type ConnManager struct {
connections sync.Map
checkInterval time.Duration
maxIdleTime time.Duration
}
func (m *ConnManager) StartCleanup() {
ticker := time.NewTicker(m.checkInterval)
for range ticker.C {
m.cleanDeadConnections()
}
}
func (m *ConnManager) cleanDeadConnections() {
m.connections.Range(func(key, value interface{}) bool {
conn := value.(*Connection)
// 检查:超过maxIdleTime没有心跳
if time.Since(conn.LastHeartbeat) > m.maxIdleTime {
log.Printf("清理僵尸连接: %s", conn.ConnID)
conn.Close()
m.connections.Delete(key)
}
return true
})
}
五、消息推送机制:从"拉"到"推"
传统的Web应用是"拉模式":客户端主动请求数据。实时应用需要"推模式":服务器主动推送。
推送的核心问题
1. 如何找到目标?
用户可能连接在任何一台Gateway上。
1. 业务层需要给用户A推送消息
2. 查询Router:用户A在哪?
3. Router返回:用户A在Gateway-2,连接ID=conn-123
4. 业务层通过消息队列/RPC通知Gateway-2
5. Gateway-2找到conn-123,推送消息
// 推送服务
type PushService struct {
routerClient *RouterClient
mq *MessageQueue
}
func (s *PushService) PushToUser(userID string, message *Message) error {
// 1. 查询用户连接
conns, err := s.routerClient.GetUserConnections(userID)
if err != nil {
return err
}
if len(conns) == 0 {
// 用户离线,存入离线消息
return s.storeOfflineMessage(userID, message)
}
// 2. 推送到所有在线设备
for _, conn := range conns {
s.mq.Publish(conn.GatewayID, &PushTask{
ConnID: conn.ConnID,
Message: message,
})
}
return nil
}
2. 如何保证可靠性?
消息发出后,怎么知道对方收到了?
// 客户端
class ReliableReceiver {
constructor() {
this.receivedIds = new Set();
}
onMessage(message) {
// 去重
if (this.receivedIds.has(message.id)) {
return;
}
this.receivedIds.add(message.id);
// 处理消息
this.processMessage(message);
// 发送ACK
this.sendAck(message.id);
}
}
// 服务端
type ReliableSender struct {
pendingAcks sync.Map // messageID -> Message
ackTimeout time.Duration
}
func (s *ReliableSender) SendWithAck(conn *websocket.Conn, msg *Message) error {
// 1. 存入待确认队列
s.pendingAcks.Store(msg.ID, msg)
// 2. 发送消息
if err := conn.WriteJSON(msg); err != nil {
return err
}
// 3. 启动超时检测
go func() {
time.Sleep(s.ackTimeout)
if _, ok := s.pendingAcks.Load(msg.ID); ok {
// 没收到ACK,重发
s.SendWithAck(conn, msg)
}
}()
return nil
}
func (s *ReliableSender) HandleAck(messageID string) {
s.pendingAcks.Delete(messageID)
}
3. 如何处理离线?
用户不在线时,消息怎么办?
| 策略 | 适用场景 | 存储成本 |
|---|---|---|
| 丢弃 | 不重要的通知(点赞) | 无 |
| 暂存有限条 | IM消息(最近100条) | 低 |
| 全量存储 | 交易通知、重要消息 | 高 |
type OfflineMessageStore struct {
redis *redis.Client
maxMessages int // 每个用户最多存多少条
}
func (s *OfflineMessageStore) Store(userID string, msg *Message) error {
key := fmt.Sprintf("offline:%s", userID)
// 存入列表
pipe := s.redis.Pipeline()
pipe.LPush(ctx, key, msg)
pipe.LTrim(ctx, key, 0, int64(s.maxMessages - 1)) // 保留最新的N条
pipe.Expire(ctx, key, 7*24*time.Hour) // 7天过期
_, err := pipe.Exec(ctx)
return err
}
func (s *OfflineMessageStore) GetAndClear(userID string) ([]*Message, error) {
key := fmt.Sprintf("offline:%s", userID)
// 获取所有消息
messages, err := s.redis.LRange(ctx, key, 0, -1).Result()
if err != nil {
return nil, err
}
// 清空
s.redis.Del(ctx, key)
return parseMessages(messages), nil
}
消息的优先级
不是所有消息都同等重要。一个合理的推送系统应该支持优先级:
| 优先级 | 类型 | 特点 |
|---|---|---|
| 高 | 聊天消息、交易通知 | 必须送达,可唤醒App |
| 中 | 点赞评论 | 尽快送达,不唤醒 |
| 低 | 系统公告 | 可延迟,可聚合 |
type Priority int
const (
PriorityHigh Priority = iota
PriorityMedium
PriorityLow
)
type Message struct {
ID string
Priority Priority
Content string
// ...
}
// 优先级队列
type PriorityQueue struct {
highChan chan *Message
mediumChan chan *Message
lowChan chan *Message
}
func (q *PriorityQueue) Get() *Message {
// 优先处理高优先级
select {
case msg := <-q.highChan:
return msg
default:
select {
case msg := <-q.highChan:
return msg
case msg := <-q.mediumChan:
return msg
default:
select {
case msg := <-q.highChan:
return msg
case msg := <-q.mediumChan:
return msg
case msg := <-q.lowChan:
return msg
}
}
}
}
多设备同步
一个用户可能同时在手机和电脑上在线。推送策略:
type PushPolicy int
const (
PushAll PushPolicy = iota // 推送所有设备
PushActive // 只推活跃设备
PushOnce // 一个设备确认后取消其他
)
func (s *PushService) Push(userID string, msg *Message, policy PushPolicy) {
conns := s.router.GetUserConnections(userID)
switch policy {
case PushAll:
for _, conn := range conns {
s.send(conn, msg)
}
case PushActive:
// 只推最近5分钟活跃的
for _, conn := range conns {
if time.Since(conn.LastActive) < 5*time.Minute {
s.send(conn, msg)
}
}
case PushOnce:
// 推送给所有,但任意一个确认后取消其他
s.pushWithCancellation(userID, msg, conns)
}
}
六、踩坑经验与最佳实践
踩坑一:连接泄漏
// ❌ 错误
func handleConnection(conn *websocket.Conn) {
for {
_, msg, err := conn.ReadMessage()
if err != nil {
return // 只是return,没有清理!
}
processMessage(msg)
}
}
// ✅ 正确
func handleConnection(conn *websocket.Conn) {
defer func() {
conn.Close()
cleanupResources() // 确保清理
}()
for {
_, msg, err := conn.ReadMessage()
if err != nil {
return
}
processMessage(msg)
}
}
踩坑二:消息乱序
// ❌ 错误:并发写入会导致消息交错
go func() { conn.WriteMessage(msg1) }()
go func() { conn.WriteMessage(msg2) }()
// ✅ 正确:使用发送队列
type SafeConn struct {
conn *websocket.Conn
sendCh chan []byte
mu sync.Mutex
}
func (c *SafeConn) Send(msg []byte) {
c.sendCh <- msg
}
func (c *SafeConn) writeLoop() {
for msg := range c.sendCh {
c.mu.Lock()
c.conn.WriteMessage(websocket.TextMessage, msg)
c.mu.Unlock()
}
}
踩坑三:NAT超时
踩坑四:Graceful Shutdown
func (s *Server) Shutdown() {
// 1. 停止接受新连接
s.listener.Close()
// 2. 通知所有客户端"我要下线了,请连接其他节点"
for _, conn := range s.connections {
conn.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(
websocket.CloseGoingAway,
"server_shutdown",
),
)
}
// 3. 等待连接自然关闭或超时
time.Sleep(10 * time.Second)
// 4. 强制关闭剩余连接
for _, conn := range s.connections {
conn.Close()
}
}
最佳实践清单
- 连接管理
- ✅ 定期清理僵尸连接 - ✅ 监控连接数、消息吞吐量、延迟 - ✅ 连接数告警(超过阈值及时扩容)
- 心跳机制
- ✅ 客户端主动ping,服务端检测超时 - ✅ 实现智能心跳,适应网络状况
- 重连策略
- ✅ 重连后同步丢失消息 - ✅ 保存会话token,快速恢复
- 消息可靠性
- ✅ 离线消息存储(有限条数) - ✅ 消息去重(客户端检查ID)
- 安全考虑
- ✅ 验证Origin防止CSRF - ✅ 消息体加密(敏感数据) - ✅ 限流防刷
七、总结:实时通信的艺术
长连接网关看似简单——不就是维持一个TCP连接吗?但要做到稳定、高效、可扩展,需要考虑的问题非常多:
技术选型建议
| 场景 | 推荐方案 |
|---|---|
| 小型应用(<1万连接) | 单机WebSocket |
| 中型应用(1-10万连接) | 简单集群 + Redis路由 |
| 大型应用(10-100万连接) | 完整三层架构 + 消息队列 |
| 超大规模(>100万连接) | 多机房部署 + 专线互联 |
学习进阶路线
- 入门:理解WebSocket协议,实现一个简单的聊天室
- 进阶:实现心跳、重连、消息确认机制
- 高级:设计可扩展的网关架构,处理百万级连接
- 专家:多机房部署、跨地域同步、极致性能优化
实时通信,是连接用户与服务的桥梁。这座桥梁建得好,用户感受不到它的存在;建得不好,每一次延迟和断线都会被放大。
- 系列一:架构设计篇(已完成)
- 系列二:数据库篇(已完成)
- 系列三:缓存篇(已完成)
- 系列四:消息队列篇(已完成)
- 系列五:微服务篇(已完成)
- 系列六:监控告警篇(已完成)
- 系列七:基础设施篇(进行中)
- 第2篇:API网关:流量的守门人(待发布) - 第3篇:服务发现:微服务的电话簿(待发布)
💬 评论 (0)