/* Sog 游戏基础库 2016 by zouwei */ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Net; using System.Net.Sockets; namespace Sog { public class UdpSession:NetSession { const int UDP_DGRAM_BUFFER_SIZE = 1500; const int UDP_BUFFER_SIZE = 1024 * 64; private bool m_recvAsyncPending; private bool m_sendAsyncPending; private object m_recvAsyncLocker; private object m_sendAsyncLocker; private SocketAsyncEventArgs m_recvAsyncEventArgs; private SocketAsyncEventArgs m_sendAsyncEventArgs; // private int m_dataLengthInSendQueue = 0; // udp数据报文的recv buffer private byte[] m_dgramBuffer; private int m_dgramLen = 0; private Kcp kcp; private ConcurrentQueue m_outputQueue; public UdpSession(Socket socket):base(socket) { InitSession(this.Output); } public UdpSession(Socket socket, Action output):base(socket) { InitSession(output); } private void InitSession(Action output) { kcp = new Kcp(0, output); m_recvBuffer = new byte[UDP_BUFFER_SIZE]; m_sendBuffer = new byte[UDP_BUFFER_SIZE]; m_dgramBuffer = new byte[UDP_DGRAM_BUFFER_SIZE]; m_outputQueue = new ConcurrentQueue(); InitRecvSendAsyncObject(); } private void InitRecvSendAsyncObject() { m_recvAsyncLocker = new object(); m_sendAsyncLocker = new object(); m_recvAsyncEventArgs = new SocketAsyncEventArgs(); m_recvAsyncEventArgs.Completed += OnRecvFromAsyncCallback; m_sendAsyncEventArgs = new SocketAsyncEventArgs(); m_sendAsyncEventArgs.Completed += OnSendToAsyncCallback; } protected override void OnBindSocket() { InitRecvSendAsyncObject(); m_remoteEndPoint = (IPEndPoint)m_socket.RemoteEndPoint; } public void SetRemoteEndPoint(EndPoint remoteEP) { m_remoteEndPoint = (IPEndPoint)remoteEP; } public void SetKcpConvId(uint id) { kcp.Conv = (uint)id; } public uint GetKcpConvId() { return kcp.Conv; } public override void CheckSocketAndSendKeepAlive() { //udp连接的心跳包由客户端来负责主动发起 //原因是底层kcp开启fast resend模式 //1、客户端已经断开了, } public ConcurrentQueue GetOutputQueue() { return m_outputQueue; } protected override void CloseSocket() { TraceLog.Trace("UdpSession.CloseSocket session {0} conv {1}", SessionID, kcp.Conv); //todo clear kcp data? m_socket = null; } public override void RecvReadableSocket() { if(m_socket == null) { return; } try { int iRecvLen = m_socket.ReceiveFrom(m_dgramBuffer, 0, m_dgramBuffer.Length, SocketFlags.None, ref m_remoteEndPoint); if(iRecvLen > 0) { if (WriteSendRecvLog) { TraceLog.Trace("UdpSession.RecvReadableSocket sessionid {0} recv from remote {1} length {2}" , SessionID, m_remoteEndPoint, iRecvLen); } m_dgramLen = iRecvLen; OnDataReceive(m_dgramBuffer, m_dgramLen); TryReadMessageFromBuffer(false, false); } } catch(SocketException socketex) { TraceLog.Debug("UdpSession.RecvReadableSocket socket session {0} receive error HResult {1} message {2}" , SessionID, socketex.HResult, socketex.Message); Close(); m_dgramLen = 0; } catch(Exception ex) { TraceLog.Error("UdpSession.RecvReadableSocket socket session {0} receive unkonw error, message {1}" , SessionID, ex.Message); Close(); m_dgramLen = 0; } } public override void WriteWriteableSocket() { if (m_socket == null) { return; } //清空 m_noWriteableCheckCount = 0; if (m_sendDataLeft == 0) { CopySendBuffer(); } if (m_sendDataLeft > 0) { // 消息转存到kcp内部的send queue,queue中每个packet长度不超过mss int ret = kcp.Send(m_sendBuffer, 0, m_sendDataLeft); if (ret == 0) { TotalSendLength += m_sendDataLeft; LastSendDataTimeSecond = AppTime.GetNowSysSecond(); if (WriteSendRecvLog) { TraceLog.Trace("UdpSession.WriteWriteableSocket sessionid {0} send remote {1} length {2} dataLengthInSendQueue {3}" , SessionID, m_remoteEndPoint, m_sendDataLeft, m_dataLengthInSendQueue); } m_sendDataLeft = 0; } else { if (WriteSendRecvLog) { TraceLog.Trace("UdpSession.WriteWriteableSocket kcp send fail, ret {0}", ret); } } } else { if (WriteSendRecvLog) { TraceLog.Trace("UdpSession.WriteWriteableSocket send remote no data"); } } } public override void StartRecvAsync() { TraceLog.Trace("UdpSession.StartRecvAsync socket session {0}", SessionID); if (m_socket == null) { return; } lock (m_recvAsyncLocker) { if (m_recvAsyncPending) { return; } m_recvAsyncPending = true; } DoRecvFromAsync(); } private void DoRecvFromAsync() { try { TraceLog.Trace("UdpSession.DoRecvFromAsync socket session {0} m_alreadyRecvLength {1}", SessionID, m_alreadyRecvLength); m_recvAsyncEventArgs.SetBuffer(m_dgramBuffer, 0, m_dgramBuffer.Length); bool pending = m_socket.ReceiveFromAsync(m_recvAsyncEventArgs); if (!pending) { OnRecvFromAsyncCallback(null, m_recvAsyncEventArgs); } } catch (SocketException socketex) { TraceLog.Debug("UdpSession.DoRecvFromAsync socket session {0} receive error HResult {1} message {2}" , SessionID, socketex.HResult, socketex.Message); Close(); lock (m_recvAsyncLocker) { m_recvAsyncPending = false; } } catch (Exception ex) { TraceLog.Error("UdpSession.DoRecvFromAsync socket session {0} receive unkonw error, message {1}" , SessionID, ex.Message); Close(); lock (m_recvAsyncLocker) { m_recvAsyncPending = false; } } } private void OnRecvFromAsyncCallback(object sender, SocketAsyncEventArgs args) { //错误 if (args.SocketError != SocketError.Success || args.BytesTransferred <= 0) { TraceLog.Debug("UdpSession.OnRecvFromAsyncCallback socket session {0} receive error {1} " , SessionID, args.SocketError); Close(); lock (m_recvAsyncLocker) { m_recvAsyncPending = false; } return; } m_dgramLen = args.BytesTransferred; if (WriteSendRecvLog) { TraceLog.Trace("UdpSession.OnRecvFromAsyncCallback sessionid {0} recv from remote {1} length {2}" , SessionID, m_remoteEndPoint, m_dgramLen); } OnDataReceive(m_dgramBuffer, m_dgramLen); //继续接收消息 DoRecvFromAsync(); } public void OnDataReceive(byte[] data, int length) { TraceLog.TraceDetail("UdpSession.OnDataReceive kcp conv {0} input data length {1}" , kcp.Conv, length); if (length >= Kcp.IKCP_OVERHEAD) { uint conv = 0; Kcp.ikcp_decode32u(data, 0, ref conv); // conv不一致,无效的信息,端口被重用了 if (conv != kcp.Conv) { TraceLog.Error("UdpSession.OnDataReceive recv conv {0} local conv {1} not equal, drop data" , conv, kcp.Conv); return; } } else { TraceLog.Error("UdpSession.OnDataReceive conv {0} invalid recv len {1}" , kcp.Conv, length); return; } kcp.Input(data, length); int count = 0; while (count < 10) { count++; int recvLength = kcp.Recv(m_recvBuffer, m_alreadyRecvLength, m_recvBuffer.Length - m_alreadyRecvLength); if (recvLength < 0) { break; } TotalRecvLength += recvLength; LastRecvDataTimeSecond = AppTime.GetNowSysSecond(); m_alreadyRecvLength += recvLength; if (WriteSendRecvLog) { TraceLog.Trace("UdpSession.OnDataReceive session {0} try recv remote {1}, realy recv length {2} left length {3}" , SessionID, m_remoteEndPoint, recvLength, m_alreadyRecvLength); } TryReadMessageFromBuffer(false, false); } } //对于服务器目前没用 public override void StartSendAsync() { if (m_socket == null) { return; } if (m_outputQueue.Count == 0) { return; } if (m_remoteEndPoint == null) { TraceLog.Trace("UdpSession.StartSendAsync session {0} remote ip is null", SessionID); return; } lock (m_sendAsyncLocker) { if (m_sendAsyncPending) { return; } m_sendAsyncPending = true; } DoSendToAsync(); } private void DoSendToAsync() { try { if (m_outputQueue.TryDequeue(out byte[] buffer)) { if (true) { uint conv = 0; byte cmd = 0; uint sn = 0; uint len = 0; Kcp.ikcp_decode32u(buffer, 0, ref conv); Kcp.ikcp_decode8u(buffer, 4, ref cmd); Kcp.ikcp_decode32u(buffer, 12, ref sn); Kcp.ikcp_decode32u(buffer, 20, ref len); if (cmd == 81) //data { TraceLog.TraceDetail("UdpSession.DoSendToAsync conv {0} sn {1} data len {2}" , conv, sn, len); } else if (cmd == 82) //ack { TraceLog.TraceDetail("UdpSession.DoSendToAsync conv {0} sn {1} ack" , conv, sn); } } m_sendAsyncEventArgs.RemoteEndPoint = m_remoteEndPoint; m_sendAsyncEventArgs.SetBuffer(buffer, 0, buffer.Length); if (! m_socket.SendToAsync(m_sendAsyncEventArgs)) { OnSendToAsyncCallback(null, m_sendAsyncEventArgs); } } else { lock (m_sendAsyncLocker) { m_sendAsyncPending = false; } } } catch (SocketException ex) { // 底层buffer不够 TraceLog.Error("UdpSession.DoSendToAsync socket session {0} send erro, errorcode {1} message {2}" , SessionID, ex.SocketErrorCode, ex.Message); Close(); lock (m_sendAsyncLocker) { m_sendAsyncPending = false; } } catch (Exception ex) { TraceLog.Error("UdpSession.DoSendToAsync socket session {0} send erro, message {1}", SessionID, ex.Message); Close(); lock (m_sendAsyncLocker) { m_sendAsyncPending = false; } } } private void OnSendToAsyncCallback(object sender, SocketAsyncEventArgs args) { int iSendBytes = args.BytesTransferred; if (args.SocketError != SocketError.Success || args.BytesTransferred <= 0) { TraceLog.Debug("UdpSession.OnSendToAsyncCallback socket session {0} error {1}" , SessionID, args.SocketError); Close(); lock (m_sendAsyncLocker) { m_sendAsyncPending = false; } return; } LastSendDataTimeSecond = AppTime.GetNowSysSecond(); if (WriteSendRecvLog) { TraceLog.Trace("UdpSession.OnSendToAsyncCallback session {0} try send remote {1} length {2} left length {3} dataLengthInSendQueue {4}" , SessionID, m_remoteEndPoint, iSendBytes, m_sendDataLeft, m_dataLengthInSendQueue); } DoSendToAsync(); } private void Output(byte[] data, int len) { byte[] packet = new byte[len]; Array.Copy(data, 0, packet, 0, len); m_outputQueue.Enqueue(packet); TraceLog.TraceDetail("UdpSession.Output session {0} output queue num {1}" , SessionID, m_outputQueue.Count); } public void Update(long timeMs) { if (kcp.State == 0) { kcp.Update((uint) timeMs); } if (kcp.State == -1 && IsSocketClosed == false) { TraceLog.Trace("UdpSession.Update session {0} conv {1} dead link, close session" , SessionID, kcp.Conv); Close(); } } } }