diff --git a/EonaCat.Connections/EonaCat.Connections.csproj b/EonaCat.Connections/EonaCat.Connections.csproj index 53256cb..07e20e3 100644 --- a/EonaCat.Connections/EonaCat.Connections.csproj +++ b/EonaCat.Connections/EonaCat.Connections.csproj @@ -11,7 +11,7 @@ EonaCat (Jeroen Saey) readme.md EonaCat.Connections - 1.0.9 + 1.1.0 EonaCat (Jeroen Saey) LICENSE EonaCat.png diff --git a/EonaCat.Connections/Processors/JsonDataProcessor.cs b/EonaCat.Connections/Processors/JsonDataProcessor.cs index 8df9b77..d470436 100644 --- a/EonaCat.Connections/Processors/JsonDataProcessor.cs +++ b/EonaCat.Connections/Processors/JsonDataProcessor.cs @@ -1,12 +1,10 @@ -// This file is part of the EonaCat project(s) which is released under the Apache License. -// See the LICENSE file or go to https://EonaCat.com/license for full license details. - +using EonaCat.Connections.Models; using EonaCat.Json; -using EonaCat.Json.Linq; -using EonaCat.Connections.Models; +using System; using System.Collections.Concurrent; using System.Text; using System.Timers; +using Timer = System.Timers.Timer; namespace EonaCat.Connections.Processors { @@ -14,10 +12,9 @@ namespace EonaCat.Connections.Processors { public int MaxAllowedBufferSize = 20 * 1024 * 1024; public int MaxMessagesPerBatch = 200; - private const int MaxCleanupRemovalsPerTick = 50; private readonly ConcurrentDictionary _buffers = new(); - private readonly System.Timers.Timer _cleanupTimer; + private readonly Timer _cleanupTimer; private readonly TimeSpan _clientBufferTimeout = TimeSpan.FromMinutes(5); private bool _isDisposed; @@ -40,18 +37,14 @@ namespace EonaCat.Connections.Processors } public event EventHandler>? OnProcessMessage; - public event EventHandler? OnProcessTextMessage; - public event EventHandler? OnMessageError; - public event EventHandler? OnError; public JsonDataProcessor() { ClientName = Guid.NewGuid().ToString(); - - _cleanupTimer = new System.Timers.Timer(Math.Max(5000, _clientBufferTimeout.TotalMilliseconds / 5)) + _cleanupTimer = new Timer(Math.Max(5000, _clientBufferTimeout.TotalMilliseconds / 5)) { AutoReset = true }; @@ -59,133 +52,7 @@ namespace EonaCat.Connections.Processors _cleanupTimer.Start(); } - public void Process(DataReceivedEventArgs e, string? currentClientName = null) - { - ThrowIfDisposed(); - if (e == null) - { - return; - } - - string endpoint = e.RemoteEndPoint?.ToString(); - string dataString = e.IsBinary ? Encoding.UTF8.GetString(e.Data ?? Array.Empty()) : e.StringData; - if (string.IsNullOrWhiteSpace(dataString)) - { - return; - } - - string client = e.Nickname ?? currentClientName ?? ClientName; - ProcessInternal(dataString.Trim(), client, endpoint); - } - - public void Process(string jsonString, string? currentClientName = null, string? endpoint = null) - { - ThrowIfDisposed(); - if (string.IsNullOrWhiteSpace(jsonString)) - { - return; - } - - string client = currentClientName ?? ClientName; - ProcessInternal(jsonString.Trim(), client, endpoint); - } - - private void ProcessInternal(string jsonString, string clientName, string? clientEndpoint) - { - var bufferEntry = _buffers.GetOrAdd(clientName, _ => new BufferEntry()); - var pendingJson = new List(); - var pendingText = new List(); - - lock (bufferEntry.SyncRoot) - { - // Check for buffer overflow - if (bufferEntry.Buffer.Length > MaxAllowedBufferSize) - { - OnError?.Invoke(this, new Exception($"Buffer overflow ({MaxAllowedBufferSize} bytes) for client {clientName} ({clientEndpoint}).")); - bufferEntry.Clear(shrink: true); - } - - bufferEntry.Buffer.Append(jsonString); - bufferEntry.LastUsed = DateTime.UtcNow; - - int processedCount = 0; - - while (processedCount < MaxMessagesPerBatch && - JsonDataProcessorHelper.TryExtractCompleteJson(bufferEntry.Buffer, out string[] json, out string[] nonJsonText)) - { - // No more messages - if ((json == null || json.Length == 0) && (nonJsonText == null || nonJsonText.Length == 0)) - { - break; - } - - if (json != null && json.Length > 0) - { - foreach (var jsonMessage in json) - { - pendingJson.Add(jsonMessage); - processedCount++; - if (processedCount >= MaxMessagesPerBatch) - { - break; - } - } - } - - if (nonJsonText != null && nonJsonText.Length > 0) - { - foreach (var textMessage in nonJsonText) - { - if (!string.IsNullOrWhiteSpace(textMessage)) - { - pendingText.Add(textMessage); - } - } - } - } - - // Cleanup buffer if needed - if (bufferEntry.Buffer.Capacity > MaxAllowedBufferSize / 2) - { - bufferEntry.Clear(shrink: true); - } - - if (bufferEntry.Buffer.Length > 0 && !ContainsJsonStructure(bufferEntry.Buffer)) - { - string leftover = bufferEntry.Buffer.ToString(); - bufferEntry.Clear(shrink: true); - if (!string.IsNullOrWhiteSpace(leftover)) - { - pendingText.Add(leftover); - } - } - } - - if (pendingText.Count > 0) - { - foreach (var textMessage in pendingText) - { - try - { - OnProcessTextMessage?.Invoke(this, new ProcessedTextMessage { Text = textMessage, ClientName = clientName, ClientEndpoint = clientEndpoint }); - } - catch (Exception exception) - { - OnError?.Invoke(this, new Exception($"ProcessTextMessage handler threw for client {clientName} ({clientEndpoint}).", exception)); - } - } - } - - if (pendingJson.Count > 0) - { - foreach (var jsonMessage in pendingJson) - { - ProcessDataReceived(jsonMessage, clientName, clientEndpoint); - } - } - } - - private void ProcessDataReceived(string data, string clientName, string? clientEndpoint) + public void Process(string data, string? clientName = null) { ThrowIfDisposed(); if (string.IsNullOrWhiteSpace(data)) @@ -193,87 +60,82 @@ namespace EonaCat.Connections.Processors return; } - bool looksLikeJson = data.Length > 1 && - ((data[0] == '{' && data[data.Length - 1] == '}') || - (data[0] == '[' && data[data.Length - 1] == ']')); + string client = clientName ?? ClientName; + ProcessInternal(data, client); + } - if (!looksLikeJson) + private void ProcessInternal(string incomingData, string clientName) + { + var bufferEntry = _buffers.GetOrAdd(clientName, _ => new BufferEntry()); + + lock (bufferEntry.SyncRoot) { - try - { - OnProcessTextMessage?.Invoke(this, new ProcessedTextMessage { Text = data, ClientName = clientName, ClientEndpoint = clientEndpoint }); - } - catch (Exception exception) - { - OnError?.Invoke(this, new Exception($"ProcessTextMessage handler threw for client {clientName} ({clientEndpoint}).", exception)); - } - return; - } + bufferEntry.Buffer.Append(incomingData); + bufferEntry.LastUsed = DateTime.UtcNow; + int processedCount = 0; + while (processedCount < MaxMessagesPerBatch) + { + if (!JsonDataProcessorHelper.TryExtractFullJson(bufferEntry.Buffer, + out var fullJson, out var nonJson)) + { + // partial JSON + break; + } + + if (!nonJson.IsEmpty) + { + OnProcessTextMessage?.Invoke(this, new ProcessedTextMessage + { + Text = nonJson.ToString(), + ClientName = clientName, + ClientEndpoint = null + }); + } + + if (!fullJson.IsEmpty) + { + // We got a full JSON message + var jsonStr = fullJson.ToString(); + ProcessDataReceived(jsonStr, clientName, null); + processedCount++; + } + } + + // Prevent buffer overflow + if (bufferEntry.Buffer.Length > MaxAllowedBufferSize) + { + OnError?.Invoke(this, new Exception($"Buffer overflow for client {clientName}.")); + bufferEntry.Clear(shrink: true); + } + } + } + + private void ProcessDataReceived(string json, string clientName, string? clientEndpoint) + { try { - if (data.Contains("Exception", StringComparison.OrdinalIgnoreCase) || - data.Contains("Error", StringComparison.OrdinalIgnoreCase)) - { - GetError(data); - } - - var messages = JsonHelper.ToObjects(data); + var messages = JsonHelper.ToObjects(json); if (messages != null) { - foreach (var message in messages) + foreach (var msg in messages) { - try + OnProcessMessage?.Invoke(this, new ProcessedMessage { - OnProcessMessage?.Invoke(this, new ProcessedMessage { Data = message, RawData = data, ClientName = clientName, ClientEndpoint = clientEndpoint }); - } - catch (Exception exception) - { - OnError?.Invoke(this, new Exception($"ProcessMessage handler threw for client {clientName} ({clientEndpoint}).", exception)); - } + Data = msg, + RawData = json, + ClientName = clientName, + ClientEndpoint = clientEndpoint + }); } } } - catch (Exception exception) + catch (Exception ex) { - OnError?.Invoke(this, new Exception($"Failed to process JSON message from {clientName} ({clientEndpoint}).", exception)); + OnError?.Invoke(this, new Exception($"Failed to process JSON for {clientName}.", ex)); } } - private void GetError(string data) - { - try - { - var jsonObject = JObject.Parse(data); - var exceptionToken = jsonObject.SelectToken("Exception"); - if (exceptionToken != null && exceptionToken.Type != JTokenType.Null) - { - var extracted = JsonHelper.ExtractException(data); - if (extracted != null) - { - OnMessageError?.Invoke(this, new Exception(extracted.Message)); - } - } - } - catch - { - // Do nothing - } - } - - private static bool ContainsJsonStructure(StringBuilder buffer) - { - for (int i = 0; i < buffer.Length; i++) - { - char c = buffer[i]; - if (c == '{' || c == '[') - { - return true; - } - } - return false; - } - private void CleanupInactiveClients(object? sender, ElapsedEventArgs e) { if (_isDisposed) @@ -282,47 +144,19 @@ namespace EonaCat.Connections.Processors } DateTime now = DateTime.UtcNow; - var keysToRemove = new List(capacity: 128); - foreach (var kvp in _buffers) { if (now - kvp.Value.LastUsed > _clientBufferTimeout) { - keysToRemove.Add(kvp.Key); - if (keysToRemove.Count >= MaxCleanupRemovalsPerTick) + if (_buffers.TryRemove(kvp.Key, out var removed)) { - break; + lock (removed.SyncRoot) + { + removed.Clear(shrink: true); + } } } } - - // Remove the selected keys - foreach (var key in keysToRemove) - { - if (_buffers.TryRemove(key, out var removed)) - { - lock (removed.SyncRoot) - { - removed.Clear(shrink: true); - } - } - } - } - - public void RemoveClient(string clientName) - { - if (string.IsNullOrWhiteSpace(clientName)) - { - return; - } - - if (_buffers.TryRemove(clientName, out var removed)) - { - lock (removed.SyncRoot) - { - removed.Clear(shrink: true); - } - } } private void ThrowIfDisposed() @@ -342,7 +176,6 @@ namespace EonaCat.Connections.Processors _isDisposed = true; - _cleanupTimer.Elapsed -= CleanupInactiveClients; _cleanupTimer.Stop(); _cleanupTimer.Dispose(); @@ -359,14 +192,6 @@ namespace EonaCat.Connections.Processors OnProcessTextMessage = null; OnMessageError = null; OnError = null; - - GC.SuppressFinalize(this); } } - - internal static class StringExtensions - { - internal static bool Contains(this string? source, string toCheck, StringComparison comp) => - source?.IndexOf(toCheck, comp) >= 0; - } -} \ No newline at end of file +} diff --git a/EonaCat.Connections/Processors/JsonDataProcessorHelper.cs b/EonaCat.Connections/Processors/JsonDataProcessorHelper.cs index 2649fa3..fccb5ff 100644 --- a/EonaCat.Connections/Processors/JsonDataProcessorHelper.cs +++ b/EonaCat.Connections/Processors/JsonDataProcessorHelper.cs @@ -1,73 +1,107 @@ -// This file is part of the EonaCat project(s) which is released under the Apache License. -// See the LICENSE file or go to https://EonaCat.com/license for full license details. - +using System; +using System.Collections.Generic; using System.Text; namespace EonaCat.Connections.Processors { internal static class JsonDataProcessorHelper { - public const int MAX_CAP = 1024; - private const int MAX_SEGMENTS_PER_CALL = 1024; - - internal static bool TryExtractCompleteJson(StringBuilder buffer, out string[] jsonArray, out string[] nonJsonText) + internal static bool TryExtractFullJson( + StringBuilder buffer, + out ReadOnlyMemory fullJson, + out ReadOnlyMemory nonJson) { - jsonArray = Array.Empty(); - nonJsonText = Array.Empty(); + fullJson = ReadOnlyMemory.Empty; + nonJson = ReadOnlyMemory.Empty; - if (buffer is null || buffer.Length == 0) + if (buffer.Length == 0) { return false; } - var jsonList = new List(capacity: 4); - var nonJsonList = new List(capacity: 2); + int start = 0; - int readPos = 0; - int segmentsProcessed = 0; - int bufferLen = buffer.Length; - - while (readPos < bufferLen && segmentsProcessed < MAX_SEGMENTS_PER_CALL) + // Skip leading whitespace + while (start < buffer.Length && char.IsWhiteSpace(buffer[start])) { - segmentsProcessed++; + start++; + } - // Skip non-JSON starting characters - if (buffer[readPos] != '{' && buffer[readPos] != '[') + if (start >= buffer.Length) + { + return false; + } + + // Detect non-JSON text before JSON + if (!IsJsonStart(buffer[start]) && !IsLiteralStart(buffer, start)) + { + int nonJsonStart = start; + while (start < buffer.Length && + !IsJsonStart(buffer[start]) && + !IsLiteralStart(buffer, start)) { - int start = readPos; - while (readPos < bufferLen && buffer[readPos] != '{' && buffer[readPos] != '[') - { - readPos++; - } - - int len = readPos - start; - if (len > 0) - { - var segment = buffer.ToString(start, len).Trim(); - if (segment.Length > 0) - { - nonJsonList.Add(segment); - } - continue; - } + start++; } - // Check if we have reached the end - if (readPos >= bufferLen) + if (start > nonJsonStart) { - break; + nonJson = buffer.ToString(nonJsonStart, start - nonJsonStart).AsMemory(); } + } - int pos = readPos; - int depth = 0; - bool inString = false; - bool escape = false; - int startIndex = pos; - bool complete = false; + // Find full JSON token + int tokenEnd = FindJsonTokenEnd(buffer, start); + if (tokenEnd == 0) + { + // Partial JSON, leave buffer intact + return false; + } - for (; pos < bufferLen; pos++) + fullJson = buffer.ToString(start, tokenEnd - start).AsMemory(); + + // Remove processed characters from buffer + buffer.Remove(0, tokenEnd); + + return true; + } + + private static bool IsJsonStart(char c) => c == '{' || c == '[' || c == '"' || c == '-' || (c >= '0' && c <= '9'); + + private static bool IsLiteralStart(StringBuilder buffer, int pos) + { + if (buffer.Length - pos >= 4) + { + var val = buffer.ToString(pos, 4); + if (val == "true" || val == "null") { - char c = buffer[pos]; + return true; + } + } + if (buffer.Length - pos >= 5) + { + if (buffer.ToString(pos, 5) == "false") + { + return true; + } + } + return false; + } + + private static int FindJsonTokenEnd(StringBuilder buffer, int start) + { + int depth = 0; + bool inString = false; + bool escape = false; + int i = start; + + char firstChar = buffer[start]; + if (firstChar == '{' || firstChar == '[') + { + depth = 1; + i++; + while (i < buffer.Length) + { + char c = buffer[i]; if (inString) { @@ -86,110 +120,36 @@ namespace EonaCat.Connections.Processors } else { - switch (c) + if (c == '"') { - case '"': - inString = true; - break; + inString = true; + } + else if (c == '{' || c == '[') + { + depth++; + } + else if (c == '}' || c == ']') + { + depth--; + } - case '{': - case '[': - depth++; - break; - - case '}': - case ']': - depth--; - if (depth == 0) - { - // Completed JSON segment - pos++; - complete = true; - } - break; + if (depth == 0) + { + return i + 1; } } - - if (complete) - { - break; - } + i++; } - if (complete) + // incomplete JSON + return 0; + } + else if (firstChar == '"') + { + i = start + 1; + while (i < buffer.Length) { - int length = pos - startIndex; - if (length <= 0) - { - // Should not happen, but just in case - break; - } - - // Extract candidate JSON segment - string candidateJson = buffer.ToString(startIndex, length); - - // Clean internal non-JSON characters - var cleaned = StripInternalNonJson(candidateJson, out var extractedInside); - if (extractedInside != null && extractedInside.Length > 0) - { - nonJsonList.AddRange(extractedInside); - } - - if (!string.IsNullOrWhiteSpace(cleaned)) - { - jsonList.Add(cleaned); - } - - // Move readPos forward - readPos = pos; - } - else - { - // Incomplete JSON segment; stop processing - break; - } - } - - // Remove processed part from buffer - if (readPos > 0) - { - buffer.Remove(0, readPos); - } - - // Cleanup buffer capacity if needed - if (buffer.Capacity > MAX_CAP && buffer.Length < 256) - { - buffer.Capacity = Math.Max(MAX_CAP, buffer.Length); - } - - jsonArray = jsonList.Count > 0 ? jsonList.ToArray() : Array.Empty(); - nonJsonText = nonJsonList.Count > 0 ? nonJsonList.ToArray() : Array.Empty(); - - return jsonArray.Length > 0; - } - - private static string StripInternalNonJson(string input, out string[] extractedTexts) - { - if (string.IsNullOrEmpty(input)) - { - extractedTexts = Array.Empty(); - return string.Empty; - } - - // Scan through the input, copying valid JSON parts to output, - List? extractedChars = null; - var sbJson = new StringBuilder(input.Length); - bool inString = false; - bool escape = false; - int depth = 0; - - for (int i = 0; i < input.Length; i++) - { - char c = input[i]; - - if (inString) - { - sbJson.Append(c); + char c = buffer[i]; if (escape) { escape = false; @@ -200,70 +160,26 @@ namespace EonaCat.Connections.Processors } else if (c == '"') { - inString = false; + return i + 1; } + + i++; } - else - { - switch (c) - { - case '"': - inString = true; - sbJson.Append(c); - break; - case '{': - case '[': - depth++; - sbJson.Append(c); - break; - - case '}': - case ']': - depth--; - sbJson.Append(c); - break; - - default: - // Outside JSON structures, only allow certain characters - if (depth > 0 || char.IsLetterOrDigit(c) || char.IsWhiteSpace(c) || ",:.-_".IndexOf(c) >= 0) - { - sbJson.Append(c); - } - else - { - extractedChars ??= new List(capacity: 4); - extractedChars.Add(c); - } - break; - } - } - } - - if (extractedChars != null && extractedChars.Count > 0) - { - // Convert char list to string array - var current = new string[extractedChars.Count]; - for (int i = 0; i < extractedChars.Count; i++) - { - current[i] = extractedChars[i].ToString(); - } - extractedTexts = current; + // incomplete string + return 0; } else { - extractedTexts = Array.Empty(); + while (i < buffer.Length && + !char.IsWhiteSpace(buffer[i]) && + buffer[i] != ',' && buffer[i] != ']' && buffer[i] != '}') + { + i++; + } + + return i; } - - string result = sbJson.ToString().Trim(); - - // Recompute capacity if needed - if (sbJson.Capacity > MAX_CAP) - { - sbJson.Capacity = Math.Max(MAX_CAP, sbJson.Length); - } - - return result; } } -} \ No newline at end of file +}