You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

206 lines
6.2 KiB

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
namespace Sog
{
/// <summary>
/// session的轮询管理
/// </summary>
public class AcceptedWebSessionUpdateAsyncMode
{
private Dictionary<Socket, NetSession> m_sessionDict = new Dictionary<Socket, NetSession>();
private WebSessionListener m_listener;
//临时存放所有出错的socket
private List<Socket> m_needCloseList = new List<Socket>();
private object m_needCloseListLocker = new object();
//临时存放所有检查发送关闭帧超时的socket
private List<Socket> m_needCheckSendCloseTimeOutList = new List<Socket>();
private object m_needCheckSendCloseTimeOutListLocker = new object();
private long m_lastCheckSendKeepAliveTime;
private List<NetSession> m_asyncAddedSessions = new List<NetSession>();
private object m_asyncAddedSessionsLocker = new object();
public AcceptedWebSessionUpdateAsyncMode(WebSessionListener listener)
{
m_listener = listener;
}
public int GetSessionCount()
{
return m_sessionDict.Count + m_asyncAddedSessions.Count;
}
public void AddAsync(NetSession session)
{
if (session.WorkSocket == null)
{
return;
}
lock (m_asyncAddedSessionsLocker)
{
m_asyncAddedSessions.Add(session);
}
}
public void Close()
{
m_sessionDict.Clear();
m_asyncAddedSessions.Clear();
}
public void Update(long nowMs)
{
if (nowMs - m_lastCheckSendKeepAliveTime >= 10000)
{
m_lastCheckSendKeepAliveTime = nowMs;
//keep alive
foreach (var pair in m_sessionDict)
{
NetSession session = pair.Value;
if (session.IsSocketClosed == false )
{
session.StartSendAsync();
}
}
}
foreach (var pair in m_sessionDict)
{
WebSession session = pair.Value as WebSession;
Socket socket = pair.Key;
if (session.IsSocketClosed)
{
AddSocketToNeedClose(socket);
}
if (session.IsSendingClose)
{
AddSocketToNeedCheckSendCloseTimeOut(socket);
}
}
CloseNeedCloseSocket();
CheckSendCloseTimeOut();
//异步添加的session在这里放到dict里去
if (m_asyncAddedSessions.Count > 0)
{
lock (m_asyncAddedSessionsLocker)
{
foreach (var session in m_asyncAddedSessions)
{
if (session.WorkSocket != null)
{
m_sessionDict.Add(session.WorkSocket, session);
}
}
m_asyncAddedSessions.Clear();
}
}
}
public void ForceAsyncCloseSession(NetSession session)
{
AddSocketToNeedClose(session.WorkSocket);
}
private void AddSocketToNeedClose(Socket socket)
{
if (socket == null)
{
return;
}
lock (m_needCloseListLocker)
{
m_needCloseList.Add(socket);
}
}
private void AddSocketToNeedCheckSendCloseTimeOut(Socket socket)
{
if (socket == null)
{
return;
}
lock (m_needCheckSendCloseTimeOutListLocker)
{
m_needCheckSendCloseTimeOutList.Add(socket);
}
}
private void CloseNeedCloseSocket()
{
if (m_needCloseList.Count == 0)
{
return;
}
List<Socket> closeSocket;
lock (m_needCloseListLocker)
{
closeSocket = m_needCloseList.ToList();
m_needCloseList.Clear();
}
foreach (Socket socket in closeSocket)
{
if (m_sessionDict.ContainsKey(socket))
{
WebSession session = m_sessionDict[socket] as WebSession;
if (session.IsSocketClosed)
{
m_sessionDict.Remove(socket);
SessionEventArgs e = new SessionEventArgs();
e.Session = session;
session.Close();
TraceLog.Trace("AcceptedWebSessionUpdateAsyncMode.process_close_result Close session {0}", session.SessionID);
m_listener.OnDisconnected(e);
}
else if (!session.IsSendingClose)// websocket已经连接的关闭前需要发送关闭帧
{
TraceLog.Trace("AcceptedWebSessionUpdateAsyncMode.process_close_result CloseWebSocket session {0}", session.SessionID);
session.CloseWebSocket();
}
}
}
}
private void CheckSendCloseTimeOut()
{
if (m_needCheckSendCloseTimeOutList.Count == 0)
{
return;
}
List<Socket> closeSocket;
lock (m_needCheckSendCloseTimeOutListLocker)
{
closeSocket = m_needCheckSendCloseTimeOutList.ToList();
m_needCheckSendCloseTimeOutList.Clear();
}
foreach (Socket socket in closeSocket)
{
if (m_sessionDict.ContainsKey(socket))
{
WebSession session = m_sessionDict[socket] as WebSession;
if (!session.IsSendingClose)// websocket已经连接的关闭前需要发送关闭帧
{
session.CheckSendCloseFrameTimeOut();
}
}
}
}
}
}