【ASP.NET CORE】6.SignalR实现WebSocket

介绍

SignalR 是一款用于快速实现服务端与客户端实时双向通信的开发框架,能够自动适配 WebSocket、服务器发送事件、长轮询等底层传输方式,极大简化了在线消息推送、实时数据同步等场景的开发流程,让服务端可以主动向客户端推送消息,无需客户端频繁轮询请求。

注意

SignalR 本身并不对消息内容进行加密,但其正常通信依赖一套完整的协商握手、连接令牌校验、心跳保活与自定义消息协议。如果不使用官方提供的客户端类库,仅通过原生 WebSocket 或 HTTP 直接连接,虽然可以建立基础网络链路,但会因缺少必要的协议交互流程,导致服务端无法识别有效连接,最终出现连接正常却接收不到任何消息的情况,实际开发中需要使用官方配套客户端以保证通信稳定可用。

实现

AppHub (Hub 主类)

功能模块方法说明
连接管理OnConnectedAsync()用户连接时自动绑定 userId ↔ connectionId
OnDisconnectedAsync()断开时清理连接映射和心跳记录
心跳机制Heartbeat()客户端定时调用,更新存活时间
消息推送PushToUser()单用户多设备推送
PushToGroup()群组广播
PushToAll()全局广播
群组管理JoinGroup() / LeaveGroup()动态加入/退出群组
在线统计GetOnlineCount()获取当前在线用户数
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;

namespace HQ.Common.SignalR;

/// <summary>
/// 系统通用 SignalR Hub(WebSocket)
/// 功能:用户绑定、在线统计、群组、心跳、鉴权
/// </summary>
public class AppHub : Hub
{
    private readonly ILogger<AppHub> _logger;
    private readonly SignalRConnectionManager _connectionManager;
    private readonly ISignalRUserService _signalRUserService;

    private static readonly ConcurrentDictionary<string, DateTime> _heartbeatMap = new();

    public AppHub(
        ILogger<AppHub> logger,
        SignalRConnectionManager connectionManager,
        ISignalRUserService signalRUserService)
    {
        _logger = logger;
        _connectionManager = connectionManager;
        _signalRUserService = signalRUserService;
    }

    #region 连接管理
    public override async Task OnConnectedAsync()
    {
        var userId = await _signalRUserService.GetUserIdAsync(Context.User);
        if (!string.IsNullOrEmpty(userId))
        {
            _connectionManager.AddConnection(userId, Context.ConnectionId);
            await Clients.Caller.SendAsync("OnConnected", new
            {
                UserId = userId,
                ConnectionId = Context.ConnectionId,
                OnlineCount = _connectionManager.OnlineUserCount
            });
        }

        _heartbeatMap.AddOrUpdate(Context.ConnectionId, DateTime.Now, (_, _) => DateTime.Now);
        await base.OnConnectedAsync();
    }
    
    public override async Task OnDisconnectedAsync(Exception? exception)
    {
        _connectionManager.RemoveConnection(Context.ConnectionId);
        _heartbeatMap.TryRemove(Context.ConnectionId, out _);
        await base.OnDisconnectedAsync(exception);
    }
    #endregion

    #region 心跳
    /// <summary>
    /// 客户端心跳
    /// </summary>
    public async Task Heartbeat()
    {
        _heartbeatMap[Context.ConnectionId] = DateTime.Now;
        await Clients.Caller.SendAsync("HeartbeatAck");
    }
    #endregion

    #region 消息推送
    /// <summary>
    /// 给指定用户推送消息
    /// </summary>
    public async Task PushToUser(string userId, string messageType, object data)
    {
        var connections = _connectionManager.GetUserConnections(userId);
        foreach (var conn in connections)
        {
            await Clients.Client(conn).SendAsync("OnMessage", messageType, data);
        }
    }

    /// <summary>
    /// 加入群组
    /// </summary>
    public async Task JoinGroup(string groupName)
    {
        await Groups.AddToGroupAsync(Context.ConnectionId, groupName);
    }

    /// <summary>
    /// 退出群组
    /// </summary>
    public async Task LeaveGroup(string groupName)
    {
        await Groups.RemoveFromGroupAsync(Context.ConnectionId, groupName);
    }

    /// <summary>
    /// 给群组推送
    /// </summary>
    public async Task PushToGroup(string groupName, string messageType, object data)
    {
        await Clients.Group(groupName).SendAsync("OnMessage", messageType, data);
    }

    /// <summary>
    /// 广播所有人
    /// </summary>
    public async Task PushToAll(string messageType, object data)
    {
        await Clients.All.SendAsync("OnMessage", messageType, data);
    }
    #endregion

    #region 在线状态
    /// <summary>
    /// 获取在线人数
    /// </summary>
    public int GetOnlineCount() => _connectionManager.OnlineUserCount;
    #endregion
}

 SignalRConnectionManager (连接管理器)

线程安全设计

  • 使用 ConcurrentDictionary 保证高并发下的线程安全
  • 双字典映射:支持 userId → connections 和 connectionId → userId 双向查询
  • 支持一个用户多设备同时在线(HashSet<string>)
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;

namespace HQ.Common.SignalR;

/// <summary>
/// SignalR 在线连接管理(用户ID绑定、在线人数、群组)
/// </summary>
public class SignalRConnectionManager
{
    private readonly ILogger<SignalRConnectionManager> _logger;

    // 用户ID -> 连接ID列表
    private readonly ConcurrentDictionary<string, HashSet<string>> _userConnections = new();
    // 连接ID -> 用户ID
    private readonly ConcurrentDictionary<string, string> _connectionUserMap = new();

    public SignalRConnectionManager(ILogger<SignalRConnectionManager> logger)
    {
        _logger = logger;
    }

    /// <summary>
    /// 添加用户连接
    /// </summary>
    public void AddConnection(string userId, string connectionId)
    {
        _connectionUserMap.AddOrUpdate(connectionId, userId, (_, _) => userId);
        _userConnections.AddOrUpdate(userId,
            _ => [connectionId],
            (_, set) =>
            {
                set.Add(connectionId);
                return set;
            });

        _logger.LogInformation("用户在线 {UserId} {ConnectionId}", userId, connectionId);
    }

    /// <summary>
    /// 移除连接
    /// </summary>
    public void RemoveConnection(string connectionId)
    {
        if (!_connectionUserMap.TryRemove(connectionId, out var userId))
            return;

        if (_userConnections.TryGetValue(userId, out var connections))
        {
            connections.Remove(connectionId);
            if (connections.Count == 0) _userConnections.TryRemove(userId, out _);
        }

        _logger.LogInformation("用户离线 {UserId} {ConnectionId}", userId, connectionId);
    }

    /// <summary>
    /// 获取用户所有连接
    /// </summary>
    public IEnumerable<string> GetUserConnections(string userId)
    {
        if (_userConnections.TryGetValue(userId, out var list))
            return list;
        return [];
    }

    /// <summary>
    /// 在线人数
    /// </summary>
    public int OnlineUserCount => _userConnections.Count;

    /// <summary>
    /// 获取连接所属用户
    /// </summary>
    public string GetUserIdByConnectionId(string connectionId)
    {
        _connectionUserMap.TryGetValue(connectionId, out var userId);
        return userId;
    }
}

ISignalRUserService (用户身份解析)

namespace HQ.Common.SignalR;

/// <summary>
/// SignalR 用户服务
/// </summary>
public interface ISignalRUserService
{
    /// <summary>
    /// 从连接中获取当前用户ID(JWT/Token)
    /// </summary>
    Task<string> GetUserIdAsync(System.Security.Claims.ClaimsPrincipal user);
}
  • 职责:从 HTTP Context 的 ClaimsPrincipal 中提取用户唯一标识
  • 默认实现DefaultSignalRUserService(当前返回固定测试值)
  • 生产环境:需替换为 JWT 解析实现,从 subnameid claim 提取

使用

配置

 builder.Services.AddAppSignalR();
 app.MapAppHub();

自定义 JWT 用户解析服务

下方代码仅供参考,需要根据实际业务具体实现

public class JwtSignalRUserService : ISignalRUserService
{
    public Task<string> GetUserIdAsync(ClaimsPrincipal user)
    {
        // 从 JWT 的 sub/nameid claim 提取用户ID
        var userId = user?.FindFirst(ClaimTypes.NameIdentifier)?.Value 
                  ?? user?.FindFirst("sub")?.Value;
        return Task.FromResult(userId ?? string.Empty);
    }
}

客户端

using Microsoft.AspNetCore.SignalR.Client;

var connection = new HubConnectionBuilder()
    .WithUrl("http://localhost:5182/appHub")
    .Build();

// 监听消息
connection.On<string, string>("ReceiveMessage", (user, msg) =>
{
    Console.WriteLine($"\n 收到消息:{user} → {msg}");
});

// 连接
try
{
    await connection.StartAsync();
    Console.WriteLine(" 连接成功!等待服务端推送...");
}
catch (Exception ex)
{
    Console.WriteLine($" 连接失败:{ex.Message}");
    return;
}

Console.ReadLine();

服务端

public class OrderController : ControllerBase
{
    private readonly IHubContext<AppHub> _hubContext;
    private readonly SignalRConnectionManager _connectionManager;

    public OrderController(
        IHubContext<AppHub> hubContext, 
        SignalRConnectionManager connectionManager)
    {
        _hubContext = hubContext;
        _connectionManager = connectionManager;
    }

    [HttpPost("notify")]
    public async Task<IActionResult> NotifyUser(string userId)
    {
        // 方式1:通过 IHubContext 推送(推荐,无需直接操作 Hub)
        await _hubContext.Clients.User(userId)
            .SendAsync("OnMessage", "notification", new { msg = "您有新消息" });

        // 方式2:获取用户连接后推送(支持多设备)
        var connections = _connectionManager.GetUserConnections(userId);
        foreach (var connId in connections)
        {
            await _hubContext.Clients.Client(connId)
                .SendAsync("OnMessage", "notification", data);
        }

        return Ok();
    }
}

© 版权声明
THE END
喜欢就支持一下吧
点赞10 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容