Files
EonaCat.Connections/EonaCat.Connections/NetworkClient.cs
T
2026-06-17 08:05:50 +02:00

2563 lines
109 KiB
C#

using EonaCat.Connections.EventArguments;
using EonaCat.Connections.Helpers;
using EonaCat.Connections.Models;
using System.Buffers;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Security.Authentication;
using System.Security.Cryptography;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using ErrorEventArgs = EonaCat.Connections.EventArguments.ErrorEventArgs;
using ProtocolType = EonaCat.Connections.Models.ProtocolType;
namespace EonaCat.Connections
{
// This file is part of the EonaCat project(s) which is released under the Apache License.
// See the LICENSE file or go to https://EonaCat.com/license for full license details.
public class NetworkClient : IAsyncDisposable, IDisposable
{
private const int DEFAULT_WAITING_TIMEOUT_IN_SECONDS = 30;
private readonly Configuration _config;
private Task _autoReconnectTask;
private TcpClient _tcpClient;
private UdpClient _udpClient;
private Stream _stream;
private Aes _aesEncryption;
private Task _receiveTask;
private CancellationTokenSource _clientCts;
private CancellationTokenSource _connectionCts;
private readonly Decoder _utf8Decoder = Encoding.UTF8.GetDecoder();
private char[] _charBuffer = new char[8192];
private static readonly byte[] PingBytes = Encoding.UTF8.GetBytes(Configuration.PING_VALUE);
private static readonly byte[] PongBytes = Encoding.UTF8.GetBytes(Configuration.PONG_VALUE);
public event EventHandler<string> OnLog;
public bool IsConnected
{
get;
set;
}
private Task _pingTask;
private Task _pongTask;
private Task _idleMonitorTask;
public bool IsSecure => _config != null && (_config.UseSsl || _config.UseAesEncryption);
public bool IsEncrypted => _config != null && _config.UseAesEncryption;
public bool IsTcp => _config != null && _config.Protocol == ProtocolType.TCP;
private readonly SemaphoreSlim _connectLock = new(1, 1);
private readonly SemaphoreSlim _streamReadLock = new(1, 1);
private readonly SemaphoreSlim _streamWriteLock = new(1, 1);
private readonly SemaphoreSlim _connectionAttemptLock = new(1, 1);
private long _bytesSent;
private long _bytesReceived;
private long _messagesSent;
private long _messagesReceived;
public DateTime ConnectionTime { get; private set; }
public TimeSpan Uptime => DateTime.UtcNow - ConnectionTime;
public DateTime LastActive { get; internal set; }
public long BytesSent => Interlocked.Read(ref _bytesSent);
public long BytesReceived => Interlocked.Read(ref _bytesReceived);
public long MessagesSent => Interlocked.Read(ref _messagesSent);
public long MessagesReceived => Interlocked.Read(ref _messagesReceived);
public int IdleTimeInSeconds()
{
var idleTime = IdleTime();
return (int)idleTime.TotalSeconds;
}
public int IdleTimeInMinutes()
{
var idleTime = IdleTime();
return (int)idleTime.TotalMinutes;
}
public int IdleTimeInHours()
{
var idleTime = IdleTime();
return (int)idleTime.TotalHours;
}
public int IdleTimeInDays()
{
var idleTime = IdleTime();
return (int)idleTime.TotalDays;
}
public TimeSpan IdleTime()
{
return DateTime.UtcNow - LastActive;
}
public string IdleTimeFormatted(bool includeDays = true, bool includeHours = true, bool includeMinutes = true, bool includeSeconds = true, bool includeMilliseconds = true)
{
var idleTime = IdleTime();
var parts = new List<string>();
if (includeDays)
{
parts.Add($"{(int)idleTime.TotalDays:D2}d");
}
if (includeHours)
{
parts.Add($"{idleTime.Hours:D2}h");
}
if (includeMinutes)
{
parts.Add($"{idleTime.Minutes:D2}m");
}
if (includeSeconds)
{
parts.Add($"{idleTime.Seconds:D2}s");
}
if (includeMilliseconds)
{
parts.Add($"{idleTime.Milliseconds:D3}ms");
}
return string.Join(" ", parts);
}
public int ConnectedTimeInSeconds()
{
var connectedTime = DateTime.UtcNow - ConnectionTime;
return (int)connectedTime.TotalSeconds;
}
public int ConnectedTimeInMinutes()
{
var connectedTime = DateTime.UtcNow - ConnectionTime;
return (int)connectedTime.TotalMinutes;
}
public int ConnectedTimeInHours()
{
var connectedTime = DateTime.UtcNow - ConnectionTime;
return (int)connectedTime.TotalHours;
}
public int ConnectedTimeInDays()
{
var connectedTime = DateTime.UtcNow - ConnectionTime;
return (int)connectedTime.TotalDays;
}
public TimeSpan ConnectedTime()
{
return DateTime.UtcNow - ConnectionTime;
}
public string ConnectedTimeFormatted(bool includeDays = true, bool includeHours = true, bool includeMinutes = true, bool includeSeconds = true, bool includeMilliseconds = true)
{
var connectedTime = ConnectedTime();
var parts = new List<string>();
if (includeDays)
{
parts.Add($"{(int)connectedTime.TotalDays:D2}d");
}
if (includeHours)
{
parts.Add($"{connectedTime.Hours:D2}h");
}
if (includeMinutes)
{
parts.Add($"{connectedTime.Minutes:D2}m");
}
if (includeSeconds)
{
parts.Add($"{connectedTime.Seconds:D2}s");
}
if (includeMilliseconds)
{
parts.Add($"{connectedTime.Milliseconds:D3}ms");
}
return string.Join(" ", parts);
}
private bool _disposed;
private bool _stopAutoReconnecting;
private DisconnectReason _reason;
private bool _wasConnected;
public event EventHandler<ConnectionEventArgs> OnConnected;
public event EventHandler<ConnectionEventArgs> OnNicknameSend;
public event EventHandler<DataReceivedEventArgs> OnDataReceived;
public event EventHandler<ConnectionEventArgs> OnDisconnected;
public event EventHandler<ErrorEventArgs> OnSslError;
public event EventHandler<ErrorEventArgs> OnEncryptionError;
public event EventHandler<ErrorEventArgs> OnGeneralError;
public event EventHandler<PingEventArgs> OnPingResponse;
public event EventHandler<PingEventArgs> OnPongResponse;
public event EventHandler<ErrorEventArgs> OnSocketError;
public SocketStatusPage StatusPage { get; } = new SocketStatusPage();
public HealthApiServer HealthApi { get; }
public NetworkClient(Configuration config)
{
_config = config ?? throw new ArgumentNullException(nameof(config));
_clientCts = new CancellationTokenSource();
HealthApi = new HealthApiServer(BuildHealthJson, BuildStatusJson);
}
private string BuildHealthJson()
{
var E = HealthApiServer.JsonEscape;
return "{" +
"\"status\":\"ok\"," +
"\"type\":\"client\"," +
$"\"serverAddress\":{E(_config.Host + ":" + _config.Port)}," +
$"\"protocol\":{E(_config.Protocol.ToString())}," +
$"\"isConnected\":{(IsConnected ? "true" : "false")}," +
$"\"isSecure\":{(IsSecure ? "true" : "false")}," +
$"\"isEncrypted\":{(IsEncrypted ? "true" : "false")}," +
$"\"uptimeSeconds\":{Uptime.TotalSeconds:F1}," +
$"\"connectionTimeUtc\":{E(ConnectionTime.ToString("O"))}" +
"}";
}
private string BuildStatusJson()
{
var E = HealthApiServer.JsonEscape;
return "{" +
"\"status\":\"ok\"," +
"\"type\":\"client\"," +
$"\"serverAddress\":{E(_config.Host + ":" + _config.Port)}," +
$"\"protocol\":{E(_config.Protocol.ToString())}," +
$"\"isConnected\":{(IsConnected ? "true" : "false")}," +
$"\"isSecure\":{(IsSecure ? "true" : "false")}," +
$"\"isEncrypted\":{(IsEncrypted ? "true" : "false")}," +
$"\"nickname\":{E(Nickname)}," +
$"\"bytesSent\":{BytesSent}," +
$"\"bytesReceived\":{BytesReceived}," +
$"\"messagesSent\":{MessagesSent}," +
$"\"messagesReceived\":{MessagesReceived}," +
$"\"uptimeSeconds\":{Uptime.TotalSeconds:F1}," +
$"\"connectionTimeUtc\":{E(ConnectionTime.ToString("O"))}," +
$"\"lastActiveUtc\":{(LastActive == default ? "null" : E(LastActive.ToString("O")))}," +
$"\"idleTimeSeconds\":{IdleTime().TotalSeconds:F1}," +
$"\"lastDataSentUtc\":{(LastDataSent == default ? "null" : E(LastDataSent.ToString("O")))}," +
$"\"lastDataReceivedUtc\":{(LastDataReceived == default ? "null" : E(LastDataReceived.ToString("O")))}," +
$"\"disconnectionTimeUtc\":{(DisconnectionTime == default ? "null" : E(DisconnectionTime.ToString("O")))}," +
$"\"isAutoConnectStarted\":{(IsAutoConnectStarted ? "true" : "false")}," +
$"\"totalErrors\":{StatusPage.TotalErrors}," +
$"\"sslErrors\":{StatusPage.SslErrorCount}" +
"}";
}
public async Task<bool> ConnectAsync()
{
DebugConnection($"Attempting to connect to {_config.Host}:{_config.Port} using protocol {_config.Protocol} with SSL: {_config.UseSsl} and AES: {_config.UseAesEncryption}");
await _connectLock.WaitAsync().ConfigureAwait(false);
DebugConnection($"Acquired connection lock for {_config.Host}:{_config.Port}");
try
{
if (_clientCts == null || _clientCts.IsCancellationRequested)
{
_clientCts?.Dispose();
_clientCts = new CancellationTokenSource();
}
// Prevent duplicate connections if already connected
if (IsConnected)
{
if (_config.Protocol == ProtocolType.TCP)
{
if (_stream == null || !_stream.CanRead || !_stream.CanWrite)
{
await DisconnectClientAsync(DisconnectReason.RemoteClosed).ConfigureAwait(false);
}
else
{
return true;
}
}
else
{
if (_udpClient == null)
{
IsConnected = false;
}
else
{
return true;
}
}
}
DebugConnection($"Starting connection process for {_config.Host}:{_config.Port}");
DebugConnection($"Cleaned up existing connection for {_config.Host}:{_config.Port}");
_connectionCts?.Cancel();
_connectionCts?.Dispose();
_connectionCts = new CancellationTokenSource();
DebugConnection($"Connecting to {_config.Host}:{_config.Port} using protocol {_config.Protocol}");
bool isConnected = false;
if (_config.Protocol == ProtocolType.TCP)
{
var result = await ConnectTcpAsync().ConfigureAwait(false);
isConnected = result;
if (!result)
{
throw new Exception("Failed to connect via TCP");
}
}
else
{
var result = await ConnectUdpAsync().ConfigureAwait(false);
isConnected = result;
if (!result)
{
throw new Exception("Failed to connect via UDP");
}
}
if (isConnected)
{
DebugConnection($"Successfully connected to {_config.Host}:{_config.Port}");
// If we already had a nickname, resend it
if (!string.IsNullOrEmpty(Nickname))
{
await SendNicknameAsync(Nickname).ConfigureAwait(false);
}
DebugConnection($"Starting background tasks for {_config.Host}:{_config.Port}");
if (_config.EnableAutoReconnect &&
(_autoReconnectTask == null || _autoReconnectTask.IsCompleted || _autoReconnectTask.IsCanceled || _autoReconnectTask.IsFaulted))
{
if (_autoReconnectSource == null || _autoReconnectSource.IsCancellationRequested)
{
_autoReconnectSource?.Dispose();
_autoReconnectSource = new CancellationTokenSource();
}
_autoReconnectTask = Task.Run(() => AutoReconnectAsync(_autoReconnectSource.Token), _autoReconnectSource.Token);
}
_wasConnected = true;
}
OnConnected?.Invoke(this, new ConnectionEventArgs
{
ClientId = Nickname ?? "self",
RemoteEndPoint = new IPEndPoint(IPAddress.Parse(_config.Host), _config.Port)
});
if (_config.EnableAutoHtmlReports)
{
StatusPage.StartAutoHtmlReport(
_config.HtmlReportOutputDirectory,
_config.HtmlReportIntervalSeconds,
"status-client-errors.html");
}
if (_config.EnableHealthApi && !HealthApi.IsRunning)
{
HealthApi.Start(_config.HealthApiPort, _config.HealthApiBindAddress);
}
return true;
}
catch (Exception exception)
{
RecordError(exception, "Connection error");
OnGeneralError?.Invoke(this, new ErrorEventArgs
{
Exception = exception,
Message = "Connection error"
});
return false;
}
finally
{
try
{
_connectLock?.Release();
}
catch
{
// Do nothing
}
}
}
private async Task CleanupExistingConnectionAsync()
{
try
{
_stream?.Close();
_stream?.Dispose();
_stream = null;
if (_tcpClient != null)
{
try
{
_tcpClient?.Client?.Shutdown(SocketShutdown.Both);
}
catch
{
// Do nothing
}
_tcpClient?.Close();
_tcpClient?.Dispose();
_tcpClient = null;
}
if (_receiveTask != null)
{
await Task.WhenAny(_receiveTask, Task.Delay(3000));
}
}
catch (Exception ex)
{
DebugConnection($"Error cleaning up existing connection for {_config.Host}:{_config.Port}: {ex.Message}");
OnLog?.Invoke(this, $"Error cleaning up existing connection: {ex.Message}");
}
}
private async Task<bool> ConnectTcpAsync()
{
await _connectionAttemptLock.WaitAsync();
try
{
if (IsConnected)
{
return true;
}
int attempt = 0;
while (_config.SSLMaxRetries == 0 || attempt < _config.SSLMaxRetries)
{
if (_clientCts != null && _clientCts.IsCancellationRequested)
{
break;
}
attempt++;
TcpClient tcpClient = null;
SslHandshakeDiagnostics sslDiagnostics = null;
try
{
tcpClient = new TcpClient
{
NoDelay = !_config.EnableNagle,
ReceiveBufferSize = _config.BufferSize,
SendBufferSize = _config.BufferSize,
LingerState = new LingerOption(_config.EnableRST, 0)
};
if (EnableKeepAlive)
{
DebugConnection($"Enabling TCP keep-alive for {_config.Host}:{_config.Port} with KeepAliveTimeSeconds: {_config.KeepAliveTimeSeconds}, KeepAliveIntervalSeconds: {_config.KeepAliveIntervalSeconds}, KeepAliveRetryCount: {_config.KeepAliveRetryCount}");
ConfigureKeepAlive(tcpClient.Client, _config.KeepAliveTimeSeconds, _config.KeepAliveIntervalSeconds, _config.KeepAliveRetryCount);
}
await CleanupExistingConnectionAsync().ConfigureAwait(false);
using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(_clientCts.Token);
connectCts.CancelAfter(_config.ConnectionTimeout > TimeSpan.Zero ? _config.ConnectionTimeout : TimeSpan.FromSeconds(10));
#if NET8_0_OR_GREATER
await tcpClient.ConnectAsync(_config.Host, _config.Port, connectCts.Token).ConfigureAwait(false);
#else
var connectTask = tcpClient.ConnectAsync(_config.Host, _config.Port);
var timeoutTask = Task.Delay(Timeout.Infinite, connectCts.Token);
if (await Task.WhenAny(connectTask, timeoutTask).ConfigureAwait(false) != connectTask)
{
throw new OperationCanceledException("TCP connect timed out.");
}
await connectTask.ConfigureAwait(false);
#endif
_tcpClient = tcpClient;
tcpClient = null;
LastPongReceived = DateTime.UtcNow;
DebugConnection($"TCP connection established to {_config.Host}:{_config.Port} on attempt {attempt} using client {_tcpClient.Client.LocalEndPoint}");
NetworkStream networkStream = _tcpClient.GetStream();
DebugConnection($"Obtained network stream for {_config.Host}:{_config.Port} on attempt {attempt}, SSL Enabled: {_config.UseSsl}");
if (!_config.UseSsl)
{
_stream = networkStream;
break;
}
// SSL setup
var sslStream = new SslStream(networkStream, leaveInnerStreamOpen: true, _config.GetRemoteCertificateValidationCallback());
sslDiagnostics = _config.EnableSslDiagnostics ? new SslHandshakeDiagnostics() : null;
var remoteEndPoint = _tcpClient?.Client.RemoteEndPoint?.ToString() ?? "unknown";
sslDiagnostics?.StartHandshake(remoteEndPoint, _config.Certificate != null);
#if NET8_0_OR_GREATER
var clientOptions = new SslClientAuthenticationOptions
{
TargetHost = _config.Host,
ClientCertificates = _config.Certificate != null ? new X509CertificateCollection { _config.Certificate } : null,
EnabledSslProtocols = SslProtocols.Tls12 | SslProtocols.Tls13,
CertificateRevocationCheckMode = _config.CheckCertificateRevocation ? X509RevocationMode.Online : X509RevocationMode.NoCheck,
AllowRenegotiation = _config.AllowTlsRenegotiation
};
DebugConnection($"Authenticating SSL connection to {_config.Host}:{_config.Port} with SNI: {_config.Host}, Client Certificate: {(_config.Certificate != null ? "Yes" : "No")}, Enabled Protocols: {clientOptions.EnabledSslProtocols}, Check Certificate Revocation: {_config.CheckCertificateRevocation}");
sslDiagnostics?.StartStage("ClientAuth");
await sslStream.AuthenticateAsClientAsync(clientOptions).ConfigureAwait(false);
sslDiagnostics?.EndStage("ClientAuth");
#else
sslDiagnostics?.StartStage("ClientAuth");
if (_config.Certificate != null)
{
await sslStream.AuthenticateAsClientAsync(_config.Host, new X509CertificateCollection { _config.Certificate }, SslProtocols.Tls12 | SslProtocols.Tls13, _config.CheckCertificateRevocation).ConfigureAwait(false);
}
else
{
await sslStream.AuthenticateAsClientAsync(_config.Host, null, SslProtocols.Tls12 | SslProtocols.Tls13, _config.CheckCertificateRevocation).ConfigureAwait(false);
}
sslDiagnostics?.EndStage("ClientAuth");
#endif
DebugConnection($"SSL authentication successful for {_config.Host}:{_config.Port} using SNI: {_config.Host}, Client Certificate: {(_config.Certificate != null ? "Yes" : "No")}, Enabled Protocols: {(sslStream.SslProtocol)}, Cipher Algorithm: {sslStream.CipherAlgorithm}, Hash Algorithm: {sslStream.HashAlgorithm}, Key Exchange Algorithm: {sslStream.KeyExchangeAlgorithm}");
sslDiagnostics?.RecordSuccess(sslStream);
if (sslDiagnostics != null)
{
DebugConnection($"[SSL Metrics] {sslDiagnostics}");
}
_stream = sslStream;
break;
}
catch (SocketException socketEx)
{
var errorMsg = $"Socket error on TCP connect attempt {attempt}: {socketEx.SocketErrorCode} - {socketEx.SocketErrorCode switch { SocketError.ConnectionRefused => "Connection refused - server not listening or firewall blocked", SocketError.HostUnreachable => "Host unreachable - network unreachable or no route to host", SocketError.NetworkUnreachable => "Network unreachable", SocketError.TimedOut => "Connection attempt timed out", _ => "Unknown socket error" }}";
RecordError(socketEx, errorMsg);
DebugConnection($"[Attempt {attempt}] Socket error: {socketEx.SocketErrorCode} - {socketEx.Message}");
OnLog?.Invoke(this, $"[Attempt {attempt}] TCP connection failed (SocketError={socketEx.SocketErrorCode}): {socketEx.Message}, cannot connect to {_config.Host}:{_config.Port}");
if (_config.SSLMaxRetries != 0 && attempt >= _config.SSLMaxRetries)
{
throw;
}
}
catch (IOException ioEx) when (ioEx.InnerException is SocketException innerSocketEx)
{
var errorMsg = $"Socket IO error on TCP connect attempt {attempt}: {innerSocketEx.SocketErrorCode} - {innerSocketEx.SocketErrorCode switch { SocketError.ConnectionRefused => "Connection refused", SocketError.HostUnreachable => "Host unreachable", _ => "Unknown socket error" }}";
RecordError(ioEx, errorMsg);
DebugConnection($"[Attempt {attempt}] Socket IO error: {innerSocketEx.SocketErrorCode} - {ioEx.Message}");
OnLog?.Invoke(this, $"[Attempt {attempt}] TCP connection failed (SocketError={innerSocketEx.SocketErrorCode}): {ioEx.Message}, cannot connect to {_config.Host}:{_config.Port}");
if (_config.SSLMaxRetries != 0 && attempt >= _config.SSLMaxRetries)
{
throw;
}
}
catch (Exception ex)
{
var isRecoverable = _config.UseSsl ? Helpers.SslHandshakeDiagnostics.IsRecoverableFailure(ex) : false;
sslDiagnostics?.RecordFailure(ex, isRecoverable);
RecordError(ex, $"Connection error on attempt {attempt}: {ex.GetType().Name} - {ex.Message}", isSslError: _config.UseSsl);
if (_config.UseSsl)
{
// Try to read SSL error notification from server
string serverSslError = null;
try
{
var rawStream = _tcpClient?.GetStream();
if (rawStream != null && rawStream.DataAvailable)
{
var buf = new byte[1024];
var n = await rawStream.ReadAsync(buf, 0, buf.Length).ConfigureAwait(false);
if (n > 0)
{
var msg = Encoding.UTF8.GetString(buf, 0, n);
if (msg.Contains(Configuration.SSL_ERROR_PREFIX))
{
var startIdx = msg.IndexOf(Configuration.SSL_ERROR_PREFIX, StringComparison.Ordinal) + Configuration.SSL_ERROR_PREFIX.Length;
var endIdx = msg.IndexOf(Configuration.SSL_ERROR_SUFFIX, StringComparison.Ordinal);
if (endIdx > startIdx)
{
serverSslError = msg.Substring(startIdx, endIdx - startIdx);
}
}
}
}
}
catch (Exception readEx)
{
DebugConnection($"Unable to read SSL error notification from server: {readEx.GetType().Name}");
}
// Try to send SSL error notification to server
try
{
var rawStream = _tcpClient?.GetStream();
if (rawStream != null)
{
var notification = Encoding.UTF8.GetBytes($"{Configuration.SSL_ERROR_PREFIX}Client SSL error: {ex.GetType().Name} - {ex.Message}{Configuration.SSL_ERROR_SUFFIX}");
await rawStream.WriteAsync(notification, 0, notification.Length).ConfigureAwait(false);
await rawStream.FlushAsync().ConfigureAwait(false);
}
}
catch (Exception sendEx)
{
DebugConnection($"Unable to send SSL error notification to server: {sendEx.GetType().Name}");
}
OnSslError?.Invoke(this, new ErrorEventArgs
{
Exception = ex,
Message = serverSslError != null
? $"SSL connection error on attempt {attempt}. Server reported: {serverSslError}"
: $"SSL connection error on attempt {attempt}: {ex.GetType().Name} - {ex.Message}"
});
}
else
{
OnGeneralError?.Invoke(this, new ErrorEventArgs
{
Exception = ex,
Message = $"TCP connection error on attempt {attempt}: {ex.GetType().Name} - {ex.Message}"
});
}
OnLog?.Invoke(this, $"[Attempt {attempt}] TCP/SSL connection failed: {ex.GetType().Name} - {ex.Message}, cannot connect to {_config.Host}:{_config.Port}");
if (_config.SSLMaxRetries != 0 && attempt >= _config.SSLMaxRetries)
{
throw;
}
}
finally
{
try { tcpClient?.Close(); tcpClient?.Dispose(); } catch { }
tcpClient = null;
}
await Task.Delay(_config.SSLRetryDelayInSeconds * 1000).ConfigureAwait(false);
}
if (_config.UseAesEncryption)
{
_aesEncryption?.Dispose();
_aesEncryption = await AesKeyExchange.ReceiveAesKeyAsync(_stream, _config.AesPassword).ConfigureAwait(false);
}
IsConnected = true;
// Start receive loop
_receiveTask = Task.Run(async () => await ReceiveDataAsync(_connectionCts.Token));
if (_config.EnableHeartbeat)
{
_pingTask = Task.Run(() => StartPingLoopAsync(_connectionCts.Token));
_pongTask = Task.Run(() => StartPongLoop(_connectionCts.Token));
}
if (_config.IdleTimeoutSeconds > 0)
{
_idleMonitorTask = Task.Run(() => MonitorIdleTimeoutAsync(_connectionCts.Token));
}
return true;
}
finally
{
_connectionAttemptLock.Release();
}
}
private void UpdateLastActive()
{
LastActive = DateTime.UtcNow;
LastDataReceived = DateTime.UtcNow;
IsIdleTimeoutTriggered = false;
_lastIdleLogUtc = DateTime.MinValue;
}
private async Task CancelAndWaitAsync()
{
_connectionCts?.Cancel();
_connectionCts?.Dispose();
_connectionCts = null;
await Task.WhenAll(
SafeTaskCompletion(_pingTask),
SafeTaskCompletion(_pongTask),
SafeTaskCompletion(_receiveTask),
SafeTaskCompletion(_idleMonitorTask)
);
}
public async ValueTask DisposeAsync()
{
if (_disposed)
{
return;
}
_disposed = true;
try
{
_clientCts?.Cancel();
await SafeTaskCompletion(_autoReconnectTask);
try
{
_autoReconnectTask?.Dispose();
}
catch
{
// Do nothing
}
_autoReconnectTask = null;
await DisconnectClientAsync();
await CleanupAsync();
_clientCts?.Dispose();
_clientCts = null;
_autoReconnectSource?.Cancel();
_autoReconnectSource?.Dispose();
_autoReconnectSource = null;
await SafeTaskCompletion(_pingTask);
await SafeTaskCompletion(_pongTask);
await SafeTaskCompletion(_idleMonitorTask);
_pingTask?.Dispose();
_pongTask?.Dispose();
_idleMonitorTask?.Dispose();
_pingTask = null;
_pongTask = null;
_idleMonitorTask = null;
_connectLock?.Dispose();
_connectionAttemptLock?.Dispose();
_streamReadLock?.Dispose();
_streamWriteLock?.Dispose();
HealthApi.Dispose();
OnConnected = null;
OnDisconnected = null;
OnDataReceived = null;
OnGeneralError = null;
OnPingResponse = null;
OnPongResponse = null;
OnSslError = null;
OnEncryptionError = null;
OnNicknameSend = null;
OnSocketError = null;
}
finally
{
GC.SuppressFinalize(this);
}
}
private async Task CleanupAsync()
{
IsConnected = false;
await CancelAndWaitAsync();
if (_pingTask != null)
{
await Task.WhenAny(_pingTask, Task.Delay(3000));
}
if (_pongTask != null)
{
await Task.WhenAny(_pongTask, Task.Delay(3000));
}
if (_idleMonitorTask != null)
{
await Task.WhenAny(_idleMonitorTask, Task.Delay(3000));
}
_pingTask?.Dispose();
_pongTask?.Dispose();
_idleMonitorTask?.Dispose();
_receiveTask?.Dispose();
_pingTask = null;
_pongTask = null;
_idleMonitorTask = null;
_receiveTask = null;
_tcpClient?.Close();
_tcpClient?.Dispose();
_tcpClient = null;
_udpClient?.Close();
_udpClient?.Dispose();
_udpClient = null;
_stream?.Close();
_stream?.Dispose();
_stream = null;
_utf8Decoder.Reset();
_aesEncryption?.Dispose();
_aesEncryption = null;
_stopAutoReconnecting = false;
}
public string IpAddress => _config != null ? _config.Host : string.Empty;
public int Port => _config != null ? _config.Port : 0;
public string Nickname { get; private set; }
public bool DEBUG_DATA_SEND { get; set; }
public bool DEBUG_DATA_RECEIVED { get; set; }
public DateTime LastPongReceived { get; private set; }
public DateTime LastPingSent { get; private set; }
public bool DisconnectOnMissedPong { get; set; }
public DateTime LastDataSent { get; private set; }
public DateTime LastDataReceived { get; private set; }
public DateTime DisconnectionTime { get; private set; }
public bool IsAutoConnectStarted { get; private set; }
public bool IsIdleTimeoutTriggered { get; private set; }
private DateTime _lastIdleLogUtc;
private CancellationTokenSource _autoReconnectSource;
public bool EnableKeepAlive { get; set; } = true;
/// <summary>
/// Determines after how many seconds of idleness a reminder log is shown.
/// </summary>
public double ShowIdleReminderInSeconds { get; set; } = 20;
/// <summary>
/// Occurs when a client has been idle for longer than the configured timeout period.
/// </summary>
/// <remarks>Subscribers can use this event to perform actions such as disconnecting or notifying
/// idle clients. The event is raised with an <see cref="IdleClientEventArgs"/> instance containing details
/// about the idle client.</remarks>
public event EventHandler<IdleClientEventArgs> OnIdleTimeout;
private async Task<bool> ConnectUdpAsync()
{
try
{
DebugConnection($"Attempting UDP connection to {_config.Host}:{_config.Port}");
_udpClient = new UdpClient();
_udpClient.Client.ReceiveBufferSize = _config.BufferSize;
_udpClient.Client.SendBufferSize = _config.BufferSize;
try
{
_udpClient.Connect(_config.Host, _config.Port);
}
catch (SocketException socketEx)
{
var errorMsg = $"UDP connection failed: {socketEx.SocketErrorCode} - {socketEx.SocketErrorCode switch { SocketError.ConnectionRefused => "Connection refused - server not listening", SocketError.HostUnreachable => "Host unreachable", SocketError.NetworkUnreachable => "Network unreachable", _ => "Unknown socket error" }}";
DebugConnection($"UDP connection socket error: {socketEx.SocketErrorCode} - {socketEx.Message}");
RecordError(socketEx, errorMsg);
OnGeneralError?.Invoke(this, new ErrorEventArgs { Exception = socketEx, Message = errorMsg });
try { _udpClient?.Close(); _udpClient?.Dispose(); } catch { }
_udpClient = null;
return false;
}
catch (Exception ex)
{
var errorMsg = $"UDP connection error: {ex.GetType().Name} - {ex.Message}";
DebugConnection($"UDP connection failed: {ex.GetType().Name} - {ex.Message}");
RecordError(ex, errorMsg);
OnGeneralError?.Invoke(this, new ErrorEventArgs { Exception = ex, Message = errorMsg });
try { _udpClient?.Close(); _udpClient?.Dispose(); } catch { }
_udpClient = null;
return false;
}
ConnectionTime = DateTime.UtcNow;
LastPongReceived = DateTime.UtcNow;
UpdateLastActive();
DebugConnection($"UDP connection established to {_config.Host}:{_config.Port}");
_ = Task.Run(() => ReceiveUdpDataAsync(), _connectionCts.Token);
if (_config.EnableHeartbeat)
{
_pingTask = Task.Run(() => StartPingLoopAsync(_connectionCts.Token));
_pongTask = Task.Run(() => StartPongLoop(_connectionCts.Token));
}
if (_config.IdleTimeoutSeconds > 0)
{
_idleMonitorTask = Task.Run(() => MonitorIdleTimeoutAsync(_connectionCts.Token));
}
IsConnected = true;
OnConnected?.Invoke(this, new ConnectionEventArgs { ClientId = "self", RemoteEndPoint = new IPEndPoint(IPAddress.Parse(_config.Host), _config.Port) });
return true;
}
catch (Exception exception)
{
DebugConnection($"Unexpected error during UDP connection: {exception.GetType().Name} - {exception.Message}");
RecordError(exception, $"UDP connection error: {exception.GetType().Name} - {exception.Message}");
OnGeneralError?.Invoke(this, new ErrorEventArgs { Exception = exception, Message = $"UDP connection error: {exception.GetType().Name}" });
try { _udpClient?.Close(); _udpClient?.Dispose(); } catch { }
_udpClient = null;
return false;
}
}
private async Task StartPingLoopAsync(CancellationToken token)
{
var interval = TimeSpan.FromSeconds(_config.HeartbeatIntervalSeconds);
var next = DateTime.UtcNow + interval;
try
{
while (!token.IsCancellationRequested && IsConnected)
{
try
{
if (_stream == null || !_stream.CanWrite)
{
DebugConnection("Ping loop stopping - stream not available for writing");
break;
}
var result = await SendInternalAsync(PingBytes, token).ConfigureAwait(false);
if (result)
{
LastPingSent = DateTime.UtcNow;
if (_config.EnablePingPongLogs)
{
OnLog?.Invoke(this, $"[PING] Sent at {DateTime.UtcNow:O}");
}
}
else
{
DebugConnection("Ping loop failed to send ping - will retry on next interval");
}
var delay = next - DateTime.UtcNow;
if (delay > TimeSpan.Zero)
{
await Task.Delay(delay, token).ConfigureAwait(false);
}
next += interval;
}
catch (OperationCanceledException)
{
DebugConnection($"Ping loop: Operation canceled for {_config.Host}:{_config.Port}");
break;
}
catch (Exception exception)
{
DebugConnection($"Ping loop: Error sending ping to {_config.Host}:{_config.Port}: {exception.GetType().Name} - {exception.Message}");
RecordError(exception, $"Ping loop error: {exception.GetType().Name} - {exception.Message}");
OnLog?.Invoke(this, $"[PING] Error sending ping: {exception.GetType().Name} - {exception.Message}");
break;
}
}
}
catch (Exception ex)
{
DebugConnection($"Ping loop: Unexpected error: {ex.GetType().Name} - {ex.Message}");
RecordError(ex, $"Ping loop unexpected error: {ex.GetType().Name}");
}
finally
{
DebugConnection("Ping loop stopped.");
}
}
private async Task StartPongLoop(CancellationToken token)
{
var interval = TimeSpan.FromSeconds(1);
var next = DateTime.UtcNow + interval;
try
{
while (!token.IsCancellationRequested)
{
try
{
if (!IsConnected)
{
DebugConnection("Pong loop stopping - client disconnected");
break;
}
var elapsed = (DateTime.UtcNow - LastPongReceived).TotalSeconds;
if (LastPongReceived != DateTime.MinValue &&
elapsed > _config.HeartbeatIntervalSeconds * 5)
{
if (DisconnectOnMissedPong)
{
var msg = $"No PONG received for {elapsed:F1}s (threshold: {_config.HeartbeatIntervalSeconds * 5}s) - disconnecting due to unresponsive server";
DebugConnection($"Pong monitor: {msg}");
RecordError(null, msg);
await DisconnectClientAsync(DisconnectReason.NoPongReceived);
break;
}
}
var delay = next - DateTime.UtcNow;
if (delay > TimeSpan.Zero)
{
await Task.Delay(delay, token);
}
next += interval;
}
catch (OperationCanceledException)
{
DebugConnection($"Pong loop: Operation canceled for {_config.Host}:{_config.Port}");
break;
}
catch (Exception ex)
{
DebugConnection($"Pong loop: Error monitoring pong for {_config.Host}:{_config.Port}: {ex.GetType().Name} - {ex.Message}");
RecordError(ex, $"Pong loop error: {ex.GetType().Name} - {ex.Message}");
await DisconnectClientAsync(DisconnectReason.Error, ex);
break;
}
}
}
catch (Exception ex)
{
DebugConnection($"Pong loop: Unexpected error: {ex.GetType().Name} - {ex.Message}");
RecordError(ex, $"Pong loop unexpected error: {ex.GetType().Name}");
}
finally
{
DebugConnection("Pong loop stopped.");
}
}
private async Task MonitorIdleTimeoutAsync(CancellationToken token)
{
var checkInterval = TimeSpan.FromSeconds(1);
while (!token.IsCancellationRequested && IsConnected)
{
try
{
var idleTime = IdleTime();
var idleSeconds = idleTime.TotalSeconds;
// Check if idle timeout threshold is exceeded
if (_config.IdleTimeoutSeconds > 0 && idleSeconds >= _config.IdleTimeoutSeconds)
{
if (!IsIdleTimeoutTriggered)
{
IsIdleTimeoutTriggered = true;
DebugConnection($"Idle timeout triggered: {idleSeconds:F1}s >= {_config.IdleTimeoutSeconds}s");
OnIdleTimeout?.Invoke(this, new IdleClientEventArgs(
_config.IdleTimeoutSeconds,
this,
$"Client idle for {IdleTimeFormatted(includeMilliseconds: false)}"
));
}
}
// Show idle reminder log if configured
if (ShowIdleReminderInSeconds > 0 && idleSeconds >= ShowIdleReminderInSeconds)
{
var now = DateTime.UtcNow;
if ((now - _lastIdleLogUtc).TotalSeconds >= ShowIdleReminderInSeconds)
{
_lastIdleLogUtc = now;
DebugConnection($"Client idle for {IdleTimeFormatted(includeMilliseconds: false)}");
}
}
await Task.Delay(checkInterval, token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
DebugConnection($"MonitorIdleTimeoutAsync: Operation canceled for {_config.Host}:{_config.Port}");
break;
}
catch (Exception ex)
{
DebugConnection($"MonitorIdleTimeoutAsync: Error monitoring idle timeout for {_config.Host}:{_config.Port}: {ex.Message}");
OnLog?.Invoke(this, $"[IDLE] Error monitoring idle timeout: {ex.Message}");
break;
}
}
}
private async Task ReceiveDataAsync(CancellationToken token)
{
DebugConnection($"ReceiveDataAsync started for {_config.Host}:{_config.Port} with framing mode {_config.MessageFraming}");
try
{
switch (_config.MessageFraming)
{
case FramingMode.LengthPrefixed:
await ReceiveLengthPrefixedAsync(token);
break;
case FramingMode.Delimiter:
await ReceiveDelimiterAsync(token);
break;
case FramingMode.None:
await ReceiveNoneAsync(token);
break;
}
}
catch (OperationCanceledException ex)
{
DebugConnection("Receive loop cancelled by cancellation token.");
RecordError(ex, "Receive loop cancellation requested");
}
catch (SocketException socketEx)
{
var errorMsg = $"Socket error in receive loop: {socketEx.SocketErrorCode} - {socketEx.SocketErrorCode switch { SocketError.ConnectionReset => "Connection reset by remote", SocketError.ConnectionAborted => "Connection aborted", SocketError.TimedOut => "Receive timeout", _ => "Unknown socket error" }}";
RecordError(socketEx, errorMsg);
DebugConnection($"Receive loop socket error: {socketEx.SocketErrorCode} - {socketEx.Message}");
var reason = socketEx.SocketErrorCode == SocketError.ConnectionReset || socketEx.SocketErrorCode == SocketError.ConnectionAborted
? DisconnectReason.RemoteClosed
: DisconnectReason.Error;
await DisconnectClientAsync(reason, socketEx);
}
catch (IOException ioEx) when (ioEx.InnerException is SocketException innerSocketEx)
{
var errorMsg = $"Socket IO error in receive loop: {innerSocketEx.SocketErrorCode} - {innerSocketEx.SocketErrorCode switch { SocketError.ConnectionReset => "Connection reset", SocketError.ConnectionAborted => "Connection aborted", _ => "Unknown socket error" }}";
RecordError(ioEx, errorMsg);
DebugConnection($"Receive loop socket IO error: {innerSocketEx.SocketErrorCode} - {ioEx.Message}");
var reason = innerSocketEx.SocketErrorCode == SocketError.ConnectionReset
|| innerSocketEx.SocketErrorCode == SocketError.ConnectionAborted
? DisconnectReason.RemoteClosed
: DisconnectReason.Error;
await DisconnectClientAsync(reason, ioEx);
}
catch (IOException ioEx)
{
DebugConnection($"IO error in receive loop: {ioEx.GetType().Name} - {ioEx.Message}");
RecordError(ioEx, $"IO error in receive loop: {ioEx.GetType().Name}");
await DisconnectClientAsync(DisconnectReason.Error, ioEx);
}
catch (Exception ex)
{
DebugConnection($"Unexpected error in receive loop: {ex.GetType().Name} - {ex.Message}");
RecordError(ex, $"Receive loop error: {ex.GetType().Name} - {ex.Message}");
await DisconnectClientAsync(DisconnectReason.Error, ex);
}
finally
{
DebugConnection("Receive loop exited.");
}
}
private async Task ReceiveLengthPrefixedAsync(CancellationToken token)
{
int prefixSize = _config.LengthPrefixedLength;
if (prefixSize < 1 || prefixSize > 8)
{
throw new InvalidOperationException("LengthPrefixedLength must be between 1 and 8.");
}
byte[] lengthBuffer = ArrayPool<byte>.Shared.Rent(prefixSize);
try
{
while (!token.IsCancellationRequested && IsConnected && _stream?.CanRead == true)
{
await ReadExactAsync(_stream, lengthBuffer, 0, prefixSize).ConfigureAwait(false);
long length = ParseLengthPrefix(lengthBuffer, prefixSize, _config.UseBigEndian);
if (length <= 0 || length > _config.MAX_MESSAGE_SIZE)
{
throw new InvalidOperationException($"Invalid message length {length}");
}
byte[] buffer = ArrayPool<byte>.Shared.Rent((int)length);
try
{
await ReadExactAsync(_stream, buffer, 0, (int)length).ConfigureAwait(false);
if (_config.UseAesEncryption && _aesEncryption != null)
{
var decrypted = await AesKeyExchange.DecryptDataAsync(buffer.AsSpan(0, (int)length).ToArray(), (int)length, _aesEncryption).ConfigureAwait(false);
await ProcessReceivedDataAsync(decrypted).ConfigureAwait(false);
}
else
{
await ProcessReceivedDataAsync(buffer.AsMemory(0, (int)length)).ConfigureAwait(false);
}
UpdateLastActive();
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}
catch (OperationCanceledException ex)
{
DebugConnection($"Length-prefixed read timeout: {ex.Message}");
RecordError(ex, "Read timeout during length-prefixed receive - stream may have hung");
await DisconnectClientAsync(DisconnectReason.Timeout, ex);
}
catch (SocketException socketEx)
{
DebugConnection($"Socket error in length-prefixed receive: {socketEx.SocketErrorCode} - {socketEx.Message}");
RecordError(socketEx, $"Socket error in length-prefixed receive: {socketEx.SocketErrorCode} - {socketEx.SocketErrorCode switch { SocketError.ConnectionReset => "Connection reset by remote", SocketError.ConnectionAborted => "Connection aborted", _ => "Unknown socket error" }}");
var reason = socketEx.SocketErrorCode == SocketError.ConnectionReset || socketEx.SocketErrorCode == SocketError.ConnectionAborted
? DisconnectReason.RemoteClosed
: DisconnectReason.Error;
await DisconnectClientAsync(reason, socketEx);
}
catch (IOException ex) when (ex.InnerException is SocketException innerSocketEx)
{
DebugConnection($"Socket IO error in length-prefixed receive: {innerSocketEx.SocketErrorCode} - {ex.Message}");
RecordError(ex, $"Socket IO error in length-prefixed receive: {innerSocketEx.SocketErrorCode} - {innerSocketEx.SocketErrorCode switch { SocketError.ConnectionReset => "Connection reset", SocketError.ConnectionAborted => "Connection aborted", _ => "Unknown socket error" }}");
var reason = innerSocketEx.SocketErrorCode == SocketError.ConnectionReset
|| innerSocketEx.SocketErrorCode == SocketError.ConnectionAborted
? DisconnectReason.RemoteClosed
: DisconnectReason.Error;
await DisconnectClientAsync(reason, ex);
}
catch (IOException ex)
{
DebugConnection($"Connection closed during length-prefixed receive: {ex.GetType().Name} - {ex.Message}");
RecordError(ex, "Connection closed during length-prefixed receive");
await DisconnectClientAsync(DisconnectReason.RemoteClosed, ex);
}
catch (Exception ex)
{
DebugConnection($"Unexpected error in length-prefixed receive: {ex.GetType().Name} - {ex.Message}");
RecordError(ex, $"Unexpected error in length-prefixed receive: {ex.GetType().Name} - {ex.Message}");
await DisconnectClientAsync(DisconnectReason.Error, ex);
}
finally
{
ArrayPool<byte>.Shared.Return(lengthBuffer);
}
}
private static long ParseLengthPrefix(byte[] buffer, int length, bool useBigEndian)
{
long value = 0;
if (useBigEndian)
{
for (int i = 0; i < length; i++)
{
value = (value << 8) | buffer[i];
}
}
else
{
for (int i = 0; i < length; i++)
{
value |= (long)buffer[i] << (8 * i);
}
}
return value;
}
private async Task ReceiveNoneAsync(CancellationToken token)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(_config.BufferSize);
try
{
while (!token.IsCancellationRequested && IsConnected && _stream?.CanRead == true)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(token);
cts.CancelAfter(TimeSpan.FromSeconds(_config.ReadTimeoutSeconds));
int read = await _stream.ReadAsync(buffer, 0, buffer.Length, cts.Token);
if (read == 0)
{
DebugConnection("Remote closed connection.");
await DisconnectClientAsync(DisconnectReason.RemoteClosed);
return;
}
ReadOnlyMemory<byte> payload = buffer.AsMemory(0, read);
if (_config.UseAesEncryption && _aesEncryption != null)
{
try
{
var decrypted = await AesKeyExchange.DecryptDataAsync(payload.ToArray(), read, _aesEncryption);
await ProcessReceivedDataAsync(decrypted);
}
catch (Exception ex)
{
DebugConnection($"Decrypt error: {ex.Message}");
OnEncryptionError?.Invoke(this, new ErrorEventArgs
{
Exception = ex,
Message = "Error decrypting data"
});
}
}
else
{
await ProcessReceivedDataAsync(payload);
}
UpdateLastActive();
}
}
catch (OperationCanceledException ex)
{
DebugConnection($"Read timeout during raw receive: {ex.Message}");
RecordError(ex, "Read timeout in raw receive - stream may be unresponsive");
await DisconnectClientAsync(DisconnectReason.Timeout, ex);
}
catch (SocketException socketEx)
{
DebugConnection($"Socket error in raw receive: {socketEx.SocketErrorCode} - {socketEx.Message}");
var errorMsg = $"Socket error in raw receive: {socketEx.SocketErrorCode} - {socketEx.SocketErrorCode switch { SocketError.ConnectionReset => "Connection reset", SocketError.ConnectionAborted => "Connection aborted", _ => "Unknown socket error" }}";
RecordError(socketEx, errorMsg);
var reason = socketEx.SocketErrorCode == SocketError.ConnectionReset || socketEx.SocketErrorCode == SocketError.ConnectionAborted
? DisconnectReason.RemoteClosed
: DisconnectReason.Error;
await DisconnectClientAsync(reason, socketEx);
}
catch (IOException ex) when (ex.InnerException is SocketException innerSocketEx)
{
DebugConnection($"Socket IO error in raw receive: {innerSocketEx.SocketErrorCode} - {ex.Message}");
var errorMsg = $"Socket IO error in raw receive: {innerSocketEx.SocketErrorCode} - {innerSocketEx.SocketErrorCode switch { SocketError.ConnectionReset => "Connection reset", SocketError.ConnectionAborted => "Connection aborted", _ => "Unknown socket error" }}";
RecordError(ex, errorMsg);
var reason = innerSocketEx.SocketErrorCode == SocketError.ConnectionReset
|| innerSocketEx.SocketErrorCode == SocketError.ConnectionAborted
? DisconnectReason.RemoteClosed
: DisconnectReason.Error;
await DisconnectClientAsync(reason, ex);
}
catch (IOException ex)
{
DebugConnection($"Connection closed during raw receive: {ex.GetType().Name} - {ex.Message}");
RecordError(ex, "Connection closed during raw receive");
await DisconnectClientAsync(DisconnectReason.RemoteClosed, ex);
}
catch (Exception ex)
{
DebugConnection($"Unexpected error in raw receive: {ex.GetType().Name} - {ex.Message}");
RecordError(ex, $"Error in raw receive: {ex.GetType().Name} - {ex.Message}");
await DisconnectClientAsync(DisconnectReason.Error, ex);
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
private async Task ReceiveDelimiterAsync(CancellationToken token)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(_config.BufferSize);
var memory = new MemoryStream();
try
{
while (!token.IsCancellationRequested && IsConnected && _stream?.CanRead == true)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(token);
cts.CancelAfter(TimeSpan.FromSeconds(_config.ReadTimeoutSeconds));
int read = await _stream.ReadAsync(buffer, 0, buffer.Length, cts.Token);
if (read == 0)
{
DebugConnection("Remote closed connection.");
await DisconnectClientAsync(DisconnectReason.RemoteClosed);
return;
}
memory.Write(buffer, 0, read);
if (memory.Length > _config.MAX_MESSAGE_SIZE)
{
throw new InvalidOperationException("Message too large.");
}
while (TryExtractMessage(memory, _config.Delimiter, out var message))
{
if (_config.UseAesEncryption && _aesEncryption != null)
{
message = await AesKeyExchange.DecryptDataAsync(message, message.Length, _aesEncryption);
}
await ProcessReceivedDataAsync(message);
UpdateLastActive();
}
}
}
catch (OperationCanceledException ex)
{
DebugConnection($"Read timeout during delimiter receive: {ex.Message}");
RecordError(ex, "Read timeout in delimiter receive - stream may be unresponsive");
await DisconnectClientAsync(DisconnectReason.Timeout, ex);
}
catch (SocketException socketEx)
{
DebugConnection($"Socket error in delimiter receive: {socketEx.SocketErrorCode} - {socketEx.Message}");
var errorMsg = $"Socket error in delimiter receive: {socketEx.SocketErrorCode} - {socketEx.SocketErrorCode switch { SocketError.ConnectionReset => "Connection reset", SocketError.ConnectionAborted => "Connection aborted", _ => "Unknown socket error" }}";
RecordError(socketEx, errorMsg);
var reason = socketEx.SocketErrorCode == SocketError.ConnectionReset || socketEx.SocketErrorCode == SocketError.ConnectionAborted
? DisconnectReason.RemoteClosed
: DisconnectReason.Error;
await DisconnectClientAsync(reason, socketEx);
}
catch (IOException ex) when (ex.InnerException is SocketException innerSocketEx)
{
DebugConnection($"Socket IO error in delimiter receive: {innerSocketEx.SocketErrorCode} - {ex.Message}");
var errorMsg = $"Socket IO error in delimiter receive: {innerSocketEx.SocketErrorCode} - {innerSocketEx.SocketErrorCode switch { SocketError.ConnectionReset => "Connection reset", SocketError.ConnectionAborted => "Connection aborted", _ => "Unknown socket error" }}";
RecordError(ex, errorMsg);
var reason = innerSocketEx.SocketErrorCode == SocketError.ConnectionReset
|| innerSocketEx.SocketErrorCode == SocketError.ConnectionAborted
? DisconnectReason.RemoteClosed
: DisconnectReason.Error;
await DisconnectClientAsync(reason, ex);
}
catch (IOException ex)
{
DebugConnection($"Connection closed during delimiter receive: {ex.GetType().Name} - {ex.Message}");
RecordError(ex, "Connection closed during delimiter receive");
await DisconnectClientAsync(DisconnectReason.RemoteClosed, ex);
}
catch (Exception ex)
{
DebugConnection($"Unexpected error in delimiter receive: {ex.GetType().Name} - {ex.Message}");
RecordError(ex, $"Error in delimiter receive: {ex.GetType().Name} - {ex.Message}");
await DisconnectClientAsync(DisconnectReason.Error, ex);
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
memory.Dispose();
}
}
private static bool TryExtractMessage(MemoryStream stream, byte[] delimiter, out byte[] message)
{
var buffer = stream.GetBuffer();
int length = (int)stream.Length;
int index = IndexOfDelimiter(buffer, length, delimiter);
if (index < 0)
{
message = null;
return false;
}
message = new byte[index];
Buffer.BlockCopy(buffer, 0, message, 0, index);
int remaining = length - index - delimiter.Length;
Buffer.BlockCopy(buffer, index + delimiter.Length, buffer, 0, remaining);
stream.SetLength(remaining);
stream.Position = remaining;
return true;
}
private static int IndexOfDelimiter(byte[] buffer, int length, byte[] delimiter)
{
if (delimiter == null || delimiter.Length == 0 || length < delimiter.Length)
{
return -1;
}
for (int i = 0; i <= length - delimiter.Length; i++)
{
bool match = true;
for (int j = 0; j < delimiter.Length; j++)
{
if (buffer[i + j] != delimiter[j])
{
match = false;
break;
}
}
if (match)
{
return i;
}
}
return -1;
}
public async Task<bool> SendAsync(byte[] data)
{
try
{
byte[] payload = data;
if (_config.UseAesEncryption && _aesEncryption != null)
{
payload = await AesKeyExchange.EncryptDataAsync(data, data.Length, _aesEncryption);
}
byte[] framed = Frame(payload);
return await SendInternalAsync(framed, _connectionCts?.Token ?? CancellationToken.None).ConfigureAwait(false);
}
catch (Exception ex)
{
await DisconnectClientAsync(DisconnectReason.Error, ex);
return false;
}
}
private byte[] Frame(byte[] payload)
{
switch (_config.MessageFraming)
{
case FramingMode.LengthPrefixed:
return CreateLengthPrefixed(payload);
case FramingMode.Delimiter:
return Combine(payload, _config.Delimiter);
case FramingMode.None:
default:
return payload;
}
}
private static byte[] Combine(byte[] payload, byte[] delimiter)
{
if (delimiter == null || delimiter.Length == 0)
{
return payload;
}
var result = new byte[payload.Length + delimiter.Length];
Buffer.BlockCopy(payload, 0, result, 0, payload.Length);
Buffer.BlockCopy(delimiter, 0, result, payload.Length, delimiter.Length);
return result;
}
private byte[] CreateLengthPrefixed(byte[] payload)
{
int prefixSize = _config.LengthPrefixedLength;
if (prefixSize < 1 || prefixSize > 8)
{
throw new InvalidOperationException("LengthPrefixedLength must be between 1 and 8 bytes.");
}
long length = payload.Length;
// Validate that the payload fits into the configured prefix size
long maxValue = (1L << (prefixSize * 8)) - 1;
if (length > maxValue)
{
throw new InvalidOperationException(
$"Payload too large for {prefixSize}-byte length prefix.");
}
byte[] result = new byte[prefixSize + payload.Length];
if (_config.UseBigEndian)
{
for (int i = 0; i < prefixSize; i++)
{
result[prefixSize - 1 - i] = (byte)(length & 0xFF);
length >>= 8;
}
}
else
{
for (int i = 0; i < prefixSize; i++)
{
result[i] = (byte)(length & 0xFF);
length >>= 8;
}
}
Buffer.BlockCopy(payload, 0, result, prefixSize, payload.Length);
return result;
}
private static async Task ReadExactAsync(Stream stream, byte[] buffer, int offset, int count)
{
int totalRead = 0;
while (totalRead < count)
{
int read = await stream.ReadAsync(buffer, offset + totalRead, count - totalRead).ConfigureAwait(false);
if (read == 0)
{
throw new IOException("Remote socket closed while reading.");
}
totalRead += read;
}
}
private async Task ProcessReceivedDataAsync(ReadOnlyMemory<byte> data)
{
if (data.IsEmpty)
{
return;
}
var now = DateTime.UtcNow;
LastDataReceived = now;
Interlocked.Add(ref _bytesReceived, data.Length);
Interlocked.Increment(ref _messagesReceived);
UpdateLastActive();
bool needsText = _config.EnableHeartbeat || OnDataReceived != null;
string stringData = null;
bool isText = false;
if (needsText)
{
try
{
#if NET8_0_OR_GREATER
var span = data.Span;
int charCount = Encoding.UTF8.GetCharCount(span);
if (_charBuffer.Length < charCount)
{
_charBuffer = new char[charCount];
}
int written = Encoding.UTF8.GetChars(span, _charBuffer);
#else
byte[] bytes = data.ToArray();
int charCount = _utf8Decoder.GetCharCount(bytes, 0, bytes.Length);
if (_charBuffer.Length < charCount)
{
_charBuffer = new char[charCount];
}
int written = _utf8Decoder.GetChars(bytes, 0, bytes.Length, _charBuffer, 0);
#endif
if (written > 0)
{
stringData = new string(_charBuffer, 0, written);
isText = true;
}
}
catch
{
isText = false;
}
}
// Heartbeat
if (_config.EnableHeartbeat && isText)
{
ProcessPingPong(ref stringData);
if (string.IsNullOrEmpty(stringData))
{
return;
}
}
// SSL error notification from server
if (isText && !string.IsNullOrEmpty(stringData) && stringData.Contains(Configuration.SSL_ERROR_PREFIX, StringComparison.OrdinalIgnoreCase))
{
string sslError = null;
var startIdx = stringData.IndexOf(Configuration.SSL_ERROR_PREFIX, StringComparison.Ordinal) + Configuration.SSL_ERROR_PREFIX.Length;
var endIdx = stringData.IndexOf(Configuration.SSL_ERROR_SUFFIX, StringComparison.Ordinal);
if (endIdx > startIdx)
{
sslError = stringData.Substring(startIdx, endIdx - startIdx);
}
RecordError(null, $"Server reported SSL error: {sslError ?? stringData}", isSslError: true);
OnSslError?.Invoke(this, new ErrorEventArgs
{
Message = $"Server reported SSL error: {sslError ?? stringData}"
});
await DisconnectClientAsync(DisconnectReason.SSLError).ConfigureAwait(false);
return;
}
if (DEBUG_DATA_RECEIVED)
{
OnLog?.Invoke(this, $"[DEBUG DATA] Received {data.Length} bytes");
}
OnDataReceived?.Invoke(this, new DataReceivedEventArgs
{
ClientId = "server",
Data = data.ToArray(),
IsBinary = !isText,
Timestamp = DateTime.UtcNow,
RemoteEndPoint = new IPEndPoint(IPAddress.Parse(_config.Host), _config.Port),
Nickname = Nickname,
});
}
private void ProcessPingPong(ref string message)
{
if (!_config.EnableHeartbeat || string.IsNullOrEmpty(message))
{
return;
}
if (message.Contains(Configuration.PING_VALUE))
{
LastPingSent = DateTime.UtcNow;
if (_config.EnablePingPongLogs)
{
OnLog?.Invoke(this, "Received PING from server. Sending PONG response.");
}
OnPingResponse?.Invoke(this, new PingEventArgs
{
Id = "server",
Nickname = Nickname,
ReceivedTime = DateTime.UtcNow,
RemoteEndPoint = new IPEndPoint(IPAddress.Parse(_config.Host), _config.Port)
});
_ = SendAsync(Configuration.PONG_VALUE).ConfigureAwait(false);
message = message.Replace(Configuration.PING_VALUE, string.Empty);
}
if (message.Contains(Configuration.PONG_VALUE))
{
LastPongReceived = DateTime.UtcNow;
if (_config.EnablePingPongLogs)
{
OnLog?.Invoke(this, "Received PONG from server for PING");
}
OnPongResponse?.Invoke(this, new PingEventArgs
{
Id = "server",
Nickname = Nickname,
ReceivedTime = DateTime.UtcNow,
RemoteEndPoint = new IPEndPoint(IPAddress.Parse(_config.Host), _config.Port)
});
message = message.Replace(Configuration.PONG_VALUE, string.Empty);
}
}
private async Task ReceiveUdpDataAsync()
{
DebugConnection($"UDP receive loop started for {_config.Host}:{_config.Port}");
while (_connectionCts != null && !_connectionCts.Token.IsCancellationRequested && IsConnected)
{
try
{
var result = await _udpClient.ReceiveAsync().ConfigureAwait(false);
var buffer = result.Buffer;
if (_config.EnableHeartbeat)
{
try
{
buffer = HandleHeartbeat(buffer, out bool foundHeartbeat);
if (foundHeartbeat)
{
LastPongReceived = DateTime.UtcNow;
if (_config.EnablePingPongLogs)
{
OnLog?.Invoke(this, "Received PONG from server for PING (UDP)");
}
}
if (buffer == null || buffer.Length == 0)
{
continue;
}
}
catch (Exception exception)
{
DebugConnection($"Heartbeat handling error in UDP: {exception.GetType().Name} - {exception.Message}");
RecordError(exception, $"Heartbeat handling failed (UDP): {exception.GetType().Name} - {exception.Message}");
OnGeneralError?.Invoke(this, new ErrorEventArgs { Exception = exception, Message = "Heartbeat handling failed (UDP)" });
}
}
UpdateLastActive();
await ProcessReceivedDataAsync(buffer).ConfigureAwait(false);
}
catch (SocketException socketEx)
{
var errorMsg = $"Socket error in UDP receive: {socketEx.SocketErrorCode} - {socketEx.SocketErrorCode switch { SocketError.ConnectionReset => "Connection reset", SocketError.HostUnreachable => "Host unreachable", _ => "Unknown socket error" }}";
DebugConnection($"UDP receive socket error: {socketEx.SocketErrorCode} - {socketEx.Message}");
RecordError(socketEx, errorMsg);
OnGeneralError?.Invoke(this, new ErrorEventArgs { Exception = socketEx, Message = errorMsg });
ConnectionTime = DateTime.MinValue;
break;
}
catch (ObjectDisposedException ex)
{
DebugConnection("UDP receive loop: UDP client disposed, stopping receive loop.");
RecordError(ex, "UDP client disposed - connection closing");
break;
}
catch (OperationCanceledException)
{
DebugConnection("UDP receive loop: Operation canceled.");
break;
}
catch (Exception exception)
{
DebugConnection($"UDP receive error: {exception.GetType().Name} - {exception.Message}");
RecordError(exception, $"Error receiving UDP data: {exception.GetType().Name} - {exception.Message}");
OnGeneralError?.Invoke(this, new ErrorEventArgs { Exception = exception, Message = $"UDP receive error: {exception.GetType().Name}" });
ConnectionTime = DateTime.MinValue;
break;
}
}
DebugConnection("UDP receive loop stopped.");
}
public async Task<bool> SendNicknameAsync(string nickname)
{
if (string.IsNullOrWhiteSpace(nickname))
{
return false;
}
var result = await SendAsync($"[NICKNAME]{nickname}[/NICKNAME]").ConfigureAwait(false);
if (result)
{
Nickname = nickname.Trim();
OnNicknameSend?.Invoke(this, new ConnectionEventArgs
{
ClientId = nickname,
RemoteEndPoint = new IPEndPoint(IPAddress.Parse(_config.Host), _config.Port),
Nickname = nickname
});
}
return result;
}
public async Task<bool> SendAsync(string message) => await SendAsync(Encoding.UTF8.GetBytes(message)).ConfigureAwait(false);
public async Task<DataReceivedEventArgs> SendAndWaitForResponseAsync(byte[] data, TimeSpan? timeout = null)
{
var tcs = new TaskCompletionSource<DataReceivedEventArgs>(TaskCreationOptions.RunContinuationsAsynchronously);
var effectiveTimeout = timeout ?? TimeSpan.FromSeconds(DEFAULT_WAITING_TIMEOUT_IN_SECONDS);
void Handler(object sender, DataReceivedEventArgs e)
{
tcs.TrySetResult(e);
}
OnDataReceived += Handler;
try
{
var sent = await SendAsync(data).ConfigureAwait(false);
if (!sent)
{
return null;
}
using var cts = new CancellationTokenSource(effectiveTimeout);
using var registration = cts.Token.Register(() => tcs.TrySetCanceled());
try
{
return await tcs.Task.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return null;
}
}
finally
{
OnDataReceived -= Handler;
}
}
public async Task<DataReceivedEventArgs> SendAndWaitForResponseAsync(string message, TimeSpan? timeout = null)
=> await SendAndWaitForResponseAsync(Encoding.UTF8.GetBytes(message), timeout).ConfigureAwait(false);
public async Task<DataReceivedEventArgs> SendNicknameAndWaitForResponseAsync(string nickname, TimeSpan? timeout = null)
{
var tcs = new TaskCompletionSource<DataReceivedEventArgs>(TaskCreationOptions.RunContinuationsAsynchronously);
var effectiveTimeout = timeout ?? TimeSpan.FromSeconds(DEFAULT_WAITING_TIMEOUT_IN_SECONDS);
void Handler(object sender, DataReceivedEventArgs e)
{
tcs.TrySetResult(e);
}
OnDataReceived += Handler;
try
{
var sent = await SendNicknameAsync(nickname).ConfigureAwait(false);
if (!sent)
{
return null;
}
using var cts = new CancellationTokenSource(effectiveTimeout);
using var registration = cts.Token.Register(() => tcs.TrySetCanceled());
try
{
return await tcs.Task.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return null;
}
}
finally
{
OnDataReceived -= Handler;
}
}
private async Task<bool> SendInternalAsync(byte[] data, CancellationToken token)
{
if (!IsConnected)
{
return false;
}
if (_config.Protocol == ProtocolType.TCP)
{
if (_stream == null || !_stream.CanWrite)
{
DebugConnection($"SendInternalAsync: Stream is not available for writing for {_config.Host}:{_config.Port}, disconnecting");
await DisconnectClientAsync(DisconnectReason.RemoteClosed).ConfigureAwait(false);
return false;
}
try
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(token);
cts.CancelAfter(TimeSpan.FromSeconds(_config.WriteTimeoutSeconds));
await _streamWriteLock.WaitAsync(cts.Token).ConfigureAwait(false);
try
{
await _stream.WriteAsync(data, 0, data.Length, cts.Token).ConfigureAwait(false);
await _stream.FlushAsync(cts.Token).ConfigureAwait(false);
LastDataSent = DateTime.UtcNow;
Interlocked.Add(ref _bytesSent, data.Length);
Interlocked.Increment(ref _messagesSent);
UpdateLastActive();
return true;
}
finally
{
_streamWriteLock.Release();
}
}
catch (OperationCanceledException ex)
{
DebugConnection($"SendInternalAsync: Write operation canceled for {_config.Host}:{_config.Port}, likely due to timeout or shutdown: {ex.Message}");
RecordError(ex, "Write timeout in TCP send - stream may be unresponsive");
await DisconnectClientAsync(DisconnectReason.Timeout, ex);
return false;
}
catch (SocketException socketEx)
{
DebugConnection($"SendInternalAsync: Socket error while sending TCP data to {_config.Host}:{_config.Port}: {socketEx.SocketErrorCode} - {socketEx.Message}");
RecordError(socketEx, $"Socket error in TCP send: {socketEx.SocketErrorCode} - {socketEx.SocketErrorCode switch { SocketError.ConnectionReset => "Connection reset by remote host", SocketError.ConnectionAborted => "Connection aborted", SocketError.HostUnreachable => "Host unreachable", SocketError.NetworkUnreachable => "Network unreachable", SocketError.ConnectionRefused => "Connection refused", _ => "Unknown socket error" }}");
await DisconnectClientAsync(DisconnectReason.Error, socketEx);
return false;
}
catch (IOException ioEx) when (ioEx.InnerException is SocketException innerSocketEx)
{
DebugConnection($"SendInternalAsync: Socket IO error while sending TCP data to {_config.Host}:{_config.Port}: {innerSocketEx.SocketErrorCode} - {ioEx.Message}");
RecordError(ioEx, $"Socket IO error in TCP send: {innerSocketEx.SocketErrorCode} - {innerSocketEx.SocketErrorCode switch { SocketError.ConnectionReset => "Connection reset by remote host", SocketError.ConnectionAborted => "Connection aborted", _ => "Unknown socket error" }}");
var reason = innerSocketEx.SocketErrorCode == SocketError.ConnectionReset || innerSocketEx.SocketErrorCode == SocketError.ConnectionAborted ? DisconnectReason.RemoteClosed : DisconnectReason.Error;
await DisconnectClientAsync(reason, ioEx);
return false;
}
catch (ObjectDisposedException ex)
{
DebugConnection($"SendInternalAsync: ObjectDisposedException for {_config.Host}:{_config.Port}, stream was likely disposed during reconnection");
RecordError(ex, "Stream disposed during TCP send - possible race condition during reconnection attempt");
if (IsConnected)
{
await DisconnectClientAsync(DisconnectReason.Error, ex).ConfigureAwait(false);
}
return false;
}
catch (Exception ex)
{
DebugConnection($"SendInternalAsync: Exception occurred while sending data to {_config.Host}:{_config.Port}: {ex.GetType().Name} - {ex.Message}");
RecordError(ex, $"Error in TCP send: {ex.GetType().Name} - {ex.Message}");
await DisconnectClientAsync(DisconnectReason.Error, ex);
return false;
}
}
else
{
try
{
if (_udpClient != null)
{
await _udpClient.SendAsync(data, data.Length).ConfigureAwait(false);
LastDataSent = DateTime.UtcNow;
Interlocked.Add(ref _bytesSent, data.Length);
Interlocked.Increment(ref _messagesSent);
return true;
}
return false;
}
catch (SocketException socketEx)
{
DebugConnection($"SendInternalAsync: Socket error sending UDP data to {_config.Host}:{_config.Port}: {socketEx.SocketErrorCode} - {socketEx.Message}");
RecordError(socketEx, $"Socket error in UDP send: {socketEx.SocketErrorCode} - {socketEx.SocketErrorCode switch { SocketError.ConnectionReset => "Connection reset", SocketError.HostUnreachable => "Host unreachable", SocketError.NetworkUnreachable => "Network unreachable", _ => "Unknown socket error" }}");
return false;
}
catch (Exception ex)
{
DebugConnection($"SendInternalAsync: Error sending UDP data to {_config.Host}:{_config.Port}: {ex.GetType().Name} - {ex.Message}");
RecordError(ex, $"Error in UDP send: {ex.GetType().Name} - {ex.Message}");
return false;
}
}
}
private async Task AutoReconnectAsync(CancellationToken token)
{
IsAutoConnectStarted = true;
int consecutiveFailures = 0;
const int MAX_CONSECUTIVE_FAILURES = 10;
const int MAX_BACKOFF_SECONDS = 300; // 5 minutes max backoff
try
{
while (!token.IsCancellationRequested)
{
if (!_config.EnableAutoReconnect ||
_reason == DisconnectReason.LocalClosed ||
!_wasConnected)
{
DebugConnection("Auto-reconnect disabled or not previously connected; stopping auto-reconnect task.");
break;
}
// Check if the disconnect reason prevents reconnect
if (_reason == DisconnectReason.ClientRequested)
{
// Don't attempt to reconnect if the client explicitly requested disconnection
DebugConnection("Client explicitly requested disconnection; stopping auto-reconnect task.");
_autoReconnectSource?.Cancel();
break;
}
if (IsConnected)
{
DebugConnection("Client is already connected; resetting backoff counter.");
consecutiveFailures = 0;
await Task.Delay(1_000, token).ConfigureAwait(false);
continue;
}
// Try to reconnect with exponential backoff
bool reconnected = await TryReconnectWithRetriesAsync(token).ConfigureAwait(false);
if (reconnected)
{
consecutiveFailures = 0;
DebugConnection("Reconnection successful; resetting backoff counter.");
}
else
{
consecutiveFailures++;
if (consecutiveFailures >= MAX_CONSECUTIVE_FAILURES)
{
var criticalMsg = $"Too many consecutive reconnection failures ({consecutiveFailures}); stopping auto-reconnect task to prevent resource exhaustion.";
DebugConnection(criticalMsg);
RecordError(null, criticalMsg);
break;
}
// Calculate exponential backoff: min 1s, max 5 minutes, doubles each attempt
int backoffSeconds = Math.Min(
(int)Math.Pow(2, Math.Min(consecutiveFailures - 1, 8)),
MAX_BACKOFF_SECONDS
);
DebugConnection($"Reconnection failed (attempt {consecutiveFailures}); waiting {backoffSeconds}s before next attempt...");
try
{
await Task.Delay(TimeSpan.FromSeconds(backoffSeconds), token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
}
}
}
catch (OperationCanceledException)
{
DebugConnection("Auto-reconnect task canceled.");
}
catch (Exception ex)
{
DebugConnection($"Auto-reconnect task encountered unexpected error: {ex.GetType().Name} - {ex.Message}");
RecordError(ex, $"Auto-reconnect task failed: {ex.GetType().Name} - {ex.Message}");
}
finally
{
IsAutoConnectStarted = false;
DebugConnection("Auto-reconnect task stopped.");
}
}
private async Task<bool> TryReconnectWithRetriesAsync(CancellationToken token)
{
int attempt = 0;
var maxAttempts = _config.MaxReconnectAttempts;
while (!token.IsCancellationRequested && (maxAttempts == 0 || attempt < maxAttempts))
{
if (IsConnected)
{
return true;
}
attempt++;
try
{
DebugConnection($"Attempting to reconnect... Attempt {attempt}/{(maxAttempts == 0 ? "unlimited" : maxAttempts.ToString())}");
var connected = await ConnectAsync().ConfigureAwait(false);
if (connected && IsConnected)
{
_reason = DisconnectReason.Unknown;
var successMsg = $"Reconnected successfully after {attempt} attempt(s)";
DebugConnection(successMsg);
OnGeneralError?.Invoke(
this,
new ErrorEventArgs
{
Message = successMsg
});
return true;
}
}
catch (OperationCanceledException)
{
DebugConnection($"Reconnection attempt {attempt} canceled.");
throw;
}
catch (Exception ex)
{
var errorMsg = $"Reconnection attempt {attempt} failed: {ex.GetType().Name} - {ex.Message}";
DebugConnection(errorMsg);
RecordError(ex, errorMsg);
OnGeneralError?.Invoke(
this,
new ErrorEventArgs
{
Exception = ex,
Message = errorMsg
});
}
// Don't delay after the last failed attempt
if (maxAttempts > 0 && attempt >= maxAttempts)
{
DebugConnection($"Maximum reconnection attempts ({maxAttempts}) reached.");
break;
}
// Brief delay between retry attempts (exponential backoff is handled at higher level)
Int32 retryDelayMs = Math.Min(500 * attempt, 2000); // Max 2s between individual retries
try
{
await Task.Delay(retryDelayMs, token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
}
return false;
}
private void DebugConnection(string debugText)
{
if (!_config.EnableConnectionDebugLogs)
{
return;
}
OnLog?.Invoke(this, $"[NetworkClient] {debugText}");
}
private void RecordError(Exception exception, string message, string clientId = null, string nickname = null, bool isSslError = false)
{
var socketEx = exception as SocketException
?? (exception as IOException)?.InnerException as SocketException;
var entry = new SocketErrorEntry
{
Timestamp = DateTime.UtcNow,
Source = "Client",
ClientId = clientId ?? Nickname ?? "self",
Nickname = nickname ?? Nickname,
SocketErrorCode = socketEx?.SocketErrorCode,
ErrorCode = socketEx != null ? socketEx.SocketErrorCode.ToString() : exception?.GetType().Name,
Message = message,
ExceptionType = exception?.GetType().FullName,
StackTrace = exception?.StackTrace,
Exception = exception,
IsSslError = isSslError
};
StatusPage.AddError(entry);
if (socketEx != null)
{
OnSocketError?.Invoke(this, new ErrorEventArgs
{
ClientId = entry.ClientId,
Nickname = entry.Nickname,
Exception = exception,
Message = $"SocketError {socketEx.SocketErrorCode}: {message}"
});
}
}
public async Task DisconnectAsync() => await DisconnectClientAsync(DisconnectReason.ClientRequested).ConfigureAwait(false);
public async Task DisconnectDueTimeoutAsync() => await DisconnectClientAsync(DisconnectReason.Timeout).ConfigureAwait(false);
private async Task DisconnectClientAsync(
DisconnectReason reason = DisconnectReason.LocalClosed,
Exception exception = null)
{
DisconnectionTime = DateTime.UtcNow;
IsConnected = false;
_reason = reason;
if (reason == DisconnectReason.ClientRequested || reason == DisconnectReason.LocalClosed)
{
_autoReconnectSource?.Cancel();
}
// Close the actual connection resources
try
{
_connectionCts?.Cancel();
}
catch
{
// Do nothing
}
try
{
_connectionCts?.Dispose();
}
catch
{
// Do nothing
}
_connectionCts = null;
try
{
_stream?.Close();
_stream?.Dispose();
_stream = null;
}
catch
{
// Do nothing
}
try
{
if (_tcpClient != null)
{
try
{
_tcpClient.Client?.Shutdown(System.Net.Sockets.SocketShutdown.Both);
}
catch
{
// Do nothing
}
_tcpClient.Close();
_tcpClient.Dispose();
_tcpClient = null;
}
}
catch
{
// Do nothing
}
try
{
_udpClient?.Close();
_udpClient?.Dispose();
_udpClient = null;
}
catch
{
// Do nothing
}
OnDisconnected?.Invoke(this, new ConnectionEventArgs
{
ClientId = "self",
RemoteEndPoint = new IPEndPoint(IPAddress.Parse(_config.Host), _config.Port),
Reason = ConnectionEventArgs.Determine(reason, exception),
Exception = exception
});
StatusPage.StopAutoHtmlReport();
HealthApi.Stop();
}
private async Task SafeTaskCompletion(Task task)
{
if (task == null)
{
return;
}
try
{
await task.ConfigureAwait(false);
}
catch
{
// Do nothing
}
}
private byte[] HandleHeartbeat(byte[] data, out bool hasHeartbeat)
{
hasHeartbeat = false;
if (!_config.EnableHeartbeat || data == null || data.Length == 0)
{
return data;
}
string text = null;
try
{
text = Encoding.UTF8.GetString(data);
}
catch
{
// Not a valid UTF-8 string
}
if (string.IsNullOrEmpty(text))
{
return data;
}
if (text.Contains("[DISCONNECT]", StringComparison.OrdinalIgnoreCase))
{
DisconnectClientAsync(DisconnectReason.RemoteClosed).ConfigureAwait(false);
hasHeartbeat = true;
}
if (text.Contains(Configuration.SSL_ERROR_PREFIX, StringComparison.OrdinalIgnoreCase))
{
string sslError = null;
var startIdx = text.IndexOf(Configuration.SSL_ERROR_PREFIX, StringComparison.Ordinal) + Configuration.SSL_ERROR_PREFIX.Length;
var endIdx = text.IndexOf(Configuration.SSL_ERROR_SUFFIX, StringComparison.Ordinal);
if (endIdx > startIdx)
{
sslError = text.Substring(startIdx, endIdx - startIdx);
}
RecordError(null, $"Server reported SSL error: {sslError ?? text}", isSslError: true);
OnSslError?.Invoke(this, new ErrorEventArgs
{
Message = $"Server reported SSL error: {sslError ?? text}"
});
DisconnectClientAsync(DisconnectReason.SSLError).ConfigureAwait(false);
hasHeartbeat = true;
}
if (text.Contains(Configuration.PING_VALUE))
{
hasHeartbeat = true;
SendAsync(Configuration.PONG_VALUE).ConfigureAwait(false);
if (_config.EnablePingPongLogs)
{
OnLog?.Invoke(this, "PING received. Sent PONG response.");
}
}
if (text.Contains(Configuration.PONG_VALUE))
{
hasHeartbeat = true;
var now = DateTime.UtcNow;
OnPingResponse?.Invoke(this, new PingEventArgs
{
Id = "server",
Nickname = Nickname,
ReceivedTime = now,
RemoteEndPoint = new IPEndPoint(IPAddress.Parse(_config.Host), _config.Port)
});
if (_config.EnablePingPongLogs)
{
OnLog?.Invoke(this, "PONG received.");
}
}
if (hasHeartbeat && !string.IsNullOrEmpty(text))
{
text = text.Replace(Configuration.PING_VALUE, string.Empty).Replace(Configuration.PONG_VALUE, string.Empty).Replace("[DISCONNECT]", string.Empty);
data = Encoding.UTF8.GetBytes(text);
}
return data;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!disposing)
{
return;
}
try
{
_clientCts?.Cancel();
_autoReconnectSource?.Cancel();
_connectionCts?.Cancel();
}
catch
{
// Do nothing
}
try { _stream?.Close(); _stream?.Dispose(); _stream = null; } catch { }
try
{
if (_tcpClient != null)
{
try { _tcpClient.Client?.Shutdown(SocketShutdown.Both); } catch { }
_tcpClient.Close();
_tcpClient.Dispose();
_tcpClient = null;
}
}
catch { }
try { _udpClient?.Close(); _udpClient?.Dispose(); _udpClient = null; } catch { }
try { _aesEncryption?.Dispose(); _aesEncryption = null; } catch { }
try { _clientCts?.Dispose(); _clientCts = null; } catch { }
try { _autoReconnectSource?.Dispose(); _autoReconnectSource = null; } catch { }
try { _connectionCts?.Dispose(); _connectionCts = null; } catch { }
try { _connectLock?.Dispose(); } catch { }
try { _connectionAttemptLock?.Dispose(); } catch { }
try { _streamReadLock?.Dispose(); } catch { }
try { _streamWriteLock?.Dispose(); } catch { }
HealthApi.Dispose();
}
private void ConfigureKeepAlive(Socket socket, int keepAliveTimeSeconds, int keepAliveIntervalSeconds, int keepAliveRetryCount = 10)
{
if (socket == null)
{
return;
}
try
{
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
#if NET5_0_OR_GREATER
socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, Math.Max(1, keepAliveTimeSeconds));
socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, Math.Max(1, keepAliveIntervalSeconds));
socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, Math.Max(1, keepAliveRetryCount));
#else
uint onOff = 1;
uint keepAliveTime = (uint)Math.Max(1, keepAliveTimeSeconds) * 1000;
uint keepAliveInterval = (uint)Math.Max(1, keepAliveIntervalSeconds) * 1000;
byte[] keepAliveSettings = new byte[12];
Buffer.BlockCopy(BitConverter.GetBytes(onOff), 0, keepAliveSettings, 0, 4);
Buffer.BlockCopy(BitConverter.GetBytes(keepAliveTime), 0, keepAliveSettings, 4, 4);
Buffer.BlockCopy(BitConverter.GetBytes(keepAliveInterval), 0, keepAliveSettings, 8, 4);
socket.IOControl(IOControlCode.KeepAliveValues, keepAliveSettings, null);
#endif
}
catch (Exception ex)
{
OnLog?.Invoke(this, $"Failed to configure TCP keep-alive: {ex.Message}");
}
}
}
}