Files
EonaCat.Connections/EonaCat.Connections/NetworkClient.cs
2025-11-28 19:37:09 +01:00

1432 lines
48 KiB
C#

// This file is part of the EonaCat project(s) which is released under the Apache License.
// See the LICENSE file or go to https://EonaCat.com/license for full license details.
using EonaCat.Connections.EventArguments;
using EonaCat.Connections.Helpers;
using EonaCat.Connections.Models;
using System.Buffers;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Security.Authentication;
using System.Security.Cryptography;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using ErrorEventArgs = EonaCat.Connections.EventArguments.ErrorEventArgs;
using ProtocolType = EonaCat.Connections.Models.ProtocolType;
namespace EonaCat.Connections
{
public class NetworkClient : IAsyncDisposable, IDisposable
{
private readonly Configuration _config;
private TcpClient _tcpClient;
private UdpClient _udpClient;
private Stream _stream;
private Aes _aesEncryption;
private CancellationTokenSource _cancellation;
private bool _isConnected;
private Task _pingTask;
private Task _pongTask;
private CancellationTokenSource _pingCancellation;
private CancellationTokenSource _pongCancellation;
public bool IsConnected => _isConnected;
public bool IsSecure => _config != null && (_config.UseSsl || _config.UseAesEncryption);
public bool IsEncrypted => _config != null && _config.UseAesEncryption;
public bool IsTcp => _config != null && _config.Protocol == ProtocolType.TCP;
private readonly SemaphoreSlim _sendLock = new(1, 1);
private readonly SemaphoreSlim _connectLock = new(1, 1);
public event EventHandler<string> OnLog;
public DateTime ConnectionTime { get; private set; }
public TimeSpan Uptime => DateTime.UtcNow - ConnectionTime;
public DateTime LastActive { get; internal set; }
public int IdleTimeInSeconds()
{
var idleTime = IdleTime();
return (int)idleTime.TotalSeconds;
}
public int IdleTimeInMinutes()
{
var idleTime = IdleTime();
return (int)idleTime.TotalMinutes;
}
public int IdleTimeInHours()
{
var idleTime = IdleTime();
return (int)idleTime.TotalHours;
}
public int IdleTimeInDays()
{
var idleTime = IdleTime();
return (int)idleTime.TotalDays;
}
public TimeSpan IdleTime()
{
return DateTime.UtcNow - LastActive;
}
public string IdleTimeFormatted(bool includeDays = true, bool includeHours = true, bool includeMinutes = true, bool includeSeconds = true, bool includeMilliseconds = true)
{
var idleTime = IdleTime();
var parts = new List<string>();
if (includeDays)
{
parts.Add($"{(int)idleTime.TotalDays:D2}d");
}
if (includeHours)
{
parts.Add($"{idleTime.Hours:D2}h");
}
if (includeMinutes)
{
parts.Add($"{idleTime.Minutes:D2}m");
}
if (includeSeconds)
{
parts.Add($"{idleTime.Seconds:D2}s");
}
if (includeMilliseconds)
{
parts.Add($"{idleTime.Milliseconds:D3}ms");
}
return string.Join(" ", parts);
}
public int ConnectedTimeInSeconds()
{
var connectedTime = DateTime.UtcNow - ConnectionTime;
return (int)connectedTime.TotalSeconds;
}
public int ConnectedTimeInMinutes()
{
var connectedTime = DateTime.UtcNow - ConnectionTime;
return (int)connectedTime.TotalMinutes;
}
public int ConnectedTimeInHours()
{
var connectedTime = DateTime.UtcNow - ConnectionTime;
return (int)connectedTime.TotalHours;
}
public int ConnectedTimeInDays()
{
var connectedTime = DateTime.UtcNow - ConnectionTime;
return (int)connectedTime.TotalDays;
}
public TimeSpan ConnectedTime()
{
return DateTime.UtcNow - ConnectionTime;
}
public string ConnectedTimeFormatted(bool includeDays = true, bool includeHours = true, bool includeMinutes = true, bool includeSeconds = true, bool includeMilliseconds = true)
{
var connectedTime = ConnectedTime();
var parts = new List<string>();
if (includeDays)
{
parts.Add($"{(int)connectedTime.TotalDays:D2}d");
}
if (includeHours)
{
parts.Add($"{connectedTime.Hours:D2}h");
}
if (includeMinutes)
{
parts.Add($"{connectedTime.Minutes:D2}m");
}
if (includeSeconds)
{
parts.Add($"{connectedTime.Seconds:D2}s");
}
if (includeMilliseconds)
{
parts.Add($"{connectedTime.Milliseconds:D3}ms");
}
return string.Join(" ", parts);
}
private bool _disposed;
private bool _stopAutoReconnecting;
public event EventHandler<ConnectionEventArgs> OnConnected;
public event EventHandler<ConnectionEventArgs> OnNicknameSend;
public event EventHandler<DataReceivedEventArgs> OnDataReceived;
public event EventHandler<ConnectionEventArgs> OnDisconnected;
public event EventHandler<ErrorEventArgs> OnSslError;
public event EventHandler<ErrorEventArgs> OnEncryptionError;
public event EventHandler<ErrorEventArgs> OnGeneralError;
public event EventHandler<PingEventArgs> OnPingResponse;
public event EventHandler<PingEventArgs> OnPongMissed;
public NetworkClient(Configuration config)
{
_config = config ?? throw new ArgumentNullException(nameof(config));
}
public async Task<bool> ConnectAsync()
{
await _connectLock.WaitAsync().ConfigureAwait(false);
try
{
await CleanupAsync().ConfigureAwait(false);
_cancellation = new CancellationTokenSource();
if (_config.Protocol == ProtocolType.TCP)
{
var result = await ConnectTcpAsync().ConfigureAwait(false);
if (!result)
{
throw new Exception("Failed to connect via TCP");
}
}
else
{
var result = await ConnectUdpAsync().ConfigureAwait(false);
if (!result)
{
throw new Exception("Failed to connect via UDP");
}
}
// If we already had a nickname, resend it
if (!string.IsNullOrEmpty(Nickname))
{
OnLog?.Invoke(this, "Resending nickname after reconnect");
await SendNicknameAsync(Nickname).ConfigureAwait(false);
}
return true;
}
catch (Exception exception)
{
_isConnected = false;
OnGeneralError?.Invoke(this, new ErrorEventArgs
{
Exception = exception, Message = "Connection error"
});
if (_config.EnableAutoReconnect)
{
_ = Task.Run(() => AutoReconnectAsync());
}
return false;
}
finally
{
try
{
_connectLock.Release();
}
catch
{
// Do nothing
}
}
}
private async Task CleanupAsync()
{
try
{
_cancellation?.Cancel();
}
catch
{
// Do nothing
}
try
{
_pingCancellation?.Cancel();
}
catch
{
// Do nothing
}
try
{
_pongCancellation?.Cancel();
}
catch
{
// Do nothing
}
if (_pingTask != null)
{
try
{
await Task.WhenAny(_pingTask, Task.Delay(500)).ConfigureAwait(false);
}
catch
{
// Do nothing
}
}
if (_pongTask != null)
{
try
{
await Task.WhenAny(_pongTask, Task.Delay(500)).ConfigureAwait(false);
}
catch
{
// Do nothing
}
}
try
{
_cancellation?.Dispose();
}
catch
{
// Do nothing
}
try
{
_pingCancellation?.Dispose();
}
catch
{
// Do nothing
}
try
{
_pongCancellation?.Dispose();
}
catch
{
// Do nothing
}
_cancellation = null;
_pingCancellation = null;
_pongCancellation = null;
_pingTask = null;
_pongTask = null;
try
{
_tcpClient?.Close();
}
catch
{
// Do nothing
}
try
{
_tcpClient?.Dispose();
}
catch
{
// Do nothing
}
_tcpClient = null;
try
{
_udpClient?.Close();
}
catch
{
// Do nothing
}
try
{
_udpClient?.Dispose();
}
catch
{
// Do nothing
}
_udpClient = null;
try
{
_stream?.Dispose();
}
catch
{
// Do nothing
}
_stream = null;
try
{
_aesEncryption?.Dispose();
}
catch
{
// Do nothing
}
_aesEncryption = null;
_isConnected = false;
_stopAutoReconnecting = false;
}
private async Task<bool> ConnectTcpAsync()
{
int attempt = 0;
while (_config.SSLMaxRetries == 0 || attempt < _config.SSLMaxRetries)
{
attempt++;
try
{
_tcpClient?.Dispose();
_tcpClient = new TcpClient();
_tcpClient.NoDelay = !_config.EnableNagle;
_tcpClient.ReceiveBufferSize = _config.BufferSize;
_tcpClient.SendBufferSize = _config.BufferSize;
_tcpClient.LingerState = new LingerOption(enable: true, seconds: 0);
await _tcpClient.ConnectAsync(_config.Host, _config.Port).ConfigureAwait(false);
NetworkStream networkStream = _tcpClient.GetStream();
if (!_config.UseSsl)
{
_stream = networkStream;
break;
}
var sslStream = new SslStream(networkStream, leaveInnerStreamOpen: true, _config.GetRemoteCertificateValidationCallback());
try
{
if (_config.Certificate != null)
{
await sslStream.AuthenticateAsClientAsync(_config.Host, new X509CertificateCollection { _config.Certificate }, SslProtocols.Tls12 | SslProtocols.Tls13, _config.CheckCertificateRevocation).ConfigureAwait(false);
}
else
{
await sslStream.AuthenticateAsClientAsync(_config.Host, null, SslProtocols.Tls12 | SslProtocols.Tls13, _config.CheckCertificateRevocation).ConfigureAwait(false);
}
_stream = sslStream;
break;
}
catch (Exception exception)
{
sslStream.Dispose();
if (_config.SSLMaxRetries != 0 && attempt >= _config.SSLMaxRetries)
{
throw;
}
OnLog?.Invoke(this, $"[Attempt {attempt}] SSL handshake failed for {_config.Host}:{_config.Port}: {exception.Message}");
await Task.Delay(_config.SSLRetryDelayInSeconds * 1000).ConfigureAwait(false);
}
}
catch (Exception exception)
{
OnLog?.Invoke(this, $"[Attempt {attempt}] TCP connection failed for {_config.Host}:{_config.Port}: {exception.Message}");
if (_config.SSLMaxRetries != 0 && attempt >= _config.SSLMaxRetries)
{
throw;
}
await Task.Delay(_config.SSLRetryDelayInSeconds * 1000).ConfigureAwait(false);
}
}
if (_config.UseAesEncryption)
{
_aesEncryption = await AesKeyExchange.ReceiveAesKeyAsync(_stream, _config.AesPassword).ConfigureAwait(false);
}
_isConnected = true;
ConnectionTime = DateTime.UtcNow;
OnConnected?.Invoke(this, new ConnectionEventArgs
{
ClientId = "self",
RemoteEndPoint = new IPEndPoint(IPAddress.Parse(_config.Host), _config.Port)
});
_ = Task.Run(() => ReceiveDataAsync(), _cancellation.Token);
if (_config.EnableHeartbeat)
{
_ = Task.Run(StartPingLoop, _cancellation.Token);
_ = Task.Run(StartPongLoop, _cancellation.Token);
}
return true;
}
public string IpAddress => _config != null ? _config.Host : string.Empty;
public int Port => _config != null ? _config.Port : 0;
public bool IsAutoReconnectRunning { get; private set; }
public string Nickname { get; private set; }
public bool DEBUG_DATA_SEND { get; set; }
public bool DEBUG_DATA_RECEIVED { get; set; }
public DateTime LastPongReceived { get; private set; }
public DateTime LastPingSent { get; private set; }
public bool DisconnectOnMissedPong { get; set; }
public DateTime LastDataSent { get; private set; }
public DateTime LastDataReceived { get; private set; }
public DateTime DisconnectionTime { get; private set; }
private async Task<bool> ConnectUdpAsync()
{
try
{
_udpClient = new UdpClient();
_udpClient.Client.ReceiveBufferSize = _config.BufferSize;
_udpClient.Client.SendBufferSize = _config.BufferSize;
_udpClient.Connect(_config.Host, _config.Port);
_isConnected = true;
ConnectionTime = DateTime.UtcNow;
OnConnected?.Invoke(this, new ConnectionEventArgs { ClientId = "self", RemoteEndPoint = new IPEndPoint(IPAddress.Parse(_config.Host), _config.Port) });
_ = Task.Run(() => ReceiveUdpDataAsync(), _cancellation.Token);
if (_config.EnableHeartbeat)
{
_ = Task.Run(StartPingLoop, _cancellation.Token);
_ = Task.Run(StartPongLoop, _cancellation.Token);
}
return true;
}
catch (Exception exception)
{
OnGeneralError?.Invoke(this, new ErrorEventArgs { Exception = exception, Message = "UDP connection error" });
return false;
}
}
private void StartPingLoop()
{
try
{
_pingCancellation?.Cancel();
}
catch
{
// Do nothing
}
_pingCancellation?.Dispose();
_pingCancellation = new CancellationTokenSource();
var token = _pingCancellation.Token;
_pingTask = Task.Run(async () =>
{
var interval = TimeSpan.FromSeconds(_config.HeartbeatIntervalSeconds);
var next = DateTime.UtcNow + interval;
while (!token.IsCancellationRequested && _isConnected)
{
try
{
if (_stream == null || !_stream.CanWrite)
{
break;
}
var pingData = Encoding.UTF8.GetBytes(Configuration.PING_VALUE);
await WriteToStreamAsync(pingData).ConfigureAwait(false);
if (_config.EnablePingPongLogs)
{
OnLog?.Invoke(this, $"[PING] Sent at {DateTime.UtcNow:O}");
}
var delay = next - DateTime.UtcNow;
if (delay > TimeSpan.Zero)
{
await Task.Delay(delay, token).ConfigureAwait(false);
}
next += interval;
}
catch (OperationCanceledException)
{
break;
}
catch (Exception exception)
{
OnLog?.Invoke(this, $"[PING] Error sending ping: {exception.Message}");
break;
}
}
}, token);
}
private void StartPongLoop()
{
_pongCancellation?.Cancel();
_pongCancellation?.Dispose();
_pongCancellation = new CancellationTokenSource();
var token = _pongCancellation.Token;
_pongTask = Task.Run(async () =>
{
var interval = TimeSpan.FromSeconds(1);
var next = DateTime.UtcNow + interval;
while (!token.IsCancellationRequested && _isConnected)
{
try
{
var elapsed = (DateTime.UtcNow - LastPongReceived).TotalSeconds;
if (LastPongReceived != DateTime.MinValue && elapsed > _config.HeartbeatIntervalSeconds * 2)
{
if (_config.EnablePingPongLogs)
{
OnLog?.Invoke(this, "Server heartbeat timeout. Disconnecting.");
}
OnPongMissed?.Invoke(this, new PingEventArgs
{
Id = "server",
Nickname = Nickname,
ReceivedTime = DateTime.UtcNow,
RemoteEndPoint = new IPEndPoint(IPAddress.Parse(_config.Host), _config.Port)
});
if (DisconnectOnMissedPong)
{
await DisconnectClientAsync(DisconnectReason.NoPongReceived);
break;
}
}
var delay = next - DateTime.UtcNow;
if (delay > TimeSpan.Zero)
{
await Task.Delay(delay, token).ConfigureAwait(false);
}
next += interval;
}
catch (TaskCanceledException)
{
break;
}
catch (Exception exception)
{
OnGeneralError?.Invoke(this, new ErrorEventArgs
{
Exception = exception,
Message = "Client pong watchdog failed"
});
}
}
}, token);
}
private async Task ReceiveDataAsync()
{
var pooled = ArrayPool<byte>.Shared.Rent(_config.BufferSize);
var assemblyBuffer = new List<byte>(_config.BufferSize * 2);
try
{
while (_cancellation != null && !_cancellation.Token.IsCancellationRequested && _isConnected)
{
int bytesRead;
try
{
bytesRead = await _stream.ReadAsync(pooled, 0, pooled.Length, _cancellation.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception exception)
{
await DisconnectClientAsync(DisconnectReason.Error, exception).ConfigureAwait(false);
break;
}
if (bytesRead == 0)
{
await DisconnectClientAsync(DisconnectReason.RemoteClosed).ConfigureAwait(false);
break;
}
if (assemblyBuffer.Count > _config.MAX_MESSAGE_SIZE)
{
OnLog?.Invoke(this, "Buffer overflow detected; dropping connection");
await DisconnectClientAsync(DisconnectReason.Error).ConfigureAwait(false);
break;
}
assemblyBuffer.AddRange(pooled.AsSpan(0, bytesRead).ToArray());
LastDataReceived = DateTime.UtcNow;
while (true)
{
var message = BuildMessage(assemblyBuffer);
if (message == null)
{
// Need more data
break;
}
if (_config.UseAesEncryption && _aesEncryption != null)
{
try
{
await AesKeyExchange.DecryptDataAsync(message, message.Length, _aesEncryption).ConfigureAwait(false);
}
catch (Exception exception)
{
OnEncryptionError?.Invoke(this, new ErrorEventArgs { Exception = exception, Message = "AES decryption failed" });
continue;
}
}
await ProcessReceivedDataAsync(message).ConfigureAwait(false);
}
}
}
finally
{
ArrayPool<byte>.Shared.Return(pooled, clearArray: true);
}
}
private byte[] BuildMessage(List<byte> buffer)
{
if (buffer == null || buffer.Count == 0)
{
return null;
}
byte[] message = null;
bool useBigEndian = _config.UseBigEndian;
switch (_config.MessageFraming)
{
case FramingMode.LengthPrefixed:
if (buffer.Count < 4)
{
break;
}
var lengthBytes = buffer.Take(4).ToArray();
if (useBigEndian && BitConverter.IsLittleEndian)
{
Array.Reverse(lengthBytes);
}
int length = BitConverter.ToInt32(lengthBytes, 0);
if (length < 0 || length > _config.MAX_MESSAGE_SIZE)
{
OnLog?.Invoke(this, $"Invalid message length: {length}");
buffer.Clear();
return null;
}
if (buffer.Count < 4 + length)
{
break;
}
message = buffer.Skip(4).Take(length).ToArray();
buffer.RemoveRange(0, 4 + length);
break;
case FramingMode.Delimiter:
int index = IndexOfDelimiter(buffer, _config.Delimiter);
if (index < 0)
{
break;
}
message = buffer.Take(index).ToArray();
buffer.RemoveRange(0, index + _config.Delimiter.Length);
break;
case FramingMode.None:
message = buffer.ToArray();
buffer.Clear();
break;
}
return message;
}
private int IndexOfDelimiter(List<byte> buffer, byte[] delimiter)
{
if (delimiter == null || delimiter.Length == 0 || buffer.Count < delimiter.Length)
{
return -1;
}
for (int i = 0; i <= buffer.Count - delimiter.Length; i++)
{
bool match = true;
for (int j = 0; j < delimiter.Length; j++)
{
if (buffer[i + j] != delimiter[j]) { match = false; break; }
}
if (match)
{
return i;
}
}
return -1;
}
private async Task ProcessReceivedDataAsync(byte[] data)
{
if (data == null || data.Length == 0)
{
return;
}
string stringData = null;
bool isBinary = true;
int realLength = Array.FindLastIndex(data, b => b != 0) + 1;
if (realLength > 0 && realLength < data.Length)
{
var trimmed = new byte[realLength];
Buffer.BlockCopy(data, 0, trimmed, 0, realLength);
data = trimmed;
}
try
{
stringData = Encoding.UTF8.GetString(data);
if (Encoding.UTF8.GetByteCount(stringData) == data.Length)
{
isBinary = false;
}
}
catch
{
// Not a valid UTF-8 string
}
if (!string.IsNullOrEmpty(stringData))
{
LastPongReceived = DateTime.UtcNow;
ProcessPingPong(ref stringData);
if (string.IsNullOrEmpty(stringData))
{
return;
}
}
if (DEBUG_DATA_RECEIVED)
{
if (isBinary)
{
OnLog?.Invoke(this, $"[DEBUG DATA] Received binary data: {BitConverter.ToString(data)}");
}
else
{
OnLog?.Invoke(this, $"[DEBUG DATA] Received string data: {stringData}");
}
}
if (!string.IsNullOrEmpty(stringData))
{
bool handled = await HandleCommandAsync(stringData).ConfigureAwait(false);
if (handled)
{
return;
}
}
if (!string.IsNullOrEmpty(stringData))
{
OnDataReceived?.Invoke(this, new DataReceivedEventArgs
{
ClientId = "server",
Data = data,
StringData = stringData,
IsBinary = isBinary,
Timestamp = DateTime.UtcNow,
RemoteEndPoint = new IPEndPoint(IPAddress.Parse(_config.Host), _config.Port),
Nickname = Nickname,
});
}
}
private void ProcessPingPong(ref string message)
{
if (!_config.EnableHeartbeat || string.IsNullOrEmpty(message))
{
return;
}
if (message.Contains(Configuration.PING_VALUE))
{
LastPingSent = DateTime.UtcNow;
if (_config.EnablePingPongLogs)
{
OnLog?.Invoke(this, "Received PING from server. Sending PONG response.");
}
SendAsync(Configuration.PONG_VALUE).ConfigureAwait(false);
message = message.Replace(Configuration.PING_VALUE, string.Empty);
}
if (message.Contains(Configuration.PONG_VALUE))
{
LastPongReceived = DateTime.UtcNow;
if (_config.EnablePingPongLogs)
{
OnLog?.Invoke(this, "Received PONG from server for PING");
}
OnPingResponse?.Invoke(this, new PingEventArgs
{
Id = "server",
Nickname = Nickname,
ReceivedTime = DateTime.UtcNow,
RemoteEndPoint = new IPEndPoint(IPAddress.Parse(_config.Host), _config.Port)
});
message = message.Replace(Configuration.PONG_VALUE, string.Empty);
}
}
private async Task<bool> HandleCommandAsync(string text)
{
if (string.IsNullOrWhiteSpace(text))
{
return false;
}
if (text.Equals("DISCONNECT", StringComparison.OrdinalIgnoreCase))
{
await DisconnectClientAsync(DisconnectReason.RemoteClosed).ConfigureAwait(false);
return true;
}
return false;
}
private async Task ReceiveUdpDataAsync()
{
while (_cancellation != null && !_cancellation.Token.IsCancellationRequested && _isConnected)
{
try
{
var result = await _udpClient.ReceiveAsync().ConfigureAwait(false);
var buffer = result.Buffer;
if (_config.EnableHeartbeat)
{
try
{
buffer = HandleHeartbeat(buffer, out bool foundHeartbeat);
if (foundHeartbeat)
{
LastPongReceived = DateTime.UtcNow;
if (_config.EnablePingPongLogs)
{
OnLog?.Invoke(this, "Received PONG from server for PING (UDP)");
}
}
if (buffer == null || buffer.Length == 0)
{
continue;
}
}
catch (Exception exception)
{
OnGeneralError?.Invoke(this, new ErrorEventArgs { Exception = exception, Message = "Heartbeat handling failed (UDP)" });
}
}
await ProcessReceivedDataAsync(buffer).ConfigureAwait(false);
}
catch (Exception exception)
{
OnGeneralError?.Invoke(this, new ErrorEventArgs { Exception = exception, Message = "Error receiving data" });
_isConnected = false;
ConnectionTime = DateTime.MinValue;
_ = Task.Run(() => AutoReconnectAsync());
break;
}
}
}
public async Task<bool> SendNicknameAsync(string nickname)
{
if (string.IsNullOrWhiteSpace(nickname))
{
return false;
}
var result = await SendAsync($"[NICKNAME]{nickname}[/NICKNAME]").ConfigureAwait(false);
if (result)
{
Nickname = nickname.Trim();
OnNicknameSend?.Invoke(this, new ConnectionEventArgs
{
ClientId = nickname,
RemoteEndPoint = new IPEndPoint(IPAddress.Parse(_config.Host), _config.Port),
Nickname = nickname
});
}
return result;
}
public async Task<bool> SendAsync(string message) => await SendAsync(Encoding.UTF8.GetBytes(message)).ConfigureAwait(false);
public async Task<bool> SendAsync(byte[] data)
{
if (!_isConnected)
{
return false;
}
await _sendLock.WaitAsync().ConfigureAwait(false);
try
{
byte[] payload = data;
if (_config.UseAesEncryption && _aesEncryption != null)
{
await AesKeyExchange.EncryptDataAsync(payload, payload.Length, _aesEncryption).ConfigureAwait(false);
}
byte[] framedData = null;
switch (_config.MessageFraming)
{
case FramingMode.LengthPrefixed:
var lengthPrefix = BitConverter.GetBytes(payload.Length);
if (_config.UseBigEndian && BitConverter.IsLittleEndian)
{
Array.Reverse(lengthPrefix);
}
framedData = new byte[lengthPrefix.Length + payload.Length];
Buffer.BlockCopy(lengthPrefix, 0, framedData, 0, lengthPrefix.Length);
Buffer.BlockCopy(payload, 0, framedData, lengthPrefix.Length, payload.Length);
break;
case FramingMode.Delimiter:
if (_config.Delimiter == null || _config.Delimiter.Length == 0)
{
throw new InvalidOperationException("Delimiter cannot be null or empty.");
}
framedData = new byte[payload.Length + _config.Delimiter.Length];
Buffer.BlockCopy(payload, 0, framedData, 0, payload.Length);
Buffer.BlockCopy(_config.Delimiter, 0, framedData, payload.Length, _config.Delimiter.Length);
break;
case FramingMode.None:
framedData = payload;
break;
}
if (DEBUG_DATA_SEND)
{
OnLog?.Invoke(this, $"[DEBUG] Sending raw: {BitConverter.ToString(data)}");
OnLog?.Invoke(this, $"[DEBUG] Sending framed: {BitConverter.ToString(framedData)}");
}
return await WriteToStreamAsync(framedData).ConfigureAwait(false);
}
catch (Exception exception)
{
OnGeneralError?.Invoke(this, new ErrorEventArgs { Exception = exception, Message = "Error sending data" });
return false;
}
finally
{
try
{
_sendLock.Release();
}
catch
{
// Do nothing
}
}
}
private readonly SemaphoreSlim _writeLock = new(1, 1);
private async Task<bool> WriteToStreamAsync(byte[] dataToSend)
{
if (_stream == null || !_stream.CanWrite)
{
return false;
}
await _writeLock.WaitAsync().ConfigureAwait(false);
try
{
if (_config.Protocol == ProtocolType.TCP)
{
await _stream.WriteAsync(dataToSend, 0, dataToSend.Length).ConfigureAwait(false);
}
else
{
await _udpClient.SendAsync(dataToSend, dataToSend.Length).ConfigureAwait(false);
}
LastActive = DateTime.UtcNow;
LastDataSent = DateTime.UtcNow;
}
finally
{
try
{
_writeLock.Release();
}
catch
{
// Do nothing
}
}
return true;
}
private async Task AutoReconnectAsync()
{
if (!_config.EnableAutoReconnect || IsAutoReconnectRunning)
{
return;
}
IsAutoReconnectRunning = true;
try
{
bool wasConnected = false;
int attempt = 0;
while (_config.EnableAutoReconnect && (_config.MaxReconnectAttempts == 0 || attempt < _config.MaxReconnectAttempts))
{
if (_stopAutoReconnecting)
{
_stopAutoReconnecting = false;
break;
}
if (IsConnected)
{
wasConnected = true;
attempt = 0;
await Task.Delay(_config.ReconnectDelayInSeconds * 1000).ConfigureAwait(false);
continue;
}
if (wasConnected)
{
OnGeneralError?.Invoke(this, new ErrorEventArgs { Message = "Connection lost. Starting auto-reconnect attempts." });
wasConnected = false;
}
attempt++;
try
{
OnGeneralError?.Invoke(this, new ErrorEventArgs { Message = $"Attempting to reconnect (Attempt {attempt})" });
await ConnectAsync().ConfigureAwait(false);
if (IsConnected)
{
OnGeneralError?.Invoke(this, new ErrorEventArgs { Message = $"Reconnected successfully after {attempt} attempt(s)" });
attempt = 0;
await Task.Delay(_config.ReconnectDelayInSeconds * 1000).ConfigureAwait(false);
continue;
}
}
catch (Exception exception)
{
var stringBuilder = new StringBuilder();
stringBuilder.AppendLine($"Reconnect attempt failed: {exception.Message}");
var inner = exception.InnerException;
while (inner != null)
{
stringBuilder.AppendLine($"Inner exception: {inner.Message}"); inner = inner.InnerException;
}
OnGeneralError?.Invoke(this, new ErrorEventArgs { Exception = exception, Message = stringBuilder.ToString() });
}
await Task.Delay(_config.ReconnectDelayInSeconds * 1000).ConfigureAwait(false);
}
}
finally
{
IsAutoReconnectRunning = false;
}
}
public async Task DisconnectAsync() => await DisconnectClientAsync(DisconnectReason.ClientRequested, forceDisconnection: true).ConfigureAwait(false);
public async Task DisconnectClientAsync(DisconnectReason reason = DisconnectReason.LocalClosed, Exception exception = null, bool forceDisconnection = false)
{
await _connectLock.WaitAsync().ConfigureAwait(false);
try
{
if (!_isConnected)
{
return;
}
_isConnected = false;
ConnectionTime = DateTime.MinValue;
_stopAutoReconnecting = forceDisconnection;
OnLog?.Invoke(this, $"Disconnecting client... {reason} {exception}");
try
{
_pingCancellation?.Cancel();
}
catch
{
// Do nothing
}
try
{
_pongCancellation?.Cancel();
}
catch
{
// Do nothing
}
try
{
_cancellation?.Cancel();
}
catch
{
// Do nothing
}
await SafeTaskCompletion(_pingTask).ConfigureAwait(false);
await SafeTaskCompletion(_pongTask).ConfigureAwait(false);
await CleanupAsync().ConfigureAwait(false);
DisconnectionTime = DateTime.UtcNow;
OnDisconnected?.Invoke(this, new ConnectionEventArgs
{
ClientId = "self",
RemoteEndPoint = new IPEndPoint(IPAddress.Parse(_config.Host), _config.Port),
Reason = ConnectionEventArgs.Determine(reason, exception),
Exception = exception
});
if (!forceDisconnection && reason != DisconnectReason.Forced && _config.EnableAutoReconnect)
{
_ = Task.Run(() => AutoReconnectAsync());
}
else
{
OnLog?.Invoke(this, "Auto-reconnect disabled due to forced disconnection.");
}
}
finally
{
_connectLock.Release();
}
}
private async Task SafeTaskCompletion(Task task)
{
if (task == null)
{
return;
}
try
{
await task.ConfigureAwait(false);
}
catch
{
// Do nothing
}
}
public async ValueTask DisposeAsync()
{
if (_disposed)
{
return;
}
_disposed = true;
try
{
await DisconnectClientAsync(forceDisconnection: true).ConfigureAwait(false);
await SafeTaskCompletion(_pingTask).ConfigureAwait(false);
await SafeTaskCompletion(_pongTask).ConfigureAwait(false);
_pingTask = null; _pongTask = null;
try
{
_cancellation?.Cancel();
}
catch
{
// Do nothing
}
try
{
_pingCancellation?.Cancel();
}
catch
{
// Do nothing
}
try
{
_pongCancellation?.Cancel();
}
catch
{
// Do nothing
}
try
{
_cancellation?.Dispose();
}
catch
{
_cancellation = null;
}
try
{
_pingCancellation?.Dispose();
}
catch
{
_pingCancellation = null;
}
try
{
_pongCancellation?.Dispose();
}
catch
{
_pongCancellation = null;
}
_sendLock.Dispose();
_connectLock.Dispose();
OnConnected = null;
OnDisconnected = null;
OnDataReceived = null;
OnGeneralError = null;
OnPingResponse = null;
}
finally
{
GC.SuppressFinalize(this);
}
}
private byte[] HandleHeartbeat(byte[] data, out bool hasHeartbeat)
{
hasHeartbeat = false;
if (!_config.EnableHeartbeat || data == null || data.Length == 0)
{
return data;
}
string text = null;
try
{
text = Encoding.UTF8.GetString(data);
}
catch
{
// Not a valid UTF-8 string
}
if (string.IsNullOrEmpty(text))
{
return data;
}
if (text.Equals("DISCONNECT", StringComparison.OrdinalIgnoreCase))
{
DisconnectClientAsync(DisconnectReason.RemoteClosed).ConfigureAwait(false);
hasHeartbeat = true;
}
if (text.Contains(Configuration.PING_VALUE))
{
hasHeartbeat = true;
SendAsync(Configuration.PONG_VALUE).ConfigureAwait(false);
if (_config.EnablePingPongLogs)
{
OnLog?.Invoke(this, "PING received. Sent PONG response.");
}
}
if (text.Contains(Configuration.PONG_VALUE))
{
hasHeartbeat = true;
var now = DateTime.UtcNow;
OnPingResponse?.Invoke(this, new PingEventArgs
{
Id = "server",
Nickname = Nickname,
ReceivedTime = now,
RemoteEndPoint = new IPEndPoint(IPAddress.Parse(_config.Host), _config.Port)
});
if (_config.EnablePingPongLogs)
{
OnLog?.Invoke(this, "PONG received.");
}
}
if (hasHeartbeat && !string.IsNullOrEmpty(text))
{
text = text.Replace(Configuration.PING_VALUE, string.Empty).Replace(Configuration.PONG_VALUE, string.Empty).Replace("DISCONNECT", string.Empty);
data = Encoding.UTF8.GetBytes(text);
}
return data;
}
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
}
}