From 1b11821ebcb5fb64e2b9abdafc7028baea453400 Mon Sep 17 00:00:00 2001 From: EonaCat Date: Thu, 21 Aug 2025 15:29:43 +0200 Subject: [PATCH] Updated --- EonaCat.Connections.Client/Program.cs | 19 +- EonaCat.Connections.Server/Program.cs | 7 +- .../EonaCat.Connections.csproj | 63 ++-- .../EventArguments/DataReceivedEventArgs.cs | 16 +- EonaCat.Connections/Helpers/AesKeyExchange.cs | 13 +- EonaCat.Connections/Helpers/StringHelpers.cs | 28 ++ EonaCat.Connections/Models/Configuration.cs | 101 +++-- EonaCat.Connections/Models/Connection.cs | 28 +- EonaCat.Connections/NetworkClient.cs | 82 ++-- EonaCat.Connections/NetworkServer.cs | 153 ++++++-- .../Processors/JsonDataProcessor.cs | 357 ++++++++++++++++++ 11 files changed, 707 insertions(+), 160 deletions(-) create mode 100644 EonaCat.Connections/Helpers/StringHelpers.cs create mode 100644 EonaCat.Connections/Processors/JsonDataProcessor.cs diff --git a/EonaCat.Connections.Client/Program.cs b/EonaCat.Connections.Client/Program.cs index 05041f5..15cc33e 100644 --- a/EonaCat.Connections.Client/Program.cs +++ b/EonaCat.Connections.Client/Program.cs @@ -22,21 +22,6 @@ namespace EonaCat.Connections.Client.Example break; } - var jsonUrl = "https://microsoftedge.github.io/Demos/json-dummy-data/5MB-min.json"; - - try - { - using var httpClient = new HttpClient(); - var jsonContent = await httpClient.GetStringAsync(jsonUrl); - var jsonSize = Encoding.UTF8.GetByteCount(jsonContent); - Console.WriteLine($"Using large JSON file (size: {jsonSize / 1024 / 1024} MB)"); - message = jsonContent; - } - catch (Exception ex) - { - Console.WriteLine($"Failed to download large JSON file: {ex.Message}"); - } - if (!string.IsNullOrEmpty(message)) { await _client.SendAsync(message).ConfigureAwait(false); @@ -53,8 +38,8 @@ namespace EonaCat.Connections.Client.Example Port = 1111, UseSsl = false, UseAesEncryption = true, - AesPassword = "p@ss", - //ServerCertificate = new System.Security.Cryptography.X509Certificates.X509Certificate2("client.pfx", "p@ss"), + AesPassword = "EonaCat.Connections.Password", + Certificate = new System.Security.Cryptography.X509Certificates.X509Certificate2("client.pfx", "p@ss"), }; _client = new NetworkClient(config); diff --git a/EonaCat.Connections.Server/Program.cs b/EonaCat.Connections.Server/Program.cs index 747cd82..a19219a 100644 --- a/EonaCat.Connections.Server/Program.cs +++ b/EonaCat.Connections.Server/Program.cs @@ -1,5 +1,4 @@ -using EonaCat.Connections; -using EonaCat.Connections.Models; +using EonaCat.Connections.Models; namespace EonaCat.Connections.Server.Example { @@ -39,8 +38,8 @@ namespace EonaCat.Connections.Server.Example UseSsl = false, UseAesEncryption = true, MaxConnections = 100000, - AesPassword = "p@ss", - //ServerCertificate = new System.Security.Cryptography.X509Certificates.X509Certificate2("server.pfx", "p@ss"), + AesPassword = "EonaCat.Connections.Password", + Certificate = new System.Security.Cryptography.X509Certificates.X509Certificate2("server.pfx", "p@ss") }; _server = new NetworkServer(config); diff --git a/EonaCat.Connections/EonaCat.Connections.csproj b/EonaCat.Connections/EonaCat.Connections.csproj index ba21650..34dfd7c 100644 --- a/EonaCat.Connections/EonaCat.Connections.csproj +++ b/EonaCat.Connections/EonaCat.Connections.csproj @@ -1,35 +1,34 @@ - - - - net48; net8.0 - latest - enable - enable - True - EonaCat.Connections - EonaCat (Jeroen Saey) - EonaCat (Jeroen Saey) - EonaCat.png - readme.md - EonaCat.Connections - 1.0.2 - EonaCat (Jeroen Saey) - LICENSE - - - - - True - \ - + + + + net48; net8.0 + latest + enable + enable + True + EonaCat.Connections + EonaCat (Jeroen Saey) + EonaCat (Jeroen Saey) + readme.md + EonaCat.Connections + 1.0.4 + EonaCat (Jeroen Saey) + LICENSE + + + True \ - - - True - \ - - - - + + + True + \ + + + + + + + + diff --git a/EonaCat.Connections/EventArguments/DataReceivedEventArgs.cs b/EonaCat.Connections/EventArguments/DataReceivedEventArgs.cs index 55aed13..46093d9 100644 --- a/EonaCat.Connections/EventArguments/DataReceivedEventArgs.cs +++ b/EonaCat.Connections/EventArguments/DataReceivedEventArgs.cs @@ -1,11 +1,15 @@ -namespace EonaCat.Connections +using System.Net; + +namespace EonaCat.Connections { public class DataReceivedEventArgs : EventArgs { - public string ClientId { get; set; } - public byte[] Data { get; set; } - public string StringData { get; set; } - public bool IsBinary { get; set; } - public DateTime Timestamp { get; set; } = DateTime.UtcNow; + public string ClientId { get; internal set; } + public byte[] Data { get; internal set; } + public string StringData { get; internal set; } + public bool IsBinary { get; internal set; } + public DateTime Timestamp { get; internal set; } = DateTime.UtcNow; + public IPEndPoint RemoteEndPoint { get; internal set; } + public string Nickname { get; internal set; } } } \ No newline at end of file diff --git a/EonaCat.Connections/Helpers/AesKeyExchange.cs b/EonaCat.Connections/Helpers/AesKeyExchange.cs index 80ea79f..11ed60f 100644 --- a/EonaCat.Connections/Helpers/AesKeyExchange.cs +++ b/EonaCat.Connections/Helpers/AesKeyExchange.cs @@ -77,7 +77,10 @@ namespace EonaCat.Connections.Helpers for (int block = 1; block <= keyBlocks; block++) { byte[] intBlock = BitConverter.GetBytes(block); - if (BitConverter.IsLittleEndian) Array.Reverse(intBlock); + if (BitConverter.IsLittleEndian) + { + Array.Reverse(intBlock); + } hmac.Initialize(); hmac.TransformBlock(salt, 0, salt.Length, salt, 0); @@ -89,7 +92,9 @@ namespace EonaCat.Connections.Helpers { temp = hmac.ComputeHash(temp); for (int j = 0; j < hashLength; j++) + { buffer[j] ^= temp[j]; + } } int offset = (block - 1) * hashLength; @@ -118,7 +123,11 @@ namespace EonaCat.Connections.Helpers while (totalRead < count) { int read = await stream.ReadAsync(buffer, offset + totalRead, count - totalRead); - if (read == 0) throw new EndOfStreamException("Stream ended prematurely"); + if (read == 0) + { + throw new EndOfStreamException("Stream ended prematurely"); + } + totalRead += read; } } diff --git a/EonaCat.Connections/Helpers/StringHelpers.cs b/EonaCat.Connections/Helpers/StringHelpers.cs new file mode 100644 index 0000000..00fe1af --- /dev/null +++ b/EonaCat.Connections/Helpers/StringHelpers.cs @@ -0,0 +1,28 @@ +namespace EonaCat.Connections.Helpers +{ + internal class StringHelper + { + public static string GetTextBetweenTags(string message, string startTag, string endTag) + { + int startIndex = message.IndexOf(startTag); + if (startIndex == -1) + { + return string.Empty; + } + + int endIndex = message.IndexOf(endTag, startIndex + startTag.Length); + if (endIndex == -1) + { + return string.Empty; + } + + int length = endIndex - startIndex - startTag.Length; + if (length < 0) + { + return string.Empty; + } + + return message.Substring(startIndex + startTag.Length, length); + } + } +} diff --git a/EonaCat.Connections/Models/Configuration.cs b/EonaCat.Connections/Models/Configuration.cs index 54ee2be..38c5650 100644 --- a/EonaCat.Connections/Models/Configuration.cs +++ b/EonaCat.Connections/Models/Configuration.cs @@ -1,28 +1,79 @@ -using System.Security.Cryptography.X509Certificates; - -namespace EonaCat.Connections.Models -{ - // Configuration class - public class Configuration - { - public bool EnableAutoReconnect { get; set; } = true; - public int ReconnectDelayMs { get; set; } = 5000; - public int MaxReconnectAttempts { get; set; } = 0; // 0 means unlimited attempts - - public ProtocolType Protocol { get; set; } = ProtocolType.TCP; - public int Port { get; set; } = 8080; - public string Host { get; set; } = "127.0.0.1"; - public bool UseSsl { get; set; } = false; - public X509Certificate2 ServerCertificate { get; set; } - public bool UseAesEncryption { get; set; } = false; - public int BufferSize { get; set; } = 8192; - public int MaxConnections { get; set; } = 100000; - public TimeSpan ConnectionTimeout { get; set; } = TimeSpan.FromSeconds(30); - public bool EnableKeepAlive { get; set; } = true; - public bool EnableNagle { get; set; } = false; - - // For testing purposes, allow self-signed certificates +using System.Diagnostics; +using System.Net.Security; +using System.Security.Cryptography.X509Certificates; + +namespace EonaCat.Connections.Models +{ + public class Configuration + { + public bool EnableAutoReconnect { get; set; } = true; + public int ReconnectDelayMs { get; set; } = 5000; + public int MaxReconnectAttempts { get; set; } = 0; // 0 means unlimited attempts + + public ProtocolType Protocol { get; set; } = ProtocolType.TCP; + public int Port { get; set; } = 8080; + public string Host { get; set; } = "127.0.0.1"; + public bool UseSsl { get; set; } = false; + public X509Certificate2 Certificate { get; set; } + public bool UseAesEncryption { get; set; } = false; + public int BufferSize { get; set; } = 8192; + public int MaxConnections { get; set; } = 100000; + public TimeSpan ConnectionTimeout { get; set; } = TimeSpan.FromSeconds(30); + public bool EnableKeepAlive { get; set; } = true; + public bool EnableNagle { get; set; } = false; + + // For testing purposes, allow self-signed certificates public bool IsSelfSignedEnabled { get; set; } = true; public string AesPassword { get; set; } - } + public bool CheckCertificateRevocation { get; set; } + public bool MutuallyAuthenticate { get; set; } = true; + + internal RemoteCertificateValidationCallback GetRemoteCertificateValidationCallback() + { + return CertificateValidation; + } + + private bool CertificateValidation(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) + { + var sw = Stopwatch.StartNew(); + + try + { + if (IsSelfSignedEnabled) + { + return true; + } + + if (sslPolicyErrors == SslPolicyErrors.None) + { + return true; + } + + if (sslPolicyErrors.HasFlag(SslPolicyErrors.RemoteCertificateChainErrors) && chain != null) + { + foreach (var status in chain.ChainStatus) + { + if (status.Status == X509ChainStatusFlags.RevocationStatusUnknown || + status.Status == X509ChainStatusFlags.OfflineRevocation) + { + continue; + } + + if (status.Status == X509ChainStatusFlags.Revoked) + { + return false; + } + + return false; + } + return true; + } + return false; + } + finally + { + sw.Stop(); + } + } + } } \ No newline at end of file diff --git a/EonaCat.Connections/Models/Connection.cs b/EonaCat.Connections/Models/Connection.cs index c01f74a..82f04cf 100644 --- a/EonaCat.Connections/Models/Connection.cs +++ b/EonaCat.Connections/Models/Connection.cs @@ -11,8 +11,34 @@ namespace EonaCat.Connections.Models public UdpClient UdpClient { get; set; } public IPEndPoint RemoteEndPoint { get; set; } public Stream Stream { get; set; } - public string Nickname { get; set; } + + private string _nickName; + public string Nickname + { + get + { + if (string.IsNullOrWhiteSpace(_nickName)) + { + _nickName = Id; + } + return _nickName; + } + + set + { + if (string.IsNullOrWhiteSpace(value)) + { + _nickName = Id; + } + else + { + _nickName = value; + } + } + } + public DateTime ConnectedAt { get; set; } + public DateTime LastActive { get; set; } public bool IsSecure { get; set; } public bool IsEncrypted { get; set; } public Aes AesEncryption { get; set; } diff --git a/EonaCat.Connections/NetworkClient.cs b/EonaCat.Connections/NetworkClient.cs index d80fb02..6642a4a 100644 --- a/EonaCat.Connections/NetworkClient.cs +++ b/EonaCat.Connections/NetworkClient.cs @@ -1,11 +1,9 @@ using EonaCat.Connections.EventArguments; using EonaCat.Connections.Helpers; using EonaCat.Connections.Models; -using System.Collections.Concurrent; using System.Net; using System.Net.Security; using System.Net.Sockets; -using System.Security.Authentication; using System.Security.Cryptography; using System.Security.Cryptography.X509Certificates; using System.Text; @@ -63,8 +61,15 @@ namespace EonaCat.Connections { try { - var sslStream = new SslStream(stream, false, userCertificateValidationCallback:RemoteCertificateValidationCallback); - await sslStream.AuthenticateAsClientAsync(_config.Host); + var sslStream = new SslStream(stream, false, userCertificateValidationCallback:_config.GetRemoteCertificateValidationCallback()); + if (_config.Certificate != null) + { + sslStream.AuthenticateAsClient(_config.Host, new X509CertificateCollection { _config.Certificate }, _config.CheckCertificateRevocation); + } + else + { + sslStream.AuthenticateAsClient(_config.Host); + } stream = sslStream; } catch (Exception ex) @@ -102,27 +107,8 @@ namespace EonaCat.Connections } } - private bool RemoteCertificateValidationCallback(object sender, X509Certificate? certificate, X509Chain? chain, SslPolicyErrors sslPolicyErrors) - { - if (_config.IsSelfSignedEnabled) - { - return true; // Accept self-signed certificates - } - - if (sslPolicyErrors == SslPolicyErrors.None) - { - return true; // Certificate is valid - } - - // Log or handle the SSL error as needed - OnSslError?.Invoke(this, new ErrorEventArgs - { - Exception = new AuthenticationException("SSL certificate validation failed"), - Message = $"SSL Policy Errors: {sslPolicyErrors}" - }); - return false; // Reject the certificate - } - + public string IpAddress => _config != null ? _config.Host : string.Empty; + public int Port => _config != null ? _config.Port : 0; private async Task ConnectUdp() { try @@ -156,23 +142,31 @@ namespace EonaCat.Connections // Read 4-byte length prefix var lengthBuffer = new byte[4]; int read = await ReadExactAsync(_stream, lengthBuffer, 4, _cancellation.Token); - if (read == 0) break; + if (read == 0) + { + break; + } + + if (BitConverter.IsLittleEndian) + { + Array.Reverse(lengthBuffer); + } - if (BitConverter.IsLittleEndian) Array.Reverse(lengthBuffer); int length = BitConverter.ToInt32(lengthBuffer, 0); // Read encrypted payload var encrypted = new byte[length]; await ReadExactAsync(_stream, encrypted, length, _cancellation.Token); - - // **Decrypt once here** data = await DecryptDataAsync(encrypted, _aesEncryption); } else { data = new byte[_config.BufferSize]; int bytesRead = await _stream.ReadAsync(data, 0, data.Length, _cancellation.Token); - if (bytesRead == 0) break; + if (bytesRead == 0) + { + break; + } if (bytesRead < data.Length) { @@ -204,7 +198,11 @@ namespace EonaCat.Connections while (offset < length) { int read = await stream.ReadAsync(buffer, offset, length - offset, ct); - if (read == 0) return 0; + if (read == 0) + { + return 0; + } + offset += read; } return offset; @@ -246,7 +244,9 @@ namespace EonaCat.Connections { stringData = Encoding.UTF8.GetString(data); if (Encoding.UTF8.GetBytes(stringData).Length == data.Length) + { isBinary = false; + } } catch { @@ -264,16 +264,23 @@ namespace EonaCat.Connections catch (Exception ex) { if (_config.UseAesEncryption) + { OnEncryptionError?.Invoke(this, new ErrorEventArgs { Exception = ex, Message = "Error processing data" }); + } else + { OnGeneralError?.Invoke(this, new ErrorEventArgs { Exception = ex, Message = "Error processing data" }); + } } } public async Task SendAsync(byte[] data) { - if (!_isConnected) return; + if (!_isConnected) + { + return; + } try { @@ -284,7 +291,10 @@ namespace EonaCat.Connections // Prepend 4-byte length for framing var lengthPrefix = BitConverter.GetBytes(data.Length); - if (BitConverter.IsLittleEndian) Array.Reverse(lengthPrefix); + if (BitConverter.IsLittleEndian) + { + Array.Reverse(lengthPrefix); + } var framed = new byte[lengthPrefix.Length + data.Length]; Buffer.BlockCopy(lengthPrefix, 0, framed, 0, lengthPrefix.Length); @@ -306,9 +316,13 @@ namespace EonaCat.Connections catch (Exception ex) { if (_config.UseAesEncryption) + { OnEncryptionError?.Invoke(this, new ErrorEventArgs { Exception = ex, Message = "Error encrypting/sending data" }); + } else + { OnGeneralError?.Invoke(this, new ErrorEventArgs { Exception = ex, Message = "Error sending data" }); + } } } @@ -372,7 +386,7 @@ namespace EonaCat.Connections } catch { - // Ignore exceptions, we'll retry + // Do nothing } await Task.Delay(_config.ReconnectDelayMs); diff --git a/EonaCat.Connections/NetworkServer.cs b/EonaCat.Connections/NetworkServer.cs index cec1f23..04c853c 100644 --- a/EonaCat.Connections/NetworkServer.cs +++ b/EonaCat.Connections/NetworkServer.cs @@ -47,6 +47,9 @@ namespace EonaCat.Connections } } + public string IpAddress => _config != null ? _config.Host : string.Empty; + public int Port => _config != null ? _config.Port : 0; + public async Task StartAsync() { _serverCancellation = new CancellationTokenSource(); @@ -86,6 +89,11 @@ namespace EonaCat.Connections } } + public Dictionary GetClients() + { + return _clients.ToDictionary(kvp => kvp.Key, kvp => kvp.Value); + } + private async Task StartUdpServerAsync() { _udpListener = new UdpClient(_config.Port); @@ -118,6 +126,7 @@ namespace EonaCat.Connections TcpClient = tcpClient, RemoteEndPoint = (IPEndPoint)tcpClient.Client.RemoteEndPoint, ConnectedAt = DateTime.UtcNow, + LastActive = DateTime.UtcNow, CancellationToken = new CancellationTokenSource() }; @@ -137,8 +146,8 @@ namespace EonaCat.Connections { try { - var sslStream = new SslStream(stream, false, userCertificateValidationCallback:RemoteCertificateValidationCallback); - await sslStream.AuthenticateAsServerAsync(_config.ServerCertificate, false, SslProtocols.Tls12, false); + var sslStream = new SslStream(stream, false, userCertificateValidationCallback: _config.GetRemoteCertificateValidationCallback()); + await sslStream.AuthenticateAsServerAsync(_config.Certificate, _config.MutuallyAuthenticate, SslProtocols.Tls12 | SslProtocols.Tls13, _config.CheckCertificateRevocation); stream = sslStream; client.IsSecure = true; } @@ -158,8 +167,8 @@ namespace EonaCat.Connections client.AesEncryption.GenerateKey(); client.AesEncryption.GenerateIV(); client.IsEncrypted = true; - - // Securely send raw AES key + IV + salt + + // Securely send raw AES key + IV + salt + password await AesKeyExchange.SendAesKeyAsync(stream, client.AesEncryption, _config.AesPassword); } catch (Exception ex) @@ -197,27 +206,6 @@ namespace EonaCat.Connections } } - private bool RemoteCertificateValidationCallback(object sender, X509Certificate? certificate, X509Chain? chain, SslPolicyErrors sslPolicyErrors) - { - if (_config.IsSelfSignedEnabled) - { - return true; // Accept self-signed certificates - } - - if (sslPolicyErrors == SslPolicyErrors.None) - { - return true; // Certificate is valid - } - - // Log or handle the SSL error as needed - OnSslError?.Invoke(this, new ErrorEventArgs - { - Exception = new AuthenticationException("SSL certificate validation failed"), - Message = $"SSL Policy Errors: {sslPolicyErrors}" - }); - return false; // Reject the certificate - } - private async Task HandleUdpDataAsync(UdpReceiveResult result) { var clientKey = result.RemoteEndPoint.ToString(); @@ -257,10 +245,15 @@ namespace EonaCat.Connections { // Read 4-byte length first int read = await ReadExactAsync(client.Stream, lengthBuffer, 4, client.CancellationToken.Token); - if (read == 0) break; + if (read == 0) + { + break; + } if (BitConverter.IsLittleEndian) + { Array.Reverse(lengthBuffer); + } int length = BitConverter.ToInt32(lengthBuffer, 0); @@ -276,7 +269,10 @@ namespace EonaCat.Connections // Non-encrypted: just read raw bytes data = new byte[_config.BufferSize]; int bytesRead = await client.Stream.ReadAsync(data, 0, data.Length, client.CancellationToken.Token); - if (bytesRead == 0) break; + if (bytesRead == 0) + { + break; + } if (bytesRead < data.Length) { @@ -307,7 +303,11 @@ namespace EonaCat.Connections while (offset < length) { int read = await stream.ReadAsync(buffer, offset, length - offset, ct); - if (read == 0) return 0; // disconnected + if (read == 0) + { + return 0; // disconnected + } + offset += read; } return offset; @@ -333,27 +333,59 @@ namespace EonaCat.Connections { stringData = Encoding.UTF8.GetString(data); if (Encoding.UTF8.GetBytes(stringData).Length == data.Length) + { isBinary = false; + } } catch { } // Handle special commands - if (!isBinary && stringData.StartsWith("NICKNAME:")) + if (!isBinary && stringData != null) { - var nickname = stringData.Substring(9); - client.Nickname = nickname; - OnConnectedWithNickname?.Invoke(this, new NicknameConnectionEventArgs + if (stringData.StartsWith("NICKNAME:")) { - ClientId = client.Id, - RemoteEndPoint = client.RemoteEndPoint, - Nickname = nickname - }); - return; + var nickname = stringData.Substring(9); + client.Nickname = nickname; + OnConnectedWithNickname?.Invoke(this, new NicknameConnectionEventArgs + { + ClientId = client.Id, + RemoteEndPoint = client.RemoteEndPoint, + Nickname = nickname + }); + return; + } + else if (stringData.StartsWith("[NICKNAME]", StringComparison.OrdinalIgnoreCase)) + { + var nickname = StringHelper.GetTextBetweenTags(stringData, "[NICKNAME]", "[/NICKNAME]"); + if (string.IsNullOrWhiteSpace(nickname)) + { + nickname = client.Id; // fallback to client ID if no valid nickname was provided + } + else + { + client.Nickname = nickname; + } + OnConnectedWithNickname?.Invoke(this, new NicknameConnectionEventArgs + { + ClientId = client.Id, + RemoteEndPoint = client.RemoteEndPoint, + Nickname = nickname + }); + return; + } + else if (stringData.Equals("DISCONNECT", StringComparison.OrdinalIgnoreCase)) + { + await DisconnectClientAsync(client.Id); + return; + } } + client.LastActive = DateTime.UtcNow; OnDataReceived?.Invoke(this, new DataReceivedEventArgs { ClientId = client.Id, + Nickname = client.Nickname, + RemoteEndPoint = client.RemoteEndPoint, Data = data, StringData = stringData, IsBinary = isBinary @@ -362,18 +394,55 @@ namespace EonaCat.Connections catch (Exception ex) { if (client.IsEncrypted) + { OnEncryptionError?.Invoke(this, new ErrorEventArgs { ClientId = client.Id, Exception = ex, Message = "Error processing data" }); + } else + { OnGeneralError?.Invoke(this, new ErrorEventArgs { ClientId = client.Id, Exception = ex, Message = "Error processing data" }); + } } } public async Task SendToClientAsync(string clientId, byte[] data) { - if (_clients.TryGetValue(clientId, out var client)) + // Check if clientId is a guid + if (Guid.TryParse(clientId, out _)) { - await SendDataAsync(client, data); + if (_clients.TryGetValue(clientId, out var client)) + { + await SendDataAsync(client, data); + return; + } + } + + // Check if clientId is an IP:Port format + string[] parts = clientId.Split(':'); + if (parts.Length == 2) + { + if (IPAddress.TryParse(parts[0], out IPAddress ip) && int.TryParse(parts[1], out int port)) + { + IPEndPoint endPoint = new IPEndPoint(ip, port); + string clientKey = endPoint.ToString(); + + if (_clients.TryGetValue(clientKey, out var client)) + { + // If inside async method, you can use await + await SendDataAsync(client, data); + return; + } + } + } + + // Check if the client is a nickname + foreach (var kvp in _clients) + { + if (kvp.Value.Nickname != null && kvp.Value.Nickname.Equals(clientId, StringComparison.OrdinalIgnoreCase)) + { + await SendDataAsync(kvp.Value, data); + return; + } } } @@ -409,7 +478,9 @@ namespace EonaCat.Connections // Prepend length for safe framing var lengthPrefix = BitConverter.GetBytes(data.Length); if (BitConverter.IsLittleEndian) + { Array.Reverse(lengthPrefix); + } var framed = new byte[lengthPrefix.Length + data.Length]; Buffer.BlockCopy(lengthPrefix, 0, framed, 0, lengthPrefix.Length); @@ -438,9 +509,13 @@ namespace EonaCat.Connections catch (Exception ex) { if (client.IsEncrypted) + { OnEncryptionError?.Invoke(this, new ErrorEventArgs { ClientId = client.Id, Exception = ex, Message = "Error encrypting/sending data" }); + } else + { OnGeneralError?.Invoke(this, new ErrorEventArgs { ClientId = client.Id, Exception = ex, Message = "Error sending data" }); + } } } diff --git a/EonaCat.Connections/Processors/JsonDataProcessor.cs b/EonaCat.Connections/Processors/JsonDataProcessor.cs new file mode 100644 index 0000000..dab84cc --- /dev/null +++ b/EonaCat.Connections/Processors/JsonDataProcessor.cs @@ -0,0 +1,357 @@ +using EonaCat.Json; +using EonaCat.Json.Linq; +using System.Collections.Concurrent; +using System.Text; +using System.Timers; +using Timer = System.Timers.Timer; + +namespace EonaCat.Connections.Processors +{ + public class JsonDataProcessor : IDisposable + { + public int MaxAllowedBufferSize = 20 * 1024 * 1024; + public int MaxMessagesPerBatch = 200; + private readonly ConcurrentDictionary _buffers = new(); + private readonly Timer _cleanupTimer; + private readonly TimeSpan _clientBufferTimeout = TimeSpan.FromMinutes(5); + private bool _isDisposed; + + /// + /// This clientName will be used for the buffer (if not set in the DataReceivedEventArgs). + /// + public string ClientName { get; set; } = Guid.NewGuid().ToString(); + + private class BufferEntry + { + public readonly StringBuilder Buffer = new(); + public DateTime LastUsed = DateTime.UtcNow; + public readonly object SyncRoot = new(); + } + + public Action? ProcessMessage; + public Action? ProcessTextMessage; + + public event EventHandler? OnMessageError; + public event EventHandler? OnError; + + public JsonDataProcessor() + { + _cleanupTimer = new Timer(_clientBufferTimeout.TotalMilliseconds / 5); + _cleanupTimer.Elapsed += CleanupInactiveClients; + _cleanupTimer.AutoReset = true; + _cleanupTimer.Start(); + } + + public void Process(DataReceivedEventArgs e) + { + if (_isDisposed) + { + throw new ObjectDisposedException(nameof(JsonDataProcessor)); + } + + if (e.IsBinary) + { + e.StringData = Encoding.UTF8.GetString(e.Data); + } + + if (string.IsNullOrEmpty(e.StringData)) + { + OnError?.Invoke(this, new Exception("Received empty data.")); + return; + } + + string clientName = !string.IsNullOrWhiteSpace(e.Nickname) ? e.Nickname : ClientName; + string incomingText = e.StringData.Trim(); + if (incomingText.Length == 0) + { + return; + } + + var bufferEntry = _buffers.GetOrAdd(clientName, _ => new BufferEntry()); + List? jsonChunksToProcess = null; + string? textMessageToProcess = null; + + lock (bufferEntry.SyncRoot) + { + // Prevent growth before appending + if (bufferEntry.Buffer.Length > MaxAllowedBufferSize) + { + bufferEntry.Buffer.Clear(); + } + + bufferEntry.Buffer.Append(incomingText); + bufferEntry.LastUsed = DateTime.UtcNow; + + int processedCount = 0; + + while (processedCount < MaxMessagesPerBatch && ExtractNextJson(bufferEntry.Buffer, out var jsonChunk)) + { + ProcessDataReceived(jsonChunk, clientName); + processedCount++; + } + + if (bufferEntry.Buffer.Length > 0 && !ContainsJsonStructure(bufferEntry.Buffer)) + { + var leftover = bufferEntry.Buffer.ToString(); + bufferEntry.Buffer.Clear(); + ProcessTextMessage?.Invoke(leftover, clientName); + } + } + + if (textMessageToProcess != null) + { + ProcessTextMessage?.Invoke(textMessageToProcess, clientName); + } + + if (jsonChunksToProcess != null) + { + foreach (var jsonChunk in jsonChunksToProcess) + { + ProcessDataReceived(jsonChunk, clientName); + } + } + } + + private void ProcessDataReceived(string? data, string clientName) + { + if (_isDisposed) + { + throw new ObjectDisposedException(nameof(JsonDataProcessor)); + } + + if (data == null) + { + return; + } + + if (string.IsNullOrEmpty(clientName)) + { + clientName = ClientName; + } + + if (string.IsNullOrWhiteSpace(data) || data.Length == 0) + { + return; + } + + if (string.IsNullOrWhiteSpace(data)) + { + return; + } + + bool looksLikeJson = data.Length > 1 && + ((data[0] == '{' && data[data.Length - 1] == '}') || (data[0] == '[' && data[data.Length - 1] == ']')); + + if (!looksLikeJson) + { + ProcessTextMessage?.Invoke(data, clientName); + return; + } + + try + { + if (data.Contains("Exception") || data.Contains("Error")) + { + try + { + var jsonObject = JObject.Parse(data); + var exceptionToken = jsonObject.SelectToken("Exception"); + if (exceptionToken is { Type: not JTokenType.Null }) + { + var exception = JsonHelper.ExtractException(data); + if (exception != null) + { + var currentException = new Exception(exception.Message); + OnMessageError?.Invoke(this, currentException); + } + } + } + catch (Exception) + { + // Do nothing + } + } + + var messages = JsonHelper.ToObjects(data); + if (messages != null) + { + foreach (var message in messages) + { + ProcessMessage?.Invoke(message, clientName, data); + } + } + } + catch (Exception ex) + { + OnError?.Invoke(this, new Exception("Failed to process JSON message.", ex)); + } + } + + + private static bool ExtractNextJson(StringBuilder buffer, out string? json) + { + json = null; + if (buffer.Length == 0) + { + return false; + } + + int depth = 0; + bool inString = false; + bool escape = false; + int startIndex = -1; + + for (int i = 0; i < buffer.Length; i++) + { + char currentCharacter = buffer[i]; + + if (inString) + { + if (escape) + { + escape = false; + } + else if (currentCharacter == '\\') + { + escape = true; + } + else if (currentCharacter == '"') + { + inString = false; + } + } + else + { + if (currentCharacter == '"') + { + inString = true; + if (depth == 0 && startIndex == -1) + { + startIndex = i; // string-only JSON + } + } + else if (currentCharacter == '{' || currentCharacter == '[') + { + if (depth == 0) + { + startIndex = i; + } + + depth++; + } + else if (currentCharacter == '}' || currentCharacter == ']') + { + depth--; + if (depth == 0 && startIndex != -1) + { + json = buffer.ToString(startIndex, i - startIndex + 1); + buffer.Remove(0, i + 1); + return true; + } + } + else if (depth == 0 && startIndex == -1 && + (char.IsDigit(currentCharacter) || currentCharacter == '-' || currentCharacter == 't' || currentCharacter == 'f' || currentCharacter == 'n')) + { + startIndex = i; + + // Find token end + int tokenEnd = FindPrimitiveEnd(buffer, i); + json = buffer.ToString(startIndex, tokenEnd - startIndex); + buffer.Remove(0, tokenEnd); + return true; + } + } + } + return false; + } + + private static int FindPrimitiveEnd(StringBuilder buffer, int startIndex) + { + for (int i = startIndex; i < buffer.Length; i++) + { + char c = buffer[i]; + if (char.IsWhiteSpace(c) || c == ',' || c == ']' || c == '}') + { + return i; + } + } + return buffer.Length; + } + + private static bool ContainsJsonStructure(StringBuilder buffer) + { + for (int i = 0; i < buffer.Length; i++) + { + char c = buffer[i]; + if (c == '{' || c == '[' || c == '"' || + c == 't' || c == 'f' || c == 'n' || + c == '-' || char.IsDigit(c)) + { + return true; + } + } + return false; + } + + private void CleanupInactiveClients(object? sender, ElapsedEventArgs e) + { + var now = DateTime.UtcNow; + + foreach (var kvp in _buffers) + { + var bufferEntry = kvp.Value; + if (now - bufferEntry.LastUsed > _clientBufferTimeout && _buffers.TryRemove(kvp.Key, out var removed)) + { + lock (removed.SyncRoot) + { + removed.Buffer.Clear(); + } + } + } + } + + public void RemoveClient(string clientName) + { + if (string.IsNullOrWhiteSpace(clientName)) + { + return; + } + + if (_buffers.TryRemove(clientName, out var removed)) + { + lock (removed.SyncRoot) + { + removed.Buffer.Clear(); + } + } + } + + public void Dispose() + { + if (_isDisposed) + { + return; + } + + _isDisposed = true; + + _cleanupTimer.Stop(); + _cleanupTimer.Elapsed -= CleanupInactiveClients; + _cleanupTimer.Dispose(); + + foreach (var bufferEntry in _buffers.Values) + { + lock (bufferEntry.SyncRoot) + { + bufferEntry.Buffer.Clear(); + } + } + _buffers.Clear(); + + ProcessMessage = null; + ProcessTextMessage = null; + OnMessageError = null; + OnError = null; + } + } +}