using System; using System.Collections.Generic; using System.Net.Sockets; namespace Sog { /// /// session的轮询管理 /// public class AcceptedWebSessionUpdate { private List m_asyncAddedSessions = new List(); private object m_asyncAddedSessionsLocker = new object(); private List m_asyncClosedSessions = new List(); private object m_asyncClosedSessionsLocker = new object(); private Dictionary m_sessionDict = new Dictionary(); private List m_readList = new List(); private List m_writeList = new List(); private List m_errorList = new List(); //临时存放所有出错的socket private List m_needCloseList = new List(); // 临时存放所有需要发送关闭帧的 Socket(WebSocket关闭前默认需要发送关闭帧) private List m_needCheckSendCloseFrameTimeOut = new List(); private WebSessionListener m_listener; public AcceptedWebSessionUpdate(WebSessionListener listener) { m_listener = listener; } public int GetSessionCount() { return m_sessionDict.Count + m_asyncAddedSessions.Count; } public void Add(NetSession session) { if (session.WorkSocket == null) { return; } m_sessionDict.Add(session.WorkSocket, session); } public void AddAsync(NetSession session) { lock (m_asyncAddedSessionsLocker) { m_asyncAddedSessions.Add(session); } } public void ForceAsyncCloseSession(NetSession session) { lock (m_asyncClosedSessionsLocker) { if (m_asyncClosedSessions.Contains(session)) { return; } m_asyncClosedSessions.Add(session); } } public void Close() { m_readList.Clear(); m_writeList.Clear(); m_errorList.Clear(); m_needCloseList.Clear(); m_asyncAddedSessions.Clear(); m_sessionDict.Clear(); } public void Update(long nowMs) { m_readList.Clear(); m_writeList.Clear(); m_errorList.Clear(); m_needCloseList.Clear(); foreach (var pair in m_sessionDict) { WebSession session = pair.Value as WebSession; Socket socket = pair.Key; if (session.IsSocketClosed) { AddSocketToNeedClose(socket); continue; } session.CheckSocketAndSendKeepAlive(); if (session.IsSocketClosed) { AddSocketToNeedClose(socket); continue; } if (session.IsSendingClose) { AddSocketToCheckSendCloseFrameTimeOut(socket); } else // 发送关闭帧了就不再读了 { m_readList.Add(socket); } //是否可写可优化,没数据不判断,提高效率 if (session.IsHaveDataNeedSend()) { m_writeList.Add(socket); } m_errorList.Add(socket); //不要太大,windows下最大64,linux下32 if (m_errorList.Count >= 32) { Socket.Select(m_readList, m_writeList, m_errorList, 0); process_select_result(); } } //最后剩下的 if (m_readList.Count > 0) { Socket.Select(m_readList, m_writeList, m_errorList, 0); process_select_result(); } //其他线程关闭的session if (m_asyncClosedSessions.Count > 0) { lock (m_asyncClosedSessionsLocker) { foreach (var session in m_asyncClosedSessions) { if (session.WorkSocket != null) { AddSocketToNeedClose(session.WorkSocket); } } m_asyncClosedSessions.Clear(); } } if (m_needCloseList.Count > 0) { process_close_result(); } if (m_needCheckSendCloseFrameTimeOut.Count > 0) { process_check_send_close_frame_timeout(); } // 异步添加的session在这里放到dict里去 if (m_asyncAddedSessions.Count > 0) { lock (m_asyncAddedSessionsLocker) { foreach (var session in m_asyncAddedSessions) { Add(session); } m_asyncAddedSessions.Clear(); } } } private void process_select_result() { //read foreach (Socket socket in m_readList) { read_socket(socket); } m_readList.Clear(); //write foreach (Socket socket in m_writeList) { write_socket(socket); } m_writeList.Clear(); //error foreach (Socket socket in m_errorList) { AddSocketToNeedClose(socket); } m_errorList.Clear(); } private void read_socket(Socket socket) { NetSession netSession = m_sessionDict[socket]; if (netSession != null) { netSession.RecvReadableSocket(); Queue dataQueue = netSession.GetReceivedDataQueue(); while (dataQueue.Count > 0) { MessageData message = dataQueue.Dequeue(); SessionEventArgs e = new SessionEventArgs(); e.Message = message; e.Session = netSession; m_listener.OnDataReceived(null, e); } //说明接受数据出错了 if (netSession.IsSocketClosed) { AddSocketToNeedClose(socket); } } } private void write_socket(Socket socket) { NetSession netSession = m_sessionDict[socket]; if (netSession != null) { netSession.WriteWriteableSocket(); //说明发送数据出错了 if (netSession.IsSocketClosed) { AddSocketToNeedClose(socket); } } } private void AddSocketToNeedClose(Socket socket) { // if (m_needCloseList.Contains(socket)) // { // return; // } m_needCloseList.Add(socket); } private void AddSocketToCheckSendCloseFrameTimeOut(Socket socket) { m_needCheckSendCloseFrameTimeOut.Add(socket); } private void process_close_result() { foreach (Socket socket in m_needCloseList) { if (m_sessionDict.ContainsKey(socket)) { WebSession session = m_sessionDict[socket] as WebSession; if (session.IsSocketClosed) { m_sessionDict.Remove(socket); SessionEventArgs e = new SessionEventArgs(); e.Session = session; session.Close(); TraceLog.Trace("AcceptedWebSessionUpdate.process_close_result close session {0}", session.SessionID); m_listener.OnDisconnected(e); } else if (!session.IsSendingClose)// websocket已经连接的关闭前需要发送关闭帧 { TraceLog.Trace("AcceptedWebSessionUpdate.process_close_result closewebsocket session {0}", session.SessionID); session.CloseWebSocket(); } } } m_needCloseList.Clear(); } private void process_check_send_close_frame_timeout() { foreach (Socket socket in m_needCheckSendCloseFrameTimeOut) { if (m_sessionDict.ContainsKey(socket)) { WebSession session = m_sessionDict[socket] as WebSession; if (session.IsSendingClose)// 已经发送中的才需要检查 { session.CheckSendCloseFrameTimeOut(); } } } m_needCheckSendCloseFrameTimeOut.Clear(); } } }