基于SuperSocket搭建Socket服务

SuperSocket 是一款基于 .NET 技术栈的高性能、跨平台、可扩展的 Socket 服务器框架,专为 TCP/UDP 服务端应用开发设计,具备稳定可靠、易于扩展的优势;

一、NuGet 包管理器中下载相关包

安装 SuperSocket.WebSocket.Server 包
image

二、启动SocketServer服务

注册启动WebSocket Server 服务方法

/// <summary>
/// WebSocket服务器实例
/// </summary>
private IHost? _webSocketServer;

/// <summary>
/// 存储所有连接的WebSocket会话
/// </summary>
private readonly ConcurrentDictionary<string, WebSocketSession> _connectedSessions = new();

/// <summary>
/// 启动 WebSocket 服务
/// </summary>
/// <param name="port">监听端口</param>
/// <param name="ip">监听IP地址</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>异步任务</returns>
public async Task WebSocketServerAsync(int port, IPAddress ip, CancellationToken cancellationToken = default)
{
    try
    {
        _webSocketServer = WebSocketHostBuilder.Create()
            .UseSession<ConnectionSession>()
            .UseWebSocketMessageHandler(
                // TODO: 监听会话消息事件
                async (session, message) => await HandleWebSocketMessageAsync(session, message.Message)
            )
            .ConfigureServices(services =>
            {
                // TODO: 监听会话连接事件
                services.AddSingleton<Action<WebSocketSession>>(session =>
                {
                    Console.WriteLine($"WebSocket会话已连接: {session.SessionID}");
                    _connectedSessions.TryAdd(session.SessionID, session);
                });
                // TODO: 监听会话断开事件
                services.AddSingleton<Action<WebSocketSession, CloseEventArgs>>((session, args) =>
                {
                    Console.WriteLine($"WebSocket会话已断开: {session.SessionID}");
                    _connectedSessions.TryRemove(session.SessionID, out _);
                });
            })
            .ConfigureAppConfiguration((hostContext, configApp) =>
            {
                var config = new Dictionary<string, string>
                {
                    ["serverOptions:listeners:0:ip"] = ip.ToString(),
                    ["serverOptions:listeners:0:port"] = port.ToString()
                };
                configApp.AddInMemoryCollection(config);
            })
            .Build();
 
        // TODO: 启动WebSocket服务
        await _webSocketServer.RunAsync(cancellationToken);
    }
    catch (Exception ex)
    {
        Console.WriteLine($"WebSocket服务器启动失败: {ex.Message}");
    }
}
  

三、WebSocket会话类

服务端为每个客户端连接创建的专属实例,全程负责该连接的全生命周期通信与状态管理

/// <summary>
/// WebSocket会话类
/// </summary>
public class ConnectionSession : WebSocketSession
{
    /// <summary>
    /// 会话连接事件
    /// </summary>
    private readonly Action<WebSocketSession> _onConnected;

    /// <summary>
    /// 会话断开事件
    /// </summary>
    private readonly Action<WebSocketSession, CloseEventArgs> _onDisconnected;

    /// <summary>
    /// 构造函数
    /// </summary>
    /// <param name="onConnected"></param>
    /// <param name="onDisconnected"></param>
    public ConnectionSession(Action<WebSocketSession> onConnected, Action<WebSocketSession, CloseEventArgs> onDisconnected)
    {
        _onConnected = onConnected;
        _onDisconnected = onDisconnected;
    }

    /// <summary>
    /// 触发连接事件
    /// </summary>
    /// <returns></returns>
    protected override ValueTask OnSessionConnectedAsync()
    {
        _onConnected?.Invoke(this);
        return base.OnSessionConnectedAsync();
    }

    /// <summary>
    /// 触发断开事件
    /// </summary>
    /// <param name="e"></param>
    /// <returns></returns>
    protected override ValueTask OnSessionClosedAsync(CloseEventArgs e)
    {
        _onDisconnected?.Invoke(this, e);
        return base.OnSessionClosedAsync(e);
    }

}

四、消息处理,通过反射加载对应请求处理方法

/// <summary>
/// 方法缓存,用于存储反射获取的Socket相关方法
/// </summary>
private Dictionary<string, MethodInfo> _webSocketMethodCache = new();
 
/// <summary>
/// 处理接收到的 WebSocket 消息
/// </summary>
/// <param name="session">WebSocket会话</param>
/// <param name="message">WebSocket消息</param>
/// <returns>异步任务</returns>
private async Task OnWebSocketMessageAsync(WebSocketSession session, string message)
{
    try
    { 
        JObject messageObject;
        try
        {
            messageObject = JObject.Parse(message);
            // TODO: 检查是否有topic属性
            if (messageObject["topic"] != null)
            {
                // 根据不同的topic来处理不同的消息
                string topic = messageObject["topic"].ToString();

                // TODO: 先检查topic对应的方法是否缓存
                MethodInfo memberInfo = null;
                if (_webSocketMethodCache.TryGetValue(topic, out memberInfo))
                {
                    // 当前的方法已经在缓存中,直接调用
                    memberInfo.Invoke(this, new object[] { session, message });
                    return;
                }
                // TODO: 没有找到对应的方法,则进行反射
                var methods = GetType().GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance);
                foreach (var method in methods)
                {
                    var attributes = method.GetCustomAttributes(typeof(WebSocketMessageAttribute), false);

                    if (attributes.Length > 0)
                    {
                        var attribute = attributes[0] as WebSocketMessageAttribute;

                        if (attribute.Topic == topic)
                        {
                            // TODO: 把method进行缓存,免得每次都需要反射
                            _webSocketMethodCache.Add(topic, method);

                            // TODO: 把method添加到Dictionary中,key是topic,value是method
                            method.Invoke(this, new object[] { session, message });
                        }
                    }
                }
            }

        }
        catch (Exception ex)
        {
            Console.WriteLine($"检查是否有topic属性,{ex.Message}");
        }

    }
    catch (Exception ex)
    {
        Console.WriteLine($"处理WebSocket消息时出错: {ex.Message}");
    }
}


/// <summary>​
/// 异步发送消息到单个会话​
/// </summary>​
/// <param name="session">目标会话</param>​
/// <param name="message">要发送的消息</param> ​
private async Task SendToSessionAsync(WebSocketSession session, string message)​
{​
    try​
    {​
        // 发送消息到会话​
        await session.SendAsync(message).ConfigureAwait(false);​
    }​
    catch (Exception ex)​
    {​
        Console.WriteLine($"向会话 {session.SessionID} 发送消息失败: {ex.Message}");​
    }​
}​
​
/// <summary>​
/// 异步发送消息到所有会话​
/// </summary>​
/// <param name="message">文本消息</param>​
/// <returns>异步任务</returns> ​
private async Task SendToAllSessionAsync(string message)​
{​
    try​
    {​
        // 基础校验​
        if (string.IsNullOrWhiteSpace(message) || _connectedSessions.Count < 1)​
        {​
            Console.WriteLine("广播消息内容为空");​
            return;​
        }​
        // 筛选状态为Connected的会话任务​
        var tasks = _connectedSessions​
                    .Where(s => s.Value.State == SessionState.Connected)​
                    .Select(s => s.Value.SendAsync(message).AsTask());​
​
        // 等待所有任务完成​
        await Task.WhenAll(tasks).ConfigureAwait(false);​
    }​
    catch (Exception ex)​
    {​
        Console.WriteLine($"向会话发送消息失败: {ex.Message}");​
    }​
}

/// <summary>
/// 响应示例:/test_states 处理方法
/// </summary>
/// <param name="session"></param>
/// <param name="message"></param>
/// <returns></returns>
[WebSocketMessage("/test_states")]
private async Task JointStatesAsync(WebSocketSession session, string message)
{
    try
    {
        // TODO: 获取并解析消息文本 
        Console.WriteLine(message); 
    }
    catch (Exception ex)
    {
        Console.WriteLine($"/test_states:",{ex.Message}");
    }
}

WebSocket消息属性

 /// <summary>
 /// WebSocket消息属性
 /// </summary>
 public class WebSocketMessageAttribute : Attribute
 {
     public string Topic { get; }

     public WebSocketMessageAttribute(string topic)
     {
         Topic = topic;
     }
 }

image

posted @ 2025-12-29 11:20  笺上知微  阅读(21)  评论(0)    收藏  举报