From abc50d0c11a9d4018be9306515549ac880c225d7 Mon Sep 17 00:00:00 2001 From: EonaCat Date: Wed, 21 Jan 2026 22:13:31 +0100 Subject: [PATCH] Updated --- EonaCat.Connections.Client/JsonTest.cs | 69 ++++ EonaCat.Connections.Client/Program.cs | 28 +- EonaCat.Connections.Server/Program.cs | 2 +- .../Processors/JsonDataProcessor.cs | 329 ++++++++++++------ .../Processors/JsonDataProcessorHelper.cs | 185 ---------- 5 files changed, 314 insertions(+), 299 deletions(-) create mode 100644 EonaCat.Connections.Client/JsonTest.cs delete mode 100644 EonaCat.Connections/Processors/JsonDataProcessorHelper.cs diff --git a/EonaCat.Connections.Client/JsonTest.cs b/EonaCat.Connections.Client/JsonTest.cs new file mode 100644 index 0000000..77f0a2e --- /dev/null +++ b/EonaCat.Connections.Client/JsonTest.cs @@ -0,0 +1,69 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace EonaCat.Connections.Client +{ + // Root myDeserializedClass = JsonConvert.DeserializeObject>(myJsonResponse); + public class Contact + { + public string email { get; set; } + public string phone { get; set; } + } + + public class Department + { + public string id { get; set; } + public string name { get; set; } + public Manager manager { get; set; } + } + + public class Details + { + public int hoursSpent { get; set; } + public List technologiesUsed { get; set; } + public string completionDate { get; set; } + public string expectedCompletion { get; set; } + } + + public class Employee + { + public string id { get; set; } + public string name { get; set; } + public string position { get; set; } + public Department department { get; set; } + public List projects { get; set; } + } + + public class Manager + { + public string id { get; set; } + public string name { get; set; } + public Contact contact { get; set; } + } + + public class Project + { + public string projectId { get; set; } + public string projectName { get; set; } + public string startDate { get; set; } + public List tasks { get; set; } + } + + public class Root + { + public Employee employee { get; set; } + } + + public class Task + { + public string taskId { get; set; } + public string title { get; set; } + public string status { get; set; } + public Details details { get; set; } + } + + +} diff --git a/EonaCat.Connections.Client/Program.cs b/EonaCat.Connections.Client/Program.cs index 8a2a77f..2e3a07f 100644 --- a/EonaCat.Connections.Client/Program.cs +++ b/EonaCat.Connections.Client/Program.cs @@ -20,7 +20,7 @@ namespace EonaCat.Connections.Client.Example //public static string SERVER_IP = "10.40.11.22"; public static string SERVER_IP = "127.0.0.1"; - private static Dictionary> _clientsProcessors = new Dictionary>(); + private static Dictionary>> _clientsProcessors = new Dictionary>>(); private static bool testDataProcessor; public static bool WaitForMessage { get; private set; } @@ -28,9 +28,9 @@ namespace EonaCat.Connections.Client.Example public static bool ToConsoleOnly { get; private set; } = true; public static bool UseJson { get; private set; } = true; public static bool TESTBYTES { get; private set; } - public static bool UseJsonProcessorTest { get; private set; } = false; + public static bool UseJsonProcessorTest { get; private set; } = true; - public static async Task Main(string[] args) + public static async System.Threading.Tasks.Task Main(string[] args) { for (long i = 0; i < clientCount; i++) { @@ -40,7 +40,7 @@ namespace EonaCat.Connections.Client.Example if (testDataProcessor) { - _clientsProcessors[client] = new JsonDataProcessor(); + _clientsProcessors[client] = new JsonDataProcessor>(); } else { @@ -67,7 +67,7 @@ namespace EonaCat.Connections.Client.Example break; } - var jsonUrl = "https://samples.json-format.com/employees/json/employees_500KB.json"; + var jsonUrl = "https://sample.json-format.com/api/download?url=https%3A%2F%2Ffiles.jsons.live%2Femployees%2F5-level%2F20-MB%2Fformatted.json&name=20%20MB%205%20Level%20Formatted.json"; try { @@ -98,14 +98,16 @@ namespace EonaCat.Connections.Client.Example { WriteToLog($"Processed JSON message from {e.ClientName} ({e.ClientEndpoint}): {e.RawData}"); }; - processor.MaxAllowedBufferSize = 10 * 1024 * 1024; // 10 MB + processor.MaxAllowedBufferSize = 50 * 1024 * 1024; // 10 MB processor.MaxMessagesPerBatch = 5; var json = _jsonContent; while (true) { + json = "TEST1" + _jsonContent + "TEST2"; + processor.Process(json, "TestClient"); - await Task.Delay(100).ConfigureAwait(false); + await System.Threading.Tasks.Task.Delay(100).ConfigureAwait(false); } } } @@ -131,11 +133,11 @@ namespace EonaCat.Connections.Client.Example } } - await Task.Delay(1000).ConfigureAwait(false); + await System.Threading.Tasks.Task.Delay(1000).ConfigureAwait(false); } } - private static async Task StartClientAsync(string clientName, NetworkClient client) + private static async System.Threading.Tasks.Task StartClientAsync(string clientName, NetworkClient client) { await client.ConnectAsync().ConfigureAwait(false); @@ -157,7 +159,7 @@ namespace EonaCat.Connections.Client.Example UseAesEncryption = false, EnableHeartbeat = IsHeartBeatEnabled, AesPassword = "EonaCat.Connections.Password", - Certificate = new System.Security.Cryptography.X509Certificates.X509Certificate2("client.pfx", "p@ss"), + //Certificate = new System.Security.Cryptography.X509Certificates.X509Certificate2("client.pfx", "p@ss"), }; var client = new NetworkClient(config); @@ -172,8 +174,8 @@ namespace EonaCat.Connections.Client.Example if (UseProcessor) { - _clientsProcessors[client] = new JsonDataProcessor(); - _clientsProcessors[client].OnError += (sender, e) => + _clientsProcessors[client] = new JsonDataProcessor>(); + _clientsProcessors[client].OnMessageError += (sender, e) => { Console.WriteLine($"Processor error: {e.Message}"); }; @@ -198,7 +200,7 @@ namespace EonaCat.Connections.Client.Example { if (UseProcessor) { - _clientsProcessors[client].Process(e, currentClientName: e.Nickname); + _clientsProcessors[client].Process(e.StringData, clientName: e.Nickname); return; } else diff --git a/EonaCat.Connections.Server/Program.cs b/EonaCat.Connections.Server/Program.cs index 7788ced..13abc54 100644 --- a/EonaCat.Connections.Server/Program.cs +++ b/EonaCat.Connections.Server/Program.cs @@ -56,7 +56,7 @@ namespace EonaCat.Connections.Server.Example UseAesEncryption = false, MaxConnections = 100000, AesPassword = "EonaCat.Connections.Password", - Certificate = new System.Security.Cryptography.X509Certificates.X509Certificate2("server.pfx", "p@ss"), + //Certificate = new System.Security.Cryptography.X509Certificates.X509Certificate2("server.pfx", "p@ss"), EnableHeartbeat = IsHeartBeatEnabled }; diff --git a/EonaCat.Connections/Processors/JsonDataProcessor.cs b/EonaCat.Connections/Processors/JsonDataProcessor.cs index d470436..afc8bae 100644 --- a/EonaCat.Connections/Processors/JsonDataProcessor.cs +++ b/EonaCat.Connections/Processors/JsonDataProcessor.cs @@ -1,162 +1,281 @@ -using EonaCat.Connections.Models; -using EonaCat.Json; -using System; -using System.Collections.Concurrent; +using EonaCat.Json; using System.Text; -using System.Timers; -using Timer = System.Timers.Timer; namespace EonaCat.Connections.Processors { + public class ProcessedJsonMessage + { + public TData Data { get; set; } = default!; + public string RawData { get; set; } = string.Empty; + public string ClientName { get; set; } = string.Empty; + public string ClientEndpoint { get; set; } = string.Empty; + } + + public class ProcessedTextMessage + { + public string Text { get; set; } = string.Empty; + public string ClientName { get; set; } = string.Empty; + } + public sealed class JsonDataProcessor : IDisposable { - public int MaxAllowedBufferSize = 20 * 1024 * 1024; - public int MaxMessagesPerBatch = 200; - - private readonly ConcurrentDictionary _buffers = new(); - private readonly Timer _cleanupTimer; - private readonly TimeSpan _clientBufferTimeout = TimeSpan.FromMinutes(5); + private readonly StringBuilder _buffer = new StringBuilder(); + private readonly object _syncLock = new object(); private bool _isDisposed; + public int MaxAllowedBufferSize { get; set; } = 30 * 1024 * 1024; // 30 MB + public int MaxMessagesPerBatch { get; set; } = 200; public string ClientName { get; } - private sealed class BufferEntry - { - public readonly StringBuilder Buffer = new(); - public DateTime LastUsed = DateTime.UtcNow; - public readonly object SyncRoot = new(); + // Diagnostics + public long TotalBytesProcessed { get; private set; } + public long TotalChunksReceived { get; private set; } - public void Clear(bool shrink = false) - { - Buffer.Clear(); - if (shrink && Buffer.Capacity > 1024) - { - Buffer.Capacity = 1024; - } - } - } - - public event EventHandler>? OnProcessMessage; + // Events + public event EventHandler>? OnProcessMessage; public event EventHandler? OnProcessTextMessage; public event EventHandler? OnMessageError; - public event EventHandler? OnError; public JsonDataProcessor() { ClientName = Guid.NewGuid().ToString(); - _cleanupTimer = new Timer(Math.Max(5000, _clientBufferTimeout.TotalMilliseconds / 5)) - { - AutoReset = true - }; - _cleanupTimer.Elapsed += CleanupInactiveClients; - _cleanupTimer.Start(); } - public void Process(string data, string? clientName = null) + public void Process(string jsonString, string? clientName = null, string? endpoint = null) { ThrowIfDisposed(); - if (string.IsNullOrWhiteSpace(data)) + if (string.IsNullOrEmpty(jsonString)) { return; } - string client = clientName ?? ClientName; - ProcessInternal(data, client); + TotalChunksReceived++; + TotalBytesProcessed += Encoding.UTF8.GetByteCount(jsonString); + + ProcessInternal(jsonString, clientName, endpoint); } - private void ProcessInternal(string incomingData, string clientName) + public void Process(DataReceivedEventArgs e, string? clientName = null) { - var bufferEntry = _buffers.GetOrAdd(clientName, _ => new BufferEntry()); - - lock (bufferEntry.SyncRoot) + ThrowIfDisposed(); + if (e == null) { - bufferEntry.Buffer.Append(incomingData); - bufferEntry.LastUsed = DateTime.UtcNow; + return; + } - int processedCount = 0; - while (processedCount < MaxMessagesPerBatch) + string dataString; + if (e.IsBinary) + { + if (e.Data == null || e.Data.Length == 0) { - if (!JsonDataProcessorHelper.TryExtractFullJson(bufferEntry.Buffer, - out var fullJson, out var nonJson)) - { - // partial JSON - break; - } + return; + } - if (!nonJson.IsEmpty) + dataString = Encoding.UTF8.GetString(e.Data); + } + else + { + dataString = e.StringData; + } + + if (string.IsNullOrWhiteSpace(dataString)) + { + return; + } + + string client = e.Nickname ?? clientName ?? ClientName; + + TotalChunksReceived++; + TotalBytesProcessed += Encoding.UTF8.GetByteCount(dataString); + + ProcessInternal(dataString, client, e.RemoteEndPoint?.ToString()); + } + + private void ProcessInternal(string jsonString, string? clientName, string? endpoint) + { + string client = clientName ?? ClientName; + + lock (_syncLock) + { + _buffer.Append(jsonString); + int processedCount = 0; + + while (processedCount < MaxMessagesPerBatch && + TryExtractFullJson(out int fullJsonStart, out int fullJsonLength, + out int textStart, out int textLength)) + { + // --- Process leading text --- + if (textLength > 0) { + string text = _buffer.ToString(textStart, textLength); OnProcessTextMessage?.Invoke(this, new ProcessedTextMessage { - Text = nonJson.ToString(), - ClientName = clientName, - ClientEndpoint = null + Text = text, + ClientName = client }); + + // Remove processed text immediately + _buffer.Remove(textStart, textLength); + processedCount++; + continue; } - if (!fullJson.IsEmpty) + // --- Process JSON --- + if (fullJsonLength > 0) { - // We got a full JSON message - var jsonStr = fullJson.ToString(); - ProcessDataReceived(jsonStr, clientName, null); + string json = _buffer.ToString(fullJsonStart, fullJsonLength); + + try + { + var deserialized = JsonHelper.ToObject(json); + OnProcessMessage?.Invoke(this, new ProcessedJsonMessage + { + Data = deserialized, + RawData = json, + ClientName = client, + ClientEndpoint = endpoint ?? string.Empty + }); + } + catch (Exception ex) + { + OnMessageError?.Invoke(this, new Exception( + $"Failed to deserialize JSON for client {client}", ex)); + } + + // Remove processed JSON immediately + _buffer.Remove(fullJsonStart, fullJsonLength); processedCount++; } } - // Prevent buffer overflow - if (bufferEntry.Buffer.Length > MaxAllowedBufferSize) + // --- Prevent buffer overflow --- + if (_buffer.Length > MaxAllowedBufferSize) { - OnError?.Invoke(this, new Exception($"Buffer overflow for client {clientName}.")); - bufferEntry.Clear(shrink: true); + OnMessageError?.Invoke(this, new Exception( + $"Buffer overflow for client {client}. Current size: {_buffer.Length / 1024.0 / 1024.0:F2} MB. " + + $"Max allowed: {MaxAllowedBufferSize / 1024.0 / 1024.0:F2} MB. Clearing buffer.")); + _buffer.Clear(); } } } - private void ProcessDataReceived(string json, string clientName, string? clientEndpoint) + private bool TryExtractFullJson( + out int fullJsonStart, + out int fullJsonLength, + out int textStart, + out int textLength) { - try + fullJsonStart = fullJsonLength = textStart = textLength = 0; + + if (_buffer.Length == 0) { - var messages = JsonHelper.ToObjects(json); - if (messages != null) + return false; + } + + int pos = 0; + + // Skip leading whitespace + while (pos < _buffer.Length && char.IsWhiteSpace(_buffer[pos])) + { + pos++; + } + + if (pos >= _buffer.Length) + { + return false; + } + + // --- Leading text before JSON --- + if (_buffer[pos] != '{' && _buffer[pos] != '[') + { + textStart = pos; + while (pos < _buffer.Length && _buffer[pos] != '{' && _buffer[pos] != '[') { - foreach (var msg in messages) - { - OnProcessMessage?.Invoke(this, new ProcessedMessage - { - Data = msg, - RawData = json, - ClientName = clientName, - ClientEndpoint = clientEndpoint - }); - } + pos++; } + + textLength = pos - textStart; + return true; } - catch (Exception ex) + + // --- JSON token --- + fullJsonStart = pos; + fullJsonLength = FindJsonTokenEnd(_buffer, pos) - pos; + + if (fullJsonLength <= 0) { - OnError?.Invoke(this, new Exception($"Failed to process JSON for {clientName}.", ex)); + return false; // partial JSON } + + return true; } - private void CleanupInactiveClients(object? sender, ElapsedEventArgs e) + private int FindJsonTokenEnd(StringBuilder buffer, int start) { - if (_isDisposed) + if (start >= buffer.Length) { - return; + return 0; } - DateTime now = DateTime.UtcNow; - foreach (var kvp in _buffers) + int i = start; + char firstChar = buffer[start]; + + if (firstChar == '{' || firstChar == '[') { - if (now - kvp.Value.LastUsed > _clientBufferTimeout) + char open = firstChar; + char close = firstChar == '{' ? '}' : ']'; + int depth = 1; + bool inString = false; + bool escape = false; + + i++; + + while (i < buffer.Length) { - if (_buffers.TryRemove(kvp.Key, out var removed)) + char c = buffer[i]; + + if (inString) { - lock (removed.SyncRoot) + if (escape) { - removed.Clear(shrink: true); + escape = false; + } + else if (c == '\\') + { + escape = true; + } + else if (c == '"') + { + inString = false; } } + else + { + if (c == '"') + { + inString = true; + } + else if (c == open) + { + depth++; + } + else if (c == close) + { + depth--; + if (depth == 0) + { + return i + 1; + } + } + } + + i++; } + + return 0; // partial JSON } + + return 0; // only objects/arrays supported as JSON start } private void ThrowIfDisposed() @@ -167,6 +286,23 @@ namespace EonaCat.Connections.Processors } } + public void ClearBuffer() + { + lock (_syncLock) + { + _buffer.Clear(); + } + } + + public void ResetStatistics() + { + lock (_syncLock) + { + TotalBytesProcessed = 0; + TotalChunksReceived = 0; + } + } + public void Dispose() { if (_isDisposed) @@ -176,22 +312,15 @@ namespace EonaCat.Connections.Processors _isDisposed = true; - _cleanupTimer.Stop(); - _cleanupTimer.Dispose(); - - foreach (var entry in _buffers.Values) + lock (_syncLock) { - lock (entry.SyncRoot) - { - entry.Clear(shrink: true); - } + _buffer.Clear(); } - _buffers.Clear(); + // Nullify events to release subscriber references OnProcessMessage = null; OnProcessTextMessage = null; OnMessageError = null; - OnError = null; } } } diff --git a/EonaCat.Connections/Processors/JsonDataProcessorHelper.cs b/EonaCat.Connections/Processors/JsonDataProcessorHelper.cs deleted file mode 100644 index fccb5ff..0000000 --- a/EonaCat.Connections/Processors/JsonDataProcessorHelper.cs +++ /dev/null @@ -1,185 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace EonaCat.Connections.Processors -{ - internal static class JsonDataProcessorHelper - { - internal static bool TryExtractFullJson( - StringBuilder buffer, - out ReadOnlyMemory fullJson, - out ReadOnlyMemory nonJson) - { - fullJson = ReadOnlyMemory.Empty; - nonJson = ReadOnlyMemory.Empty; - - if (buffer.Length == 0) - { - return false; - } - - int start = 0; - - // Skip leading whitespace - while (start < buffer.Length && char.IsWhiteSpace(buffer[start])) - { - start++; - } - - if (start >= buffer.Length) - { - return false; - } - - // Detect non-JSON text before JSON - if (!IsJsonStart(buffer[start]) && !IsLiteralStart(buffer, start)) - { - int nonJsonStart = start; - while (start < buffer.Length && - !IsJsonStart(buffer[start]) && - !IsLiteralStart(buffer, start)) - { - start++; - } - - if (start > nonJsonStart) - { - nonJson = buffer.ToString(nonJsonStart, start - nonJsonStart).AsMemory(); - } - } - - // Find full JSON token - int tokenEnd = FindJsonTokenEnd(buffer, start); - if (tokenEnd == 0) - { - // Partial JSON, leave buffer intact - return false; - } - - fullJson = buffer.ToString(start, tokenEnd - start).AsMemory(); - - // Remove processed characters from buffer - buffer.Remove(0, tokenEnd); - - return true; - } - - private static bool IsJsonStart(char c) => c == '{' || c == '[' || c == '"' || c == '-' || (c >= '0' && c <= '9'); - - private static bool IsLiteralStart(StringBuilder buffer, int pos) - { - if (buffer.Length - pos >= 4) - { - var val = buffer.ToString(pos, 4); - if (val == "true" || val == "null") - { - return true; - } - } - if (buffer.Length - pos >= 5) - { - if (buffer.ToString(pos, 5) == "false") - { - return true; - } - } - return false; - } - - private static int FindJsonTokenEnd(StringBuilder buffer, int start) - { - int depth = 0; - bool inString = false; - bool escape = false; - int i = start; - - char firstChar = buffer[start]; - if (firstChar == '{' || firstChar == '[') - { - depth = 1; - i++; - while (i < buffer.Length) - { - char c = buffer[i]; - - if (inString) - { - if (escape) - { - escape = false; - } - else if (c == '\\') - { - escape = true; - } - else if (c == '"') - { - inString = false; - } - } - else - { - if (c == '"') - { - inString = true; - } - else if (c == '{' || c == '[') - { - depth++; - } - else if (c == '}' || c == ']') - { - depth--; - } - - if (depth == 0) - { - return i + 1; - } - } - i++; - } - - // incomplete JSON - return 0; - } - else if (firstChar == '"') - { - i = start + 1; - while (i < buffer.Length) - { - char c = buffer[i]; - if (escape) - { - escape = false; - } - else if (c == '\\') - { - escape = true; - } - else if (c == '"') - { - return i + 1; - } - - i++; - } - - // incomplete string - return 0; - } - else - { - while (i < buffer.Length && - !char.IsWhiteSpace(buffer[i]) && - buffer[i] != ',' && buffer[i] != ']' && buffer[i] != '}') - { - i++; - } - - return i; - } - } - } -}