最近在学WebSocket,服务端需要监听多个WebSocket客户端发送的消息。
开始的解决方法是每个WebSocket客户端都添加一个线程进行监听,代码如下:
/// <summary> /// 监听端口 创建WebSocket /// </summary> /// <param name="httpListener"></param> private void CreateWebSocket(HttpListener httpListener) { if (!httpListener.IsListening) throw new Exception("HttpListener未启动"); HttpListenerContext listenerContext = httpListener.GetContextAsync().Result; if (!listenerContext.Request.IsWebSocketRequest) { CreateWebSocket(httpListener); return; } WebSocketContext webSocket = null; try { webSocket = new WebSocketContext(listenerContext, SubProtocol); } catch (Exception ex) { log.Error(ex); CreateWebSocket(HttpListener); return; } log.Info($"成功创建WebSocket:{webSocket.ID}"); int workerThreads = 0, completionPortThreads = 0; ThreadPool.GetAvailableThreads(out workerThreads, out completionPortThreads); if (workerThreads <= ReservedThreadsCount + 1 || completionPortThreads <= ReservedThreadsCount + 1) { /** * 可用线程小于预留线程数量 * 通知客户端关闭连接 * */ webSocket.CloseAsync(WebSocketCloseStatus.InternalServerError, "可用线程不足,无法连接").Wait(); } else { if (OnReceiveMessage != null) webSocket.OnReceiveMessage += OnReceiveMessage; webSocket.OnCloseWebSocket += WebSocket_OnCloseWebSocket; webSocketContexts.Add(webSocket); // 在线程中监听客户端发送的消息 ThreadPool.QueueUserWorkItem(new WaitCallback(p => { (p as WebSocketContext).ReceiveMessageAsync().Wait(); }), webSocket); } CreateWebSocket(HttpListener); }
但是可用线程数量是有限的,先连接的客户端一直递归接收消息,导致线程无限占用,后连接上的客户端就没有线程用于监听接受消息了。
接受消息方法如下:
/// <summary> /// 递归 同步接收消息 /// </summary> /// <returns></returns> public void ReceiveMessage() { WebSocket webSocket = HttpListenerWebSocketContext.WebSocket; if (webSocket.State != WebSocketState.Open) throw new Exception("Http未握手成功,不能接受消息!"); var byteBuffer = WebSocket.CreateServerBuffer(ReceiveBufferSize); WebSocketReceiveResult receiveResult = null; try { receiveResult = webSocket.ReceiveAsync(byteBuffer, cancellationToken).Result; } catch (WebSocketException ex) { if (ex.InnerException is HttpListenerException) { log.Error(ex); CloseAsync(WebSocketCloseStatus.ProtocolError, "客户端断开连接" + ex.Message).Wait(TimeSpan.FromSeconds(20)); return; } else { log.Error(ex); CloseAsync(WebSocketCloseStatus.ProtocolError, "WebSocket 连接异常" + ex.Message).Wait(TimeSpan.FromSeconds(20)); return; } } catch (Exception ex) { log.Error(ex); CloseAsync(WebSocketCloseStatus.ProtocolError, "客户端断开连接" + ex.Message).Wait(TimeSpan.FromSeconds(20)); return; } if (receiveResult.CloseStatus.HasValue) { log.Info("接受到关闭消息!"); CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription).Wait(TimeSpan.FromSeconds(20)); return; } byte[] bytes = new byte[receiveResult.Count]; Array.Copy(byteBuffer.Array, bytes, bytes.Length); string message = Encoding.GetString(bytes); log.Info($"{ID}接收到消息:{message}"); if (OnReceiveMessage !=