You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

239 lines
8.4 KiB

using System;
using System.Collections.Generic;
using Aliyun.Api.LOG;
using Aliyun.Api.LOG.Common.Utilities;
using Aliyun.Api.LOG.Data;
using Aliyun.Api.LOG.Request;
using Sog;
using ProtoCSStruct;
using Sog.Service;
namespace BillLog
{
//定义这个是为了减小内存占用,http上报太慢的时候,大量的 SSBDCLogReq结构体太占用内存
public class CBdcReq
{
public string LogMsg;
public string Event_uuid;
public int EventId;
}
public class MessageTaskHandler
{
private LogClient logClient;
private PutLogsRequest putLogsReq;
private string appKey;
private DateTime lastSendLogTime;
private Queue<CBdcReq> bdcReqQueue = new Queue<CBdcReq> ();
public static readonly uint DefaultTaskCount = 10;
public static uint TaskCount = DefaultTaskCount;
public static MessageTaskHandler[] m_allTaskHandler;
public static void InitAllTaskHandler()
{
var config = BillLogServerUtils.GetServerConfig().bdcCfg;
if (config.TaskThreadCount > 0)
{
TaskCount = (uint)config.TaskThreadCount;
}
else
{
TaskCount = DefaultTaskCount;
}
TraceLog.Trace("BillLogServer.InitAllTaskHandler task count {0}", TaskCount);
MessageTaskDistributor.Instance.InitTask(TaskCount);
TraceLog.Trace("BillLogServer.InitAllTaskHandler Bdcplatform {0}", config.Bdcplatform);
m_allTaskHandler = new MessageTaskHandler[TaskCount];
for (int i = 0; i < m_allTaskHandler.Length; i++)
{
m_allTaskHandler[i] = new MessageTaskHandler();
m_allTaskHandler[i].appKey = config.appKey;
m_allTaskHandler[i].lastSendLogTime = DateTime.Now;
var logClient = new LogClient(config.endPoint, config.accesskeyID, config.accessKeySecret);
logClient.ConnectionTimeout = 10000;
logClient.ReadWriteTimeout = 10000;
m_allTaskHandler[i].logClient = logClient;
var putLogReq = new PutLogsRequest();
putLogReq.Project = config.logProject;
putLogReq.Logstore = config.logStoreServer;
putLogReq.Topic = "";
putLogReq.LogItems = new List<LogItem>();
m_allTaskHandler[i].putLogsReq = putLogReq;
MessageTaskObj taskObj = MessageTaskDistributor.Instance.GetTaskByIndex(i);
taskObj.ManualHandler = m_allTaskHandler[i].ManualHandle;
}
MessageTaskDistributor.Instance.StartAllTask();
}
// todo 停服或reload时等待日志写完
public static void DisposeAllHandler()
{
MessageTaskDistributor.Instance.CloseAllTask();
if (m_allTaskHandler == null)
{
return;
}
for (int i = 0; i < m_allTaskHandler.Length; i++)
{
m_allTaskHandler[i].logClient = null;
m_allTaskHandler[i].putLogsReq = null;
}
m_allTaskHandler = null;
}
private void CopyReqFromtaskObj(MessageTaskObj taskObj)
{
while (taskObj.m_threadSafePacketQueue.IsEmpty == false)
{
StructPacketData packetData;
bool bDequeueSuccess = taskObj.m_threadSafePacketQueue.TryDequeue(out packetData);
if (bDequeueSuccess && packetData.Packet.MsgID == (int)SSGameMsgID.BdcLogReq)
{
ref SSBDCLogReq req = ref packetData.Packet.GetMessage<SSBDCLogReq>();
CBdcReq creq = new CBdcReq();
creq.LogMsg = req.LogMsg.GetString();
creq.Event_uuid = req.Event_uuid.GetString();
creq.EventId = req.EventId;
bdcReqQueue.Enqueue(creq);
}
}
}
public void ManualHandle(MessageTaskObj taskObj)
{
CopyReqFromtaskObj(taskObj);
//没有消息
if (bdcReqQueue.Count == 0)
{
return;
}
var logsMaxNum = BillLogServerUtils.GetServerConfig().bdcCfg.logsMaxNum == 0 ? 10 : BillLogServerUtils.GetServerConfig().bdcCfg.logsMaxNum;
long waitTimeMsWhenNotEnoughLog = BillLogServerUtils.GetServerConfig().bdcCfg.waitTimeMsWhenNotEnoughLog;
//时间判断不能取AppTime,AppTime依赖主线程Tick,hotfix的时候,多线程任务有特殊性,特殊性是主线程会一直等待工作线程停止,导致AppTime时间不走
//主线程和工作线程互相等,导致hotfix不成功,程序会死掉
long timeMsPass = (long)(DateTime.Now - lastSendLogTime).TotalMilliseconds;
//消息不足logsMaxNum的时候需要等待一段时间,免得http调用过于频繁
//假设一次http调用耗时300毫秒,如果每个线程1秒3个日志,如果没有这个等待机制,那合并消息的逻辑基本就不会实际触发,还是每次一个
//http调用太频繁的结果是机器的timewait会比较高,这个bdc的日志上报目前使用的是短连接
//ConcurrentQueue.Count 会加锁遍历,性能非常低,所以一定要注意不能频繁调用
if (bdcReqQueue.Count < logsMaxNum && timeMsPass < waitTimeMsWhenNotEnoughLog)
{
System.Threading.Thread.Sleep(10);
return;
}
lastSendLogTime = DateTime.Now;
List<CBdcReq> list = new List<CBdcReq>();
while (bdcReqQueue.Count > 0)
{
CBdcReq req = bdcReqQueue.Dequeue();
list.Add(req);
taskObj.Add1LogHandlePacketCount();
if (list.Count >= logsMaxNum)
{
OnBDCLogsReq(list);
list.Clear();
//一次进来最多合批报一个
break;
}
}
if(list.Count > 0)
{
OnBDCLogsReq(list);
}
}
public void OnBDCLogsReq(List<CBdcReq> list)
{
putLogsReq.LogItems.Clear();
for (int i = 0; i < list.Count; i++)
{
CBdcReq req = list[i];
string logMsg = req.LogMsg;
var pairs = logMsg.Split('|');
if (pairs == null || pairs.Length == 0)
{
continue;
}
var logItem = new LogItem();
logItem.Time = DateUtils.TimeSpan();
foreach (string str in pairs)
{
var kv = str.Split("=");
if (kv.Length == 2)
{
string value = kv[1];
if (string.IsNullOrEmpty(value))
{
value = "0";
}
logItem.PushBack(kv[0], value);
}
}
if (logItem.Contents.Count > 0)
{
// 接口通用参数
logItem.PushBack("event_uuid", req.Event_uuid);
logItem.PushBack("event_time", BillLogServerUtils.GetTimeSecond().ToString());
logItem.PushBack("event_time2", BillLogServerUtils.GetDateTime().ToString("yyyy-MM-dd"));
logItem.PushBack("appkey", appKey);
// 平台, SERVER|SERVER_TEST
logItem.PushBack("platform", BillLogServerUtils.GetServerConfig().bdcCfg.Bdcplatform);
logItem.PushBack("time_zone", "UTC+" + AppTime.TimeZone.ToString()+":00");
//putLogsReq.LogItems.Clear();
putLogsReq.LogItems.Add(logItem);
}
}
try
{
logClient.PutLogs(putLogsReq);
}
catch (LogException logEx)
{
TraceLog.Error("MessageTaskHandler.OnBDCLogReqInner LogException {0}", logEx.ToString());
TraceLog.Exception(logEx);
}
catch (Exception ex)
{
TraceLog.Exception(ex);
}
}
}
}