长连接网关:实时通信的秘密

系列七:基础设施篇 | 第1篇

当你打开微信,朋友的消息瞬间出现在屏幕上;当你在股票App里,K线图实时跳动;当你在玩游戏,队友的操作毫秒级同步——这一切的背后,都藏着一个默默工作的技术:长连接网关

今天,我们来揭开实时通信的秘密。


一、长连接 vs 短连接:两种对话方式

想象你和一个朋友聊天。

为什么短连接不够用?

  1. 延迟高:每次建立TCP连接需要三次握手,加上TLS握手,光是"拨号"就要几百毫秒
  2. 资源浪费:每次请求都要携带完整的HTTP头部,可能几百字节的数据只为了传几个字的业务内容
  3. 无法主动推送:服务器有消息时,只能等客户端来问(轮询),效率极低

长连接的代价

当然,长连接不是万能的:

HTTP长轮询 vs WebSocket:一场不对等的较量

在WebSocket普及之前,很多实时功能是用HTTP长轮询实现的。它的原理是:客户端发起请求,服务器如果有数据就返回,没有就"挂着"等,直到有数据或超时。

客户端: "有新消息吗?"
服务器: (等待...等待...)"有了!这是消息!"
客户端: (收到后立即发下一个请求)"还有吗?"
  1. 资源浪费:每次请求都有HTTP头部开销,即使没有数据也要反复建立连接
  2. 服务器压力:大量请求"挂起",占用线程或连接池
  3. 不是真正的实时:消息到达时,可能正好在两次请求的间隙

来个直观的对比:

特性 HTTP长轮询 WebSocket
连接复用 每次请求都新建 一次连接,持续使用
头部开销 每次几百字节 仅2-14字节
服务端推送 需要客户端先问 随时可推
实时性 秒级 毫秒级
实现复杂度
浏览器支持 全部 现代浏览器

二、WebSocket协议原理:让连接"活"起来

HTTP设计之初是为了传输文档,不是实时通信。WebSocket的出现,正是为了弥补这个短板。

从HTTP到WebSocket:一次优雅的"变身"

WebSocket的建立过程非常巧妙,叫做握手。这个过程只发生一次,之后就是纯粹的TCP数据传输。

握手过程详解

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Origin: https://example.com

关键信息解读:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

那个 Sec-WebSocket-Accept 是怎么来的?这是协议设计的一个小聪明:

Sec-WebSocket-Accept = base64(sha1(Sec-WebSocket-Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"))

这个魔法字符串是WebSocket协议规定的固定值。服务器通过计算这个值,证明自己"真的懂WebSocket",而不是随便返回一个101。

从这一刻起,这个TCP连接就不再是HTTP了,双方都可以随时发消息,不再需要请求-响应的模式。

帧结构:数据的"集装箱"

WebSocket通信的基本单位是"帧"(Frame)。理解帧结构,是理解WebSocket性能优势的关键。

  0                   1                   2                   3
  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
 +-+-+-+-+-------+-+-------------+-------------------------------+
<table>
<thead><tr>
<th>F</th>
<th>R</th>
<th>R</th>
<th>R</th>
<th>opcode</th>
<th>M</th>
<th>Payload len</th>
<th>Extended payload length</th>
</tr></thead><tbody>
<thead><tr>
<th>I</th>
<th>S</th>
<th>S</th>
<th>S</th>
<th>(4)</th>
<th>A</th>
<th>(7)</th>
<th>(16/64)</th>
</tr></thead><tbody>
<thead><tr>
<th>N</th>
<th>V</th>
<th>V</th>
<th>V</th>
<th></th>
<th>S</th>
<th></th>
<th>(if payload len==126/127)</th>
</tr></thead><tbody>
<thead><tr>
<th></th>
<th>1</th>
<th>2</th>
<th>3</th>
<th></th>
<th>K</th>
<th></th>
<th></th>
</tr></thead><tbody>
</tbody></table>
 +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
<table>
<thead><tr>
<th>Extended payload length continued, if payload len == 127</th>
</tr></thead><tbody>
</tbody></table>
 + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
<table>
<thead><tr>
<th></th>
<th>Masking-key, if MASK set to 1</th>
</tr></thead><tbody>
</tbody></table>
 +-------------------------------+-------------------------------+
<table>
<thead><tr>
<th>Masking-key (continued)</th>
<th>Payload Data</th>
</tr></thead><tbody>
</tbody></table>
 +-------------------------------- - - - - - - - - - - - - - - - +
 :                     Payload Data continued ...                :
 + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
<table>
<thead><tr>
<th>Payload Data continued ...</th>
</tr></thead><tbody>
</tbody></table>
 +---------------------------------------------------------------+

别被这个图吓到,我来拆解:

- 0x1 文本帧

- 0x2 二进制帧 - 0x8 关闭连接 - 0x9 心跳Ping - 0xA 心跳Pong

这是为了防止一种叫"缓存污染攻击"的安全问题。恶意网页可能让浏览器向某个服务器发送看起来像HTTP请求的WebSocket数据,如果中间有代理缓存,可能被污染。掩码后,数据看起来就是随机的。

实际的握手代码示例

const ws = new WebSocket('wss://example.com/chat');

ws.onopen = () => {
    console.log('连接建立成功');
    ws.send('Hello Server!');
};

ws.onmessage = (event) => {
    console.log('收到消息:', event.data);
};

ws.onerror = (error) => {
    console.error('连接错误:', error);
};

ws.onclose = (event) => {
    console.log('连接关闭:', event.code, event.reason);
};
import (
    "github.com/gorilla/websocket"
    "net/http"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool {
        return true // 生产环境要校验Origin
    },
}

func handleWebSocket(w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println("升级失败:", err)
        return
    }
    defer conn.Close()

    for {
        messageType, message, err := conn.ReadMessage()
        if err != nil {
            log.Println("读取失败:", err)
            break
        }
        log.Printf("收到消息: %s", message)
        
        // 回显
        err = conn.WriteMessage(messageType, message)
        if err != nil {
            log.Println("发送失败:", err)
            break
        }
    }
}

为什么选择WebSocket?


三、长连接网关的架构设计:处理百万连接的秘密

单台服务器能撑多少连接?理论上是65535个(端口号上限),但实际受限于内存、CPU、文件描述符等资源。要支撑百万级并发,必须设计合理的架构。

架构图描述

想象一张架构图:

                    ┌─────────────────────────────────────────┐
                    │              客户端层                    │
                    │   [手机App] [Web浏览器] [桌面客户端]       │
                    └────────────────┬────────────────────────┘
                                     │ WebSocket连接
                                     ▼
┌────────────────────────────────────────────────────────────────────┐
│                         负载均衡层 (LB)                              │
│    功能:SSL卸载、连接分发、健康检查、会话保持                          │
│    [Nginx] [HAProxy] [云厂商LB]                                      │
└────────────────────────────────┬───────────────────────────────────┘
                                 │
        ┌────────────────────────┼────────────────────────┐
        ▼                        ▼                        ▼
┌───────────────┐        ┌───────────────┐        ┌───────────────┐
│   Gateway-1   │        │   Gateway-2   │        │   Gateway-N   │
│   接入层       │        │   接入层       │        │   接入层       │
│               │        │               │        │               │
│ - 连接管理     │        │ - 连接管理     │        │ - 连接管理     │
│ - 协议解析     │        │ - 协议解析     │        │ - 协议解析     │
│ - 心跳检测     │        │ - 心跳检测     │        │ - 心跳检测     │
│ - 消息转发     │        │ - 消息转发     │        │ - 消息转发     │
└───────┬───────┘        └───────┬───────┘        └───────┬───────┘
        │                        │                        │
        └────────────────────────┼────────────────────────┘
                                 │ 内部RPC/消息队列
                                 ▼
                    ┌─────────────────────────────────────────┐
                    │            路由层 (Router)                │
                    │                                         │
                    │   ┌─────────────────────────────────┐   │
                    │   │    Redis集群:用户-连接映射表      │   │
                    │   │    用户A → Gateway-1:ConnID123   │   │
                    │   │    用户B → Gateway-2:ConnID456   │   │
                    │   └─────────────────────────────────┘   │
                    │                                         │
                    │   功能:连接定位、会话管理、消息路由         │
                    └────────────────┬────────────────────────┘
                                     │
        ┌────────────────────────────┼────────────────────────────┐
        ▼                            ▼                            ▼
┌───────────────┐            ┌───────────────┐            ┌───────────────┐
│   Logic-1     │            │   Logic-2     │            │   Logic-N     │
│   业务层       │            │   业务层       │            │   业务层       │
│               │            │               │            │               │
│ - 聊天服务     │            │ - 推送服务     │            │ - 其他业务     │
│ - 好友服务     │            │ - 通知服务     │            │               │
└───────────────┘            └───────────────┘            └───────────────┘

三层架构详解

接入层是整个系统的"门面",负责维持与客户端的长连接。

核心职责:

设计要点:

// 每个连接的结构
type Connection struct {
    ConnID       string          // 连接唯一标识
    UserID       string          // 绑定的用户ID
    DeviceID     string          // 设备标识
    Conn         *websocket.Conn // 底层连接
    LastActive   time.Time       // 最后活跃时间
    SendChan     chan []byte     // 发送队列
    Done         chan struct{}   // 关闭信号
}

// 连接管理器
type ConnManager struct {
    connections sync.Map         // 并发安全的连接表
    userConns   map[string][]*Connection // 用户->连接列表
    mu          sync.RWMutex
}

路由层知道每个用户的连接在哪个Gateway上,是消息推送的关键。

核心数据结构:

Redis存储结构:
Key: user:{userID}:connections
Value: {
    "gateway_id": "gateway-1",
    "conn_id": "conn-12345",
    "device": "iphone",
    "last_active": 1709012345
}

为什么用Redis?

业务层处理具体业务逻辑,不直接与客户端通信。

工作流程:

  1. 接收Gateway转发的消息
  2. 处理业务逻辑(存数据库、调用其他服务)
  3. 需要推送时,查询Router找到目标Gateway
  4. 通过内部RPC或消息队列,通知Gateway推送

关键设计原则

高并发优化技巧

单机要支持百万连接,需要调优:

# 文件描述符限制
ulimit -n 1000000

# 内核参数调优
echo "net.ipv4.tcp_wmem = 4096 87380 4194304" >> /etc/sysctl.conf
echo "net.ipv4.tcp_rmem = 4096 87380 4194304" >> /etc/sysctl.conf
echo "net.core.somaxconn = 65535" >> /etc/sysctl.conf
echo "net.ipv4.tcp_max_syn_backlog = 65535" >> /etc/sysctl.conf
sysctl -p

每个连接都占用内存,要精打细算:

// 不要这样:每个消息一个goroutine
go handle message()

// 要这样:使用worker池
type WorkerPool struct {
    taskChan chan Task
    workers  int
}

func (p *WorkerPool) Start() {
    for i := 0; i < p.workers; i++ {
        go p.worker()
    }
}

func (p *WorkerPool) worker() {
    for task := range p.taskChan {
        task.Execute()
    }
}

四、心跳与断线重连:让连接"不死"

网络是不可靠的。Wi-Fi断了、4G切3G、手机进入省电模式……连接随时可能断掉。如何快速发现问题、快速恢复?这就是心跳机制的职责。

心跳:网络的"脉搏"

心跳的原理很简单:双方约定一个间隔(比如30秒),如果一方在这个时间内没收到对方的消息,就主动发一个"我还活着"的包。如果连续多次没收到心跳响应,就认为连接已断开。

心跳机制的设计考量

场景 推荐间隔 原因
游戏 5-10秒 需要快速感知断线
IM聊天 30-60秒 平衡实时性和省电
推送通知 3-5分钟 大部分时候空闲

两种方案:

推荐:客户端发起 + 服务端检测超时

不够!TCP Keepalive默认2小时才检测一次,而且无法穿透NAT。应用层必须自己实现心跳。

# TCP Keepalive参数
net.ipv4.tcp_keepalive_time = 7200   # 2小时
net.ipv4.tcp_keepalive_intvl = 75    # 检测间隔
net.ipv4.tcp_keepalive_probes = 9    # 检测次数

心跳实现代码

class WebSocketClient {
    constructor(url) {
        this.url = url;
        this.heartbeatInterval = 30000; // 30秒
        this.heartbeatTimer = null;
        this.missedHeartbeats = 0;
        this.maxMissedHeartbeats = 3;
    }

    connect() {
        this.ws = new WebSocket(this.url);
        
        this.ws.onopen = () => {
            console.log('连接成功');
            this.startHeartbeat();
        };
        
        this.ws.onmessage = (event) => {
            if (event.data === 'pong') {
                this.missedHeartbeats = 0;
                return;
            }
            // 处理业务消息
        };
        
        this.ws.onclose = () => {
            this.stopHeartbeat();
            this.reconnect();
        };
    }

    startHeartbeat() {
        this.heartbeatTimer = setInterval(() => {
            if (this.ws.readyState === WebSocket.OPEN) {
                this.missedHeartbeats++;
                if (this.missedHeartbeats > this.maxMissedHeartbeats) {
                    console.log('心跳超时,重连');
                    this.ws.close();
                    return;
                }
                this.ws.send('ping');
            }
        }, this.heartbeatInterval);
    }

    stopHeartbeat() {
        if (this.heartbeatTimer) {
            clearInterval(this.heartbeatTimer);
            this.heartbeatTimer = null;
        }
    }
}
type Connection struct {
    conn        *websocket.Conn
    lastActive  time.Time
    mu          sync.Mutex
}

func (c *Connection) heartbeatMonitor() {
    ticker := time.NewTicker(10 * time.Second)
    timeout := 60 * time.Second // 60秒无活动则断开
    
    for {
        select {
        case <-ticker.C:
            c.mu.Lock()
            if time.Since(c.lastActive) > timeout {
                c.conn.Close()
                c.mu.Unlock()
                return
            }
            c.mu.Unlock()
        }
    }
}

func (c *Connection) handleMessage(message string) {
    c.mu.Lock()
    c.lastActive = time.Now()
    c.mu.Unlock()
    
    if message == "ping" {
        c.conn.WriteMessage(websocket.TextMessage, []byte("pong"))
    }
}

智能心跳:自适应网络状况

高级的心跳机制可以根据网络状况动态调整:

class SmartHeartbeat {
    constructor() {
        this.baseInterval = 30000;  // 基础30秒
        this.minInterval = 10000;   // 最小10秒
        this.maxInterval = 120000;  // 最大2分钟
        this.currentInterval = this.baseInterval;
        this.networkQuality = 'good'; // good/medium/poor
    }

    adjustInterval(rtt) {
        // 根据往返时延调整
        if (rtt < 100) {
            this.networkQuality = 'good';
            this.currentInterval = Math.min(this.currentInterval * 1.1, this.maxInterval);
        } else if (rtt > 500) {
            this.networkQuality = 'poor';
            this.currentInterval = Math.max(this.currentInterval * 0.8, this.minInterval);
        } else {
            this.networkQuality = 'medium';
        }
    }
}

断线重连:快速恢复

当检测到连接断开后,客户端需要自动重连。这个过程要精心设计。

1. 退避策略

// ❌ 错误:可能导致服务器雪崩
ws.onclose = () => {
    setTimeout(() => connect(), 100); // 立即重连
};

如果服务器刚重启,百万客户端同时重连会直接把服务器打挂。

class ReconnectPolicy {
    constructor() {
        this.baseDelay = 1000;      // 初始1秒
        this.maxDelay = 30000;      // 最大30秒
        this.attempts = 0;
    }

    getNextDelay() {
        // 指数退避
        delay = Math.min(this.baseDelay * Math.pow(2, this.attempts), this.maxDelay);
        
        // 加随机抖动(防止多客户端同步重连)
        jitter = delay * 0.2 * Math.random();
        
        this.attempts++;
        return delay + jitter;
    }

    reset() {
        this.attempts = 0;
    }
}

重连时间线:1秒 → 2秒 → 4秒 → 8秒 → 16秒 → 30秒(封顶)

2. 断点续传

重连后,怎么收到断开期间错过的消息?

服务端为每个用户维护一个消息队列:
┌─────────────────────────────────────┐
│  用户A的消息队列(最多保留100条)      │
├─────────────────────────────────────┤
│  [1001] "你好"           时间:10:01  │
│  [1002] "在吗?"         时间:10:02  │
│  [1003] "下午开会"       时间:10:05  │
└─────────────────────────────────────┘

客户端重连时:
Client: "我上次收到消息1001,之后有什么?"
Server: "你有2条新消息:[1002], [1003]"
class MessageReceiver {
    constructor() {
        this.lastReceivedId = 0;
        this.pendingMessages = new Map();
    }

    onMessage(message) {
        if (message.id <= this.lastReceivedId) {
            // 重复消息,忽略
            return;
        }
        
        // 处理消息
        this.processMessage(message);
        
        // 发送确认
        this.sendAck(message.id);
        this.lastReceivedId = message.id;
    }

    onReconnect() {
        // 重连后请求丢失的消息
        this.send({
            type: 'sync',
            lastReceivedId: this.lastReceivedId
        });
    }
}

3. 会话恢复

重连后,登录状态还在吗?

class SessionManager {
    constructor() {
        this.token = localStorage.getItem('session_token');
    }

    onReconnect(ws) {
        if (this.token) {
            // 用保存的token快速恢复会话
            ws.send({
                type: 'resume',
                token: this.token
            });
        } else {
            // 需要重新登录
            this.login();
        }
    }
}

服务端处理:对付僵尸连接

常见原因:

服务端对策:

type ConnManager struct {
    connections sync.Map
    checkInterval time.Duration
    maxIdleTime   time.Duration
}

func (m *ConnManager) StartCleanup() {
    ticker := time.NewTicker(m.checkInterval)
    for range ticker.C {
        m.cleanDeadConnections()
    }
}

func (m *ConnManager) cleanDeadConnections() {
    m.connections.Range(func(key, value interface{}) bool {
        conn := value.(*Connection)
        
        // 检查:超过maxIdleTime没有心跳
        if time.Since(conn.LastHeartbeat) > m.maxIdleTime {
            log.Printf("清理僵尸连接: %s", conn.ConnID)
            conn.Close()
            m.connections.Delete(key)
        }
        return true
    })
}

五、消息推送机制:从"拉"到"推"

传统的Web应用是"拉模式":客户端主动请求数据。实时应用需要"推模式":服务器主动推送。

推送的核心问题

1. 如何找到目标?

用户可能连接在任何一台Gateway上。

1. 业务层需要给用户A推送消息
2. 查询Router:用户A在哪?
3. Router返回:用户A在Gateway-2,连接ID=conn-123
4. 业务层通过消息队列/RPC通知Gateway-2
5. Gateway-2找到conn-123,推送消息
// 推送服务
type PushService struct {
    routerClient *RouterClient
    mq           *MessageQueue
}

func (s *PushService) PushToUser(userID string, message *Message) error {
    // 1. 查询用户连接
    conns, err := s.routerClient.GetUserConnections(userID)
    if err != nil {
        return err
    }
    
    if len(conns) == 0 {
        // 用户离线,存入离线消息
        return s.storeOfflineMessage(userID, message)
    }
    
    // 2. 推送到所有在线设备
    for _, conn := range conns {
        s.mq.Publish(conn.GatewayID, &PushTask{
            ConnID:  conn.ConnID,
            Message: message,
        })
    }
    
    return nil
}

2. 如何保证可靠性?

消息发出后,怎么知道对方收到了?

// 客户端
class ReliableReceiver {
    constructor() {
        this.receivedIds = new Set();
    }

    onMessage(message) {
        // 去重
        if (this.receivedIds.has(message.id)) {
            return;
        }
        this.receivedIds.add(message.id);
        
        // 处理消息
        this.processMessage(message);
        
        // 发送ACK
        this.sendAck(message.id);
    }
}
// 服务端
type ReliableSender struct {
    pendingAcks sync.Map // messageID -> Message
    ackTimeout  time.Duration
}

func (s *ReliableSender) SendWithAck(conn *websocket.Conn, msg *Message) error {
    // 1. 存入待确认队列
    s.pendingAcks.Store(msg.ID, msg)
    
    // 2. 发送消息
    if err := conn.WriteJSON(msg); err != nil {
        return err
    }
    
    // 3. 启动超时检测
    go func() {
        time.Sleep(s.ackTimeout)
        if _, ok := s.pendingAcks.Load(msg.ID); ok {
            // 没收到ACK,重发
            s.SendWithAck(conn, msg)
        }
    }()
    
    return nil
}

func (s *ReliableSender) HandleAck(messageID string) {
    s.pendingAcks.Delete(messageID)
}

3. 如何处理离线?

用户不在线时,消息怎么办?

策略 适用场景 存储成本
丢弃 不重要的通知(点赞)
暂存有限条 IM消息(最近100条)
全量存储 交易通知、重要消息
type OfflineMessageStore struct {
    redis   *redis.Client
    maxMessages int  // 每个用户最多存多少条
}

func (s *OfflineMessageStore) Store(userID string, msg *Message) error {
    key := fmt.Sprintf("offline:%s", userID)
    
    // 存入列表
    pipe := s.redis.Pipeline()
    pipe.LPush(ctx, key, msg)
    pipe.LTrim(ctx, key, 0, int64(s.maxMessages - 1)) // 保留最新的N条
    pipe.Expire(ctx, key, 7*24*time.Hour) // 7天过期
    
    _, err := pipe.Exec(ctx)
    return err
}

func (s *OfflineMessageStore) GetAndClear(userID string) ([]*Message, error) {
    key := fmt.Sprintf("offline:%s", userID)
    
    // 获取所有消息
    messages, err := s.redis.LRange(ctx, key, 0, -1).Result()
    if err != nil {
        return nil, err
    }
    
    // 清空
    s.redis.Del(ctx, key)
    
    return parseMessages(messages), nil
}

消息的优先级

不是所有消息都同等重要。一个合理的推送系统应该支持优先级:

优先级 类型 特点
聊天消息、交易通知 必须送达,可唤醒App
点赞评论 尽快送达,不唤醒
系统公告 可延迟,可聚合
type Priority int

const (
    PriorityHigh Priority = iota
    PriorityMedium
    PriorityLow
)

type Message struct {
    ID       string
    Priority Priority
    Content  string
    // ...
}

// 优先级队列
type PriorityQueue struct {
    highChan   chan *Message
    mediumChan chan *Message
    lowChan    chan *Message
}

func (q *PriorityQueue) Get() *Message {
    // 优先处理高优先级
    select {
    case msg := <-q.highChan:
        return msg
    default:
        select {
        case msg := <-q.highChan:
            return msg
        case msg := <-q.mediumChan:
            return msg
        default:
            select {
            case msg := <-q.highChan:
                return msg
            case msg := <-q.mediumChan:
                return msg
            case msg := <-q.lowChan:
                return msg
            }
        }
    }
}

多设备同步

一个用户可能同时在手机和电脑上在线。推送策略:

type PushPolicy int

const (
    PushAll      PushPolicy = iota // 推送所有设备
    PushActive                     // 只推活跃设备
    PushOnce                       // 一个设备确认后取消其他
)

func (s *PushService) Push(userID string, msg *Message, policy PushPolicy) {
    conns := s.router.GetUserConnections(userID)
    
    switch policy {
    case PushAll:
        for _, conn := range conns {
            s.send(conn, msg)
        }
    case PushActive:
        // 只推最近5分钟活跃的
        for _, conn := range conns {
            if time.Since(conn.LastActive) < 5*time.Minute {
                s.send(conn, msg)
            }
        }
    case PushOnce:
        // 推送给所有,但任意一个确认后取消其他
        s.pushWithCancellation(userID, msg, conns)
    }
}

六、踩坑经验与最佳实践

踩坑一:连接泄漏

// ❌ 错误
func handleConnection(conn *websocket.Conn) {
    for {
        _, msg, err := conn.ReadMessage()
        if err != nil {
            return // 只是return,没有清理!
        }
        processMessage(msg)
    }
}

// ✅ 正确
func handleConnection(conn *websocket.Conn) {
    defer func() {
        conn.Close()
        cleanupResources() // 确保清理
    }()
    
    for {
        _, msg, err := conn.ReadMessage()
        if err != nil {
            return
        }
        processMessage(msg)
    }
}

踩坑二:消息乱序

// ❌ 错误:并发写入会导致消息交错
go func() { conn.WriteMessage(msg1) }()
go func() { conn.WriteMessage(msg2) }()

// ✅ 正确:使用发送队列
type SafeConn struct {
    conn    *websocket.Conn
    sendCh  chan []byte
    mu      sync.Mutex
}

func (c *SafeConn) Send(msg []byte) {
    c.sendCh <- msg
}

func (c *SafeConn) writeLoop() {
    for msg := range c.sendCh {
        c.mu.Lock()
        c.conn.WriteMessage(websocket.TextMessage, msg)
        c.mu.Unlock()
    }
}

踩坑三:NAT超时

踩坑四:Graceful Shutdown

func (s *Server) Shutdown() {
    // 1. 停止接受新连接
    s.listener.Close()
    
    // 2. 通知所有客户端"我要下线了,请连接其他节点"
    for _, conn := range s.connections {
        conn.WriteMessage(websocket.CloseMessage, 
            websocket.FormatCloseMessage(
                websocket.CloseGoingAway, 
                "server_shutdown",
            ),
        )
    }
    
    // 3. 等待连接自然关闭或超时
    time.Sleep(10 * time.Second)
    
    // 4. 强制关闭剩余连接
    for _, conn := range s.connections {
        conn.Close()
    }
}

最佳实践清单

  1. 连接管理
- ✅ 每个连接有唯一ID

- ✅ 定期清理僵尸连接 - ✅ 监控连接数、消息吞吐量、延迟 - ✅ 连接数告警(超过阈值及时扩容)

  1. 心跳机制
- ✅ 心跳间隔根据业务场景选择(10-60秒)

- ✅ 客户端主动ping,服务端检测超时 - ✅ 实现智能心跳,适应网络状况

  1. 重连策略
- ✅ 使用指数退避+随机抖动

- ✅ 重连后同步丢失消息 - ✅ 保存会话token,快速恢复

  1. 消息可靠性
- ✅ 重要消息需要ACK确认

- ✅ 离线消息存储(有限条数) - ✅ 消息去重(客户端检查ID)

  1. 安全考虑
- ✅ 使用wss://(加密传输)

- ✅ 验证Origin防止CSRF - ✅ 消息体加密(敏感数据) - ✅ 限流防刷


七、总结:实时通信的艺术

长连接网关看似简单——不就是维持一个TCP连接吗?但要做到稳定、高效、可扩展,需要考虑的问题非常多:

技术选型建议

场景 推荐方案
小型应用(<1万连接) 单机WebSocket
中型应用(1-10万连接) 简单集群 + Redis路由
大型应用(10-100万连接) 完整三层架构 + 消息队列
超大规模(>100万连接) 多机房部署 + 专线互联

学习进阶路线

  1. 入门:理解WebSocket协议,实现一个简单的聊天室
  2. 进阶:实现心跳、重连、消息确认机制
  3. 高级:设计可扩展的网关架构,处理百万级连接
  4. 专家:多机房部署、跨地域同步、极致性能优化

实时通信,是连接用户与服务的桥梁。这座桥梁建得好,用户感受不到它的存在;建得不好,每一次延迟和断线都会被放大。



- 第1篇:长连接网关:实时通信的秘密(本文)

- 第2篇:API网关:流量的守门人(待发布) - 第3篇:服务发现:微服务的电话簿(待发布)

💬 评论 (0)

0/500
排序: