using System; using System.Collections.Generic; using Aliyun.Api.LOG; using Aliyun.Api.LOG.Common.Utilities; using Aliyun.Api.LOG.Data; using Aliyun.Api.LOG.Request; using Sog; using ProtoCSStruct; using Sog.Service; namespace BillLog { //定义这个是为了减小内存占用,http上报太慢的时候,大量的 SSBDCLogReq结构体太占用内存 public class CBdcReq { public string LogMsg; public string Event_uuid; public int EventId; } public class MessageTaskHandler { private LogClient logClient; private PutLogsRequest putLogsReq; private string appKey; private DateTime lastSendLogTime; private Queue bdcReqQueue = new Queue (); public static readonly uint DefaultTaskCount = 10; public static uint TaskCount = DefaultTaskCount; public static MessageTaskHandler[] m_allTaskHandler; public static void InitAllTaskHandler() { var config = BillLogServerUtils.GetServerConfig().bdcCfg; if (config.TaskThreadCount > 0) { TaskCount = (uint)config.TaskThreadCount; } else { TaskCount = DefaultTaskCount; } TraceLog.Trace("BillLogServer.InitAllTaskHandler task count {0}", TaskCount); MessageTaskDistributor.Instance.InitTask(TaskCount); TraceLog.Trace("BillLogServer.InitAllTaskHandler Bdcplatform {0}", config.Bdcplatform); m_allTaskHandler = new MessageTaskHandler[TaskCount]; for (int i = 0; i < m_allTaskHandler.Length; i++) { m_allTaskHandler[i] = new MessageTaskHandler(); m_allTaskHandler[i].appKey = config.appKey; m_allTaskHandler[i].lastSendLogTime = DateTime.Now; var logClient = new LogClient(config.endPoint, config.accesskeyID, config.accessKeySecret); logClient.ConnectionTimeout = 10000; logClient.ReadWriteTimeout = 10000; m_allTaskHandler[i].logClient = logClient; var putLogReq = new PutLogsRequest(); putLogReq.Project = config.logProject; putLogReq.Logstore = config.logStoreServer; putLogReq.Topic = ""; putLogReq.LogItems = new List(); m_allTaskHandler[i].putLogsReq = putLogReq; MessageTaskObj taskObj = MessageTaskDistributor.Instance.GetTaskByIndex(i); taskObj.ManualHandler = m_allTaskHandler[i].ManualHandle; } MessageTaskDistributor.Instance.StartAllTask(); } // todo 停服或reload时等待日志写完 public static void DisposeAllHandler() { MessageTaskDistributor.Instance.CloseAllTask(); if (m_allTaskHandler == null) { return; } for (int i = 0; i < m_allTaskHandler.Length; i++) { m_allTaskHandler[i].logClient = null; m_allTaskHandler[i].putLogsReq = null; } m_allTaskHandler = null; } private void CopyReqFromtaskObj(MessageTaskObj taskObj) { while (taskObj.m_threadSafePacketQueue.IsEmpty == false) { StructPacketData packetData; bool bDequeueSuccess = taskObj.m_threadSafePacketQueue.TryDequeue(out packetData); if (bDequeueSuccess && packetData.Packet.MsgID == (int)SSGameMsgID.BdcLogReq) { ref SSBDCLogReq req = ref packetData.Packet.GetMessage(); CBdcReq creq = new CBdcReq(); creq.LogMsg = req.LogMsg.GetString(); creq.Event_uuid = req.Event_uuid.GetString(); creq.EventId = req.EventId; bdcReqQueue.Enqueue(creq); } } } public void ManualHandle(MessageTaskObj taskObj) { CopyReqFromtaskObj(taskObj); //没有消息 if (bdcReqQueue.Count == 0) { return; } var logsMaxNum = BillLogServerUtils.GetServerConfig().bdcCfg.logsMaxNum == 0 ? 10 : BillLogServerUtils.GetServerConfig().bdcCfg.logsMaxNum; long waitTimeMsWhenNotEnoughLog = BillLogServerUtils.GetServerConfig().bdcCfg.waitTimeMsWhenNotEnoughLog; //时间判断不能取AppTime,AppTime依赖主线程Tick,hotfix的时候,多线程任务有特殊性,特殊性是主线程会一直等待工作线程停止,导致AppTime时间不走 //主线程和工作线程互相等,导致hotfix不成功,程序会死掉 long timeMsPass = (long)(DateTime.Now - lastSendLogTime).TotalMilliseconds; //消息不足logsMaxNum的时候需要等待一段时间,免得http调用过于频繁 //假设一次http调用耗时300毫秒,如果每个线程1秒3个日志,如果没有这个等待机制,那合并消息的逻辑基本就不会实际触发,还是每次一个 //http调用太频繁的结果是机器的timewait会比较高,这个bdc的日志上报目前使用的是短连接 //ConcurrentQueue.Count 会加锁遍历,性能非常低,所以一定要注意不能频繁调用 if (bdcReqQueue.Count < logsMaxNum && timeMsPass < waitTimeMsWhenNotEnoughLog) { System.Threading.Thread.Sleep(10); return; } lastSendLogTime = DateTime.Now; List list = new List(); while (bdcReqQueue.Count > 0) { CBdcReq req = bdcReqQueue.Dequeue(); list.Add(req); taskObj.Add1LogHandlePacketCount(); if (list.Count >= logsMaxNum) { OnBDCLogsReq(list); list.Clear(); //一次进来最多合批报一个 break; } } if(list.Count > 0) { OnBDCLogsReq(list); } } public void OnBDCLogsReq(List list) { putLogsReq.LogItems.Clear(); for (int i = 0; i < list.Count; i++) { CBdcReq req = list[i]; string logMsg = req.LogMsg; var pairs = logMsg.Split('|'); if (pairs == null || pairs.Length == 0) { continue; } var logItem = new LogItem(); logItem.Time = DateUtils.TimeSpan(); foreach (string str in pairs) { var kv = str.Split("="); if (kv.Length == 2) { string value = kv[1]; if (string.IsNullOrEmpty(value)) { value = "0"; } logItem.PushBack(kv[0], value); } } if (logItem.Contents.Count > 0) { // 接口通用参数 logItem.PushBack("event_uuid", req.Event_uuid); logItem.PushBack("event_time", BillLogServerUtils.GetTimeSecond().ToString()); logItem.PushBack("event_time2", BillLogServerUtils.GetDateTime().ToString("yyyy-MM-dd")); logItem.PushBack("appkey", appKey); // 平台, SERVER|SERVER_TEST logItem.PushBack("platform", BillLogServerUtils.GetServerConfig().bdcCfg.Bdcplatform); logItem.PushBack("time_zone", "UTC+" + AppTime.TimeZone.ToString()+":00"); //putLogsReq.LogItems.Clear(); putLogsReq.LogItems.Add(logItem); } } try { logClient.PutLogs(putLogsReq); } catch (LogException logEx) { TraceLog.Error("MessageTaskHandler.OnBDCLogReqInner LogException {0}", logEx.ToString()); TraceLog.Exception(logEx); } catch (Exception ex) { TraceLog.Exception(ex); } } } }