Initial version

This commit is contained in:
2026-02-23 19:55:22 +01:00
parent 5020616804
commit 81908aaa96
9 changed files with 767 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
<Solution>
<Project Path="EonaCat.Sockets/EonaCat.Sockets.csproj" />
</Solution>

View File

@@ -0,0 +1,121 @@
using EonaCat.Sockets.Interfaces;
using EonaCat.Sockets.Models;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Http;
using System.Net.Security;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ProtocolType = EonaCat.Sockets.Models.ProtocolType;
namespace EonaCat.Sockets
{
/// <summary>
/// Client which supports TCP, SSL-TCP, UDP.
/// </summary>
public class Client : IDisposable
{
private readonly NetConfiguration _config;
private IConnection _connection;
private int _disposed;
public bool IsConnected => _connection?.IsConnected ?? false;
public string ConnectionId => _connection?.ConnectionId;
public NetConfiguration Configuration => _config;
public Client(NetConfiguration config = null)
{
_config = config ?? new NetConfiguration();
}
public async Task ConnectAsync(CancellationToken ct = default)
{
switch (_config.Protocol)
{
case ProtocolType.TCP:
_connection = await ConnectTcpAsync(ct).ConfigureAwait(false);
break;
case ProtocolType.UDP:
_connection = ConnectUdp();
break;
default:
throw new NotSupportedException($"Protocol {_config.Protocol} not supported.");
}
_config.Logger?.Invoke($"Connected [{_config.Protocol}] → {_config.Host}:{_config.Port} id={_connection.ConnectionId}");
}
private async Task<TcpConnection> ConnectTcpAsync(CancellationToken ct)
{
var tcpClient = new TcpClient();
tcpClient.ReceiveTimeout = _config.ReceiveTimeoutMs;
tcpClient.SendTimeout = _config.SendTimeoutMs;
tcpClient.NoDelay = true;
await tcpClient.ConnectAsync(_config.Host, _config.Port).ConfigureAwait(false);
System.IO.Stream stream;
if (_config.UseSsl)
{
var sslStream = new SslStream(tcpClient.GetStream(), false,
_config.CertificateValidationCallback ??
(_config.AllowSelfSignedCertificates
? (sender, certificate, chain, errors) => true
: (RemoteCertificateValidationCallback)null));
await sslStream.AuthenticateAsClientAsync(_config.SslTargetHost ?? _config.Host).ConfigureAwait(false);
stream = sslStream;
}
else
{
stream = tcpClient.GetStream();
}
return new TcpConnection(tcpClient, stream);
}
private UdpConnection ConnectUdp()
{
var udpClient = new UdpClient();
udpClient.Client.ReceiveTimeout = _config.ReceiveTimeoutMs;
udpClient.Client.SendTimeout = _config.SendTimeoutMs;
var ep = new IPEndPoint(IPAddress.Parse(_config.Host), _config.Port);
return new UdpConnection(udpClient, ep);
}
public Task SendBytesAsync(byte[] data) => EnsureConnection().SendBytesAsync(data);
public Task SendStringAsync(string msg) => EnsureConnection().SendStringAsync(msg);
public Task<byte[]> ReceiveBytesAsync() => EnsureConnection().ReceiveBytesAsync();
public Task<string> ReceiveStringAsync() => EnsureConnection().ReceiveStringAsync();
public void Disconnect()
{
_connection?.Close();
_connection?.Dispose();
_connection = null;
_config.Logger?.Invoke("Disconnected.");
}
private IConnection EnsureConnection()
{
var conn = _connection;
if (conn == null || !conn.IsConnected)
{
throw new InvalidOperationException("Not connected. Call ConnectAsync() first.");
}
return conn;
}
public void Dispose()
{
if (Interlocked.Exchange(ref _disposed, 1) == 0)
{
Disconnect();
}
}
}
}

View File

@@ -0,0 +1,66 @@
using EonaCat.Sockets.Interfaces;
using System;
using System.Collections.Concurrent;
using System.Threading;
namespace EonaCat.Sockets
{
/// <summary>
/// Connection pool
/// </summary>
public class ConnectionPool<TConnection> where TConnection : IConnection
{
private readonly ConcurrentDictionary<string, TConnection> _connections = new ConcurrentDictionary<string, TConnection>(StringComparer.Ordinal);
private long _totalAccepted;
private long _totalDropped;
private readonly int _maxConnections;
public ConnectionPool(int maxConnections)
{
_maxConnections = maxConnections;
}
public int Count => _connections.Count;
public long TotalAccepted => Interlocked.Read(ref _totalAccepted);
public long TotalDropped => Interlocked.Read(ref _totalDropped);
public bool TryAdd(TConnection connection)
{
if (_connections.Count >= _maxConnections)
{
Interlocked.Increment(ref _totalDropped);
return false;
}
if (_connections.TryAdd(connection.ConnectionId, connection))
{
Interlocked.Increment(ref _totalAccepted);
return true;
}
return false;
}
public bool TryRemove(string connectionId, out TConnection connection) => _connections.TryRemove(connectionId, out connection);
public bool TryGet(string connectionId, out TConnection connection) => _connections.TryGetValue(connectionId, out connection);
public void ForEach(Action<TConnection> action)
{
foreach (var keyValue in _connections)
{
action(keyValue.Value);
}
}
public void Clear()
{
foreach (var keyValue in _connections)
{
keyValue.Value.Close();
keyValue.Value.Dispose();
}
_connections.Clear();
}
}
}

View File

@@ -0,0 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
</Project>

View File

@@ -0,0 +1,16 @@
using System;
using System.Threading.Tasks;
namespace EonaCat.Sockets.Interfaces
{
public interface IConnection : IDisposable
{
string ConnectionId { get; }
bool IsConnected { get; }
Task SendBytesAsync(byte[] data);
Task SendStringAsync(string message);
Task<byte[]> ReceiveBytesAsync();
Task<string> ReceiveStringAsync();
void Close();
}
}

View File

@@ -0,0 +1,291 @@
using EonaCat.Sockets.Models;
using System;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Security.Authentication;
using System.Threading;
using System.Threading.Tasks;
using ProtocolType = EonaCat.Sockets.Models.ProtocolType;
namespace EonaCat.Sockets.Interfaces
{
/// <summary>
/// TCP/SSL/UDP server.
/// </summary>
public class Server : IDisposable
{
public event Func<IConnection, Task> ClientConnected;
public event Func<IConnection, byte[], Task> DataReceived;
public event Func<IConnection, string, Task> StringReceived;
public event Func<IConnection, Exception, Task> ClientError;
public event Func<IConnection, Task> ClientDisconnected;
private readonly NetConfiguration _config;
private ConnectionPool<TcpConnection> _tcpPool;
private TcpListener _tcpListener;
private UdpClient _udpServer;
private CancellationTokenSource _cancellationTokenSource;
private int _disposed;
public int ActiveConnections => _tcpPool?.Count ?? 0;
public bool IsRunning => _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested;
public Server(NetConfiguration config = null)
{
_config = config ?? new NetConfiguration();
}
public void Start()
{
_cancellationTokenSource = new CancellationTokenSource();
switch (_config.Protocol)
{
case ProtocolType.TCP:
StartTcp(_cancellationTokenSource.Token);
break;
case ProtocolType.UDP:
StartUdp(_cancellationTokenSource.Token);
break;
}
}
private void StartTcp(CancellationToken cancellationToken)
{
_tcpPool = new ConnectionPool<TcpConnection>(_config.MaxConnections);
_tcpListener = new TcpListener(IPAddress.Any, _config.Port);
// Allow address reuse and dual-stack
_tcpListener.Server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
_tcpListener.Server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
// Increase socket buffer sizes for high throughput
_tcpListener.Server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer, _config.BufferSize);
_tcpListener.Server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer, _config.BufferSize);
_tcpListener.Start(_config.BacklogSize);
_config.Logger?.Invoke($"TCP Server listening on port {_config.Port} (max {_config.MaxConnections:N0} connections, SSL={_config.UseSsl})");
// Accept loop
_ = AcceptLoopAsync(cancellationToken);
}
private async Task AcceptLoopAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
TcpClient tcpClient;
try
{
tcpClient = await _tcpListener.AcceptTcpClientAsync().ConfigureAwait(false);
}
catch (ObjectDisposedException)
{
break;
}
catch (Exception exception)
{
_config.Logger?.Invoke($"Accept error: {exception.Message}");
continue;
}
// Handle each client on the thread pool
_ = Task.Run(() => HandleTcpClientAsync(tcpClient, cancellationToken), cancellationToken);
}
}
private async Task HandleTcpClientAsync(TcpClient tcpClient, CancellationToken cancelationToken)
{
tcpClient.NoDelay = true;
tcpClient.ReceiveTimeout = _config.ReceiveTimeoutMs;
tcpClient.SendTimeout = _config.SendTimeoutMs;
System.IO.Stream stream;
try
{
if (_config.UseSsl)
{
var sslStream = new SslStream(tcpClient.GetStream(), false);
await sslStream.AuthenticateAsServerAsync(
_config.ServerCertificate,
clientCertificateRequired: false,
// No Tls 1.3 support for .net Framework 4.8
enabledSslProtocols: SslProtocols.Tls12,
checkCertificateRevocation: true).ConfigureAwait(false);
stream = sslStream;
}
else
{
stream = tcpClient.GetStream();
}
}
catch (Exception exception)
{
_config.Logger?.Invoke($"SSL handshake failed: {exception.Message}");
tcpClient.Close();
return;
}
var connection = new TcpConnection(tcpClient, stream);
if (!_tcpPool.TryAdd(connection))
{
_config.Logger?.Invoke($"Connection limit reached ({_config.MaxConnections:N0}). Dropping client.");
connection.Close();
connection.Dispose();
return;
}
try
{
if (ClientConnected != null)
{
await ClientConnected(connection).ConfigureAwait(false);
}
await ReceiveLoopAsync(connection, cancelationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
if (ClientError != null)
{
await ClientError(connection, exception).ConfigureAwait(false);
}
else
{
_config.Logger?.Invoke($"Client {connection.ConnectionId} error: {exception.Message}");
}
}
finally
{
_tcpPool.TryRemove(connection.ConnectionId, out _);
connection.Close();
if (ClientDisconnected != null)
{
await ClientDisconnected(connection).ConfigureAwait(false);
}
connection.Dispose();
}
}
private async Task ReceiveLoopAsync(TcpConnection conn, CancellationToken ct)
{
while (!ct.IsCancellationRequested && conn.IsConnected)
{
byte[] data = await conn.ReceiveBytesAsync().ConfigureAwait(false);
if (DataReceived != null)
{
await DataReceived(conn, data).ConfigureAwait(false);
}
if (StringReceived != null)
{
await StringReceived(conn, System.Text.Encoding.UTF8.GetString(data)).ConfigureAwait(false);
}
}
}
private void StartUdp(CancellationToken ct)
{
_udpServer = new UdpClient(_config.Port);
_config.Logger?.Invoke($"UDP Server listening on port {_config.Port}");
_ = UdpReceiveLoopAsync(ct);
}
private async Task UdpReceiveLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try
{
var result = await _udpServer.ReceiveAsync().ConfigureAwait(false);
var conn = new UdpConnection(_udpServer, result.RemoteEndPoint);
if (DataReceived != null)
{
await DataReceived(conn, result.Buffer).ConfigureAwait(false);
}
if (StringReceived != null)
{
await StringReceived(conn, System.Text.Encoding.UTF8.GetString(result.Buffer)).ConfigureAwait(false);
}
}
catch (ObjectDisposedException)
{
break;
}
catch (Exception exception)
{
_config.Logger?.Invoke($"UDP receive error: {exception.Message}");
}
}
}
public async Task BroadcastBytesAsync(byte[] data)
{
if (_tcpPool == null)
{
return;
}
_tcpPool.ForEach(async c =>
{
try
{
await c.SendBytesAsync(data).ConfigureAwait(false);
}
catch
{
// Do nothing
}
});
await Task.CompletedTask;
}
public Task BroadcastStringAsync(string message) => BroadcastBytesAsync(System.Text.Encoding.UTF8.GetBytes(message));
public void Stop()
{
_cancellationTokenSource?.Cancel();
try
{
_tcpListener?.Stop();
}
catch
{
// Do nothing
}
try
{
_udpServer?.Close();
}
catch
{
// Do nothing
}
_tcpPool?.Clear();
_config.Logger?.Invoke("Server stopped.");
}
public void Dispose()
{
if (Interlocked.Exchange(ref _disposed, 1) == 0)
{
Stop();
_cancellationTokenSource?.Dispose();
_udpServer?.Dispose();
}
}
}
}

View File

@@ -0,0 +1,37 @@
using System;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
namespace EonaCat.Sockets.Models
{
public enum ProtocolType { TCP, UDP }
public class NetConfiguration
{
// Connection limits
public int MaxConnections { get; set; } = 2_000_000;
public int BacklogSize { get; set; } = 10_000;
// Protocol
public ProtocolType Protocol { get; set; } = ProtocolType.TCP;
// Network
public string Host { get; set; } = "127.0.0.1";
public int Port { get; set; } = 9000;
public int BufferSize { get; set; } = 65536;
public int ReceiveTimeoutMs { get; set; } = 30_000;
public int SendTimeoutMs { get; set; } = 30_000;
public int MaxRetries { get; set; } = 3;
// SSL
public bool UseSsl { get; set; } = false;
public X509Certificate2 ServerCertificate { get; set; }
public string SslTargetHost { get; set; }
public bool AllowSelfSignedCertificates { get; set; } = false;
public RemoteCertificateValidationCallback CertificateValidationCallback { get; set; }
public int IoThreads { get; set; } = Environment.ProcessorCount * 2;
public Action<string> Logger { get; set; } = message => Console.WriteLine($"[EonaCat.Sockets] {message}");
}
}

View File

@@ -0,0 +1,142 @@
using EonaCat.Sockets.Interfaces;
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace EonaCat.Sockets
{
/// <summary>
/// Represents a single TCP or SSL connection.
/// </summary>
public class TcpConnection : IConnection
{
private readonly TcpClient _client;
private readonly Stream _stream; // NetworkStream or SslStream
private readonly SemaphoreSlim _sendLock = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _recvLock = new SemaphoreSlim(1, 1);
private int _disposed;
public string ConnectionId { get; }
public bool IsConnected => _client?.Connected ?? false;
public TcpConnection(TcpClient client, Stream stream, string connectionId = null)
{
_client = client ?? throw new ArgumentNullException(nameof(client));
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
ConnectionId = connectionId ?? Guid.NewGuid().ToString("N");
}
public async Task SendBytesAsync(byte[] data)
{
if (data == null)
{
throw new ArgumentNullException(nameof(data));
}
await _sendLock.WaitAsync().ConfigureAwait(false);
try
{
// 4-byte length prefix (big-endian) then payload
var lenBytes = BitConverter.GetBytes(data.Length);
if (BitConverter.IsLittleEndian)
{
Array.Reverse(lenBytes);
}
await _stream.WriteAsync(lenBytes, 0, 4).ConfigureAwait(false);
await _stream.WriteAsync(data, 0, data.Length).ConfigureAwait(false);
await _stream.FlushAsync().ConfigureAwait(false);
}
finally
{
_sendLock.Release();
}
}
public async Task SendStringAsync(string message) => await SendBytesAsync(Encoding.UTF8.GetBytes(message ?? string.Empty)).ConfigureAwait(false);
public async Task<byte[]> ReceiveBytesAsync()
{
await _recvLock.WaitAsync().ConfigureAwait(false);
try
{
var bufferLength = new byte[4];
await ReadExactAsync(bufferLength, 4).ConfigureAwait(false);
if (BitConverter.IsLittleEndian)
{
Array.Reverse(bufferLength);
}
int length = BitConverter.ToInt32(bufferLength, 0);
// 128 MB safety limit
if (length < 0 || length > 128 * 1024 * 1024)
{
throw new InvalidDataException($"Invalid message length: {length}");
}
var data = new byte[length];
await ReadExactAsync(data, length).ConfigureAwait(false);
return data;
}
finally
{
_recvLock.Release();
}
}
public async Task<string> ReceiveStringAsync() => Encoding.UTF8.GetString(await ReceiveBytesAsync().ConfigureAwait(false));
private async Task ReadExactAsync(byte[] buffer, int count)
{
int offset = 0;
while (offset < count)
{
int read = await _stream.ReadAsync(buffer, offset, count - offset).ConfigureAwait(false);
if (read == 0)
{
throw new EndOfStreamException("Connection closed by remote host.");
}
offset += read;
}
}
public void Close()
{
if (Interlocked.Exchange(ref _disposed, 1) == 0)
{
try
{
_stream?.Close();
}
catch
{
// Do nothing
}
try
{
_client?.Close();
}
catch
{
// Do nothing
}
}
}
public void Dispose()
{
Close();
_sendLock.Dispose();
_recvLock.Dispose();
_stream?.Dispose();
_client?.Dispose();
}
}
}

View File

@@ -0,0 +1,84 @@
using EonaCat.Sockets.Interfaces;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace EonaCat.Sockets
{
/// <summary>
/// UDP connection wrapper
/// Note: UDP packets are limited to ~65507 bytes per datagram.
/// </summary>
public class UdpConnection : IConnection
{
private readonly UdpClient _udpClient;
private readonly IPEndPoint _remoteEndPoint;
private readonly SemaphoreSlim _sendLock = new SemaphoreSlim(1, 1);
private int _disposed;
public string ConnectionId { get; }
public bool IsConnected => !(_disposed == 1);
public UdpConnection(UdpClient udpClient, IPEndPoint remoteEndPoint, string connectionId = null)
{
_udpClient = udpClient ?? throw new ArgumentNullException(nameof(udpClient));
_remoteEndPoint = remoteEndPoint ?? throw new ArgumentNullException(nameof(remoteEndPoint));
ConnectionId = connectionId ?? $"udp-{remoteEndPoint}";
}
public async Task SendBytesAsync(byte[] data)
{
if (data == null)
{
throw new ArgumentNullException(nameof(data));
}
await _sendLock.WaitAsync().ConfigureAwait(false);
try
{
await _udpClient.SendAsync(data, data.Length, _remoteEndPoint).ConfigureAwait(false);
}
finally
{
_sendLock.Release();
}
}
public async Task SendStringAsync(string message) => await SendBytesAsync(Encoding.UTF8.GetBytes(message ?? string.Empty)).ConfigureAwait(false);
public async Task<byte[]> ReceiveBytesAsync()
{
var result = await _udpClient.ReceiveAsync().ConfigureAwait(false);
return result.Buffer;
}
public async Task<string> ReceiveStringAsync() => Encoding.UTF8.GetString(await ReceiveBytesAsync().ConfigureAwait(false));
public void Close()
{
if (Interlocked.Exchange(ref _disposed, 1) == 0)
{
try
{
_udpClient?.Close();
}
catch
{
// Do nothing
}
}
}
public void Dispose()
{
Close();
_sendLock.Dispose();
_udpClient?.Dispose();
}
}
}