diff --git a/EonaCat.Connections/Processors/JsonDataProcessor.cs b/EonaCat.Connections/Processors/JsonDataProcessor.cs index 6dead81..417490a 100644 --- a/EonaCat.Connections/Processors/JsonDataProcessor.cs +++ b/EonaCat.Connections/Processors/JsonDataProcessor.cs @@ -1,329 +1,252 @@ -using EonaCat.Json; -using System.Text; - -namespace EonaCat.Connections.Processors -{ - public class ProcessedJsonMessage - { - public TData Data { get; set; } - public string RawData { get; set; } = string.Empty; - public string ClientName { get; set; } = string.Empty; - public string ClientEndpoint { get; set; } = string.Empty; - } - - public class ProcessedTextMessage - { - public string Text { get; set; } = string.Empty; - public string ClientName { get; set; } = string.Empty; - } - - public sealed class JsonDataProcessor : IDisposable - { - private readonly StringBuilder _buffer = new StringBuilder(); - private readonly object _syncLock = new object(); - private bool _isDisposed; - - // 30 MB default max buffer size - public int MaxAllowedBufferSize { get; set; } = 30 * 1024 * 1024; - public int MaxMessagesPerBatch { get; set; } = 200; - public string ClientName { get; } - - // Diagnostics - public long TotalBytesProcessed { get; private set; } - public long TotalChunksReceived { get; private set; } - - // Events - public event EventHandler>? OnProcessMessage; - public event EventHandler? OnProcessTextMessage; - public event EventHandler? OnMessageError; - - public JsonDataProcessor() - { - ClientName = Guid.NewGuid().ToString(); - } - - public void Process(string jsonString, string? clientName = null, string? endpoint = null) - { - ThrowIfDisposed(); - if (string.IsNullOrEmpty(jsonString)) - { - return; - } - - TotalChunksReceived++; - TotalBytesProcessed += Encoding.UTF8.GetByteCount(jsonString); - - ProcessInternal(jsonString, clientName, endpoint); - } - - public void Process(DataReceivedEventArgs e, string? clientName = null) - { - ThrowIfDisposed(); - if (e == null) - { - return; - } - - string dataString; - if (e.IsBinary) - { - if (e.Data == null || e.Data.Length == 0) - { - return; - } - - dataString = Encoding.UTF8.GetString(e.Data); - } - else - { - dataString = e.StringData; - } - - if (string.IsNullOrWhiteSpace(dataString)) - { - return; - } - - string client = e.Nickname ?? clientName ?? ClientName; - - TotalChunksReceived++; - TotalBytesProcessed += Encoding.UTF8.GetByteCount(dataString); - - ProcessInternal(dataString, client, e.RemoteEndPoint?.ToString()); - } - - private void ProcessInternal(string jsonString, string? clientName, string? endpoint) - { - string client = clientName ?? ClientName; - - lock (_syncLock) - { - _buffer.Append(jsonString); - int processedCount = 0; - - while (processedCount < MaxMessagesPerBatch && - TryExtractFullJson(out int fullJsonStart, out int fullJsonLength, - out int textStart, out int textLength)) - { - // Check if we have any leading text to process - if (textLength > 0) - { - string text = _buffer.ToString(textStart, textLength); - OnProcessTextMessage?.Invoke(this, new ProcessedTextMessage - { - Text = text, - ClientName = client - }); - - // Remove the processed text immediately - _buffer.Remove(textStart, textLength); - processedCount++; - continue; - } - - // Process the full JSON - if (fullJsonLength > 0) - { - string json = _buffer.ToString(fullJsonStart, fullJsonLength); - - try - { - var deserialized = JsonHelper.ToObject(json); - OnProcessMessage?.Invoke(this, new ProcessedJsonMessage - { - Data = deserialized, - RawData = json, - ClientName = client, - ClientEndpoint = endpoint ?? string.Empty - }); - } - catch (Exception ex) - { - OnMessageError?.Invoke(this, new Exception( - $"Failed to deserialize JSON for client {client}", ex)); - } - - // Remove processed JSON immediately - _buffer.Remove(fullJsonStart, fullJsonLength); - processedCount++; - } - } - - // Prevent buffer overflow - if (_buffer.Length > MaxAllowedBufferSize) - { - OnMessageError?.Invoke(this, new Exception( - $"Buffer overflow for client {client}. Current size: {_buffer.Length / 1024.0 / 1024.0:F2} MB. " + - $"Max allowed: {MaxAllowedBufferSize / 1024.0 / 1024.0:F2} MB. Clearing buffer.")); - _buffer.Clear(); - } - } - } - - private bool TryExtractFullJson( - out int fullJsonStart, - out int fullJsonLength, - out int textStart, - out int textLength) - { - fullJsonStart = fullJsonLength = textStart = textLength = 0; - - if (_buffer.Length == 0) - { - return false; - } - - int pos = 0; - - // Skip leading whitespace - while (pos < _buffer.Length && char.IsWhiteSpace(_buffer[pos])) - { - pos++; - } - - if (pos >= _buffer.Length) - { - return false; - } - - // Json needs to start with { or [ - if (_buffer[pos] != '{' && _buffer[pos] != '[') - { - textStart = pos; - while (pos < _buffer.Length && _buffer[pos] != '{' && _buffer[pos] != '[') - { - pos++; - } - - textLength = pos - textStart; - return true; - } - - // Check if we also have a json end token - fullJsonStart = pos; - fullJsonLength = FindJsonTokenEnd(_buffer, pos) - pos; - - if (fullJsonLength <= 0) - { - // partial JSON - return false; - } - - return true; - } - - private static int FindJsonTokenEnd(StringBuilder buffer, int start) - { - if (start >= buffer.Length) - { - return 0; - } - - int i = start; - char firstChar = buffer[start]; - - if (firstChar == '{' || firstChar == '[') - { - char open = firstChar; - char close = firstChar == '{' ? '}' : ']'; - int depth = 1; - bool inString = false; - bool escape = false; - - i++; - - while (i < buffer.Length) - { - char c = buffer[i]; - - if (inString) - { - if (escape) - { - escape = false; - } - else if (c == '\\') - { - escape = true; - } - else if (c == '"') - { - inString = false; - } - } - else - { - if (c == '"') - { - inString = true; - } - else if (c == open) - { - depth++; - } - else if (c == close) - { - depth--; - if (depth == 0) - { - return i + 1; - } - } - } - - i++; - } - - // partial JSON - return 0; - } - - // only objects/arrays supported as JSON start - return 0; - } - - private void ThrowIfDisposed() - { - if (_isDisposed) - { - throw new ObjectDisposedException(nameof(JsonDataProcessor)); - } - } - - public void ClearBuffer() - { - lock (_syncLock) - { - _buffer.Clear(); - } - } - - public void ResetStatistics() - { - lock (_syncLock) - { - TotalBytesProcessed = 0; - TotalChunksReceived = 0; - } - } - - public void Dispose() - { - if (_isDisposed) - { - return; - } - - _isDisposed = true; - - lock (_syncLock) - { - _buffer.Clear(); - } - - OnProcessMessage = null; - OnProcessTextMessage = null; - OnMessageError = null; - } - } -} +using EonaCat.Json; +using Heijmans.Connector.Models; +using System; +using System.Text; + +namespace EonaCat.Connections.Processors +{ + public sealed class JsonDataProcessor : IDisposable + { + private readonly StringBuilder _buffer = new StringBuilder(4096); + private readonly object _sync = new object(); + private bool _disposed; + + public int MaxAllowedBufferSize { get; set; } = 30 * 1024 * 1024; + public int MaxMessagesPerBatch { get; set; } = 200; + public string ClientName { get; } + + public long TotalBytesProcessed { get; private set; } + public long TotalChunksReceived { get; private set; } + + public event EventHandler>? OnProcessMessage; + public event EventHandler? OnProcessTextMessage; + public event EventHandler? OnMessageError; + + public JsonDataProcessor() + { + ClientName = Guid.NewGuid().ToString(); + } + + public void Process(string data, string? client = null, string? endpoint = null) + { + ThrowIfDisposed(); + if (string.IsNullOrEmpty(data)) + { + return; + } + + TotalChunksReceived++; + TotalBytesProcessed += Encoding.UTF8.GetByteCount(data); + + ProcessInternal(data, client ?? ClientName, endpoint); + } + + private void ProcessInternal(string data, string client, string? endpoint) + { + lock (_sync) + { + _buffer.Append(data); + + int processed = 0; + while (processed < MaxMessagesPerBatch && + TryExtract(out int start, out int length, out bool isText)) + { + if (isText) + { + var text = _buffer.ToString(start, length); + OnProcessTextMessage?.Invoke(this, new ProcessedTextMessage + { + Text = text, + ClientName = client + }); + } + else + { + var json = _buffer.ToString(start, length); + try + { + var obj = JsonHelper.ToObject(json); + OnProcessMessage?.Invoke(this, new ProcessedJsonMessage + { + Data = obj, + RawData = json, + ClientName = client, + ClientEndpoint = endpoint ?? string.Empty + }); + } + catch (Exception ex) + { + OnMessageError?.Invoke(this, + new Exception($"Failed to parse JSON for {client}", ex)); + } + } + + Consume(start, length); + processed++; + } + + if (_buffer.Length > MaxAllowedBufferSize) + { + OnMessageError?.Invoke(this, + new Exception($"Buffer exceeded {MaxAllowedBufferSize} bytes for client {client}. Discarding.")); + _buffer.Clear(); + _buffer.Capacity = 4096; + } + } + } + + private bool TryExtract(out int start, out int length, out bool isText) + { + start = length = 0; + isText = false; + + if (_buffer.Length == 0) + { + return false; + } + + var span = _buffer.ToString().AsSpan(); + int pos = 0; + + while (pos < span.Length && char.IsWhiteSpace(span[pos])) + { + pos++; + } + + if (pos >= span.Length) + { + return false; + } + + char c = span[pos]; + + if (c != '{' && c != '[') + { + isText = true; + start = pos; + + while (pos < span.Length && span[pos] != '{' && span[pos] != '[') + { + pos++; + } + + length = pos - start; + return true; + } + + start = pos; + length = FindJsonEnd(span, pos) - pos; + if (length <= 0) + { + return false; + } + + isText = false; + return true; + } + + private static int FindJsonEnd(ReadOnlySpan span, int start) + { + char open = span[start]; + char close = open == '{' ? '}' : ']'; + + int depth = 1; + bool inString = false; + bool escape = false; + + for (int i = start + 1; i < span.Length; i++) + { + char c = span[i]; + + if (inString) + { + if (escape) + { + escape = false; + } + else if (c == '\\') + { + escape = true; + } + else if (c == '"') + { + inString = false; + } + } + else + { + if (c == '"') + { + inString = true; + } + else if (c == open) + { + depth++; + } + else if (c == close && --depth == 0) + { + return i + 1; + } + } + } + + return 0; + } + + private void Consume(int start, int length) + { + _buffer.Remove(start, length); + + if (_buffer.Capacity > 1024 * 1024 && _buffer.Length < _buffer.Capacity / 2) + { + _buffer.Capacity = Math.Max(_buffer.Length, 4096); + } + } + + public void ClearBuffer() + { + lock (_sync) + { + _buffer.Clear(); + _buffer.Capacity = 4096; + } + } + + public void ResetStatistics() + { + lock (_sync) + { + TotalBytesProcessed = 0; + TotalChunksReceived = 0; + } + } + + public void Dispose() + { + if (_disposed) + { + return; + } + + _disposed = true; + + lock (_sync) + { + _buffer.Clear(); + _buffer.Capacity = 0; + } + + OnProcessMessage = null; + OnProcessTextMessage = null; + OnMessageError = null; + } + + private void ThrowIfDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(JsonDataProcessor)); + } + } + } +}