using System; using System.Collections.Generic; using System.Net.Sockets; namespace Sog { /// /// session的轮询管理 /// public class AcceptedSessionUpdate { private List m_asyncAddedSessions = new List(); private object m_asyncAddedSessionsLocker = new object(); private List m_asyncClosedSessions = new List(); private object m_asyncClosedSessionsLocker = new object(); private Dictionary m_sessionDict = new Dictionary(); private List m_readList = new List(); private List m_writeList = new List(); private List m_errorList = new List(); //临时存放所有出错的socket private List m_needCloseList = new List(); private SessionListener m_listener; public AcceptedSessionUpdate(SessionListener listener) { m_listener = listener; } public int GetSessionCount() { return m_sessionDict.Count + m_asyncAddedSessions.Count; } public void Add(NetSession session) { if(session.WorkSocket == null) { return; } m_sessionDict.Add(session.WorkSocket, session); } public void AddAsync(NetSession session) { lock(m_asyncAddedSessionsLocker) { m_asyncAddedSessions.Add(session); } } public void ForceAsyncCloseSession(NetSession session) { lock(m_asyncClosedSessionsLocker) { if(m_asyncClosedSessions.Contains(session)) { return; } m_asyncClosedSessions.Add(session); } } public void Close() { m_readList.Clear(); m_writeList.Clear(); m_errorList.Clear(); m_needCloseList.Clear(); m_asyncAddedSessions.Clear(); m_sessionDict.Clear(); } public void Update(long nowMs) { m_readList.Clear(); m_writeList.Clear(); m_errorList.Clear(); m_needCloseList.Clear(); foreach (var pair in m_sessionDict) { NetSession session = pair.Value; Socket socket = pair.Key; if (session.IsSocketClosed) { AddSocketToNeedClose(socket); continue; } session.CheckSocketAndSendKeepAlive(); if (session.IsSocketClosed) { AddSocketToNeedClose(socket); continue; } m_readList.Add(socket); //是否可写可优化,没数据不判断,提高效率 if (session.IsHaveDataNeedSend()) { m_writeList.Add(socket); } m_errorList.Add(socket); //不要太大,windows下最大64,linux下32 if (m_readList.Count >= 32) { Socket.Select(m_readList, m_writeList, m_errorList, 0); process_select_result(); } } //最后剩下的 if (m_readList.Count > 0) { Socket.Select(m_readList, m_writeList, m_errorList, 0); process_select_result(); } //其他线程关闭的session if (m_asyncClosedSessions.Count > 0) { lock (m_asyncClosedSessionsLocker) { foreach (var session in m_asyncClosedSessions) { if (session.WorkSocket != null) { AddSocketToNeedClose(session.WorkSocket); } } m_asyncClosedSessions.Clear(); } } if (m_needCloseList.Count > 0) { process_close_result(); } // 异步添加的session在这里放到dict里去 if (m_asyncAddedSessions.Count > 0) { lock (m_asyncAddedSessionsLocker) { foreach (var session in m_asyncAddedSessions) { Add(session); } m_asyncAddedSessions.Clear(); } } } private void process_select_result() { //read foreach (Socket socket in m_readList) { read_socket(socket); } m_readList.Clear(); //write foreach (Socket socket in m_writeList) { write_socket(socket); } m_writeList.Clear(); //error foreach (Socket socket in m_errorList) { AddSocketToNeedClose(socket); } m_errorList.Clear(); } private void read_socket(Socket socket) { NetSession netSession = m_sessionDict[socket]; if (netSession != null) { netSession.RecvReadableSocket(); Queue dataQueue = netSession.GetReceivedDataQueue(); while (dataQueue.Count > 0) { MessageData message = dataQueue.Dequeue(); SessionEventArgs e = new SessionEventArgs(); e.Message = message; e.Session = netSession; m_listener.OnDataReceived(null, e); } //说明接受数据出错了 if (netSession.IsSocketClosed) { AddSocketToNeedClose(socket); } } } private void write_socket(Socket socket) { NetSession netSession = m_sessionDict[socket]; if (netSession != null) { netSession.WriteWriteableSocket(); //说明发送数据出错了 if (netSession.IsSocketClosed) { AddSocketToNeedClose(socket); } } } private void AddSocketToNeedClose(Socket socket) { // if (m_needCloseList.Contains(socket)) // { // return; // } m_needCloseList.Add(socket); } private void process_close_result() { foreach (Socket socket in m_needCloseList) { if (m_sessionDict.ContainsKey(socket)) { NetSession session = m_sessionDict[socket]; m_sessionDict.Remove(socket); SessionEventArgs e = new SessionEventArgs(); e.Session = session; m_listener.OnDisconnected(e); session.Close(); } } m_needCloseList.Clear(); } } }