diff --git a/EonaCat.Sockets/EonaCat.Sockets.slnx b/EonaCat.Sockets/EonaCat.Sockets.slnx new file mode 100644 index 0000000..1d1cd7a --- /dev/null +++ b/EonaCat.Sockets/EonaCat.Sockets.slnx @@ -0,0 +1,3 @@ + + + diff --git a/EonaCat.Sockets/EonaCat.Sockets/Client.cs b/EonaCat.Sockets/EonaCat.Sockets/Client.cs new file mode 100644 index 0000000..a2398c4 --- /dev/null +++ b/EonaCat.Sockets/EonaCat.Sockets/Client.cs @@ -0,0 +1,121 @@ +using EonaCat.Sockets.Interfaces; +using EonaCat.Sockets.Models; +using System; +using System.Collections.Generic; +using System.Net; +using System.Net.Http; +using System.Net.Security; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using ProtocolType = EonaCat.Sockets.Models.ProtocolType; + +namespace EonaCat.Sockets +{ + /// + /// Client which supports TCP, SSL-TCP, UDP. + /// + public class Client : IDisposable + { + private readonly NetConfiguration _config; + private IConnection _connection; + private int _disposed; + + public bool IsConnected => _connection?.IsConnected ?? false; + public string ConnectionId => _connection?.ConnectionId; + public NetConfiguration Configuration => _config; + + public Client(NetConfiguration config = null) + { + _config = config ?? new NetConfiguration(); + } + + public async Task ConnectAsync(CancellationToken ct = default) + { + switch (_config.Protocol) + { + case ProtocolType.TCP: + _connection = await ConnectTcpAsync(ct).ConfigureAwait(false); + break; + case ProtocolType.UDP: + _connection = ConnectUdp(); + break; + default: + throw new NotSupportedException($"Protocol {_config.Protocol} not supported."); + } + _config.Logger?.Invoke($"Connected [{_config.Protocol}] → {_config.Host}:{_config.Port} id={_connection.ConnectionId}"); + } + + private async Task ConnectTcpAsync(CancellationToken ct) + { + var tcpClient = new TcpClient(); + tcpClient.ReceiveTimeout = _config.ReceiveTimeoutMs; + tcpClient.SendTimeout = _config.SendTimeoutMs; + tcpClient.NoDelay = true; + + await tcpClient.ConnectAsync(_config.Host, _config.Port).ConfigureAwait(false); + + System.IO.Stream stream; + + if (_config.UseSsl) + { + var sslStream = new SslStream(tcpClient.GetStream(), false, + _config.CertificateValidationCallback ?? + (_config.AllowSelfSignedCertificates + ? (sender, certificate, chain, errors) => true + : (RemoteCertificateValidationCallback)null)); + + await sslStream.AuthenticateAsClientAsync(_config.SslTargetHost ?? _config.Host).ConfigureAwait(false); + stream = sslStream; + } + else + { + stream = tcpClient.GetStream(); + } + + return new TcpConnection(tcpClient, stream); + } + + private UdpConnection ConnectUdp() + { + var udpClient = new UdpClient(); + udpClient.Client.ReceiveTimeout = _config.ReceiveTimeoutMs; + udpClient.Client.SendTimeout = _config.SendTimeoutMs; + var ep = new IPEndPoint(IPAddress.Parse(_config.Host), _config.Port); + return new UdpConnection(udpClient, ep); + } + + public Task SendBytesAsync(byte[] data) => EnsureConnection().SendBytesAsync(data); + public Task SendStringAsync(string msg) => EnsureConnection().SendStringAsync(msg); + public Task ReceiveBytesAsync() => EnsureConnection().ReceiveBytesAsync(); + public Task ReceiveStringAsync() => EnsureConnection().ReceiveStringAsync(); + + public void Disconnect() + { + _connection?.Close(); + _connection?.Dispose(); + _connection = null; + _config.Logger?.Invoke("Disconnected."); + } + + private IConnection EnsureConnection() + { + var conn = _connection; + if (conn == null || !conn.IsConnected) + { + throw new InvalidOperationException("Not connected. Call ConnectAsync() first."); + } + + return conn; + } + + public void Dispose() + { + if (Interlocked.Exchange(ref _disposed, 1) == 0) + { + Disconnect(); + } + } + } +} diff --git a/EonaCat.Sockets/EonaCat.Sockets/ConnectionPool.cs b/EonaCat.Sockets/EonaCat.Sockets/ConnectionPool.cs new file mode 100644 index 0000000..5ec0447 --- /dev/null +++ b/EonaCat.Sockets/EonaCat.Sockets/ConnectionPool.cs @@ -0,0 +1,66 @@ +using EonaCat.Sockets.Interfaces; +using System; +using System.Collections.Concurrent; +using System.Threading; + +namespace EonaCat.Sockets +{ + /// + /// Connection pool + /// + public class ConnectionPool where TConnection : IConnection + { + private readonly ConcurrentDictionary _connections = new ConcurrentDictionary(StringComparer.Ordinal); + + private long _totalAccepted; + private long _totalDropped; + private readonly int _maxConnections; + + public ConnectionPool(int maxConnections) + { + _maxConnections = maxConnections; + } + + public int Count => _connections.Count; + public long TotalAccepted => Interlocked.Read(ref _totalAccepted); + public long TotalDropped => Interlocked.Read(ref _totalDropped); + + public bool TryAdd(TConnection connection) + { + if (_connections.Count >= _maxConnections) + { + Interlocked.Increment(ref _totalDropped); + return false; + } + + if (_connections.TryAdd(connection.ConnectionId, connection)) + { + Interlocked.Increment(ref _totalAccepted); + return true; + } + return false; + } + + public bool TryRemove(string connectionId, out TConnection connection) => _connections.TryRemove(connectionId, out connection); + + public bool TryGet(string connectionId, out TConnection connection) => _connections.TryGetValue(connectionId, out connection); + + public void ForEach(Action action) + { + foreach (var keyValue in _connections) + { + action(keyValue.Value); + } + } + + public void Clear() + { + foreach (var keyValue in _connections) + { + keyValue.Value.Close(); + keyValue.Value.Dispose(); + } + _connections.Clear(); + } + } +} diff --git a/EonaCat.Sockets/EonaCat.Sockets/EonaCat.Sockets.csproj b/EonaCat.Sockets/EonaCat.Sockets/EonaCat.Sockets.csproj new file mode 100644 index 0000000..dbdcea4 --- /dev/null +++ b/EonaCat.Sockets/EonaCat.Sockets/EonaCat.Sockets.csproj @@ -0,0 +1,7 @@ + + + + netstandard2.0 + + + diff --git a/EonaCat.Sockets/EonaCat.Sockets/Interfaces/IConnection.cs b/EonaCat.Sockets/EonaCat.Sockets/Interfaces/IConnection.cs new file mode 100644 index 0000000..6c1c8bb --- /dev/null +++ b/EonaCat.Sockets/EonaCat.Sockets/Interfaces/IConnection.cs @@ -0,0 +1,16 @@ +using System; +using System.Threading.Tasks; + +namespace EonaCat.Sockets.Interfaces +{ + public interface IConnection : IDisposable + { + string ConnectionId { get; } + bool IsConnected { get; } + Task SendBytesAsync(byte[] data); + Task SendStringAsync(string message); + Task ReceiveBytesAsync(); + Task ReceiveStringAsync(); + void Close(); + } +} diff --git a/EonaCat.Sockets/EonaCat.Sockets/Interfaces/IServer.cs b/EonaCat.Sockets/EonaCat.Sockets/Interfaces/IServer.cs new file mode 100644 index 0000000..0cb61af --- /dev/null +++ b/EonaCat.Sockets/EonaCat.Sockets/Interfaces/IServer.cs @@ -0,0 +1,291 @@ +using EonaCat.Sockets.Models; +using System; +using System.Net; +using System.Net.Security; +using System.Net.Sockets; +using System.Security.Authentication; +using System.Threading; +using System.Threading.Tasks; +using ProtocolType = EonaCat.Sockets.Models.ProtocolType; + +namespace EonaCat.Sockets.Interfaces +{ + /// + /// TCP/SSL/UDP server. + /// + public class Server : IDisposable + { + public event Func ClientConnected; + public event Func DataReceived; + public event Func StringReceived; + public event Func ClientError; + public event Func ClientDisconnected; + + private readonly NetConfiguration _config; + private ConnectionPool _tcpPool; + private TcpListener _tcpListener; + private UdpClient _udpServer; + private CancellationTokenSource _cancellationTokenSource; + private int _disposed; + + public int ActiveConnections => _tcpPool?.Count ?? 0; + public bool IsRunning => _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested; + + public Server(NetConfiguration config = null) + { + _config = config ?? new NetConfiguration(); + } + + public void Start() + { + _cancellationTokenSource = new CancellationTokenSource(); + + switch (_config.Protocol) + { + case ProtocolType.TCP: + StartTcp(_cancellationTokenSource.Token); + break; + case ProtocolType.UDP: + StartUdp(_cancellationTokenSource.Token); + break; + } + } + + private void StartTcp(CancellationToken cancellationToken) + { + _tcpPool = new ConnectionPool(_config.MaxConnections); + + _tcpListener = new TcpListener(IPAddress.Any, _config.Port); + + // Allow address reuse and dual-stack + _tcpListener.Server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); + _tcpListener.Server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); + + // Increase socket buffer sizes for high throughput + _tcpListener.Server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer, _config.BufferSize); + _tcpListener.Server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer, _config.BufferSize); + + _tcpListener.Start(_config.BacklogSize); + _config.Logger?.Invoke($"TCP Server listening on port {_config.Port} (max {_config.MaxConnections:N0} connections, SSL={_config.UseSsl})"); + + // Accept loop + _ = AcceptLoopAsync(cancellationToken); + } + + private async Task AcceptLoopAsync(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + TcpClient tcpClient; + + try + { + tcpClient = await _tcpListener.AcceptTcpClientAsync().ConfigureAwait(false); + } + catch (ObjectDisposedException) + { + break; + } + catch (Exception exception) + { + _config.Logger?.Invoke($"Accept error: {exception.Message}"); + continue; + } + + // Handle each client on the thread pool + _ = Task.Run(() => HandleTcpClientAsync(tcpClient, cancellationToken), cancellationToken); + } + } + + private async Task HandleTcpClientAsync(TcpClient tcpClient, CancellationToken cancelationToken) + { + tcpClient.NoDelay = true; + tcpClient.ReceiveTimeout = _config.ReceiveTimeoutMs; + tcpClient.SendTimeout = _config.SendTimeoutMs; + + System.IO.Stream stream; + + try + { + if (_config.UseSsl) + { + var sslStream = new SslStream(tcpClient.GetStream(), false); + await sslStream.AuthenticateAsServerAsync( + _config.ServerCertificate, + clientCertificateRequired: false, + // No Tls 1.3 support for .net Framework 4.8 + enabledSslProtocols: SslProtocols.Tls12, + checkCertificateRevocation: true).ConfigureAwait(false); + stream = sslStream; + } + else + { + stream = tcpClient.GetStream(); + } + } + catch (Exception exception) + { + _config.Logger?.Invoke($"SSL handshake failed: {exception.Message}"); + tcpClient.Close(); + return; + } + + var connection = new TcpConnection(tcpClient, stream); + + if (!_tcpPool.TryAdd(connection)) + { + _config.Logger?.Invoke($"Connection limit reached ({_config.MaxConnections:N0}). Dropping client."); + connection.Close(); + connection.Dispose(); + return; + } + + try + { + if (ClientConnected != null) + { + await ClientConnected(connection).ConfigureAwait(false); + } + + await ReceiveLoopAsync(connection, cancelationToken).ConfigureAwait(false); + } + catch (Exception exception) + { + if (ClientError != null) + { + await ClientError(connection, exception).ConfigureAwait(false); + } + else + { + _config.Logger?.Invoke($"Client {connection.ConnectionId} error: {exception.Message}"); + } + } + finally + { + _tcpPool.TryRemove(connection.ConnectionId, out _); + connection.Close(); + + if (ClientDisconnected != null) + { + await ClientDisconnected(connection).ConfigureAwait(false); + } + + connection.Dispose(); + } + } + + private async Task ReceiveLoopAsync(TcpConnection conn, CancellationToken ct) + { + while (!ct.IsCancellationRequested && conn.IsConnected) + { + byte[] data = await conn.ReceiveBytesAsync().ConfigureAwait(false); + + if (DataReceived != null) + { + await DataReceived(conn, data).ConfigureAwait(false); + } + + if (StringReceived != null) + { + await StringReceived(conn, System.Text.Encoding.UTF8.GetString(data)).ConfigureAwait(false); + } + } + } + + private void StartUdp(CancellationToken ct) + { + _udpServer = new UdpClient(_config.Port); + _config.Logger?.Invoke($"UDP Server listening on port {_config.Port}"); + _ = UdpReceiveLoopAsync(ct); + } + + private async Task UdpReceiveLoopAsync(CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + try + { + var result = await _udpServer.ReceiveAsync().ConfigureAwait(false); + var conn = new UdpConnection(_udpServer, result.RemoteEndPoint); + + if (DataReceived != null) + { + await DataReceived(conn, result.Buffer).ConfigureAwait(false); + } + + if (StringReceived != null) + { + await StringReceived(conn, System.Text.Encoding.UTF8.GetString(result.Buffer)).ConfigureAwait(false); + } + } + catch (ObjectDisposedException) + { + break; + } + catch (Exception exception) + { + _config.Logger?.Invoke($"UDP receive error: {exception.Message}"); + } + } + } + + public async Task BroadcastBytesAsync(byte[] data) + { + if (_tcpPool == null) + { + return; + } + + _tcpPool.ForEach(async c => + { + try + { + await c.SendBytesAsync(data).ConfigureAwait(false); + } + catch + { + // Do nothing + } + }); + + await Task.CompletedTask; + } + + public Task BroadcastStringAsync(string message) => BroadcastBytesAsync(System.Text.Encoding.UTF8.GetBytes(message)); + + public void Stop() + { + _cancellationTokenSource?.Cancel(); + try + { + _tcpListener?.Stop(); + } + catch + { + // Do nothing + } + + try + { + _udpServer?.Close(); + } + catch + { + // Do nothing + } + + _tcpPool?.Clear(); + _config.Logger?.Invoke("Server stopped."); + } + + public void Dispose() + { + if (Interlocked.Exchange(ref _disposed, 1) == 0) + { + Stop(); + _cancellationTokenSource?.Dispose(); + _udpServer?.Dispose(); + } + } + } +} diff --git a/EonaCat.Sockets/EonaCat.Sockets/Models/Connection.cs b/EonaCat.Sockets/EonaCat.Sockets/Models/Connection.cs new file mode 100644 index 0000000..1cf2206 --- /dev/null +++ b/EonaCat.Sockets/EonaCat.Sockets/Models/Connection.cs @@ -0,0 +1,37 @@ +using System; +using System.Net.Security; +using System.Security.Cryptography.X509Certificates; + +namespace EonaCat.Sockets.Models +{ + public enum ProtocolType { TCP, UDP } + + public class NetConfiguration + { + // Connection limits + public int MaxConnections { get; set; } = 2_000_000; + public int BacklogSize { get; set; } = 10_000; + + // Protocol + public ProtocolType Protocol { get; set; } = ProtocolType.TCP; + + // Network + public string Host { get; set; } = "127.0.0.1"; + public int Port { get; set; } = 9000; + public int BufferSize { get; set; } = 65536; + public int ReceiveTimeoutMs { get; set; } = 30_000; + public int SendTimeoutMs { get; set; } = 30_000; + public int MaxRetries { get; set; } = 3; + + // SSL + public bool UseSsl { get; set; } = false; + public X509Certificate2 ServerCertificate { get; set; } + public string SslTargetHost { get; set; } + public bool AllowSelfSignedCertificates { get; set; } = false; + public RemoteCertificateValidationCallback CertificateValidationCallback { get; set; } + + public int IoThreads { get; set; } = Environment.ProcessorCount * 2; + + public Action Logger { get; set; } = message => Console.WriteLine($"[EonaCat.Sockets] {message}"); + } +} diff --git a/EonaCat.Sockets/EonaCat.Sockets/TcpConnection.cs b/EonaCat.Sockets/EonaCat.Sockets/TcpConnection.cs new file mode 100644 index 0000000..dec7bf3 --- /dev/null +++ b/EonaCat.Sockets/EonaCat.Sockets/TcpConnection.cs @@ -0,0 +1,142 @@ +using EonaCat.Sockets.Interfaces; +using System; +using System.Collections.Generic; +using System.IO; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace EonaCat.Sockets +{ + /// + /// Represents a single TCP or SSL connection. + /// + public class TcpConnection : IConnection + { + private readonly TcpClient _client; + private readonly Stream _stream; // NetworkStream or SslStream + private readonly SemaphoreSlim _sendLock = new SemaphoreSlim(1, 1); + private readonly SemaphoreSlim _recvLock = new SemaphoreSlim(1, 1); + private int _disposed; + + public string ConnectionId { get; } + public bool IsConnected => _client?.Connected ?? false; + + public TcpConnection(TcpClient client, Stream stream, string connectionId = null) + { + _client = client ?? throw new ArgumentNullException(nameof(client)); + _stream = stream ?? throw new ArgumentNullException(nameof(stream)); + ConnectionId = connectionId ?? Guid.NewGuid().ToString("N"); + } + + public async Task SendBytesAsync(byte[] data) + { + if (data == null) + { + throw new ArgumentNullException(nameof(data)); + } + + await _sendLock.WaitAsync().ConfigureAwait(false); + try + { + // 4-byte length prefix (big-endian) then payload + var lenBytes = BitConverter.GetBytes(data.Length); + if (BitConverter.IsLittleEndian) + { + Array.Reverse(lenBytes); + } + + await _stream.WriteAsync(lenBytes, 0, 4).ConfigureAwait(false); + await _stream.WriteAsync(data, 0, data.Length).ConfigureAwait(false); + await _stream.FlushAsync().ConfigureAwait(false); + } + finally + { + _sendLock.Release(); + } + } + + public async Task SendStringAsync(string message) => await SendBytesAsync(Encoding.UTF8.GetBytes(message ?? string.Empty)).ConfigureAwait(false); + + public async Task ReceiveBytesAsync() + { + await _recvLock.WaitAsync().ConfigureAwait(false); + try + { + var bufferLength = new byte[4]; + await ReadExactAsync(bufferLength, 4).ConfigureAwait(false); + if (BitConverter.IsLittleEndian) + { + Array.Reverse(bufferLength); + } + + int length = BitConverter.ToInt32(bufferLength, 0); + + // 128 MB safety limit + if (length < 0 || length > 128 * 1024 * 1024) + { + throw new InvalidDataException($"Invalid message length: {length}"); + } + + var data = new byte[length]; + await ReadExactAsync(data, length).ConfigureAwait(false); + return data; + } + finally + { + _recvLock.Release(); + } + } + + public async Task ReceiveStringAsync() => Encoding.UTF8.GetString(await ReceiveBytesAsync().ConfigureAwait(false)); + + private async Task ReadExactAsync(byte[] buffer, int count) + { + int offset = 0; + while (offset < count) + { + int read = await _stream.ReadAsync(buffer, offset, count - offset).ConfigureAwait(false); + if (read == 0) + { + throw new EndOfStreamException("Connection closed by remote host."); + } + + offset += read; + } + } + + public void Close() + { + if (Interlocked.Exchange(ref _disposed, 1) == 0) + { + try + { + _stream?.Close(); + } + catch + { + // Do nothing + } + + try + { + _client?.Close(); + } + catch + { + // Do nothing + } + } + } + + public void Dispose() + { + Close(); + _sendLock.Dispose(); + _recvLock.Dispose(); + _stream?.Dispose(); + _client?.Dispose(); + } + } +} diff --git a/EonaCat.Sockets/EonaCat.Sockets/UdpConnection.cs b/EonaCat.Sockets/EonaCat.Sockets/UdpConnection.cs new file mode 100644 index 0000000..ceb49df --- /dev/null +++ b/EonaCat.Sockets/EonaCat.Sockets/UdpConnection.cs @@ -0,0 +1,84 @@ +using EonaCat.Sockets.Interfaces; +using System; +using System.Collections.Generic; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace EonaCat.Sockets +{ + /// + /// UDP connection wrapper + /// Note: UDP packets are limited to ~65507 bytes per datagram. + /// + public class UdpConnection : IConnection + { + private readonly UdpClient _udpClient; + private readonly IPEndPoint _remoteEndPoint; + private readonly SemaphoreSlim _sendLock = new SemaphoreSlim(1, 1); + private int _disposed; + + public string ConnectionId { get; } + public bool IsConnected => !(_disposed == 1); + + public UdpConnection(UdpClient udpClient, IPEndPoint remoteEndPoint, string connectionId = null) + { + _udpClient = udpClient ?? throw new ArgumentNullException(nameof(udpClient)); + _remoteEndPoint = remoteEndPoint ?? throw new ArgumentNullException(nameof(remoteEndPoint)); + ConnectionId = connectionId ?? $"udp-{remoteEndPoint}"; + } + + public async Task SendBytesAsync(byte[] data) + { + if (data == null) + { + throw new ArgumentNullException(nameof(data)); + } + + await _sendLock.WaitAsync().ConfigureAwait(false); + + try + { + await _udpClient.SendAsync(data, data.Length, _remoteEndPoint).ConfigureAwait(false); + } + finally + { + _sendLock.Release(); + } + } + + public async Task SendStringAsync(string message) => await SendBytesAsync(Encoding.UTF8.GetBytes(message ?? string.Empty)).ConfigureAwait(false); + + public async Task ReceiveBytesAsync() + { + var result = await _udpClient.ReceiveAsync().ConfigureAwait(false); + return result.Buffer; + } + + public async Task ReceiveStringAsync() => Encoding.UTF8.GetString(await ReceiveBytesAsync().ConfigureAwait(false)); + + public void Close() + { + if (Interlocked.Exchange(ref _disposed, 1) == 0) + { + try + { + _udpClient?.Close(); + } + catch + { + // Do nothing + } + } + } + + public void Dispose() + { + Close(); + _sendLock.Dispose(); + _udpClient?.Dispose(); + } + } +}