You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

704 lines
23 KiB

/*
Sog 游戏基础库
2016 by zouwei
*/
using System;
using System.Collections.Generic;
using System.Net;
using Sog;
using Sog.Log;
using Sog.ClusterCfg;
namespace Sog
{
public class Cluster
{
ClusterConfig m_clusterSettings;
ClusterApp m_clusterApp;
public string ClusterName
{
get { return m_clusterApp.Name; }
}
// 包含self和有通信关系的app
private Dictionary<uint, ClusterApp> m_appDict;
//uint是对端的appID地址
private Dictionary<uint, ClusterChannel> m_channelDict;
private uint m_appID = 0;
private SessionListener m_sessionListener;
// 同时保护m_waitRegSockets和m_regChannels
private object m_regSessionLocker;
//连接临时存放,注册后将移到m_registedSocket
private Dictionary<long, NetSession> m_waitRegSockets;
private Dictionary<long, ClusterChannel> m_regChannels;
//是否log消息状态日志,缺省关闭
public bool NeedLogMsgStat;
//cluster网络处理模式
private SessionNetSelectMode NetSelectMode;
private ServerApp m_serverApp;
public Cluster(ServerApp app, uint appID)
{
m_appID = appID;
m_appDict = new Dictionary<uint, ClusterApp>();
m_channelDict = new Dictionary<uint, ClusterChannel>();
m_regSessionLocker = new object();
m_waitRegSockets = new Dictionary<long, NetSession>();
m_regChannels = new Dictionary<long, ClusterChannel>();
NeedLogMsgStat = false;
// 默认异步模式通信
NetSelectMode = SessionNetSelectMode.Asynchronous;
m_serverApp = app;
}
public string GetAppConfigPath()
{
return m_clusterApp.CfgPath;
}
public string GetAppParamByKey(string key)
{
if(m_clusterApp.ParamsMap.ContainsKey(key))
{
return m_clusterApp.ParamsMap[key];
}
return string.Empty;
}
public string GetAppConfigFile()
{
return m_clusterApp.Cfgfile;
}
public int InitAppByConfig(string strSettingsFile)
{
try
{
m_clusterSettings = JsonConfig.parseFileLitJson<ClusterConfig>(strSettingsFile);
//Console.WriteLine("Cluster.InitAppByConfig parse json file {0} !", strSettingsFile);
ClusterApp.ParseWithMyAppId(m_clusterSettings, m_appDict, m_appID);
m_clusterApp = m_appDict[m_appID];
// 默认异步通信, 可以配置同步模式
NetSelectMode = GetAppParamByKey("netmode") == "1" ?
SessionNetSelectMode.Synchronous : SessionNetSelectMode.Asynchronous;
}
catch (Exception ex)
{
Console.WriteLine("Cluster.InitAppByConfig Exception: {0}", ex.Message);
Console.WriteLine("Cluster.InitAppByConfig StackTrace: {0}", ex.StackTrace);
throw ex;
}
return 0;
}
public int InitAllChannel()
{
try
{
foreach (var app in m_appDict.Values)
{
TraceLog.Trace("Cluster.InitAllChannel app {0} isClientCluster {1} {2}", app.ServerID, app.IsClientCluster, m_appID);
if (app.ServerID != m_appID)
{
AddClusterChannel(m_appID, app.ServerID, !app.IsClientCluster);
}
}
}
catch (Exception ex)
{
TraceLog.Exception(ex);
// re throw
throw ex;
}
return 0;
}
// gate等服务器重新init
public int ReInitAllChannel()
{
m_channelDict.Clear();
InitAllChannel();
return 0;
}
public void SetNetMode(int netMode)
{
if (netMode == 0)
{
return;
}
NetSelectMode = netMode == 1 ?
SessionNetSelectMode.Synchronous : SessionNetSelectMode.Asynchronous;
}
//设置是否在同步模式下支持多线程send,dbserver等使用
public void EnableMultiThreadSendSafe()
{
try
{
foreach (var channel in m_channelDict.Values)
{
channel.MultiThreadSendSafe = true;
}
}
catch (Exception ex)
{
TraceLog.Exception(ex);
// re throw
throw ex;
}
}
private string GetClusterAppIpPortString(ClusterApp thisApp, ClusterApp remoteApp)
{
return ClusterApp.GetRemoteIpEndPoint(thisApp, remoteApp).ToString();
}
private bool IsRemoteAppNetChange(ClusterApp thisApp, ClusterApp oldRemoteApp, ClusterApp nowRemoteApp)
{
var oldIp = GetClusterAppIpPortString(thisApp, oldRemoteApp);
var nowIp = GetClusterAppIpPortString(thisApp, nowRemoteApp);
if (oldIp.Equals(nowIp))
{
return false;
}
TraceLog.Trace("Cluster.isRemoteAppNetChange remote clusterapp ip change, old serverid: {0}, old: {1}, now serverid: {2}, now:{3}",
oldRemoteApp.ServerID, oldIp, nowRemoteApp.ServerID, nowIp);
return true;
}
private void DealRemoteAppNetChange(Dictionary<uint, ClusterApp> oldAppDcit, Dictionary<uint, ClusterApp> newAppDcit)
{
Dictionary<uint, ClusterApp> delAppDict = new Dictionary<uint, ClusterApp>();
Dictionary<uint, ClusterApp> addAppDict = new Dictionary<uint, ClusterApp>();
foreach (var newPair in newAppDcit)
{
ClusterApp oldApp;
if (m_appDict.TryGetValue(newPair.Key, out oldApp))
{
if (IsRemoteAppNetChange(m_clusterApp, oldApp, newPair.Value))
{
delAppDict.Add(newPair.Key, oldApp);
addAppDict.Add(newPair.Key, newPair.Value);
}
}
else
{
addAppDict.Add(newPair.Key, newPair.Value);
}
}
foreach (var oldPair in oldAppDcit)
{
if (newAppDcit.ContainsKey(oldPair.Key) == false)
{
delAppDict.Add(oldPair.Key, oldPair.Value);
}
}
foreach (var delPair in delAppDict)
{
m_appDict.Remove(delPair.Key);
if (delPair.Value.ServerID != m_appID)
{
DelClusterChannel(m_appID, delPair.Value.ServerID, !delPair.Value.IsClientCluster);
}
}
foreach (var addPair in addAppDict)
{
m_appDict.Add(addPair.Key, addPair.Value);
if (addPair.Value.ServerID != m_appID)
{
AddClusterChannel(m_appID, addPair.Value.ServerID, !addPair.Value.IsClientCluster);
}
}
// 开始网络连接
StartConnectAllClientChannel();
}
public void ReloadClusterAppByConfig(string strSettingsFile)
{
try
{
//读配置
ClusterConfig reloadClusterSettings = JsonConfig.parseFileLitJson<ClusterConfig>(strSettingsFile);
Dictionary<uint, ClusterApp> newAppDcit = new Dictionary<uint, ClusterApp>();
//解析
ClusterApp.ParseWithMyAppId(reloadClusterSettings, newAppDcit, m_appID);
m_clusterSettings = reloadClusterSettings;
// 处理连接变化
DealRemoteAppNetChange(m_appDict, newAppDcit);
}
catch (Exception ex)
{
TraceLog.Exception(ex);
}
return ;
}
private void StartConnectAllClientChannel()
{
foreach(var channelpair in m_channelDict)
{
var channel = channelpair.Value;
if(channel.IsClient && channel.IsStartConnecting == false)
{
ClusterApp remoteApp = m_appDict[channelpair.Key];
channel.StartConnect(m_clusterApp, remoteApp);
}
}
}
private void AddClusterChannel(uint localAppID, uint remoteAppID, bool iamClient)
{
ClusterChannel channel = new ClusterChannel(localAppID, remoteAppID, iamClient, NetSelectMode);
TraceLog.Trace("Cluster.AddClusterChannel cluster add channel local {0} remote {1} iamClient {2} NetSelectMode {3}",
ServerIDUtils.IDToString(localAppID),
ServerIDUtils.IDToString(remoteAppID),
iamClient,
NetSelectMode);
m_channelDict.Add(remoteAppID, channel);
}
private void DelClusterChannel(uint localAppID, uint remoteAppID, bool iamClient)
{
TraceLog.Trace("Cluster.DelClusterChannel cluster del channel local {0} remote {1} iamClient {2} NetSelectMode {3}",
ServerIDUtils.IDToString(localAppID),
ServerIDUtils.IDToString(remoteAppID),
iamClient,
NetSelectMode);
ClusterChannel channel;
if (!m_channelDict.TryGetValue(remoteAppID, out channel))
{
TraceLog.Error("Cluster.DelClusterChannel cluster try close not exists channel.");
return;
}
m_channelDict.Remove(remoteAppID);
channel.CloseConnect();
}
public void Start()
{
TraceLog.Trace("Cluster.Start appID {0}", m_appID);
//if self is cluster,start listen
ClusterApp myApp;
m_appDict.TryGetValue(m_appID, out myApp);
if (myApp != null)
{
SessionSettings socketSetting = new SessionSettings(1000, 10, 10, myApp.BufferSize, myApp.InnerIPPort);
m_sessionListener = new SessionListener(m_serverApp,socketSetting,"ClusterListener",NetSelectMode);
m_sessionListener.DataReceived += socketListener_DataReceived;
m_sessionListener.Connected += socketListener_OnConnectCompleted;
m_sessionListener.Disconnected += socketListener_Disconnected;
TraceLog.Trace("Cluster.Start appID {0} start listen {1}", m_appID, myApp.InnerIPPort);
m_sessionListener.StartListen(myApp.listenAnyIP == 1);
}
StartConnectAllClientChannel();
}
public void Close()
{
if(m_sessionListener != null)
{
m_sessionListener.Close();
}
}
private void socketListener_OnConnectCompleted(object sender, SessionEventArgs e)
{
try
{
e.Session.SetSocketBufferSize(ClusterChannel.BufferSize);
lock (m_regSessionLocker)
{
m_waitRegSockets.Add(e.Session.SessionID, e.Session);
}
e.Session.WriteSendRecvLog = true;
TraceLog.Trace("Cluster.socketListener_OnConnectCompleted sessionId {0}", e.Session.SessionID.ToString());
}
catch (Exception err)
{
TraceLog.Exception(err);
}
}
private void socketListener_Disconnected(object sender, SessionEventArgs e)
{
try
{
long sessionID = e.Session.SessionID;
TraceLog.Trace("Cluster.socketListener_Disconnected sessionId {0}", sessionID);
ClusterChannel channel;
lock (m_regSessionLocker)
{
m_waitRegSockets.Remove(sessionID);
if (m_regChannels.TryGetValue(sessionID, out channel))
{
m_regChannels.Remove(sessionID);
}
}
// 临时解决bug: 连续2次connect, 第1次在update中close时会将第2次的session unbindsocket
if (channel != null && channel.ConnectedSession != null)
{
if (channel.ConnectedSession.SessionID == sessionID)
{
TraceLog.Trace("Cluster.socketListener_Disconnected channel sessionId {0} remote app {1}"
, sessionID, ServerIDUtils.IDToString(channel.RemoteAppID));
channel.UnBindSocket();
}
else
{
TraceLog.Error("Cluster.socketListener_Disconnected channel sessionId {0} not equal trigger sessionId {1}"
, channel.ConnectedSession.SessionID, sessionID);
}
}
}
catch (Exception err)
{
TraceLog.Exception(err);
}
}
private void socketListener_DataReceived(object sender, SessionEventArgs e)
{
try
{
long sessionID = e.Session.SessionID;
NetSession netSession;
lock (m_regSessionLocker)
{
m_waitRegSockets.TryGetValue(sessionID, out netSession);
}
if (netSession != null)
{
remoteApp_register(netSession, e.Message);
}
else
{
ClusterChannel channel;
lock (m_regSessionLocker)
{
m_regChannels.TryGetValue(sessionID, out channel);
}
if (channel != null)
{
channel.OnRecvDataFromSocket(sender, e);
}
else//收到了不是合法channel的session的消息
{
TraceLog.Error("Cluster.socketListener_DataReceived remote {0} is not registed channel, close it", e.Session.RemoteEndPoint);
e.Session.Close();
}
}
}
catch (Exception ex)
{
TraceLog.Error("Cluster.socketListener_DataReceived exception, remote {0}", e.Session.RemoteEndPoint);
TraceLog.Exception(ex);
}
}
private void remoteApp_register(NetSession netSession, MessageData message)
{
uint remoteAppID = message.Header.ServerID;
ClusterChannel channel;
m_channelDict.TryGetValue(remoteAppID, out channel);
if (channel == null)
{
TraceLog.Error("Cluster.remoteApp_register invalid socket hashcode {0} can not register, remoteAppID {1}"
, netSession.SessionID.ToString(), ServerIDUtils.IDToString(remoteAppID));
on_remoteApp_register_error(netSession);
return;
}
if (m_appDict.TryGetValue(remoteAppID, out var clusterApp))
{
var remoteIpaddr = ((IPEndPoint) netSession.RemoteEndPoint).Address;
if (clusterApp.InnerIPPort.Address.ToString() == remoteIpaddr.ToString()
|| (clusterApp.ExternalIPPort != null && clusterApp.ExternalIPPort.Address.ToString() ==
remoteIpaddr.ToString()))
{
}
else
{
TraceLog.Error("Cluster.remoteApp_register invalid socket hashcode {0} can not register, remoteAppID {1} remoteIp {2}"
, netSession.SessionID.ToString(), ServerIDUtils.IDToString(remoteAppID), remoteIpaddr);
on_remoteApp_register_error(netSession);
return;
}
}
else
{
TraceLog.Error("Cluster.remoteApp_register invalid socket hashcode {0} can not register, remoteAppID {1} no clusterApp"
, netSession.SessionID.ToString(), ServerIDUtils.IDToString(remoteAppID));
on_remoteApp_register_error(netSession);
return;
}
TraceLog.Trace("Cluster.remoteApp_register session {0} register success remoteAppID {1}"
, netSession.SessionID, ServerIDUtils.IDToString(remoteAppID));
channel.BindSocket(netSession);
lock (m_regSessionLocker)
{
m_waitRegSockets.Remove(netSession.SessionID);
if (! m_regChannels.ContainsKey(netSession.SessionID))
{
m_regChannels.Add(netSession.SessionID, channel);
}
}
}
private void on_remoteApp_register_error(NetSession netSession)
{
TraceLog.Trace("Cluster.on_remoteApp_register_error socket hashcode {0}, remove it and close session", netSession.SessionID.ToString());
lock (m_regSessionLocker)
{
m_waitRegSockets.Remove(netSession.SessionID);
}
netSession.Close();
}
//处理发送和接收
public int Update(OnClusterMessageHandler handler, int maxCount, long nowMs)
{
if (m_sessionListener == null)
{
return 0;
}
m_sessionListener.UpdateAccept();
m_sessionListener.UpdateAcceptedSessions(nowMs);
foreach (KeyValuePair<uint, ClusterChannel> pair in m_channelDict)
{
// update clientsession if not null
pair.Value.Update(nowMs);
}
return TryRecvMessageFromQueue(handler, maxCount);
}
public void Send(uint remoteAppID, MessageData message)
{
//有存在超过nM大小的消息包吗
if(message.Buffer.Data != null && message.Buffer.Length >= BigDataMessage.BigDataMessageFullMaxLength)
{
TraceLog.Error("Cluster.Send msg to appid {0}, msgId {1} length {2} too long! drop it"
, ServerIDUtils.IDToString(remoteAppID), message.Header.Type, message.Buffer.Length);
return;
}
ClusterChannel channel;
m_channelDict.TryGetValue(remoteAppID, out channel);
if (channel == null)
{
TraceLog.Error("Cluster.Send invalid remote appid {0} MsgType {1}"
, ServerIDUtils.IDToString(remoteAppID), message.Header.Type);
return;
}
channel.Send(message);
//统计消息
if (this.NeedLogMsgStat)
{
ServerStat.Instance.OnNetSend(message.Header.Type, message.Buffer.Length + 16);
}
}
//广播给某类服务器,比如account发给所有world,account发给所有lobby,game发给所有gate
public void Broadcast(int serverType, MessageData message)
{
foreach(var channel in m_channelDict)
{
if(ServerIDUtils.GetServerType(channel.Key) == serverType)
{
// 广播消息要clone
channel.Value.Send(message.Clone());
//统计消息
if (this.NeedLogMsgStat)
{
ServerStat.Instance.OnNetSend(message.Header.Type, message.Buffer.Length + 16);
}
}
}
message.FreeData();
}
private int TryRecvMessageFromQueue(OnClusterMessageHandler handler, int maxCount)
{
int iRecvCount = 0;
MessageData message;
foreach (KeyValuePair<uint, ClusterChannel> pair in m_channelDict)
{
message = pair.Value.RecvOneFromQueue();
while (message != null)
{
TraceLog.Trace("Cluster.TryRecvMessageFromQueue server {0} message length {1} type {2}",
ServerIDUtils.IDToString(pair.Key),
message.Header.Length,
message.Header.Type);
try
{
var sTime = AppTime.GetNowSysMs();
handler(pair.Key, message);
var eTime = AppTime.GetNowSysMs();
//统计消息
if (this.NeedLogMsgStat)
{
ServerStat.Instance.OnNetRecv(message.Header.Type, message.Buffer.Length + 16, eTime - sTime);
}
}
finally
{
//收到的网络消息包在这里释放
message.FreeData();
}
iRecvCount++;
//接收数量控制,免得在负载极高的情况下一直出不来
if (iRecvCount >= maxCount)
{
return iRecvCount;
}
//next message
message = pair.Value.RecvOneFromQueue();
}
}
return iRecvCount;
}
//实现远程调用,不支持返回,返回让问题复杂化
public void RemoteCall(uint remoteAppID)
{
}
public uint[] GetRemoteAppID(int serverType)
{
//一般来说不会超过10个
List<uint> serverList = new List<uint>(10);
foreach (uint remoteAppID in m_channelDict.Keys)
{
if (ServerIDUtils.GetServerType(remoteAppID) == serverType)
{
serverList.Add(remoteAppID);
}
}
return serverList.ToArray();
}
public int GetChannelCount(int serverType)
{
int iCount = 0 ;
foreach (uint remoteAppID in m_channelDict.Keys)
{
if(ServerIDUtils.GetServerType(remoteAppID) == serverType)
{
iCount++;
}
}
return iCount;
}
public int GetChannelCountInWorld(uint worldId, int serverType)
{
int iCount = 0;
foreach (uint remoteAppID in m_channelDict.Keys)
{
if (ServerIDUtils.GetWorldID(remoteAppID) == worldId && ServerIDUtils.GetServerType(remoteAppID) == serverType)
{
iCount++;
}
}
return iCount;
}
private void GetTotalSocketSendRecvLength(out long totalSend, out long totalRecv)
{
totalSend = 0;
totalRecv = 0;
foreach (KeyValuePair<uint, ClusterChannel> pair in m_channelDict)
{
if( pair.Value.ConnectedSession != null)
{
totalSend += pair.Value.ConnectedSession.TotalSendLength;
totalRecv += pair.Value.ConnectedSession.TotalRecvLength;
}
}
}
}
}