EonaCat.Network/EonaCat.Network/System/Sockets/Web/Server/WSEndpointManager.cs

467 lines
12 KiB
C#

// This file is part of the EonaCat project(s) which is released under the Apache License.
// See the LICENSE file or go to https://EonaCat.com/License for full license details.
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
namespace EonaCat.Network
{
public class WSEndpointManager
{
private readonly Dictionary<string, WSEndpointHost> _hosts;
private readonly object _locker;
private volatile bool _clean;
private volatile ServerState _state;
private TimeSpan _responseWaitingTime;
internal WSEndpointManager()
{
_clean = true;
_hosts = new Dictionary<string, WSEndpointHost>();
_state = ServerState.Started;
_locker = ((ICollection)_hosts).SyncRoot;
_responseWaitingTime = TimeSpan.FromSeconds(1);
}
public bool AutoCleanSessions
{
get
{
return _clean;
}
set
{
if (!canSet(out string message))
{
Logger.Warning(message);
return;
}
lock (_locker)
{
if (!canSet(out message))
{
Logger.Warning(message);
return;
}
foreach (var host in _hosts.Values)
{
host.AutoCleanSessions = value;
}
_clean = value;
}
}
}
public int Count
{
get
{
lock (_locker)
{
return _hosts.Count;
}
}
}
public IEnumerable<WSEndpointHost> Hosts
{
get
{
lock (_locker)
{
return _hosts.Values.ToList();
}
}
}
public IEnumerable<string> Paths
{
get
{
lock (_locker)
{
return _hosts.Keys.ToList();
}
}
}
public TimeSpan ResponseWaitingTime
{
get
{
return _responseWaitingTime;
}
set
{
if (value <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(value), "Zero or less.");
}
if (!canSet(out string message))
{
Logger.Warning(message);
return;
}
lock (_locker)
{
if (!canSet(out message))
{
Logger.Warning(message);
return;
}
foreach (var host in _hosts.Values)
{
host.ResponseWaitingTime = value;
}
_responseWaitingTime = value;
}
}
}
public WSEndpointHost this[string path]
{
get
{
if (path == null)
{
throw new ArgumentNullException(nameof(path));
}
if (path.Length == 0)
{
throw new ArgumentException("An empty string.", nameof(path));
}
if (path[0] != '/')
{
throw new ArgumentException("Not an absolute path.", nameof(path));
}
if (path.IndexOfAny(new[] { '?', '#' }) > -1)
{
var message = "It includes either or both query and fragment components.";
throw new ArgumentException(message, nameof(path));
}
InternalTryGetEndpointHost(path, out WSEndpointHost host);
return host;
}
}
public void AddEndpoint<TEndpoint>(
string path, Action<TEndpoint> initializer
)
where TEndpoint : WSEndpoint, new()
{
if (path == null)
{
throw new ArgumentNullException(nameof(path));
}
if (path.Length == 0)
{
throw new ArgumentException("An empty string.", nameof(path));
}
if (path[0] != '/')
{
throw new ArgumentException("Not an absolute path.", nameof(path));
}
if (path.IndexOfAny(new[] { '?', '#' }) > -1)
{
var message = "It includes either or both query and fragment components.";
throw new ArgumentException(message, nameof(path));
}
path = HttpUtility.UrlDecode(path).TrimSlashFromEnd();
lock (_locker)
{
if (_hosts.TryGetValue(path, out WSEndpointHost host))
{
throw new ArgumentException("Already in use.", nameof(path));
}
host = new WebSocketEndpointHost<TEndpoint>(
path, () => new TEndpoint(), initializer
);
if (!_clean)
{
host.AutoCleanSessions = false;
}
if (_responseWaitingTime != host.ResponseWaitingTime)
{
host.ResponseWaitingTime = _responseWaitingTime;
}
if (_state == ServerState.Start)
{
host.Start();
}
_hosts.Add(path, host);
}
}
public void Clear()
{
List<WSEndpointHost> hosts = null;
lock (_locker)
{
hosts = _hosts.Values.ToList();
_hosts.Clear();
}
foreach (var host in hosts)
{
if (host.State == ServerState.Start)
{
host.Stop((ushort)CloseStatusCode.Away, string.Empty);
}
}
}
public bool RemoveEndpoint(string path)
{
if (path == null)
{
throw new ArgumentNullException(nameof(path));
}
if (path.Length == 0)
{
throw new ArgumentException("An empty string.", nameof(path));
}
if (path[0] != '/')
{
throw new ArgumentException("Not an absolute path.", nameof(path));
}
if (path.IndexOfAny(new[] { '?', '#' }) > -1)
{
var message = "It includes either or both query and fragment components.";
throw new ArgumentException(message, nameof(path));
}
path = HttpUtility.UrlDecode(path).TrimSlashFromEnd();
WSEndpointHost host;
lock (_locker)
{
if (!_hosts.TryGetValue(path, out host))
{
return false;
}
_hosts.Remove(path);
}
if (host.State == ServerState.Start)
{
host.Stop((ushort)CloseStatusCode.Away, string.Empty);
}
return true;
}
public bool TryGetEndpointHost(string path, out WSEndpointHost host)
{
if (path == null)
{
throw new ArgumentNullException(nameof(path));
}
if (path.Length == 0)
{
throw new ArgumentException("An empty string.", nameof(path));
}
if (path[0] != '/')
{
throw new ArgumentException("Not an absolute path.", nameof(path));
}
if (path.IndexOfAny(new[] { '?', '#' }) > -1)
{
var message = "It includes either or both query and fragment components.";
throw new ArgumentException(message, nameof(path));
}
return InternalTryGetEndpointHost(path, out host);
}
internal void Add<TEndpoint>(string path, Func<TEndpoint> creator)
where TEndpoint : WSEndpoint
{
path = HttpUtility.UrlDecode(path).TrimSlashFromEnd();
lock (_locker)
{
if (_hosts.TryGetValue(path, out WSEndpointHost host))
{
throw new ArgumentException("Already in use.", nameof(path));
}
host = new WebSocketEndpointHost<TEndpoint>(
path, creator, null
);
if (!_clean)
{
host.AutoCleanSessions = false;
}
if (_responseWaitingTime != host.ResponseWaitingTime)
{
host.ResponseWaitingTime = _responseWaitingTime;
}
if (_state == ServerState.Start)
{
host.Start();
}
_hosts.Add(path, host);
}
}
internal bool InternalTryGetEndpointHost(
string path, out WSEndpointHost host
)
{
path = HttpUtility.UrlDecode(path).TrimSlashFromEnd();
lock (_locker)
{
return _hosts.TryGetValue(path, out host);
}
}
internal void Start()
{
lock (_locker)
{
foreach (var host in _hosts.Values)
{
host.Start();
}
_state = ServerState.Start;
}
}
internal void Stop(ushort code, string reason)
{
lock (_locker)
{
_state = ServerState.ShuttingDown;
foreach (var host in _hosts.Values)
{
host.Stop(code, reason);
}
_state = ServerState.Stop;
}
}
private void broadcast(OperationCode opcode, byte[] data, Action completed)
{
var cache = new Dictionary<CompressionMethod, byte[]>();
try
{
foreach (var host in Hosts)
{
if (_state != ServerState.Start)
{
Logger.Error("The server is shutting down.");
break;
}
host.Sessions.Broadcast(opcode, data, cache);
}
completed?.Invoke();
}
catch (Exception ex)
{
Logger.Error(ex, "Could not broadcast");
}
finally
{
cache.Clear();
}
}
private void broadcast(OperationCode opcode, Stream stream, Action completed)
{
var cache = new Dictionary<CompressionMethod, Stream>();
try
{
foreach (var host in Hosts)
{
if (_state != ServerState.Start)
{
Logger.Error("The server is shutting down.");
break;
}
host.Sessions.Broadcast(opcode, stream, cache);
}
completed?.Invoke();
}
catch (Exception ex)
{
Logger.Error(ex, "Could not broadcast");
}
finally
{
foreach (var cached in cache.Values)
{
cached.Dispose();
}
cache.Clear();
}
}
private bool canSet(out string message)
{
message = null;
if (_state == ServerState.Start)
{
message = "The server has already started.";
return false;
}
if (_state == ServerState.ShuttingDown)
{
message = "The server is shutting down.";
return false;
}
return true;
}
}
}