This commit is contained in:
2026-01-21 20:08:54 +01:00
parent 3dd1c37f09
commit 3dce3cc56b
3 changed files with 191 additions and 450 deletions

View File

@@ -11,7 +11,7 @@
<Copyright>EonaCat (Jeroen Saey)</Copyright> <Copyright>EonaCat (Jeroen Saey)</Copyright>
<PackageReadmeFile>readme.md</PackageReadmeFile> <PackageReadmeFile>readme.md</PackageReadmeFile>
<PackageId>EonaCat.Connections</PackageId> <PackageId>EonaCat.Connections</PackageId>
<Version>1.0.9</Version> <Version>1.1.0</Version>
<Authors>EonaCat (Jeroen Saey)</Authors> <Authors>EonaCat (Jeroen Saey)</Authors>
<PackageLicenseFile>LICENSE</PackageLicenseFile> <PackageLicenseFile>LICENSE</PackageLicenseFile>
<PackageIcon>EonaCat.png</PackageIcon> <PackageIcon>EonaCat.png</PackageIcon>

View File

@@ -1,12 +1,10 @@
// This file is part of the EonaCat project(s) which is released under the Apache License. using EonaCat.Connections.Models;
// See the LICENSE file or go to https://EonaCat.com/license for full license details.
using EonaCat.Json; using EonaCat.Json;
using EonaCat.Json.Linq; using System;
using EonaCat.Connections.Models;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Text; using System.Text;
using System.Timers; using System.Timers;
using Timer = System.Timers.Timer;
namespace EonaCat.Connections.Processors namespace EonaCat.Connections.Processors
{ {
@@ -14,10 +12,9 @@ namespace EonaCat.Connections.Processors
{ {
public int MaxAllowedBufferSize = 20 * 1024 * 1024; public int MaxAllowedBufferSize = 20 * 1024 * 1024;
public int MaxMessagesPerBatch = 200; public int MaxMessagesPerBatch = 200;
private const int MaxCleanupRemovalsPerTick = 50;
private readonly ConcurrentDictionary<string, BufferEntry> _buffers = new(); private readonly ConcurrentDictionary<string, BufferEntry> _buffers = new();
private readonly System.Timers.Timer _cleanupTimer; private readonly Timer _cleanupTimer;
private readonly TimeSpan _clientBufferTimeout = TimeSpan.FromMinutes(5); private readonly TimeSpan _clientBufferTimeout = TimeSpan.FromMinutes(5);
private bool _isDisposed; private bool _isDisposed;
@@ -40,18 +37,14 @@ namespace EonaCat.Connections.Processors
} }
public event EventHandler<ProcessedMessage<TData>>? OnProcessMessage; public event EventHandler<ProcessedMessage<TData>>? OnProcessMessage;
public event EventHandler<ProcessedTextMessage>? OnProcessTextMessage; public event EventHandler<ProcessedTextMessage>? OnProcessTextMessage;
public event EventHandler<Exception>? OnMessageError; public event EventHandler<Exception>? OnMessageError;
public event EventHandler<Exception>? OnError; public event EventHandler<Exception>? OnError;
public JsonDataProcessor() public JsonDataProcessor()
{ {
ClientName = Guid.NewGuid().ToString(); ClientName = Guid.NewGuid().ToString();
_cleanupTimer = new Timer(Math.Max(5000, _clientBufferTimeout.TotalMilliseconds / 5))
_cleanupTimer = new System.Timers.Timer(Math.Max(5000, _clientBufferTimeout.TotalMilliseconds / 5))
{ {
AutoReset = true AutoReset = true
}; };
@@ -59,133 +52,7 @@ namespace EonaCat.Connections.Processors
_cleanupTimer.Start(); _cleanupTimer.Start();
} }
public void Process(DataReceivedEventArgs e, string? currentClientName = null) public void Process(string data, string? clientName = null)
{
ThrowIfDisposed();
if (e == null)
{
return;
}
string endpoint = e.RemoteEndPoint?.ToString();
string dataString = e.IsBinary ? Encoding.UTF8.GetString(e.Data ?? Array.Empty<byte>()) : e.StringData;
if (string.IsNullOrWhiteSpace(dataString))
{
return;
}
string client = e.Nickname ?? currentClientName ?? ClientName;
ProcessInternal(dataString.Trim(), client, endpoint);
}
public void Process(string jsonString, string? currentClientName = null, string? endpoint = null)
{
ThrowIfDisposed();
if (string.IsNullOrWhiteSpace(jsonString))
{
return;
}
string client = currentClientName ?? ClientName;
ProcessInternal(jsonString.Trim(), client, endpoint);
}
private void ProcessInternal(string jsonString, string clientName, string? clientEndpoint)
{
var bufferEntry = _buffers.GetOrAdd(clientName, _ => new BufferEntry());
var pendingJson = new List<string>();
var pendingText = new List<string>();
lock (bufferEntry.SyncRoot)
{
// Check for buffer overflow
if (bufferEntry.Buffer.Length > MaxAllowedBufferSize)
{
OnError?.Invoke(this, new Exception($"Buffer overflow ({MaxAllowedBufferSize} bytes) for client {clientName} ({clientEndpoint})."));
bufferEntry.Clear(shrink: true);
}
bufferEntry.Buffer.Append(jsonString);
bufferEntry.LastUsed = DateTime.UtcNow;
int processedCount = 0;
while (processedCount < MaxMessagesPerBatch &&
JsonDataProcessorHelper.TryExtractCompleteJson(bufferEntry.Buffer, out string[] json, out string[] nonJsonText))
{
// No more messages
if ((json == null || json.Length == 0) && (nonJsonText == null || nonJsonText.Length == 0))
{
break;
}
if (json != null && json.Length > 0)
{
foreach (var jsonMessage in json)
{
pendingJson.Add(jsonMessage);
processedCount++;
if (processedCount >= MaxMessagesPerBatch)
{
break;
}
}
}
if (nonJsonText != null && nonJsonText.Length > 0)
{
foreach (var textMessage in nonJsonText)
{
if (!string.IsNullOrWhiteSpace(textMessage))
{
pendingText.Add(textMessage);
}
}
}
}
// Cleanup buffer if needed
if (bufferEntry.Buffer.Capacity > MaxAllowedBufferSize / 2)
{
bufferEntry.Clear(shrink: true);
}
if (bufferEntry.Buffer.Length > 0 && !ContainsJsonStructure(bufferEntry.Buffer))
{
string leftover = bufferEntry.Buffer.ToString();
bufferEntry.Clear(shrink: true);
if (!string.IsNullOrWhiteSpace(leftover))
{
pendingText.Add(leftover);
}
}
}
if (pendingText.Count > 0)
{
foreach (var textMessage in pendingText)
{
try
{
OnProcessTextMessage?.Invoke(this, new ProcessedTextMessage { Text = textMessage, ClientName = clientName, ClientEndpoint = clientEndpoint });
}
catch (Exception exception)
{
OnError?.Invoke(this, new Exception($"ProcessTextMessage handler threw for client {clientName} ({clientEndpoint}).", exception));
}
}
}
if (pendingJson.Count > 0)
{
foreach (var jsonMessage in pendingJson)
{
ProcessDataReceived(jsonMessage, clientName, clientEndpoint);
}
}
}
private void ProcessDataReceived(string data, string clientName, string? clientEndpoint)
{ {
ThrowIfDisposed(); ThrowIfDisposed();
if (string.IsNullOrWhiteSpace(data)) if (string.IsNullOrWhiteSpace(data))
@@ -193,86 +60,81 @@ namespace EonaCat.Connections.Processors
return; return;
} }
bool looksLikeJson = data.Length > 1 && string client = clientName ?? ClientName;
((data[0] == '{' && data[data.Length - 1] == '}') || ProcessInternal(data, client);
(data[0] == '[' && data[data.Length - 1] == ']')); }
if (!looksLikeJson) private void ProcessInternal(string incomingData, string clientName)
{
var bufferEntry = _buffers.GetOrAdd(clientName, _ => new BufferEntry());
lock (bufferEntry.SyncRoot)
{
bufferEntry.Buffer.Append(incomingData);
bufferEntry.LastUsed = DateTime.UtcNow;
int processedCount = 0;
while (processedCount < MaxMessagesPerBatch)
{
if (!JsonDataProcessorHelper.TryExtractFullJson(bufferEntry.Buffer,
out var fullJson, out var nonJson))
{
// partial JSON
break;
}
if (!nonJson.IsEmpty)
{
OnProcessTextMessage?.Invoke(this, new ProcessedTextMessage
{
Text = nonJson.ToString(),
ClientName = clientName,
ClientEndpoint = null
});
}
if (!fullJson.IsEmpty)
{
// We got a full JSON message
var jsonStr = fullJson.ToString();
ProcessDataReceived(jsonStr, clientName, null);
processedCount++;
}
}
// Prevent buffer overflow
if (bufferEntry.Buffer.Length > MaxAllowedBufferSize)
{
OnError?.Invoke(this, new Exception($"Buffer overflow for client {clientName}."));
bufferEntry.Clear(shrink: true);
}
}
}
private void ProcessDataReceived(string json, string clientName, string? clientEndpoint)
{ {
try try
{ {
OnProcessTextMessage?.Invoke(this, new ProcessedTextMessage { Text = data, ClientName = clientName, ClientEndpoint = clientEndpoint }); var messages = JsonHelper.ToObjects<TData>(json);
}
catch (Exception exception)
{
OnError?.Invoke(this, new Exception($"ProcessTextMessage handler threw for client {clientName} ({clientEndpoint}).", exception));
}
return;
}
try
{
if (data.Contains("Exception", StringComparison.OrdinalIgnoreCase) ||
data.Contains("Error", StringComparison.OrdinalIgnoreCase))
{
GetError(data);
}
var messages = JsonHelper.ToObjects<TData>(data);
if (messages != null) if (messages != null)
{ {
foreach (var message in messages) foreach (var msg in messages)
{ {
try OnProcessMessage?.Invoke(this, new ProcessedMessage<TData>
{ {
OnProcessMessage?.Invoke(this, new ProcessedMessage<TData> { Data = message, RawData = data, ClientName = clientName, ClientEndpoint = clientEndpoint }); Data = msg,
RawData = json,
ClientName = clientName,
ClientEndpoint = clientEndpoint
});
} }
catch (Exception exception) }
}
catch (Exception ex)
{ {
OnError?.Invoke(this, new Exception($"ProcessMessage handler threw for client {clientName} ({clientEndpoint}).", exception)); OnError?.Invoke(this, new Exception($"Failed to process JSON for {clientName}.", ex));
} }
} }
}
}
catch (Exception exception)
{
OnError?.Invoke(this, new Exception($"Failed to process JSON message from {clientName} ({clientEndpoint}).", exception));
}
}
private void GetError(string data)
{
try
{
var jsonObject = JObject.Parse(data);
var exceptionToken = jsonObject.SelectToken("Exception");
if (exceptionToken != null && exceptionToken.Type != JTokenType.Null)
{
var extracted = JsonHelper.ExtractException(data);
if (extracted != null)
{
OnMessageError?.Invoke(this, new Exception(extracted.Message));
}
}
}
catch
{
// Do nothing
}
}
private static bool ContainsJsonStructure(StringBuilder buffer)
{
for (int i = 0; i < buffer.Length; i++)
{
char c = buffer[i];
if (c == '{' || c == '[')
{
return true;
}
}
return false;
}
private void CleanupInactiveClients(object? sender, ElapsedEventArgs e) private void CleanupInactiveClients(object? sender, ElapsedEventArgs e)
{ {
@@ -282,24 +144,11 @@ namespace EonaCat.Connections.Processors
} }
DateTime now = DateTime.UtcNow; DateTime now = DateTime.UtcNow;
var keysToRemove = new List<string>(capacity: 128);
foreach (var kvp in _buffers) foreach (var kvp in _buffers)
{ {
if (now - kvp.Value.LastUsed > _clientBufferTimeout) if (now - kvp.Value.LastUsed > _clientBufferTimeout)
{ {
keysToRemove.Add(kvp.Key); if (_buffers.TryRemove(kvp.Key, out var removed))
if (keysToRemove.Count >= MaxCleanupRemovalsPerTick)
{
break;
}
}
}
// Remove the selected keys
foreach (var key in keysToRemove)
{
if (_buffers.TryRemove(key, out var removed))
{ {
lock (removed.SyncRoot) lock (removed.SyncRoot)
{ {
@@ -308,21 +157,6 @@ namespace EonaCat.Connections.Processors
} }
} }
} }
public void RemoveClient(string clientName)
{
if (string.IsNullOrWhiteSpace(clientName))
{
return;
}
if (_buffers.TryRemove(clientName, out var removed))
{
lock (removed.SyncRoot)
{
removed.Clear(shrink: true);
}
}
} }
private void ThrowIfDisposed() private void ThrowIfDisposed()
@@ -342,7 +176,6 @@ namespace EonaCat.Connections.Processors
_isDisposed = true; _isDisposed = true;
_cleanupTimer.Elapsed -= CleanupInactiveClients;
_cleanupTimer.Stop(); _cleanupTimer.Stop();
_cleanupTimer.Dispose(); _cleanupTimer.Dispose();
@@ -359,14 +192,6 @@ namespace EonaCat.Connections.Processors
OnProcessTextMessage = null; OnProcessTextMessage = null;
OnMessageError = null; OnMessageError = null;
OnError = null; OnError = null;
GC.SuppressFinalize(this);
} }
} }
internal static class StringExtensions
{
internal static bool Contains(this string? source, string toCheck, StringComparison comp) =>
source?.IndexOf(toCheck, comp) >= 0;
}
} }

View File

@@ -1,73 +1,107 @@
// This file is part of the EonaCat project(s) which is released under the Apache License. using System;
// See the LICENSE file or go to https://EonaCat.com/license for full license details. using System.Collections.Generic;
using System.Text; using System.Text;
namespace EonaCat.Connections.Processors namespace EonaCat.Connections.Processors
{ {
internal static class JsonDataProcessorHelper internal static class JsonDataProcessorHelper
{ {
public const int MAX_CAP = 1024; internal static bool TryExtractFullJson(
private const int MAX_SEGMENTS_PER_CALL = 1024; StringBuilder buffer,
out ReadOnlyMemory<char> fullJson,
internal static bool TryExtractCompleteJson(StringBuilder buffer, out string[] jsonArray, out string[] nonJsonText) out ReadOnlyMemory<char> nonJson)
{ {
jsonArray = Array.Empty<string>(); fullJson = ReadOnlyMemory<char>.Empty;
nonJsonText = Array.Empty<string>(); nonJson = ReadOnlyMemory<char>.Empty;
if (buffer is null || buffer.Length == 0) if (buffer.Length == 0)
{ {
return false; return false;
} }
var jsonList = new List<string>(capacity: 4); int start = 0;
var nonJsonList = new List<string>(capacity: 2);
int readPos = 0; // Skip leading whitespace
int segmentsProcessed = 0; while (start < buffer.Length && char.IsWhiteSpace(buffer[start]))
int bufferLen = buffer.Length;
while (readPos < bufferLen && segmentsProcessed < MAX_SEGMENTS_PER_CALL)
{ {
segmentsProcessed++; start++;
// Skip non-JSON starting characters
if (buffer[readPos] != '{' && buffer[readPos] != '[')
{
int start = readPos;
while (readPos < bufferLen && buffer[readPos] != '{' && buffer[readPos] != '[')
{
readPos++;
} }
int len = readPos - start; if (start >= buffer.Length)
if (len > 0)
{ {
var segment = buffer.ToString(start, len).Trim(); return false;
if (segment.Length > 0)
{
nonJsonList.Add(segment);
} }
continue;
// Detect non-JSON text before JSON
if (!IsJsonStart(buffer[start]) && !IsLiteralStart(buffer, start))
{
int nonJsonStart = start;
while (start < buffer.Length &&
!IsJsonStart(buffer[start]) &&
!IsLiteralStart(buffer, start))
{
start++;
}
if (start > nonJsonStart)
{
nonJson = buffer.ToString(nonJsonStart, start - nonJsonStart).AsMemory();
} }
} }
// Check if we have reached the end // Find full JSON token
if (readPos >= bufferLen) int tokenEnd = FindJsonTokenEnd(buffer, start);
if (tokenEnd == 0)
{ {
break; // Partial JSON, leave buffer intact
return false;
} }
int pos = readPos; fullJson = buffer.ToString(start, tokenEnd - start).AsMemory();
// Remove processed characters from buffer
buffer.Remove(0, tokenEnd);
return true;
}
private static bool IsJsonStart(char c) => c == '{' || c == '[' || c == '"' || c == '-' || (c >= '0' && c <= '9');
private static bool IsLiteralStart(StringBuilder buffer, int pos)
{
if (buffer.Length - pos >= 4)
{
var val = buffer.ToString(pos, 4);
if (val == "true" || val == "null")
{
return true;
}
}
if (buffer.Length - pos >= 5)
{
if (buffer.ToString(pos, 5) == "false")
{
return true;
}
}
return false;
}
private static int FindJsonTokenEnd(StringBuilder buffer, int start)
{
int depth = 0; int depth = 0;
bool inString = false; bool inString = false;
bool escape = false; bool escape = false;
int startIndex = pos; int i = start;
bool complete = false;
for (; pos < bufferLen; pos++) char firstChar = buffer[start];
if (firstChar == '{' || firstChar == '[')
{ {
char c = buffer[pos]; depth = 1;
i++;
while (i < buffer.Length)
{
char c = buffer[i];
if (inString) if (inString)
{ {
@@ -86,110 +120,36 @@ namespace EonaCat.Connections.Processors
} }
else else
{ {
switch (c) if (c == '"')
{ {
case '"':
inString = true; inString = true;
break; }
else if (c == '{' || c == '[')
case '{': {
case '[':
depth++; depth++;
break; }
else if (c == '}' || c == ']')
case '}': {
case ']':
depth--; depth--;
}
if (depth == 0) if (depth == 0)
{ {
// Completed JSON segment return i + 1;
pos++;
complete = true;
} }
break;
} }
i++;
} }
if (complete) // incomplete JSON
return 0;
}
else if (firstChar == '"')
{ {
break; i = start + 1;
} while (i < buffer.Length)
}
if (complete)
{ {
int length = pos - startIndex; char c = buffer[i];
if (length <= 0)
{
// Should not happen, but just in case
break;
}
// Extract candidate JSON segment
string candidateJson = buffer.ToString(startIndex, length);
// Clean internal non-JSON characters
var cleaned = StripInternalNonJson(candidateJson, out var extractedInside);
if (extractedInside != null && extractedInside.Length > 0)
{
nonJsonList.AddRange(extractedInside);
}
if (!string.IsNullOrWhiteSpace(cleaned))
{
jsonList.Add(cleaned);
}
// Move readPos forward
readPos = pos;
}
else
{
// Incomplete JSON segment; stop processing
break;
}
}
// Remove processed part from buffer
if (readPos > 0)
{
buffer.Remove(0, readPos);
}
// Cleanup buffer capacity if needed
if (buffer.Capacity > MAX_CAP && buffer.Length < 256)
{
buffer.Capacity = Math.Max(MAX_CAP, buffer.Length);
}
jsonArray = jsonList.Count > 0 ? jsonList.ToArray() : Array.Empty<string>();
nonJsonText = nonJsonList.Count > 0 ? nonJsonList.ToArray() : Array.Empty<string>();
return jsonArray.Length > 0;
}
private static string StripInternalNonJson(string input, out string[] extractedTexts)
{
if (string.IsNullOrEmpty(input))
{
extractedTexts = Array.Empty<string>();
return string.Empty;
}
// Scan through the input, copying valid JSON parts to output,
List<char>? extractedChars = null;
var sbJson = new StringBuilder(input.Length);
bool inString = false;
bool escape = false;
int depth = 0;
for (int i = 0; i < input.Length; i++)
{
char c = input[i];
if (inString)
{
sbJson.Append(c);
if (escape) if (escape)
{ {
escape = false; escape = false;
@@ -200,70 +160,26 @@ namespace EonaCat.Connections.Processors
} }
else if (c == '"') else if (c == '"')
{ {
inString = false; return i + 1;
} }
i++;
}
// incomplete string
return 0;
} }
else else
{ {
switch (c) while (i < buffer.Length &&
!char.IsWhiteSpace(buffer[i]) &&
buffer[i] != ',' && buffer[i] != ']' && buffer[i] != '}')
{ {
case '"': i++;
inString = true;
sbJson.Append(c);
break;
case '{':
case '[':
depth++;
sbJson.Append(c);
break;
case '}':
case ']':
depth--;
sbJson.Append(c);
break;
default:
// Outside JSON structures, only allow certain characters
if (depth > 0 || char.IsLetterOrDigit(c) || char.IsWhiteSpace(c) || ",:.-_".IndexOf(c) >= 0)
{
sbJson.Append(c);
}
else
{
extractedChars ??= new List<char>(capacity: 4);
extractedChars.Add(c);
}
break;
}
}
} }
if (extractedChars != null && extractedChars.Count > 0) return i;
{ }
// Convert char list to string array
var current = new string[extractedChars.Count];
for (int i = 0; i < extractedChars.Count; i++)
{
current[i] = extractedChars[i].ToString();
}
extractedTexts = current;
}
else
{
extractedTexts = Array.Empty<string>();
}
string result = sbJson.ToString().Trim();
// Recompute capacity if needed
if (sbJson.Capacity > MAX_CAP)
{
sbJson.Capacity = Math.Max(MAX_CAP, sbJson.Length);
}
return result;
} }
} }
} }