/* Sog 游戏基础库 2016 by zouwei */ using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using System.Net; using System.Net.Sockets; namespace Sog { public class ClusterChannel { public const int BufferSize = 655360; public uint LocalAppID { get; private set; } public uint RemoteAppID { get; private set; } public string LocalAppIDStr { get; private set; } public string RemoteAppIDStr { get; private set; } //我是不是客户端方,是的话就要主动发起连接请求,不是的话则等待对方主动连接我 public bool IsClient { get; private set; } //是否已经开始连接 public bool IsStartConnecting { get; private set; } public NetSession ConnectedSession { get; private set; } private ClientSessionSettings m_clientSessionSettings; private ClientSession m_clientSession; private long m_disConnectTime = 0; private bool m_bNeedReconnect = false; private long m_iSendMessageCount = 0; //测试断线重连逻辑,用来主动断线用,应该关闭 private bool m_testReconnect = false; private Queue m_recvivedMessage = new Queue(); private object m_recvlocker = new object(); private object m_sendlocker = new object(); //cluster网络处理模式 private SessionNetSelectMode NetSelectMode; //缺省不支持多线程send,这个只在同步模式起作用 public bool MultiThreadSendSafe = false; //我连他人 public delegate void OnIToItConnectOrDisConnectHandler(uint serverID, bool isConnect); public OnIToItConnectOrDisConnectHandler OnIToItConnectOrDisConnect; public ClusterChannel(uint localAppID,uint remoteAppID,bool isClinet, SessionNetSelectMode netMode) { LocalAppID = localAppID; RemoteAppID = remoteAppID; IsClient = isClinet; IsStartConnecting = false; LocalAppIDStr = ServerIDUtils.IDToString(localAppID); RemoteAppIDStr = ServerIDUtils.IDToString(remoteAppID); NetSelectMode = netMode; } public void StartConnect(ClusterApp thisApp,ClusterApp remoteApp) { IPEndPoint remoteIPEndPoint = ClusterApp.GetRemoteIpEndPoint(thisApp, remoteApp); TraceLog.Trace("ClusterChannel.StartConnect remote {0}, my idc {1} remote idc {2} remote listenAnyIP {3}", remoteIPEndPoint, thisApp.idc, remoteApp.idc, remoteApp.listenAnyIP); IsStartConnecting = true; if (m_clientSessionSettings == null) { m_clientSessionSettings = new ClientSessionSettings(remoteApp.BufferSize, remoteIPEndPoint, ProtocolType.Tcp); } if (m_clientSession == null) { m_clientSession = new TcpClientSession(m_clientSessionSettings, NetSelectMode); m_clientSession.Connected += socket_OnConnected; m_clientSession.ConnectFail += socket_OnConnectFail; m_clientSession.Disconnected += socket_OnDisconnected; m_clientSession.DataReceived += OnRecvDataFromSocket; } m_clientSession.Connect(); } //关闭连接和清空资源 public void CloseConnect() { if (m_clientSession != null) { m_clientSession.Dispose(); m_clientSessionSettings = null; m_clientSession = null; m_recvivedMessage.Clear(); } } private void socket_OnConnected(object clientSocket,SessionEventArgs e) { TraceLog.Trace("ClusterChannel.socket_OnConnected remote {0}", m_clientSessionSettings.RemoteEndPoint); //设置缓冲大小 m_clientSession.NetSessionObj.SetSocketBufferSize(BufferSize); ConnectedSession = m_clientSession.NetSessionObj; ConnectedSession.WriteSendRecvLog = true; MessageData message = new MessageData(); message.Header.Length = 0; message.Header.Type = (int)SpecialMessageType.ClusterClientRegister; message.Header.ObjectID = 0; message.Header.ServerID = LocalAppID; m_clientSession.DirectSendNoQueue(message); if(NetSelectMode == SessionNetSelectMode.Asynchronous) { ConnectedSession.StartRecvAsync(); ConnectedSession.StartSendAsync(); } OnIToItConnectOrDisConnect?.Invoke(RemoteAppID, true); } private void socket_OnConnectFail(object clientSocket, SessionEventArgs e) { m_bNeedReconnect = true; m_disConnectTime = AppTime.GetNowSysMs(); TraceLog.Trace("ClusterChannel.socket_OnConnectFail at MsTime {0}", m_disConnectTime); OnIToItConnectOrDisConnect?.Invoke(RemoteAppID, false); } private void socket_OnDisconnected(object clientSocket, SessionEventArgs e) { //重连 ConnectedSession = null; m_bNeedReconnect = true; m_disConnectTime = AppTime.GetNowSysMs(); TraceLog.Trace("ClusterChannel.socket_OnDisconnected at MsTime {0}", m_disConnectTime); OnIToItConnectOrDisConnect?.Invoke(RemoteAppID, false); } public void OnRecvDataFromSocket(object clientSocket, SessionEventArgs e) { if (NetSelectMode == SessionNetSelectMode.Synchronous) { m_recvivedMessage.Enqueue(e.Message); } else { lock (m_recvlocker) { m_recvivedMessage.Enqueue(e.Message); } } } public void BindSocket(NetSession socket) { //原来的存在 if(ConnectedSession != null && ConnectedSession.IsSocketClosed == false) { TraceLog.Trace("ClusterChannel.BindSocket close exist session {0}", ConnectedSession.SessionID); ConnectedSession.Close(); } TraceLog.Trace("ClusterChannel.BindSocket new session {0}", socket.SessionID); ConnectedSession = socket; } public void UnBindSocket() { ConnectedSession = null; } public void Update(long nowMs) { if (m_clientSession != null) { m_clientSession.Update(nowMs); //重连 if (m_bNeedReconnect) { //一秒最多重连一次,linux下太快 if (AppTime.GetNowSysMs() - m_disConnectTime >= 2000) { TraceLog.Debug("ClusterChannel.Update NowMsTime {0} disConnectTime {1}", nowMs, m_disConnectTime); //这个m_bNeedReconnect要放在Reconnect之前,因为有可能connect立即返回结果或者结果非常快回调,快到这里reconnect没结束, //调用socket_OnConnectFail,这时m_bNeedReconnect变成true后在Reconnect结束后又变成false会出错,不会再重连了 m_bNeedReconnect = false; m_clientSession.Reconnect(); //以下是测试代码 //System.Threading.Thread.Sleep(500); //m_bNeedReconnect = false; //TraceLog.Debug("ClusterChannel.Update Reconnect call finish,wait long time, now {0}",DateTime.Now); } } } } public void Send(MessageData message) { if (ConnectedSession == null) { TraceLog.Debug("ClusterChannel.Send no ConnectedSession type {0}", message.Header.Type); return; } if (NetSelectMode == SessionNetSelectMode.Synchronous) { if(MultiThreadSendSafe) { lock(m_sendlocker) { ConnectedSession.SendMessageToAsyncQueue(message); m_iSendMessageCount++; } } else { ConnectedSession.SendMessageToQueue(message); m_iSendMessageCount++; } //测试断线重连逻辑,主动断线 if (m_testReconnect) { if (m_clientSession != null && IsClient && (m_iSendMessageCount % 1000) == 0) { TraceLog.Debug("ClusterChannel.Send m_iSendMessageCount {0} close socket", m_iSendMessageCount); m_clientSession.Close(); ConnectedSession = null; m_bNeedReconnect = true; m_disConnectTime = AppTime.GetNowSysMs(); } } } else if(NetSelectMode == SessionNetSelectMode.Asynchronous) { ConnectedSession.StartSendAsyncMessage(message); m_iSendMessageCount++; } } public MessageData RecvOneFromQueue() { if (m_recvivedMessage.Count <= 0) { return null; } if (NetSelectMode == SessionNetSelectMode.Synchronous) { MessageData message = m_recvivedMessage.Dequeue(); return message; } else { lock (m_recvlocker) { MessageData message = m_recvivedMessage.Dequeue(); return message; } } } } }