You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1053 lines
37 KiB

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
using System.Security.Cryptography;
using System.Xml;
using Fleck;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
namespace Sog
{
public class WebSession : NetSession
{
const int WEBSOCKET_BUFFER_SIZE = 1024 * 64;
private const string WebSocketResponseGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
// 证书
private X509Certificate2 m_certificate;
// 掩码
private byte[] m_maskBytes = new byte[4];
private const UInt32 KeepAliveInterval = 60000;
private const UInt32 RetryInterval = 10000;
private bool m_recvAsyncPending;
private bool m_sendAsyncPending;
private object m_recvAsyncLocker;
private object m_sendAsyncLocker;
private AsyncState m_recvAsyncState;
private AsyncState m_sendAsyncState;
private object m_authenticateLocker;
private bool m_authenticating;
// 子协议协商,目前没有
private IEnumerable<string> supportedSubProtocols;
private bool IsHandShake;
private object m_sendHandShakeLocker = new object();
private bool m_sendHandShakePending = false;
public bool IsSendingClose { private set; get; }
private object m_sendCloseFrameLocker = new object();
internal long LastCloseWebSocketSecond; // 避免太长时间无法关闭,设置一个超时
private string m_scheme;
private bool needShutdown; // 只是为了判断是否需要socket.shutdown
// 这个带有webSocket的控制帧头
private byte[] m_webRecvBuffer;
protected int m_webAlreadyRecvLength;
private Stream m_webStream;
public string clientIp;//ip
public WebSession(Socket socket, string scheme) : base(socket)
{
m_scheme = scheme;
supportedSubProtocols = new string[0];
m_recvBuffer = new byte[WEBSOCKET_BUFFER_SIZE];
m_sendBuffer = new byte[WEBSOCKET_BUFFER_SIZE];
m_webRecvBuffer = new byte[WEBSOCKET_BUFFER_SIZE];
IsHandShake = false;
IsSendingClose = false;
InitRecvSendAsyncObject();
m_remoteEndPoint = (IPEndPoint)m_socket.RemoteEndPoint;
m_minHeaderLength = m_protoPacker.GetHeaderLength();
m_headerBytes = new byte[m_protoPacker.GetFullHeaderLength()];
m_zeroLengthBytes = BuildZeroFrame();
// socket.Connected表示最近一次IO操作时的socket状态
// 这里Connected说明是server.accept成功时
if (m_socket.Connected)
{
needShutdown = true;
}
if (OSUtils.IsWindows())
{
SetKeepAlive(socket, KeepAliveInterval, RetryInterval);
}
m_webStream = new NetworkStream(m_socket);
}
private void SetKeepAlive(Socket socket, UInt32 keepAliveInterval, UInt32 retryInterval)
{
int size = sizeof(UInt32);
UInt32 on = 1;
byte[] inArray = new byte[size * 3];
Array.Copy(BitConverter.GetBytes(on), 0, inArray, 0, size);
Array.Copy(BitConverter.GetBytes(keepAliveInterval), 0, inArray, size, size);
Array.Copy(BitConverter.GetBytes(retryInterval), 0, inArray, size * 2, size);
socket.IOControl(IOControlCode.KeepAliveValues, inArray, null);
}
/// <summary>
/// 构造一个空包websocket帧
/// </summary>
/// <returns></returns>
private byte[] BuildZeroFrame()
{
byte[] zeroFrameData = new byte[10];
zeroFrameData[0] = (byte)((byte)FrameType.Binary + 128);
zeroFrameData[1] = (byte)8;
return zeroFrameData;
}
protected override void OnBindSocket()
{
m_remoteEndPoint = (IPEndPoint)m_socket.RemoteEndPoint;
needShutdown = true;
InitRecvSendAsyncObject();
}
private void InitRecvSendAsyncObject()
{
m_recvAsyncLocker = new object();
m_sendAsyncLocker = new object();
m_authenticateLocker = new object();
m_sendAsyncState = new AsyncState(0, 0, false);
m_recvAsyncState = new AsyncState(0, 0, false);
}
protected override void CloseSocket()
{
try
{
// 在调用Accept和Connect之前访问socket.RemoteEndPoint会抛出异常
TraceLog.Trace("WebSession.CloseSocket remote {0}", m_remoteEndPoint);
if (m_socket != null && needShutdown)
{
needShutdown = false;
m_socket.Shutdown(SocketShutdown.Both);
}
}
catch (SocketException ex)
{
TraceLog.Trace("WebSession.CloseSocket SocketException errcode {0} msg {1}"
, ex.ErrorCode, ex.Message);
}
catch (Exception ex)
{
TraceLog.Exception(ex);
}
finally
{
if (m_socket != null)
{
m_webStream.Close();
m_webStream.Dispose();
m_webStream = null;
m_socket.Close();
m_socket = null;
}
}
}
public override void CheckSocketAndSendKeepAlive()
{
if (m_socket == null)
{
return;
}
//发送0字节是没有效果的,这里发一个空包,这个功能增大网络流量,可以将时间设置大点
//需要客户端兼容
long span = AppTime.GetNowSysSecond() - LastSendDataTimeSecond;
if (span >= 60)
{
LastSendDataTimeSecond = AppTime.GetNowSysSecond();
try
{
if (m_webStream.CanWrite)
{
m_noWriteableCheckCount = 0;
//有数据不能发送,会破坏结构
if (m_sendDataLeft == 0)
{
m_webStream.Write(m_zeroLengthBytes, 0, m_zeroLengthBytes.Length);
}
}
else
{
CheckNoWriteableSocket();
}
}
catch (SocketException ex)
{
TraceLog.Error("WebSession.CheckSocketAndSendKeepAlive remote {0} SocketException err {1}"
, m_remoteEndPoint, ex.SocketErrorCode);
if (ex.SocketErrorCode != SocketError.WouldBlock)
{
Close();
}
}
catch (Exception ex)
{
TraceLog.Error("WebSession.CheckSocketAndSendKeepAlive remote {0} SocketException message {1}"
, m_remoteEndPoint, ex.Message);
Close();
}
}
}
private void CheckNoWriteableSocket()
{
if (m_socket == null)
{
return;
}
// 通过send来判断socket是否还有效,万一成功时m_sendDataLeft!=0,就把消息打断了
// 改为count++; 后续只要socket有一次成功的recv/send就把计数清0
m_noWriteableCheckCount++;
TraceLog.Trace("WebSession.CheckNoWriteableSocket socket no writeable, remote {0}, noWriteableCheckCount {1}"
, m_remoteEndPoint, m_noWriteableCheckCount);
LastSendDataTimeSecond = AppTime.GetNowSysSecond();
//如果连续多个5秒socket不可写,可以认为是断线了
if (m_noWriteableCheckCount >= 5)
{
TraceLog.Error("WebSession.CheckNoWriteableSocket socket no writeable, need close it remote {0}, noWriteableCheckCount {1} to much"
, m_remoteEndPoint, m_noWriteableCheckCount);
Close();
}
}
public override void RecvReadableSocket()
{
if (m_socket == null)
{
return;
}
try
{
int leftLen = m_webRecvBuffer.Length - m_webAlreadyRecvLength;
int iRecvLen = m_webStream.Read(m_webRecvBuffer, m_webAlreadyRecvLength, leftLen);
if (iRecvLen > 0)
{
TotalRecvLength += iRecvLen;
LastRecvDataTimeSecond = AppTime.GetNowSysSecond();
m_webAlreadyRecvLength += iRecvLen;
leftLen -= iRecvLen;
if (WriteSendRecvLog)
{
TraceLog.Trace("WebSession.RecvReadableSocket sessionid {0} try recv remote {1}, really recv length {2} free buffer length {3}"
, SessionID, m_remoteEndPoint, iRecvLen, leftLen);
}
TryReadWebSocketMessageFromBuffer(false, false);
}
}
catch (SocketException socketex)
{
TraceLog.Error("WebSession.RecvReadableSocket socket sessionid {0} receive error HResult {1} message {2}"
, SessionID, socketex.HResult, socketex.Message);
Close();
}
catch (Exception ex)
{
TraceLog.Error("WebSession.RecvReadableSocket socket sessionid {0} receive unkonw error, message {1}"
, SessionID, ex.Message);
Close();
}
}
public override void WriteWriteableSocket()
{
if (m_socket == null)
{
return;
}
m_noWriteableCheckCount = 0;
try
{
if (m_sendDataLeft == 0)
{
CopyWebSocketSendBuffer();
}
if (m_sendDataLeft > 0)
{
int requestSendLength = m_sendDataLeft;
m_webStream.Write(m_sendBuffer, m_sendDataPos, requestSendLength);
if (requestSendLength > 0)
{
TotalSendLength += requestSendLength;
LastSendDataTimeSecond = AppTime.GetNowSysSecond();
m_sendDataLeft -= requestSendLength;
if (m_sendDataLeft == 0)
{
m_sendDataPos = 0;
}
else
{
m_sendDataPos += requestSendLength;
}
OnSendCompleted();
}
if (WriteSendRecvLog)
{
TraceLog.Trace("WebSession.WriteWriteableSocket session {5} try send remote {0} length {1}, really send length {2} left length {3} dataLengthInSendQueue {4}"
, m_remoteEndPoint, requestSendLength, requestSendLength, m_sendDataLeft, m_dataLengthInSendQueue, SessionID);
}
}
}
catch (SocketException ex)
{
// 正常
if (ex.SocketErrorCode == SocketError.WouldBlock)
{
TraceLog.Trace("WebSession.WriteWriteableSocket session {0} send erro, errorcode WouldBlock"
, SessionID);
}
else
{
TraceLog.Error("WebSession.WriteWriteableSocket session {0} send erro, errorcode {1} message {2}"
, SessionID, ex.SocketErrorCode, ex.Message);
TraceLog.Exception(ex);
Close();
}
}
catch (Exception ex)
{
TraceLog.Error("WebSession.WriteWriteableSocket socket session {0} send erro, message {1}", SessionID, ex.Message);
TraceLog.Exception(ex);
Close();
}
}
public override void StartSendAsync()
{
if (m_socket == null)
{
return;
}
lock (m_authenticateLocker)
{
if (m_authenticating)
{
return;
}
}
lock (m_sendAsyncLocker)
{
if (m_sendAsyncPending)
{
return;
}
m_sendAsyncPending = true;
}
TraceLog.TraceDetail("WebSession.StartSendAsync socket session {0}", SessionID);
DoSendAsync();
}
private void DoSendAsync()
{
if (m_sendDataLeft == 0)
{
CopyWebSocketSendBuffer();
}
if (m_sendDataLeft > 0)
{
try
{
m_sendAsyncState.m_offset = m_sendDataPos;
m_sendAsyncState.m_count = m_sendDataLeft;
m_webStream.BeginWrite(m_sendBuffer, m_sendDataPos, m_sendDataLeft, OnSendAsyncCallback, m_sendAsyncState);
}
catch(Exception e)
{
TraceLog.Error("WebSession.DoSendAsync webSession {0} send error message {1}", SessionID, e);
Close();
lock (m_sendAsyncLocker)
{
m_sendAsyncPending = false;
}
}
}// keep alive
else if (!IsSendingClose && AppTime.GetNowSysSecond() - LastSendDataTimeSecond >= 60)
{
try
{
m_sendAsyncState.m_keepAlive = true;
TraceLog.Trace("WebSession.DoSendAsync socket session {0} keep alive, send empty message 000", SessionID);
LastSendDataTimeSecond = AppTime.GetNowSysSecond();
m_webStream.BeginWrite(m_zeroLengthBytes, 0, m_zeroLengthBytes.Length, OnSendAsyncCallback, m_sendAsyncState);
TraceLog.Trace("WebSession.DoSendAsync socket session {0} keep alive, send empty message 111", SessionID);
}
catch (Exception e){
TraceLog.Error("WebSession.DoSendAsync webSession {0} keep alive send error message {1}", SessionID, e);
Close();
lock (m_sendAsyncLocker)
{
m_sendAsyncPending = false;
}
}
}
else
{
lock (m_sendAsyncLocker)
{
m_sendAsyncPending = false;
}
}
}
/// <summary>
/// Send完成后需要调用一下,判断是否打开
/// </summary>
private void OnSendCompleted()
{
lock (m_sendHandShakeLocker)
{
if (m_sendHandShakePending)
{
m_sendHandShakePending = false;
IsHandShake = true;
OnAsyncModeWebsocketOpen();
return;
}
}
lock (m_sendCloseFrameLocker)
{
if (IsSendingClose)
{
IsSendingClose = false;
Close();
return;
}
}
}
/// <summary>
/// 异步send的CallBack
/// </summary>
/// <param name="sender"></param>
/// <param name="args"></param>
private void OnSendAsyncCallback(IAsyncResult ar)
{
try
{
m_webStream.EndWrite(ar);
var asyncState = ar.AsyncState as AsyncState;
if (asyncState.m_keepAlive)
{
// keep alive
if (WriteSendRecvLog)
{
TraceLog.TraceDetail("WebSession.OnSendAsyncCallback session {0} keepalive callback remote {1} left length {2} dataLengthInSendQueue {3}"
, SessionID, m_remoteEndPoint, m_sendDataLeft, m_dataLengthInSendQueue);
}
}
else
{
int iSendBytes = asyncState.m_count;
TotalSendLength += iSendBytes;
m_sendDataLeft -= iSendBytes;
if (m_sendDataLeft == 0)
{
m_sendDataPos = 0;
}
else
{
m_sendDataPos += iSendBytes;
}
if (WriteSendRecvLog)
{
TraceLog.TraceDetail("WebSession.OnSendAsyncCallback session {0} remote {1} send length {2} left length {3} dataLengthInSendQueue {4}"
, SessionID, m_remoteEndPoint, iSendBytes, m_sendDataLeft, m_dataLengthInSendQueue);
}
}
m_sendAsyncState.Clear();
}
catch (Exception ex)
{
TraceLog.Error("WebSession.OnSendAsyncCallback session {0} IsSocketClosed {1} SocketError {2} "
, SessionID, IsSocketClosed, ex);
Close();
lock (m_sendAsyncLocker)
{
m_sendAsyncPending = false;
}
return;
}
OnSendCompleted();
if (IsSocketClosed)
{
return;
}
DoSendAsync();
}
public event NetEventHandler AsyncModeWebsocketOpen;
private void OnAsyncModeWebsocketOpen()
{
SessionEventArgs e = new SessionEventArgs();
e.Session = this;
TraceLog.TraceDetail("WebSession.OnAsyncModeWebsocketOpen remote {0} session {1}"
, e.Session.RemoteEndPoint, e.Session.SessionID);
if (AsyncModeWebsocketOpen != null)
{
AsyncModeWebsocketOpen(null, e);
}
}
// 进行ssl验证
public void Authenticate(X509Certificate2 certificate)
{
lock (m_authenticateLocker)
{
if (m_authenticating)
{
return;
}
m_authenticating = true;
}
var sslStrem = new SslStream(m_webStream, false);
m_webStream = sslStrem;
Task authTask = sslStrem.AuthenticateAsServerAsync(certificate);
authTask.ContinueWith(AuthenticateCallBack);
}
private void AuthenticateCallBack(Task authTask)
{
lock(m_authenticateLocker)
{
m_authenticating = false;
}
if (authTask.IsFaulted)
{
TraceLog.Error($"{SessionID } Authenticate falid");
// 认证失败,直接关掉
Close();
}
else
{
StartRecvAsync();
}
}
public override void StartRecvAsync()
{
if (m_socket == null)
{
return;
}
lock (m_authenticateLocker)
{
if (m_authenticating)
{
return;
}
}
lock (m_recvAsyncLocker)
{
if (m_recvAsyncPending)
{
return;
}
m_recvAsyncPending = true;
}
TraceLog.TraceDetail("WebSession.StartRecvAsync socket session {0}", SessionID);
DoRecvAsync();
}
private void DoRecvAsync()
{
if (m_webStream == null)
{
return;
}
try
{
TraceLog.TraceDetail("WebSession.DoRecvAsync socket session {0}", SessionID);
m_recvAsyncState.m_offset = m_webAlreadyRecvLength;
m_webStream.BeginRead(m_webRecvBuffer, m_webAlreadyRecvLength, m_webRecvBuffer.Length - m_webAlreadyRecvLength, OnRecvAsyncCallback, m_recvAsyncState);
}
catch (SocketException socketex)
{
TraceLog.Error("WebSession.DoRecvAsync socket session {0} receive error HResult {1} message {2}"
, SessionID, socketex.HResult, socketex.Message);
Close();
lock (m_recvAsyncLocker)
{
m_recvAsyncPending = false;
}
}
catch (Exception ex)
{
TraceLog.Error("WebSession.DoRecvAsync socket session {0} receive unkonw error, message {1}"
, SessionID, ex.Message);
Close();
lock (m_recvAsyncLocker)
{
m_recvAsyncPending = false;
}
}
}
private void OnRecvAsyncCallback(IAsyncResult ar)
{
try
{
if (m_webStream == null)
{
return;
}
int iRecvLen = m_webStream.EndRead(ar);
if (iRecvLen > 0)
{
m_webAlreadyRecvLength += iRecvLen;
TotalRecvLength += iRecvLen;
LastRecvDataTimeSecond = AppTime.GetNowSysSecond();
if (WriteSendRecvLog)
{
TraceLog.TraceDetail("WebSession.OnRecvAsyncCallback session {0} remote {1} recv length {2} m_alreadyRecvLength {3}"
, SessionID, m_remoteEndPoint, iRecvLen, m_alreadyRecvLength);
}
TryReadWebSocketMessageFromBuffer(false, true);
//继续接收消息
DoRecvAsync();
}
else
{
TraceLog.Error("WebSession.OnRecvAsyncCallback m_webStream session {0} IsSocketClosed {1} recvLen:0 so close"
, SessionID, IsSocketClosed);
Close();
lock (m_recvAsyncLocker)
{
m_recvAsyncPending = false;
}
return;
}
}
catch (Exception ex)
{
//接收过程中关闭
TraceLog.Debug("WebSession.OnRecvAsyncCallback socket session {0} receive unkonw error, message {1}", SessionID, ex.Message);
Close();
lock (m_recvAsyncLocker)
{
m_recvAsyncPending = false;
}
}
}
/// <summary>
/// websocket有控制帧头,自己使用新的
/// </summary>
protected void CopyWebSocketSendBuffer()
{
//只有buffer的数据全部发送完毕后才行
if (m_sendDataLeft > 0)
{
return;
}
if (m_sendMessageAsyncQueue.Count > 0)
{
lock (m_sendMessageAsyncQueueLocker)
{
while (m_sendMessageAsyncQueue.Count > 0)
{
MessageData message = m_sendMessageAsyncQueue.Dequeue();
//调用这个,大消息需要拆分下
//线程安全,也不会有顺序问题
SendMessageToQueue(message);
}
}
}
while (m_sendMessageQueue.Count > 0)
{
MessageData message = m_sendMessageQueue.Dequeue();
m_dataLengthInSendQueue -= message.Header.Length;
int headerLength = m_protoPacker.PackHeader(message.Header, m_headerBytes);
// 这里加webSocket控制帧的头
PacketFrameDataHeadToSendBuffer(FrameType.Binary, headerLength + message.Buffer.Length);
Buffer.BlockCopy(m_headerBytes, 0, m_sendBuffer, m_sendDataLeft, headerLength);
m_sendDataLeft += headerLength;
Buffer.BlockCopy(message.Buffer.Data, 0, m_sendBuffer, m_sendDataLeft, message.Buffer.Length);
m_sendDataLeft += message.Buffer.Length;
message.FreeData();
//查看一下,下个消息长度能否copy成功,不行就跳出
if (m_sendMessageQueue.Count > 0)
{
message = m_sendMessageQueue.Peek();
if (m_sendDataLeft + message.Buffer.Length + 100 >= m_sendBuffer.Length)
{
break;
}
}
}
}
/// <summary>
/// 解包webSocket的控制帧头后,数据放到 m_RecvBuffer 中
/// </summary>
/// <param name="threadSafe"></param>
/// <param name="raiseAsyncModeEvent"></param>
protected void TryReadWebSocketMessageFromBuffer(bool threadSafe, bool raiseAsyncModeEvent)
{
if (!IsHandShake)
{
AnswerHandShake();
return;
}
// 这里解包并放到 m_RecvBuffer 中
UnPacketFrameDataToRecvBuffer();
// 解包控制帧可能会直接关闭socket
if (IsSocketClosed)
{
return;
}
TryReadMessageFromBuffer(threadSafe, raiseAsyncModeEvent);
}
private void AnswerHandShake()
{
byte[] data = new byte[m_webAlreadyRecvLength];
Buffer.BlockCopy(m_webRecvBuffer, 0, data, 0, m_webAlreadyRecvLength);
m_webAlreadyRecvLength = 0;
WebSocketHttpRequest request = RequestParser.Parse(data, m_scheme);
if (!IsValidWebSocketRequest(request))
{
Close();
return;
}
lock (m_sendHandShakeLocker)
{
if (m_sendHandShakePending)
{
return;
}
m_sendHandShakePending = true;
}
if (request.Headers.ContainsKey("X-Forwarded-For"))
{
this.clientIp = request.Headers["X-Forwarded-For"];
}
else
{
this.clientIp = RemoteEndPoint.ToString().Split(':')[0];
}
string subProtocol = SubProtocolNegotiator.Negotiate(supportedSubProtocols, request.SubProtocols);
var builder = new StringBuilder();
builder.Append("HTTP/1.1 101 Switching Protocols\r\n");
builder.Append("Upgrade: websocket\r\n");
builder.Append("Connection: Upgrade\r\n");
if (subProtocol != null)
builder.AppendFormat("Sec-WebSocket-Protocol: {0}\r\n", subProtocol);
var responseKey = CreateResponseKey(request["Sec-WebSocket-Key"]);
builder.AppendFormat("Sec-WebSocket-Accept: {0}\r\n", responseKey);
builder.Append("\r\n");
var handshake = Encoding.ASCII.GetBytes(builder.ToString());
// handshake 直接发送
Buffer.BlockCopy(handshake, 0, m_sendBuffer, m_sendDataLeft, handshake.Length);
m_sendDataLeft += handshake.Length;
StartSendAsync();
}
public static string CreateResponseKey(string requestKey)
{
var combined = requestKey + WebSocketResponseGuid;
var bytes = SHA1.Create().ComputeHash(Encoding.ASCII.GetBytes(combined));
return Convert.ToBase64String(bytes);
}
private bool IsValidWebSocketRequest(WebSocketHttpRequest request)
{
string version = HandlerFactory.GetVersion(request);
if (version == "7" || version == "8" || version == "13")
{
return true;
}
return false;
}
/// <summary>
/// 打包websocket数据帧到 m_sendBuffer
/// </summary>
/// <param name="frameType"></param>
/// <param name="payload"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
/// <param name="targetOffSet"></param>
/// <param name="targetBuffer"></param>
private void PacketFrameDataHeadToSendBuffer(FrameType frameType, int length)
{
m_sendBuffer[m_sendDataLeft++] = (byte)((byte)frameType + 128);
byte[] frameHeader;
// todo XD 需要注意大小字节序 这里可以改成 BitConverter.TryWriteBytes,参考 PackHeader
if (length > UInt16.MaxValue)
{
m_sendBuffer[m_sendDataLeft++] = (byte)127;
frameHeader = BitConverter.GetBytes((ulong)length);
if (BitConverter.IsLittleEndian)
{
Array.Reverse(frameHeader);
}
Buffer.BlockCopy(frameHeader, 0, m_sendBuffer, m_sendDataLeft, frameHeader.Length);
m_sendDataLeft += frameHeader.Length;
}
else if (length > 125)
{
m_sendBuffer[m_sendDataLeft++] = (byte)126;
frameHeader = BitConverter.GetBytes((ushort)length);
if (BitConverter.IsLittleEndian)
{
Array.Reverse(frameHeader);
}
Buffer.BlockCopy(frameHeader, 0, m_sendBuffer, m_sendDataLeft, frameHeader.Length);
m_sendDataLeft += frameHeader.Length;
}
else
{
m_sendBuffer[m_sendDataLeft++] = (byte)length;
}
}
public void CheckSendCloseFrameTimeOut()
{
if (LastCloseWebSocketSecond > 0)
{
//2秒都没有成功关闭,直接强关了
if (AppTime.GetNowSysSecond() - LastCloseWebSocketSecond > 2)
{
Close();
}
}
}
public void CloseWebSocket(int closeCode = WebSocketStatusCodes.NormalClosure)
{
// socket已关闭
if (IsSocketClosed)
{
return;
}
CheckSendCloseFrameTimeOut();
// checkTimeOut可能会关掉
if (IsSocketClosed)
{
return;
}
lock (m_sendCloseFrameLocker)
{
if (IsSendingClose)
{
return;
}
IsSendingClose = true;
LastCloseWebSocketSecond = AppTime.GetNowSysSecond();
}
byte[] closeBytes = BitConverter.GetBytes(closeCode);
PacketFrameDataHeadToSendBuffer(FrameType.Close, closeBytes.Length);
Buffer.BlockCopy(closeBytes, 0, m_sendBuffer, m_sendDataLeft, closeBytes.Length);
m_sendDataLeft += closeBytes.Length;
StartSendAsync();
}
/// <summary>
/// 从 m_webRecvBuffer 解控制帧头到 m_recvBuffer
/// 同时需要判断帧类型,将不符合的丢弃
/// </summary>
private void UnPacketFrameDataToRecvBuffer()
{
// websocket 单帧数据长度最小2
if (m_webAlreadyRecvLength < 2)
{
return;
}
int iBufferLeftLength = m_webAlreadyRecvLength;
int iBufferStartPos = 0;
while (iBufferLeftLength >= 2)
{
// 第128位判断是否最后一帧
bool isFinal = (m_webRecvBuffer[0] & 128) != 0;
// 保留位,此位在 spec5.2中必须为0
int reservedBits = (m_webRecvBuffer[0] & 112);
// 帧类型。
FrameType frameType = (FrameType)(m_webRecvBuffer[0] & 15);
// websocket协议约定,客户端→服务端的帧必须用掩码处理
bool isMasked = (m_webRecvBuffer[1] & 128) != 0;
// 长度控制位
int lengthCtrl = (m_webRecvBuffer[1] & 127);
if (!isMasked
|| reservedBits != 0 //Must be zero per spec 5.2
|| !Enum.IsDefined(typeof(FrameType), frameType)
)
{
Close();
return;
}
// 去掉控制帧头
iBufferStartPos += 2;
iBufferLeftLength -= 2;
// 获得数据长度
int realDataLength;
if (lengthCtrl == 127)
{
if (iBufferLeftLength < 8)
{
// 长度不够,数据未完整
return;
}
if (BitConverter.IsLittleEndian)
{
Array.Reverse(m_webRecvBuffer, iBufferStartPos, 8);
}
realDataLength = (int)BitConverter.ToUInt64(m_webRecvBuffer, iBufferStartPos);
iBufferLeftLength -= 8;
iBufferStartPos += 8;
}
else if (lengthCtrl == 126)
{
if (iBufferLeftLength < 2)
{
// 长度不够,数据未完整
return;
}
if (BitConverter.IsLittleEndian)
{
Array.Reverse(m_webRecvBuffer, iBufferStartPos, 2);
}
realDataLength = (int)BitConverter.ToUInt16(m_webRecvBuffer, iBufferStartPos);
iBufferLeftLength -= 2;
iBufferStartPos += 2;
}
else
{
realDataLength = lengthCtrl;
}
// 获得掩码
if (iBufferLeftLength < 4)
{
// 长度不够,数据未完整
return;
}
Buffer.BlockCopy(m_webRecvBuffer, iBufferStartPos, m_maskBytes, 0, 4);
iBufferLeftLength -= 4;
iBufferStartPos += 4;
if (iBufferLeftLength < realDataLength)
{
// 长度不够,数据未完整
return;
}
switch (frameType)
{
case FrameType.Binary:
case FrameType.Continuation:
// copy数据到m_recvBuffer
Buffer.BlockCopy(m_webRecvBuffer, iBufferStartPos, m_recvBuffer, m_alreadyRecvLength, realDataLength);
// 掩码处理数据
for (int i = 0, j = m_alreadyRecvLength; i < realDataLength; i++, j++)
{
m_recvBuffer[j] = (byte)(m_recvBuffer[j] ^ m_maskBytes[i % 4]);
}
m_alreadyRecvLength += realDataLength;
// 整体前移动
iBufferStartPos += realDataLength;
iBufferLeftLength -= realDataLength;
Buffer.BlockCopy(m_webRecvBuffer, iBufferStartPos, m_webRecvBuffer, 0, iBufferLeftLength);
iBufferStartPos = 0;
m_webAlreadyRecvLength = iBufferLeftLength;
break;
case FrameType.Ping:
case FrameType.Pong:
case FrameType.Text:
// 整体前移动
iBufferStartPos += realDataLength;
iBufferLeftLength -= realDataLength;
Buffer.BlockCopy(m_webRecvBuffer, iBufferStartPos, m_webRecvBuffer, 0, iBufferLeftLength);
iBufferStartPos = 0;
m_webAlreadyRecvLength = iBufferLeftLength;
break;
case FrameType.Close: // 收到关闭帧需要回个关闭帧然后关闭
CloseWebSocket();
break;
default:
// 按理说不该收到这些帧类型,打印一下关闭吧
TraceLog.Debug($"WebSesion recv other FrameType {frameType}");
CloseWebSocket(WebSocketStatusCodes.UnsupportedDataType);
break;
}
}
}
}
}