using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;
using Sog.Log;
namespace Sog.Service
{
public class StructPacketData
{
public uint RemoteApp { get; }
public StructPacket Packet { get; }
public StructPacketData(uint remoteApp, StructPacket packet)
{
RemoteApp = remoteApp;
Packet = packet;
}
}
///
/// 消息处理委托
///
///
///
public delegate void OnRequestPacketHandler(uint remoteApp, StructPacket packet);
///
/// 上层手动处理数据委托
///
public delegate void OnManualHandler(MessageTaskObj taskObj);
//tick
public delegate void OnIdleTick(long idleSecond);
///
/// 消息处理任务对象,缓存需要处理的消息,循环处理,可外部停止任务
///
public class MessageTaskObj
{
///
/// 任务索引
///
private int m_taskIndex;
private string m_logStatCountName;
private string m_logStatTotalCountName;
private long m_lastBeginIdleTime;
///
/// 缓存消息队列,先进的先处理
/// 需要线程安全,所以用ConcurrentQueue,这个比加锁效率高
///
public ConcurrentQueue m_threadSafePacketQueue;
//是否结束线程工作
private bool m_finished = false;
//使用Thread的时候的Thread对象
private Thread m_thread = null;
//是否直接使用Thread,还是使用Task呢
//Task的机制底层实现更加复杂,涉及到线程池的概念,其实这里的消息任务处理使用线程更加合适
//Task更加智能一点,对异步支持更好,我们反正也不会用到async机制,直接用线程简单安全高效
private bool m_bUseThread = true;
///
/// 总共处理了几个消息
///
public long TotalHandlerPacketCount { get; private set; }
///
/// 总共处理失败的消息
///
public long TotalHandlerFailedPacketCount { get; private set; }
public OnRequestPacketHandler Handler;
public OnManualHandler ManualHandler;
public OnIdleTick IdleTick;
//private long m_lastStatLogTime;
public ServerApp m_app = null;
public MessageTaskObj(int index, ServerApp app)
{
m_taskIndex = index;
m_app = app;
m_threadSafePacketQueue = new ConcurrentQueue();
m_logStatCountName = string.Format("Task[{0}]_HandlerMsg", index);
m_logStatTotalCountName = string.Format("Task[{0}]_TotalHandlerMsg", index);
}
//工作循环
private void DoTaskWork(object obj)
{
int iSleepMs = 1;
while(!m_finished)
{
if(m_threadSafePacketQueue.IsEmpty == false)
{
//有消息,sleep时间改成1毫秒
iSleepMs = 1;
m_lastBeginIdleTime = 0;
if (Handler != null)
{
StructPacketData packetData;
bool bDequeueSuccess = m_threadSafePacketQueue.TryDequeue(out packetData);
if (bDequeueSuccess && Handler != null)
{
Add1LogHandlePacketCount();
try
{
Handler(packetData.RemoteApp, packetData.Packet);
}
catch (Exception ex)
{
TraceLog.Error("MessageTaskObj handler packet msgid {0} throw exception!", packetData.Packet.MsgID);
TraceLog.Exception(ex);
TotalHandlerFailedPacketCount++;
if (m_app != null)
{
m_app.Alerter.AlertException(ex);
}
}
}
}
if(ManualHandler != null)
{
try
{
ManualHandler(this);
}
catch (Exception ex)
{
TraceLog.Error("MessageTaskObj ManualHandler throw exception!");
TraceLog.Exception(ex);
if (m_app != null)
{
m_app.Alerter.AlertException(ex);
}
}
}
}
else
{
if (iSleepMs < 10)
{
iSleepMs++;
}
//没有消息sleep,最多 5-10 毫秒
Thread.Sleep(iSleepMs);
if (m_lastBeginIdleTime == 0)
{
m_lastBeginIdleTime = AppTime.GetNowSysSecond();
}
if (IdleTick != null)
{
try
{
IdleTick(AppTime.GetNowSysSecond() - m_lastBeginIdleTime);
}
catch (Exception ex)
{
TraceLog.Error("MessageTaskObj IdleTick throw exception!");
TraceLog.Exception(ex);
if (m_app != null)
{
m_app.Alerter.AlertException(ex);
}
}
}
}
}
}
public void Add1LogHandlePacketCount()
{
TotalHandlerPacketCount++;
ServerStat.Instance.AddValue(m_logStatCountName, 1);
//这条统计永不清空
ServerStat.Instance.SetValue(m_logStatTotalCountName, TotalHandlerPacketCount, false);
}
///
/// 开始任务
///
public void Start()
{
m_finished = false;
var parStart = new ParameterizedThreadStart(DoTaskWork);
m_thread = new Thread(parStart, 1024 * 1024 * 10);
//设置成后台线程,如果不是后台线程,线程不是主动关闭,则会一直运行,那怕进程主线程退出也没用
m_thread.IsBackground = true;
m_thread.Start();
}
///
/// 销毁,停止任务,停止前处理完所有缓存的消息
///
public void Close()
{
//等待所有消息处理完毕,注意这个过程中不能再AddRequestPacket
while (m_threadSafePacketQueue.IsEmpty == false)
{
Thread.Sleep(1);
}
m_finished = true;
//等待线程退出
while (m_thread.ThreadState != ThreadState.Stopped)
{
Thread.Sleep(10);
}
Handler = null;
ManualHandler = null;
m_threadSafePacketQueue = null;
m_app = null;
}
///
/// 添加一个消息到处理队列
///
///
///
public void AddRequestPacket(uint remoteApp, StructPacket packet)
{
m_threadSafePacketQueue.Enqueue(new StructPacketData(remoteApp, packet));
}
}
}