You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

509 lines
15 KiB

/*
Sog 游戏基础库
2016 by zouwei
*/
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace SogClient
{
public class UdpSession:NetSession
{
const int UDP_DGRAM_BUFFER_SIZE = 1500;
const int UDP_BUFFER_SIZE = 1024 * 64;
private bool m_recvAsyncPending;
private bool m_sendAsyncPending;
private object m_recvAsyncLocker;
private object m_sendAsyncLocker;
private SocketAsyncEventArgs m_recvAsyncEventArgs;
private SocketAsyncEventArgs m_sendAsyncEventArgs;
// private int m_dataLengthInSendQueue = 0;
// udp数据报文的recv buffer
private byte[] m_dgramBuffer;
private int m_dgramLen = 0;
private Kcp kcp;
private ConcurrentQueue<byte[]> m_outputQueue;
public UdpSession(Socket socket):base(socket)
{
InitSession(this.Output);
}
public UdpSession(Socket socket, Action<byte[], int> output):base(socket)
{
InitSession(output);
}
private void InitSession(Action<byte[], int> output)
{
kcp = new Kcp(0, output);
m_recvBuffer = new byte[UDP_BUFFER_SIZE];
m_sendBuffer = new byte[UDP_BUFFER_SIZE];
m_dgramBuffer = new byte[UDP_DGRAM_BUFFER_SIZE];
m_outputQueue = new ConcurrentQueue<byte[]>();
InitRecvSendAsyncObject();
}
private void InitRecvSendAsyncObject()
{
m_recvAsyncLocker = new object();
m_sendAsyncLocker = new object();
m_recvAsyncEventArgs = new SocketAsyncEventArgs();
m_recvAsyncEventArgs.Completed += OnRecvFromAsyncCallback;
m_sendAsyncEventArgs = new SocketAsyncEventArgs();
m_sendAsyncEventArgs.Completed += OnSendToAsyncCallback;
}
protected override void OnBindSocket()
{
InitRecvSendAsyncObject();
m_remoteEndPoint = (IPEndPoint)m_socket.RemoteEndPoint;
}
public void SetRemoteEndPoint(EndPoint remoteEP)
{
m_remoteEndPoint = (IPEndPoint)remoteEP;
}
public void SetKcpConvId(uint id)
{
kcp.Conv = id;
}
public uint GetKcpConvId()
{
return kcp.Conv;
}
public void SetKcpName(string name)
{
kcp.Name = name;
}
public string GetKcpName()
{
return kcp.Name;
}
public override void CheckSocketAndSendKeepAlive()
{
}
public ConcurrentQueue<byte[]> GetOutputQueue()
{
return m_outputQueue;
}
protected override void CloseSocket()
{
TraceLog.Trace("UdpSession.CloseSocket {0} conv {1}", kcp.Name, kcp.Conv);
//todo clear kcp data?
m_socket = null;
}
public override void RecvReadableSocket()
{
if(m_socket == null)
{
return;
}
try
{
int iRecvLen = m_socket.ReceiveFrom(m_dgramBuffer, 0, m_dgramBuffer.Length, SocketFlags.None, ref m_remoteEndPoint);
if(iRecvLen > 0)
{
if (WriteSendRecvLog)
{
TraceLog.Trace("UdpSession.RecvReadableSocket {0} conv {1} remote {2} length {3}"
, kcp.Name, kcp.Conv, m_remoteEndPoint, iRecvLen);
}
m_dgramLen = iRecvLen;
OnDataReceive(m_dgramBuffer, m_dgramLen);
TryReadMessageFromBuffer(false, false);
}
}
catch(SocketException socketex)
{
TraceLog.Trace("UdpSession.RecvReadableSocket {0} conv {1} SocketException errcode {2} message {3}"
, kcp.Name, kcp.Conv, socketex.ErrorCode, socketex.Message);
Close();
m_dgramLen = 0;
}
catch(Exception ex)
{
TraceLog.Error("UdpSession.RecvReadableSocket {0} conv {1} receive unkonw error, message {2}"
, kcp.Name, kcp.Conv, ex.Message);
Close();
m_dgramLen = 0;
}
}
public override void WriteWriteableSocket()
{
if (m_socket == null)
{
return;
}
//清空
m_noWriteableCheckCount = 0;
if (m_sendDataLeft == 0)
{
CopySendBuffer();
}
if (m_sendDataLeft > 0)
{
// 消息转存到kcp内部的send queue,queue中每个packet长度不超过mss
int ret = kcp.Send(m_sendBuffer, 0, m_sendDataLeft);
if (ret == 0)
{
TotalSendLength += m_sendDataLeft;
// LastSendDataTimeSecond = AppTime.GetNowSysSecond();
if (WriteSendRecvLog)
{
TraceLog.Trace("UdpSession.WriteWriteableSocket {0} conv {1} send remote {2} length {3} dataLengthInSendQueue {4}"
, kcp.Name, kcp.Conv, m_remoteEndPoint, m_sendDataLeft, m_dataLengthInSendQueue);
}
m_sendDataLeft = 0;
}
else
{
if (WriteSendRecvLog)
{
TraceLog.Trace("UdpSession.WriteWriteableSocket kcp send fail, ret {0}", ret);
}
}
}
else
{
if (WriteSendRecvLog)
{
TraceLog.Trace("UdpSession.WriteWriteableSocket send remote no data");
}
}
}
public override void StartRecvAsync()
{
TraceLog.Trace("UdpSession.StartRecvAsync {0} conv {1}", kcp.Name, kcp.Conv);
if (m_socket == null)
{
return;
}
lock (m_recvAsyncLocker)
{
if (m_recvAsyncPending)
{
return;
}
m_recvAsyncPending = true;
}
DoRecvFromAsync();
}
private void DoRecvFromAsync()
{
try
{
TraceLog.Trace("UdpSession.DoRecvFromAsync {0} conv {1} m_alreadyRecvLength {2}", kcp.Name, kcp.Conv, m_alreadyRecvLength);
m_recvAsyncEventArgs.SetBuffer(m_dgramBuffer, 0, m_dgramBuffer.Length);
bool pending = m_socket.ReceiveFromAsync(m_recvAsyncEventArgs);
if (!pending)
{
OnRecvFromAsyncCallback(null, m_recvAsyncEventArgs);
}
}
catch (SocketException socketex)
{
TraceLog.Trace("UdpSession.DoRecvFromAsync {0} conv {1} SocketException, errcode {2} message {3}"
, kcp.Name, kcp.Conv, socketex.ErrorCode, socketex.Message);
Close();
lock (m_recvAsyncLocker)
{
m_recvAsyncPending = false;
}
}
catch (Exception ex)
{
TraceLog.Error("UdpSession.DoRecvFromAsync {0} conv {1} error message {2}"
, kcp.Name, kcp.Conv, ex.Message);
Close();
lock (m_recvAsyncLocker)
{
m_recvAsyncPending = false;
}
}
}
private void OnRecvFromAsyncCallback(object sender, SocketAsyncEventArgs args)
{
//错误
if (args.SocketError != SocketError.Success || args.BytesTransferred <= 0)
{
TraceLog.Trace("UdpSession.OnRecvFromAsyncCallback {0} conv {1} receive error {2}"
, kcp.Name, kcp.Conv, args.SocketError);
Close();
lock (m_recvAsyncLocker)
{
m_recvAsyncPending = false;
}
return;
}
m_dgramLen = args.BytesTransferred;
if (WriteSendRecvLog)
{
TraceLog.Trace("UdpSession.OnRecvFromAsyncCallback {0} conv {1} remote {2} length {3}"
, kcp.Name, kcp.Conv, m_remoteEndPoint, m_dgramLen);
}
OnDataReceive(m_dgramBuffer, m_dgramLen);
//继续接收消息
DoRecvFromAsync();
}
public void OnDataReceive(byte[] data, int length)
{
TraceLog.TraceDetail("UdpSession.OnDataReceive kcp conv {0} input data length {1}"
, kcp.Conv, length);
if (length >= Kcp.IKCP_OVERHEAD)
{
uint conv = 0;
Kcp.ikcp_decode32u(data, 0, ref conv);
// conv不一致,无效的信息,端口被重用了
if (conv != kcp.Conv)
{
TraceLog.Error("UdpSession.OnDataReceive recv conv {0} local conv {1} not equal, drop data"
, conv, kcp.Conv);
return;
}
}
else
{
TraceLog.Error("UdpSession.OnDataReceive conv {0} invalid recv len {1}"
, kcp.Conv, length);
return;
}
kcp.Input(data, length);
int iRecvLen = kcp.Recv(m_recvBuffer, m_alreadyRecvLength, m_recvBuffer.Length - m_alreadyRecvLength);
if (iRecvLen > 0)
{
TotalRecvLength += iRecvLen;
// LastRecvDataTimeSecond = AppTime.GetNowSysSecond();
m_alreadyRecvLength += iRecvLen;
if (WriteSendRecvLog)
{
TraceLog.Trace("UdpSession.OnDataReceive {0} conv {1} remote {2} realy recv length {3} left length {4}"
, kcp.Name, kcp.Conv, m_remoteEndPoint, iRecvLen, m_alreadyRecvLength);
}
TryReadMessageFromBuffer(false, false);
}
}
public override void StartSendAsync()
{
if (m_socket == null)
{
return;
}
if (m_outputQueue.Count == 0)
{
return;
}
if (m_remoteEndPoint == null)
{
TraceLog.Trace("UdpSession.StartSendAsync {0} conv {1} remote ip is null", kcp.Name, kcp.Conv);
return;
}
lock (m_sendAsyncLocker)
{
if (m_sendAsyncPending)
{
return;
}
m_sendAsyncPending = true;
}
DoSendToAsync();
}
private void DoSendToAsync()
{
try
{
if (m_outputQueue.TryDequeue(out byte[] buffer))
{
if (true)
{
uint conv = 0;
byte cmd = 0;
uint sn = 0;
uint len = 0;
Kcp.ikcp_decode32u(buffer, 0, ref conv);
Kcp.ikcp_decode8u(buffer, 4, ref cmd);
Kcp.ikcp_decode32u(buffer, 12, ref sn);
Kcp.ikcp_decode32u(buffer, 20, ref len);
if (cmd == 81) //data
{
TraceLog.TraceDetail("UdpSession.DoSendToAsync {0} conv {1} sn {2} data len {3}"
, kcp.Name, conv, sn, len);
}
else if (cmd == 82) //ack
{
TraceLog.TraceDetail("UdpSession.DoSendToAsync {0} conv {1} sn {2} ack"
, kcp.Name, conv, sn);
}
}
m_sendAsyncEventArgs.RemoteEndPoint = m_socket.RemoteEndPoint;
m_sendAsyncEventArgs.SetBuffer(buffer, 0, buffer.Length);
if (! m_socket.SendToAsync(m_sendAsyncEventArgs))
{
OnSendToAsyncCallback(null, m_sendAsyncEventArgs);
}
}
else
{
lock (m_sendAsyncLocker)
{
m_sendAsyncPending = false;
}
}
}
catch (SocketException ex)
{
// 底层buffer不够
TraceLog.Error("UdpSession.DoSendToAsync {0} conv {1} send erro, errorcode {2} message {3}"
, kcp.Name, kcp.Conv, ex.SocketErrorCode, ex.Message);
Close();
lock (m_sendAsyncLocker)
{
m_sendAsyncPending = false;
}
}
catch (Exception ex)
{
TraceLog.Error("UdpSession.DoSendToAsync {0} conv {1} send erro, message {2}"
, kcp.Name, kcp.Conv, ex.Message);
Close();
lock (m_sendAsyncLocker)
{
m_sendAsyncPending = false;
}
}
}
private void OnSendToAsyncCallback(object sender, SocketAsyncEventArgs args)
{
TraceLog.Trace("UdpSession.OnSendToAsyncCallback {0} conv {1}", kcp.Name, kcp.Conv);
int iSendBytes = args.BytesTransferred;
if (args.SocketError != SocketError.Success || args.BytesTransferred <= 0)
{
TraceLog.Trace("UdpSession.OnSendToAsyncCallback {0} conv {1} error {2}"
, kcp.Name, kcp.Conv, args.SocketError);
Close();
lock (m_sendAsyncLocker)
{
m_sendAsyncPending = false;
}
return;
}
// LastSendDataTimeSecond = AppTime.GetNowSysSecond();
if (WriteSendRecvLog)
{
TraceLog.Trace("UdpSession.OnSendToAsyncCallback {0} conv {1} try send remote {2} length {3} left length {4} dataLengthInSendQueue {5}"
, kcp.Name, kcp.Conv, m_remoteEndPoint, iSendBytes, m_sendDataLeft, m_dataLengthInSendQueue);
}
DoSendToAsync();
}
private void Output(byte[] data, int len)
{
byte[] packet = new byte[len];
Array.Copy(data, 0, packet, 0, len);
m_outputQueue.Enqueue(packet);
}
public void Update(long timeMs)
{
if (kcp.State == 0)
{
kcp.Update((uint) timeMs);
}
if (kcp.State == -1 && IsSocketClosed == false)
{
TraceLog.Trace("UdpSession.Update {0} conv {1} dead link, close session"
, kcp.Name, kcp.Conv);
Close();
}
}
}
}