You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

303 lines
9.9 KiB

using System;
using System.Linq;
using System.Text;
using System.Collections.Generic;
using System.Net;
using Sog;
using Sog.Log;
using Sog.IO;
using Google.Protobuf.WellKnownTypes;
using Google.Protobuf;
namespace SMCenter
{
public class SMCenterNet : Singleton<SMCenterNet>
{
public SessionListener m_socketListener;
public SessionSettings m_socketSetting;
// sessionId -> ClientInfo
public Dictionary<long, ClientInfo> m_clientsDict;
public List<long> m_needCloseClients;
private MsgHandler m_handler = new MsgHandler();
private long m_lastCheckTimeoutTime;
public void Start(ServerApp app)
{
SMProtoRegister.Instance.RegisterPacket();
m_clientsDict = new Dictionary<long, ClientInfo>();
m_needCloseClients = new List<long>();
m_socketSetting = new SessionSettings(10, 10, 10, 128000, SMCenterUtils.Config.listenPort);
m_socketListener = new SessionListener(app, m_socketSetting, "SMCenterListen", SessionNetSelectMode.Synchronous);
TraceLog.Debug("SMCenterNet.Start");
//注册消息处理
m_socketListener.DataReceived += socketLintener_DataReceived;
m_socketListener.Connected += socketLintener_OnConnectCompleted;
m_socketListener.Disconnected += socketLintener_Disconnected;
m_socketListener.StartListen();
}
private void socketLintener_OnConnectCompleted(object sender, SessionEventArgs e)
{
try
{
long sessionID = Sog.Service.GateSessionID.GenNextID(SMCenterUtils.GetAppID());
e.Session.SetSocketBufferSize(128000);
e.Session.ResetSessionID(sessionID);
ClientInfo info = new ClientInfo(SMCenterUtils.GetTimeSecond(), e.Session);
m_clientsDict.Add(sessionID, info);
string ip = "0.0.0.0";
if (e.Session.RemoteEndPoint != null)
{
try
{
ip = e.Session.RemoteEndPoint.ToString().Split(':')[0];
}
catch (Exception)
{
}
}
TraceLog.Trace("session {0} ip {1} conneted", sessionID, ip);
}
catch (Exception err)
{
TraceLog.Error("ConnectCompleted error:{0}", err);
}
}
private void socketLintener_Disconnected(object sender, SessionEventArgs e)
{
try
{
long sessionID = e.Session.SessionID;
TraceLog.Trace("session {0} disconneted ", sessionID);
AddToNeedCloseClient(e.Session.SessionID);
}
catch (Exception err)
{
TraceLog.Error("Disconnected error:{0}", err);
}
}
private void socketLintener_DataReceived(object sender, SessionEventArgs e)
{
try
{
long sessionID = e.Session.SessionID;
TraceLog.Trace("session {0} recv data len {1} ", sessionID, e.Message.Header.Length);
ClientInfo client;
m_clientsDict.TryGetValue(sessionID, out client);
if (client == null)
{
TraceLog.Trace("socketLintener_DataReceived session {0} ,no GateClientInfo", sessionID);
return;
}
MessageData message = e.Message;
RequestPacket packet;
bool result = ProtoPackerFactory.Instance.GetProtoBufPacker().UnpackMessage(message, out packet);
if (result)
{
Google.Protobuf.IMessage pbMessage = (Google.Protobuf.IMessage) (packet.Message);
//这个包太大,不能打印
if (message.Header.Type == (int) SMMsgID.FileContentRes)
{
TraceLog.Trace("recv from {0} msgId {1} length {2} message: {3}"
, client.HostName, packet.MsgID, message.Header.Length
, packet.Message.GetType().Name);
}
else
{
TraceLog.Trace("recv from {0} msgId {1} length {2} message: {3} -> {4}"
, client.HostName, packet.MsgID, message.Header.Length
, packet.Message.GetType().Name, pbMessage.ToString());
}
}
m_handler.HandlerMsg(client, packet);
}
catch (Exception ex)
{
TraceLog.Error("Received to Host:{0} error:{1}", e.Session.RemoteEndPoint, ex);
}
}
public void Update(long tMs)
{
m_socketListener.UpdateAccept();
m_socketListener.UpdateAcceptedSessions(tMs);
CheckTimeoutClient();
CloseNeedCloseClients();
ServerStat.Instance.SetValue("SocketSessionMapCount", m_socketListener.GetClientSocketCount());
}
private void CheckTimeoutClient()
{
long now = SMCenterUtils.GetTimeSecond();
// 3秒检查一次,10超时
if (now - m_lastCheckTimeoutTime < 3)
{
return;
}
m_lastCheckTimeoutTime = now;
foreach (KeyValuePair<long, ClientInfo> pair in m_clientsDict)
{
long sessionID = pair.Key;
ClientInfo info = pair.Value;
if ((info.ClientType == EClientType.None || string.IsNullOrEmpty(info.HostName))
&& now - info.ConnectedTime > 20)
{
TraceLog.Error("session {0} wait register timeout, add to close list", sessionID);
AddToNeedCloseClient(info.SessionID);
}
else if(info.LastPingTime > 0 && now - info.LastPingTime > 60)
{
TraceLog.Error("session {0}, name {1} ping timeout, add to close list", sessionID, info.HostName);
AddToNeedCloseClient(info.SessionID);
}
}
}
private void CloseNeedCloseClients()
{
foreach (long sessionID in m_needCloseClients)
{
ClientInfo info;
m_clientsDict.TryGetValue(sessionID, out info);
if (info != null)
{
if (info.Session.WorkSocket != null)
{
m_socketListener.ForceCloseNetSession(info.Session);
}
m_clientsDict.Remove(sessionID);
info.Session.Close();
info.m_disableServerID.Clear();
}
}
m_needCloseClients.Clear();
}
public void AddToNeedCloseClient(long sessionID)
{
m_needCloseClients.Add(sessionID);
}
public void SendMsg(ClientInfo client, SMMsgID msgID,IMessage pbMessage,uint httpid=0)
{
if(client.Session.IsSocketClosed)
{
TraceLog.Error("SendMsg msgId {0} failed, session {1}, name {2} socket is closed"
, msgID, client.SessionID, client.HostName);
return;
}
RequestPacket packet = new RequestPacket((int)msgID, 0, 0);
packet.Message = pbMessage;
MessageData message = new MessageData();
message.Header.Type = (int)msgID;
message.Header.Length = 0;
message.Header.ObjectID = 0;
message.Header.ServerID = httpid;
bool bRet = ProtoPackerFactory.Instance.GetProtoBufPacker().PackMessage(packet, ref message);
if (bRet == false)
{
TraceLog.Error("PackMessage error");
return;
}
//这个包太大,不打了
if (msgID == SMMsgID.FileContentRes)
{
TraceLog.Trace("send to {0} msgId {1} length {2} message: {3}"
, client.HostName, (int)msgID, message.Header.Length, packet.Message.GetType().Name);
}
else
{
TraceLog.Trace("send to {0} msgId {1} length {2} message: {3} -> {4}"
, client.HostName, (int)msgID, message.Header.Length
, packet.Message.GetType().Name, pbMessage.ToString());
}
client.Session.SendMessageToQueue(message);
}
public void SendMsg(long sessionId, SMMsgID msgID, IMessage pbMessage,uint httpid=0)
{
ClientInfo info;
m_clientsDict.TryGetValue(sessionId, out info);
if (info == null)
{
TraceLog.Trace("SendMsg session {0} ,no GateClientInfo", sessionId);
return;
}
SendMsg(info, msgID, pbMessage,httpid);
}
// 只针对当前已注册成功的agent
public void BroadcastToAllAgent(SMMsgID msgID, IMessage pbMessage)
{
foreach(var client in m_clientsDict.Values)
{
if (client.ClientType == EClientType.Agent)
{
SendMsg(client, msgID, pbMessage);
}
}
}
public ClientInfo GetClientInfoByName(string hostname)
{
foreach(var client in m_clientsDict)
{
if(client.Value.HostName == hostname)
{
return client.Value;
}
}
return null;
}
public List<ClientInfo> GetAllAgent()
{
return m_clientsDict.Values.Where(p => p.ClientType == EClientType.Agent).ToList();
}
}
}