Update EonaCat.Connections/Processors/JsonDataProcessor.cs

This commit is contained in:
2026-01-27 06:15:22 +00:00
parent b38514f0d1
commit 94119081bb

View File

@@ -1,329 +1,252 @@
using EonaCat.Json; using EonaCat.Json;
using System.Text; using Heijmans.Connector.Models;
using System;
namespace EonaCat.Connections.Processors using System.Text;
{
public class ProcessedJsonMessage<TData> namespace EonaCat.Connections.Processors
{ {
public TData Data { get; set; } public sealed class JsonDataProcessor<TData> : IDisposable
public string RawData { get; set; } = string.Empty; {
public string ClientName { get; set; } = string.Empty; private readonly StringBuilder _buffer = new StringBuilder(4096);
public string ClientEndpoint { get; set; } = string.Empty; private readonly object _sync = new object();
} private bool _disposed;
public class ProcessedTextMessage public int MaxAllowedBufferSize { get; set; } = 30 * 1024 * 1024;
{ public int MaxMessagesPerBatch { get; set; } = 200;
public string Text { get; set; } = string.Empty; public string ClientName { get; }
public string ClientName { get; set; } = string.Empty;
} public long TotalBytesProcessed { get; private set; }
public long TotalChunksReceived { get; private set; }
public sealed class JsonDataProcessor<TData> : IDisposable
{ public event EventHandler<ProcessedJsonMessage<TData>>? OnProcessMessage;
private readonly StringBuilder _buffer = new StringBuilder(); public event EventHandler<ProcessedTextMessage>? OnProcessTextMessage;
private readonly object _syncLock = new object(); public event EventHandler<Exception>? OnMessageError;
private bool _isDisposed;
public JsonDataProcessor()
// 30 MB default max buffer size {
public int MaxAllowedBufferSize { get; set; } = 30 * 1024 * 1024; ClientName = Guid.NewGuid().ToString();
public int MaxMessagesPerBatch { get; set; } = 200; }
public string ClientName { get; }
public void Process(string data, string? client = null, string? endpoint = null)
// Diagnostics {
public long TotalBytesProcessed { get; private set; } ThrowIfDisposed();
public long TotalChunksReceived { get; private set; } if (string.IsNullOrEmpty(data))
{
// Events return;
public event EventHandler<ProcessedJsonMessage<TData>>? OnProcessMessage; }
public event EventHandler<ProcessedTextMessage>? OnProcessTextMessage;
public event EventHandler<Exception>? OnMessageError; TotalChunksReceived++;
TotalBytesProcessed += Encoding.UTF8.GetByteCount(data);
public JsonDataProcessor()
{ ProcessInternal(data, client ?? ClientName, endpoint);
ClientName = Guid.NewGuid().ToString(); }
}
private void ProcessInternal(string data, string client, string? endpoint)
public void Process(string jsonString, string? clientName = null, string? endpoint = null) {
{ lock (_sync)
ThrowIfDisposed(); {
if (string.IsNullOrEmpty(jsonString)) _buffer.Append(data);
{
return; int processed = 0;
} while (processed < MaxMessagesPerBatch &&
TryExtract(out int start, out int length, out bool isText))
TotalChunksReceived++; {
TotalBytesProcessed += Encoding.UTF8.GetByteCount(jsonString); if (isText)
{
ProcessInternal(jsonString, clientName, endpoint); var text = _buffer.ToString(start, length);
} OnProcessTextMessage?.Invoke(this, new ProcessedTextMessage
{
public void Process(DataReceivedEventArgs e, string? clientName = null) Text = text,
{ ClientName = client
ThrowIfDisposed(); });
if (e == null) }
{ else
return; {
} var json = _buffer.ToString(start, length);
try
string dataString; {
if (e.IsBinary) var obj = JsonHelper.ToObject<TData>(json);
{ OnProcessMessage?.Invoke(this, new ProcessedJsonMessage<TData>
if (e.Data == null || e.Data.Length == 0) {
{ Data = obj,
return; RawData = json,
} ClientName = client,
ClientEndpoint = endpoint ?? string.Empty
dataString = Encoding.UTF8.GetString(e.Data); });
} }
else catch (Exception ex)
{ {
dataString = e.StringData; OnMessageError?.Invoke(this,
} new Exception($"Failed to parse JSON for {client}", ex));
}
if (string.IsNullOrWhiteSpace(dataString)) }
{
return; Consume(start, length);
} processed++;
}
string client = e.Nickname ?? clientName ?? ClientName;
if (_buffer.Length > MaxAllowedBufferSize)
TotalChunksReceived++; {
TotalBytesProcessed += Encoding.UTF8.GetByteCount(dataString); OnMessageError?.Invoke(this,
new Exception($"Buffer exceeded {MaxAllowedBufferSize} bytes for client {client}. Discarding."));
ProcessInternal(dataString, client, e.RemoteEndPoint?.ToString()); _buffer.Clear();
} _buffer.Capacity = 4096;
}
private void ProcessInternal(string jsonString, string? clientName, string? endpoint) }
{ }
string client = clientName ?? ClientName;
private bool TryExtract(out int start, out int length, out bool isText)
lock (_syncLock) {
{ start = length = 0;
_buffer.Append(jsonString); isText = false;
int processedCount = 0;
if (_buffer.Length == 0)
while (processedCount < MaxMessagesPerBatch && {
TryExtractFullJson(out int fullJsonStart, out int fullJsonLength, return false;
out int textStart, out int textLength)) }
{
// Check if we have any leading text to process var span = _buffer.ToString().AsSpan();
if (textLength > 0) int pos = 0;
{
string text = _buffer.ToString(textStart, textLength); while (pos < span.Length && char.IsWhiteSpace(span[pos]))
OnProcessTextMessage?.Invoke(this, new ProcessedTextMessage {
{ pos++;
Text = text, }
ClientName = client
}); if (pos >= span.Length)
{
// Remove the processed text immediately return false;
_buffer.Remove(textStart, textLength); }
processedCount++;
continue; char c = span[pos];
}
if (c != '{' && c != '[')
// Process the full JSON {
if (fullJsonLength > 0) isText = true;
{ start = pos;
string json = _buffer.ToString(fullJsonStart, fullJsonLength);
while (pos < span.Length && span[pos] != '{' && span[pos] != '[')
try {
{ pos++;
var deserialized = JsonHelper.ToObject<TData>(json); }
OnProcessMessage?.Invoke(this, new ProcessedJsonMessage<TData>
{ length = pos - start;
Data = deserialized, return true;
RawData = json, }
ClientName = client,
ClientEndpoint = endpoint ?? string.Empty start = pos;
}); length = FindJsonEnd(span, pos) - pos;
} if (length <= 0)
catch (Exception ex) {
{ return false;
OnMessageError?.Invoke(this, new Exception( }
$"Failed to deserialize JSON for client {client}", ex));
} isText = false;
return true;
// Remove processed JSON immediately }
_buffer.Remove(fullJsonStart, fullJsonLength);
processedCount++; private static int FindJsonEnd(ReadOnlySpan<char> span, int start)
} {
} char open = span[start];
char close = open == '{' ? '}' : ']';
// Prevent buffer overflow
if (_buffer.Length > MaxAllowedBufferSize) int depth = 1;
{ bool inString = false;
OnMessageError?.Invoke(this, new Exception( bool escape = false;
$"Buffer overflow for client {client}. Current size: {_buffer.Length / 1024.0 / 1024.0:F2} MB. " +
$"Max allowed: {MaxAllowedBufferSize / 1024.0 / 1024.0:F2} MB. Clearing buffer.")); for (int i = start + 1; i < span.Length; i++)
_buffer.Clear(); {
} char c = span[i];
}
} if (inString)
{
private bool TryExtractFullJson( if (escape)
out int fullJsonStart, {
out int fullJsonLength, escape = false;
out int textStart, }
out int textLength) else if (c == '\\')
{ {
fullJsonStart = fullJsonLength = textStart = textLength = 0; escape = true;
}
if (_buffer.Length == 0) else if (c == '"')
{ {
return false; inString = false;
} }
}
int pos = 0; else
{
// Skip leading whitespace if (c == '"')
while (pos < _buffer.Length && char.IsWhiteSpace(_buffer[pos])) {
{ inString = true;
pos++; }
} else if (c == open)
{
if (pos >= _buffer.Length) depth++;
{ }
return false; else if (c == close && --depth == 0)
} {
return i + 1;
// Json needs to start with { or [ }
if (_buffer[pos] != '{' && _buffer[pos] != '[') }
{ }
textStart = pos;
while (pos < _buffer.Length && _buffer[pos] != '{' && _buffer[pos] != '[') return 0;
{ }
pos++;
} private void Consume(int start, int length)
{
textLength = pos - textStart; _buffer.Remove(start, length);
return true;
} if (_buffer.Capacity > 1024 * 1024 && _buffer.Length < _buffer.Capacity / 2)
{
// Check if we also have a json end token _buffer.Capacity = Math.Max(_buffer.Length, 4096);
fullJsonStart = pos; }
fullJsonLength = FindJsonTokenEnd(_buffer, pos) - pos; }
if (fullJsonLength <= 0) public void ClearBuffer()
{ {
// partial JSON lock (_sync)
return false; {
} _buffer.Clear();
_buffer.Capacity = 4096;
return true; }
} }
private static int FindJsonTokenEnd(StringBuilder buffer, int start) public void ResetStatistics()
{ {
if (start >= buffer.Length) lock (_sync)
{ {
return 0; TotalBytesProcessed = 0;
} TotalChunksReceived = 0;
}
int i = start; }
char firstChar = buffer[start];
public void Dispose()
if (firstChar == '{' || firstChar == '[') {
{ if (_disposed)
char open = firstChar; {
char close = firstChar == '{' ? '}' : ']'; return;
int depth = 1; }
bool inString = false;
bool escape = false; _disposed = true;
i++; lock (_sync)
{
while (i < buffer.Length) _buffer.Clear();
{ _buffer.Capacity = 0;
char c = buffer[i]; }
if (inString) OnProcessMessage = null;
{ OnProcessTextMessage = null;
if (escape) OnMessageError = null;
{ }
escape = false;
} private void ThrowIfDisposed()
else if (c == '\\') {
{ if (_disposed)
escape = true; {
} throw new ObjectDisposedException(nameof(JsonDataProcessor<TData>));
else if (c == '"') }
{ }
inString = false; }
} }
}
else
{
if (c == '"')
{
inString = true;
}
else if (c == open)
{
depth++;
}
else if (c == close)
{
depth--;
if (depth == 0)
{
return i + 1;
}
}
}
i++;
}
// partial JSON
return 0;
}
// only objects/arrays supported as JSON start
return 0;
}
private void ThrowIfDisposed()
{
if (_isDisposed)
{
throw new ObjectDisposedException(nameof(JsonDataProcessor<TData>));
}
}
public void ClearBuffer()
{
lock (_syncLock)
{
_buffer.Clear();
}
}
public void ResetStatistics()
{
lock (_syncLock)
{
TotalBytesProcessed = 0;
TotalChunksReceived = 0;
}
}
public void Dispose()
{
if (_isDisposed)
{
return;
}
_isDisposed = true;
lock (_syncLock)
{
_buffer.Clear();
}
OnProcessMessage = null;
OnProcessTextMessage = null;
OnMessageError = null;
}
}
}