EonaCat.Network/EonaCat.Network/System/Sockets/Web/Server/WSSessionManager.cs

873 lines
24 KiB
C#

// This file is part of the EonaCat project(s) which is released under the Apache License.
// See the LICENSE file or go to https://EonaCat.com/License for full license details.
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
namespace EonaCat.Network
{
public class WSSessionManager
{
private readonly object _forSweep;
private readonly Dictionary<string, IWSSession> _sessions;
private readonly object _sync;
private volatile bool _clean;
private volatile ServerState _state;
private volatile bool _sweeping;
private System.Timers.Timer _sweepTimer;
private TimeSpan _waitTime;
internal WSSessionManager()
{
_clean = true;
_forSweep = new object();
_sessions = new Dictionary<string, IWSSession>();
_state = ServerState.Started;
_sync = ((ICollection)_sessions).SyncRoot;
_waitTime = TimeSpan.FromSeconds(1);
setSweepTimer(60000);
}
public IEnumerable<string> ActiveIDs
{
get
{
foreach (var res in broadping(WSFrame.EmptyPingBytes))
{
if (res.Value)
{
yield return res.Key;
}
}
}
}
public int Count
{
get
{
lock (_sync)
{
return _sessions.Count;
}
}
}
public IEnumerable<string> IDs
{
get
{
if (_state != ServerState.Start)
{
return Enumerable.Empty<string>();
}
lock (_sync)
{
if (_state != ServerState.Start)
{
return Enumerable.Empty<string>();
}
return _sessions.Keys.ToList();
}
}
}
public IEnumerable<string> InactiveIDs
{
get
{
foreach (var res in broadping(WSFrame.EmptyPingBytes))
{
if (!res.Value)
{
yield return res.Key;
}
}
}
}
public bool KeepClean
{
get
{
return _clean;
}
set
{
if (!canSet(out string message))
{
Logger.Warning(message);
return;
}
lock (_sync)
{
if (!canSet(out message))
{
Logger.Warning(message);
return;
}
_clean = value;
}
}
}
public IEnumerable<IWSSession> Sessions
{
get
{
if (_state != ServerState.Start)
{
return Enumerable.Empty<IWSSession>();
}
lock (_sync)
{
if (_state != ServerState.Start)
{
return Enumerable.Empty<IWSSession>();
}
return _sessions.Values.ToList();
}
}
}
public TimeSpan WaitTime
{
get
{
return _waitTime;
}
set
{
if (value <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(value), "Zero or less.");
}
if (!canSet(out string message))
{
Logger.Warning(message);
return;
}
lock (_sync)
{
if (!canSet(out message))
{
Logger.Warning(message);
return;
}
_waitTime = value;
}
}
}
internal ServerState State => _state;
public IWSSession this[string id]
{
get
{
if (id == null)
{
throw new ArgumentNullException(nameof(id));
}
if (id.Length == 0)
{
throw new ArgumentException("An empty string.", nameof(id));
}
tryGetSession(id, out IWSSession session);
return session;
}
}
public void Broadcast(byte[] data)
{
if (_state != ServerState.Start)
{
var message = "The current state of the manager is not Start.";
throw new InvalidOperationException(message);
}
if (data == null)
{
throw new ArgumentNullException(nameof(data));
}
if (data.LongLength <= WSClient.FragmentLength)
{
broadcast(OperationCode.Binary, data, null);
}
else
{
broadcast(OperationCode.Binary, new MemoryStream(data), null);
}
}
public void Broadcast(string data)
{
if (_state != ServerState.Start)
{
var message = "The current state of the manager is not Start.";
throw new InvalidOperationException(message);
}
if (data == null)
{
throw new ArgumentNullException(nameof(data));
}
if (!data.TryGetUTF8EncodedBytes(out byte[] bytes))
{
var message = "It could not be UTF-8-encoded.";
throw new ArgumentException(message, nameof(data));
}
if (bytes.LongLength <= WSClient.FragmentLength)
{
broadcast(OperationCode.Text, bytes, null);
}
else
{
broadcast(OperationCode.Text, new MemoryStream(bytes), null);
}
}
public void Broadcast(Stream stream, int length)
{
if (_state != ServerState.Start)
{
var message = "The current state of the manager is not Start.";
throw new InvalidOperationException(message);
}
if (stream == null)
{
throw new ArgumentNullException(nameof(stream));
}
if (!stream.CanRead)
{
var message = "It cannot be read.";
throw new ArgumentException(message, nameof(stream));
}
if (length < 1)
{
var message = "Less than 1.";
throw new ArgumentException(message, nameof(length));
}
var bytes = stream.ReadBytes(length);
var len = bytes.Length;
if (len == 0)
{
var message = "No data could be read from it.";
throw new ArgumentException(message, nameof(stream));
}
if (len < length)
{
Logger.Warning(
string.Format(
"Only {0} byte(s) of data could be read from the stream.",
len
)
);
}
if (len <= WSClient.FragmentLength)
{
broadcast(OperationCode.Binary, bytes, null);
}
else
{
broadcast(OperationCode.Binary, new MemoryStream(bytes), null);
}
}
public void BroadcastAsync(byte[] data, Action completed)
{
if (_state != ServerState.Start)
{
var message = "The current state of the manager is not Start.";
throw new InvalidOperationException(message);
}
if (data == null)
{
throw new ArgumentNullException(nameof(data));
}
if (data.LongLength <= WSClient.FragmentLength)
{
broadcastAsync(OperationCode.Binary, data, completed);
}
else
{
broadcastAsync(OperationCode.Binary, new MemoryStream(data), completed);
}
}
public void BroadcastAsync(string data, Action completed)
{
if (_state != ServerState.Start)
{
var message = "The current state of the manager is not Start.";
throw new InvalidOperationException(message);
}
if (data == null)
{
throw new ArgumentNullException(nameof(data));
}
if (!data.TryGetUTF8EncodedBytes(out byte[] bytes))
{
var message = "It could not be UTF-8-encoded.";
throw new ArgumentException(message, nameof(data));
}
if (bytes.LongLength <= WSClient.FragmentLength)
{
broadcastAsync(OperationCode.Text, bytes, completed);
}
else
{
broadcastAsync(OperationCode.Text, new MemoryStream(bytes), completed);
}
}
public void BroadcastAsync(Stream stream, int length, Action completed)
{
if (_state != ServerState.Start)
{
var message = "The current state of the manager is not Start.";
throw new InvalidOperationException(message);
}
if (stream == null)
{
throw new ArgumentNullException(nameof(stream));
}
if (!stream.CanRead)
{
var message = "It cannot be read.";
throw new ArgumentException(message, nameof(stream));
}
if (length < 1)
{
var message = "Less than 1.";
throw new ArgumentException(message, nameof(length));
}
var bytes = stream.ReadBytes(length);
var len = bytes.Length;
if (len == 0)
{
var message = "No data could be read from it.";
throw new ArgumentException(message, nameof(stream));
}
if (len < length)
{
Logger.Warning(
string.Format(
"Only {0} byte(s) of data could be read from the stream.",
len
)
);
}
if (len <= WSClient.FragmentLength)
{
broadcastAsync(OperationCode.Binary, bytes, completed);
}
else
{
broadcastAsync(OperationCode.Binary, new MemoryStream(bytes), completed);
}
}
public void CloseSession(string id)
{
if (!TryGetSession(id, out IWSSession session))
{
var message = "The session could not be found.";
throw new InvalidOperationException(message);
}
session.Context.WebSocket.Close();
}
public void CloseSession(string id, ushort code, string reason)
{
if (!TryGetSession(id, out IWSSession session))
{
var message = "The session could not be found.";
throw new InvalidOperationException(message);
}
session.Context.WebSocket.Close(code, reason);
}
public void CloseSession(string id, CloseStatusCode code, string reason)
{
if (!TryGetSession(id, out IWSSession session))
{
var message = "The session could not be found.";
throw new InvalidOperationException(message);
}
session.Context.WebSocket.Close(code, reason);
}
public bool PingTo(string id)
{
if (!TryGetSession(id, out IWSSession session))
{
var message = "The session could not be found.";
throw new InvalidOperationException(message);
}
return session.Context.WebSocket.Ping();
}
public bool PingTo(string message, string id)
{
if (!TryGetSession(id, out IWSSession session))
{
var pingMessage = "The session could not be found.";
throw new InvalidOperationException(pingMessage);
}
return session.Context.WebSocket.Ping(message);
}
public void SendTo(byte[] data, string id)
{
if (!TryGetSession(id, out IWSSession session))
{
var message = "The session could not be found.";
throw new InvalidOperationException(message);
}
session.Context.WebSocket.Send(data);
}
public void SendTo(string data, string id)
{
if (!TryGetSession(id, out IWSSession session))
{
var message = "The session could not be found.";
throw new InvalidOperationException(message);
}
session.Context.WebSocket.Send(data);
}
public void SendTo(Stream stream, int length, string id)
{
if (!TryGetSession(id, out IWSSession session))
{
var message = "The session could not be found.";
throw new InvalidOperationException(message);
}
session.Context.WebSocket.Send(stream, length);
}
public void SendToAsync(byte[] data, string id, Action<bool> completed)
{
if (!TryGetSession(id, out IWSSession session))
{
var message = "The session could not be found.";
throw new InvalidOperationException(message);
}
session.Context.WebSocket.SendAsync(data, completed);
}
public void SendToAsync(string data, string id, Action<bool> completed)
{
if (!TryGetSession(id, out IWSSession session))
{
var message = "The session could not be found.";
throw new InvalidOperationException(message);
}
session.Context.WebSocket.SendAsync(data, completed);
}
public void SendToAsync(
Stream stream, int length, string id, Action<bool> completed
)
{
if (!TryGetSession(id, out IWSSession session))
{
var message = "The session could not be found.";
throw new InvalidOperationException(message);
}
session.Context.WebSocket.SendAsync(stream, length, completed);
}
public void Sweep()
{
if (_sweeping)
{
Logger.Info("The sweeping is already in progress.");
return;
}
lock (_forSweep)
{
if (_sweeping)
{
Logger.Info("The sweeping is already in progress.");
return;
}
_sweeping = true;
}
foreach (var id in InactiveIDs)
{
if (_state != ServerState.Start)
{
break;
}
lock (_sync)
{
if (_state != ServerState.Start)
{
break;
}
if (_sessions.TryGetValue(id, out IWSSession session))
{
var state = session.State;
if (state == WSState.Open)
{
session.Context.WebSocket.Close(CloseStatusCode.Abnormal);
}
else if (state == WSState.Closing)
{
continue;
}
else
{
_sessions.Remove(id);
}
}
}
}
_sweeping = false;
}
public bool TryGetSession(string id, out IWSSession session)
{
if (id == null)
{
throw new ArgumentNullException(nameof(id));
}
if (id.Length == 0)
{
throw new ArgumentException("An empty string.", nameof(id));
}
return tryGetSession(id, out session);
}
internal string Add(IWSSession session)
{
lock (_sync)
{
if (_state != ServerState.Start)
{
return null;
}
var id = createID();
_sessions.Add(id, session);
return id;
}
}
internal void Broadcast(
OperationCode opcode, byte[] data, Dictionary<CompressionMethod, byte[]> cache
)
{
foreach (var session in Sessions)
{
if (_state != ServerState.Start)
{
Logger.Error("The service is shutting down.");
break;
}
session.Context.WebSocket.Send(opcode, data, cache);
}
}
internal void Broadcast(
OperationCode opcode, Stream stream, Dictionary<CompressionMethod, Stream> cache
)
{
foreach (var session in Sessions)
{
if (_state != ServerState.Start)
{
Logger.Error("The service is shutting down.");
break;
}
session.Context.WebSocket.Send(opcode, stream, cache);
}
}
internal Dictionary<string, bool> Broadping(
byte[] frameAsBytes, TimeSpan timeout
)
{
var ret = new Dictionary<string, bool>();
foreach (var session in Sessions)
{
if (_state != ServerState.Start)
{
Logger.Error("The service is shutting down.");
break;
}
var res = session.Context.WebSocket.Ping(frameAsBytes, timeout);
ret.Add(session.ID, res);
}
return ret;
}
internal bool Remove(string id)
{
lock (_sync)
{
return _sessions.Remove(id);
}
}
internal void Start()
{
lock (_sync)
{
_sweepTimer.Enabled = _clean;
_state = ServerState.Start;
}
}
internal void Stop(ushort code, string reason)
{
if (code == (ushort)CloseStatusCode.NoStatus)
{ // == no status
stop(PayloadData.Empty, true);
return;
}
stop(new PayloadData(code, reason), !code.IsReserved());
}
private static string createID()
{
return Guid.NewGuid().ToString("N");
}
private void broadcast(OperationCode opcode, byte[] data, Action completed)
{
var cache = new Dictionary<CompressionMethod, byte[]>();
try
{
foreach (var session in Sessions)
{
if (_state != ServerState.Start)
{
Logger.Error("The service is shutting down.");
break;
}
session.Context.WebSocket.Send(opcode, data, cache);
}
if (completed != null)
{
completed();
}
}
catch (Exception ex)
{
Logger.Error(ex.Message);
Logger.Debug(ex.ToString());
}
finally
{
cache.Clear();
}
}
private void broadcast(OperationCode opcode, Stream stream, Action completed)
{
var cache = new Dictionary<CompressionMethod, Stream>();
try
{
foreach (var session in Sessions)
{
if (_state != ServerState.Start)
{
Logger.Error("The service is shutting down.");
break;
}
session.Context.WebSocket.Send(opcode, stream, cache);
}
if (completed != null)
{
completed();
}
}
catch (Exception ex)
{
Logger.Error(ex.Message);
Logger.Debug(ex.ToString());
}
finally
{
foreach (var cached in cache.Values)
{
cached.Dispose();
}
cache.Clear();
}
}
private void broadcastAsync(OperationCode opcode, byte[] data, Action completed)
{
ThreadPool.QueueUserWorkItem(
state => broadcast(opcode, data, completed)
);
}
private void broadcastAsync(OperationCode opcode, Stream stream, Action completed)
{
ThreadPool.QueueUserWorkItem(
state => broadcast(opcode, stream, completed)
);
}
private Dictionary<string, bool> broadping(byte[] frameAsBytes)
{
var ret = new Dictionary<string, bool>();
foreach (var session in Sessions)
{
if (_state != ServerState.Start)
{
Logger.Error("The service is shutting down.");
break;
}
var res = session.Context.WebSocket.Ping(frameAsBytes, _waitTime);
ret.Add(session.ID, res);
}
return ret;
}
private bool canSet(out string message)
{
message = null;
if (_state == ServerState.Start)
{
message = "The service has already started.";
return false;
}
if (_state == ServerState.ShuttingDown)
{
message = "The service is shutting down.";
return false;
}
return true;
}
private void setSweepTimer(double interval)
{
_sweepTimer = new System.Timers.Timer(interval);
_sweepTimer.Elapsed += (sender, e) => Sweep();
}
private void stop(PayloadData payloadData, bool send)
{
var bytes = send
? WSFrame.CreateCloseFrame(payloadData, false).ToArray()
: null;
lock (_sync)
{
_state = ServerState.ShuttingDown;
_sweepTimer.Enabled = false;
foreach (var session in _sessions.Values.ToList())
{
session.Context.WebSocket.Close(payloadData, bytes);
}
_state = ServerState.Stop;
}
}
private bool tryGetSession(string id, out IWSSession session)
{
session = null;
if (_state != ServerState.Start)
{
return false;
}
lock (_sync)
{
if (_state != ServerState.Start)
{
return false;
}
return _sessions.TryGetValue(id, out session);
}
}
}
}