This commit is contained in:
2026-01-21 22:13:31 +01:00
parent 3dce3cc56b
commit abc50d0c11
5 changed files with 314 additions and 299 deletions

View File

@@ -0,0 +1,69 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace EonaCat.Connections.Client
{
// Root myDeserializedClass = JsonConvert.DeserializeObject<List<Root>>(myJsonResponse);
public class Contact
{
public string email { get; set; }
public string phone { get; set; }
}
public class Department
{
public string id { get; set; }
public string name { get; set; }
public Manager manager { get; set; }
}
public class Details
{
public int hoursSpent { get; set; }
public List<string> technologiesUsed { get; set; }
public string completionDate { get; set; }
public string expectedCompletion { get; set; }
}
public class Employee
{
public string id { get; set; }
public string name { get; set; }
public string position { get; set; }
public Department department { get; set; }
public List<Project> projects { get; set; }
}
public class Manager
{
public string id { get; set; }
public string name { get; set; }
public Contact contact { get; set; }
}
public class Project
{
public string projectId { get; set; }
public string projectName { get; set; }
public string startDate { get; set; }
public List<Task> tasks { get; set; }
}
public class Root
{
public Employee employee { get; set; }
}
public class Task
{
public string taskId { get; set; }
public string title { get; set; }
public string status { get; set; }
public Details details { get; set; }
}
}

View File

@@ -20,7 +20,7 @@ namespace EonaCat.Connections.Client.Example
//public static string SERVER_IP = "10.40.11.22";
public static string SERVER_IP = "127.0.0.1";
private static Dictionary<NetworkClient, JsonDataProcessor<dynamic>> _clientsProcessors = new Dictionary<NetworkClient, JsonDataProcessor<dynamic>>();
private static Dictionary<NetworkClient, JsonDataProcessor<List<Root>>> _clientsProcessors = new Dictionary<NetworkClient, JsonDataProcessor<List<Root>>>();
private static bool testDataProcessor;
public static bool WaitForMessage { get; private set; }
@@ -28,9 +28,9 @@ namespace EonaCat.Connections.Client.Example
public static bool ToConsoleOnly { get; private set; } = true;
public static bool UseJson { get; private set; } = true;
public static bool TESTBYTES { get; private set; }
public static bool UseJsonProcessorTest { get; private set; } = false;
public static bool UseJsonProcessorTest { get; private set; } = true;
public static async Task Main(string[] args)
public static async System.Threading.Tasks.Task Main(string[] args)
{
for (long i = 0; i < clientCount; i++)
{
@@ -40,7 +40,7 @@ namespace EonaCat.Connections.Client.Example
if (testDataProcessor)
{
_clientsProcessors[client] = new JsonDataProcessor<dynamic>();
_clientsProcessors[client] = new JsonDataProcessor<List<Root>>();
}
else
{
@@ -67,7 +67,7 @@ namespace EonaCat.Connections.Client.Example
break;
}
var jsonUrl = "https://samples.json-format.com/employees/json/employees_500KB.json";
var jsonUrl = "https://sample.json-format.com/api/download?url=https%3A%2F%2Ffiles.jsons.live%2Femployees%2F5-level%2F20-MB%2Fformatted.json&name=20%20MB%205%20Level%20Formatted.json";
try
{
@@ -98,14 +98,16 @@ namespace EonaCat.Connections.Client.Example
{
WriteToLog($"Processed JSON message from {e.ClientName} ({e.ClientEndpoint}): {e.RawData}");
};
processor.MaxAllowedBufferSize = 10 * 1024 * 1024; // 10 MB
processor.MaxAllowedBufferSize = 50 * 1024 * 1024; // 10 MB
processor.MaxMessagesPerBatch = 5;
var json = _jsonContent;
while (true)
{
json = "TEST1" + _jsonContent + "TEST2";
processor.Process(json, "TestClient");
await Task.Delay(100).ConfigureAwait(false);
await System.Threading.Tasks.Task.Delay(100).ConfigureAwait(false);
}
}
}
@@ -131,11 +133,11 @@ namespace EonaCat.Connections.Client.Example
}
}
await Task.Delay(1000).ConfigureAwait(false);
await System.Threading.Tasks.Task.Delay(1000).ConfigureAwait(false);
}
}
private static async Task StartClientAsync(string clientName, NetworkClient client)
private static async System.Threading.Tasks.Task StartClientAsync(string clientName, NetworkClient client)
{
await client.ConnectAsync().ConfigureAwait(false);
@@ -157,7 +159,7 @@ namespace EonaCat.Connections.Client.Example
UseAesEncryption = false,
EnableHeartbeat = IsHeartBeatEnabled,
AesPassword = "EonaCat.Connections.Password",
Certificate = new System.Security.Cryptography.X509Certificates.X509Certificate2("client.pfx", "p@ss"),
//Certificate = new System.Security.Cryptography.X509Certificates.X509Certificate2("client.pfx", "p@ss"),
};
var client = new NetworkClient(config);
@@ -172,8 +174,8 @@ namespace EonaCat.Connections.Client.Example
if (UseProcessor)
{
_clientsProcessors[client] = new JsonDataProcessor<object>();
_clientsProcessors[client].OnError += (sender, e) =>
_clientsProcessors[client] = new JsonDataProcessor<List<Root>>();
_clientsProcessors[client].OnMessageError += (sender, e) =>
{
Console.WriteLine($"Processor error: {e.Message}");
};
@@ -198,7 +200,7 @@ namespace EonaCat.Connections.Client.Example
{
if (UseProcessor)
{
_clientsProcessors[client].Process(e, currentClientName: e.Nickname);
_clientsProcessors[client].Process(e.StringData, clientName: e.Nickname);
return;
}
else

View File

@@ -56,7 +56,7 @@ namespace EonaCat.Connections.Server.Example
UseAesEncryption = false,
MaxConnections = 100000,
AesPassword = "EonaCat.Connections.Password",
Certificate = new System.Security.Cryptography.X509Certificates.X509Certificate2("server.pfx", "p@ss"),
//Certificate = new System.Security.Cryptography.X509Certificates.X509Certificate2("server.pfx", "p@ss"),
EnableHeartbeat = IsHeartBeatEnabled
};

View File

@@ -1,162 +1,281 @@
using EonaCat.Connections.Models;
using EonaCat.Json;
using System;
using System.Collections.Concurrent;
using EonaCat.Json;
using System.Text;
using System.Timers;
using Timer = System.Timers.Timer;
namespace EonaCat.Connections.Processors
{
public class ProcessedJsonMessage<TData>
{
public TData Data { get; set; } = default!;
public string RawData { get; set; } = string.Empty;
public string ClientName { get; set; } = string.Empty;
public string ClientEndpoint { get; set; } = string.Empty;
}
public class ProcessedTextMessage
{
public string Text { get; set; } = string.Empty;
public string ClientName { get; set; } = string.Empty;
}
public sealed class JsonDataProcessor<TData> : IDisposable
{
public int MaxAllowedBufferSize = 20 * 1024 * 1024;
public int MaxMessagesPerBatch = 200;
private readonly ConcurrentDictionary<string, BufferEntry> _buffers = new();
private readonly Timer _cleanupTimer;
private readonly TimeSpan _clientBufferTimeout = TimeSpan.FromMinutes(5);
private readonly StringBuilder _buffer = new StringBuilder();
private readonly object _syncLock = new object();
private bool _isDisposed;
public int MaxAllowedBufferSize { get; set; } = 30 * 1024 * 1024; // 30 MB
public int MaxMessagesPerBatch { get; set; } = 200;
public string ClientName { get; }
private sealed class BufferEntry
{
public readonly StringBuilder Buffer = new();
public DateTime LastUsed = DateTime.UtcNow;
public readonly object SyncRoot = new();
// Diagnostics
public long TotalBytesProcessed { get; private set; }
public long TotalChunksReceived { get; private set; }
public void Clear(bool shrink = false)
{
Buffer.Clear();
if (shrink && Buffer.Capacity > 1024)
{
Buffer.Capacity = 1024;
}
}
}
public event EventHandler<ProcessedMessage<TData>>? OnProcessMessage;
// Events
public event EventHandler<ProcessedJsonMessage<TData>>? OnProcessMessage;
public event EventHandler<ProcessedTextMessage>? OnProcessTextMessage;
public event EventHandler<Exception>? OnMessageError;
public event EventHandler<Exception>? OnError;
public JsonDataProcessor()
{
ClientName = Guid.NewGuid().ToString();
_cleanupTimer = new Timer(Math.Max(5000, _clientBufferTimeout.TotalMilliseconds / 5))
{
AutoReset = true
};
_cleanupTimer.Elapsed += CleanupInactiveClients;
_cleanupTimer.Start();
}
public void Process(string data, string? clientName = null)
public void Process(string jsonString, string? clientName = null, string? endpoint = null)
{
ThrowIfDisposed();
if (string.IsNullOrWhiteSpace(data))
if (string.IsNullOrEmpty(jsonString))
{
return;
}
TotalChunksReceived++;
TotalBytesProcessed += Encoding.UTF8.GetByteCount(jsonString);
ProcessInternal(jsonString, clientName, endpoint);
}
public void Process(DataReceivedEventArgs e, string? clientName = null)
{
ThrowIfDisposed();
if (e == null)
{
return;
}
string dataString;
if (e.IsBinary)
{
if (e.Data == null || e.Data.Length == 0)
{
return;
}
dataString = Encoding.UTF8.GetString(e.Data);
}
else
{
dataString = e.StringData;
}
if (string.IsNullOrWhiteSpace(dataString))
{
return;
}
string client = e.Nickname ?? clientName ?? ClientName;
TotalChunksReceived++;
TotalBytesProcessed += Encoding.UTF8.GetByteCount(dataString);
ProcessInternal(dataString, client, e.RemoteEndPoint?.ToString());
}
private void ProcessInternal(string jsonString, string? clientName, string? endpoint)
{
string client = clientName ?? ClientName;
ProcessInternal(data, client);
}
private void ProcessInternal(string incomingData, string clientName)
lock (_syncLock)
{
var bufferEntry = _buffers.GetOrAdd(clientName, _ => new BufferEntry());
lock (bufferEntry.SyncRoot)
{
bufferEntry.Buffer.Append(incomingData);
bufferEntry.LastUsed = DateTime.UtcNow;
_buffer.Append(jsonString);
int processedCount = 0;
while (processedCount < MaxMessagesPerBatch)
{
if (!JsonDataProcessorHelper.TryExtractFullJson(bufferEntry.Buffer,
out var fullJson, out var nonJson))
{
// partial JSON
break;
}
if (!nonJson.IsEmpty)
while (processedCount < MaxMessagesPerBatch &&
TryExtractFullJson(out int fullJsonStart, out int fullJsonLength,
out int textStart, out int textLength))
{
// --- Process leading text ---
if (textLength > 0)
{
string text = _buffer.ToString(textStart, textLength);
OnProcessTextMessage?.Invoke(this, new ProcessedTextMessage
{
Text = nonJson.ToString(),
ClientName = clientName,
ClientEndpoint = null
Text = text,
ClientName = client
});
// Remove processed text immediately
_buffer.Remove(textStart, textLength);
processedCount++;
continue;
}
if (!fullJson.IsEmpty)
// --- Process JSON ---
if (fullJsonLength > 0)
{
// We got a full JSON message
var jsonStr = fullJson.ToString();
ProcessDataReceived(jsonStr, clientName, null);
string json = _buffer.ToString(fullJsonStart, fullJsonLength);
try
{
var deserialized = JsonHelper.ToObject<TData>(json);
OnProcessMessage?.Invoke(this, new ProcessedJsonMessage<TData>
{
Data = deserialized,
RawData = json,
ClientName = client,
ClientEndpoint = endpoint ?? string.Empty
});
}
catch (Exception ex)
{
OnMessageError?.Invoke(this, new Exception(
$"Failed to deserialize JSON for client {client}", ex));
}
// Remove processed JSON immediately
_buffer.Remove(fullJsonStart, fullJsonLength);
processedCount++;
}
}
// Prevent buffer overflow
if (bufferEntry.Buffer.Length > MaxAllowedBufferSize)
// --- Prevent buffer overflow ---
if (_buffer.Length > MaxAllowedBufferSize)
{
OnError?.Invoke(this, new Exception($"Buffer overflow for client {clientName}."));
bufferEntry.Clear(shrink: true);
OnMessageError?.Invoke(this, new Exception(
$"Buffer overflow for client {client}. Current size: {_buffer.Length / 1024.0 / 1024.0:F2} MB. " +
$"Max allowed: {MaxAllowedBufferSize / 1024.0 / 1024.0:F2} MB. Clearing buffer."));
_buffer.Clear();
}
}
}
private void ProcessDataReceived(string json, string clientName, string? clientEndpoint)
private bool TryExtractFullJson(
out int fullJsonStart,
out int fullJsonLength,
out int textStart,
out int textLength)
{
try
fullJsonStart = fullJsonLength = textStart = textLength = 0;
if (_buffer.Length == 0)
{
var messages = JsonHelper.ToObjects<TData>(json);
if (messages != null)
return false;
}
int pos = 0;
// Skip leading whitespace
while (pos < _buffer.Length && char.IsWhiteSpace(_buffer[pos]))
{
foreach (var msg in messages)
pos++;
}
if (pos >= _buffer.Length)
{
OnProcessMessage?.Invoke(this, new ProcessedMessage<TData>
return false;
}
// --- Leading text before JSON ---
if (_buffer[pos] != '{' && _buffer[pos] != '[')
{
Data = msg,
RawData = json,
ClientName = clientName,
ClientEndpoint = clientEndpoint
});
textStart = pos;
while (pos < _buffer.Length && _buffer[pos] != '{' && _buffer[pos] != '[')
{
pos++;
}
textLength = pos - textStart;
return true;
}
// --- JSON token ---
fullJsonStart = pos;
fullJsonLength = FindJsonTokenEnd(_buffer, pos) - pos;
if (fullJsonLength <= 0)
{
return false; // partial JSON
}
return true;
}
private int FindJsonTokenEnd(StringBuilder buffer, int start)
{
if (start >= buffer.Length)
{
return 0;
}
int i = start;
char firstChar = buffer[start];
if (firstChar == '{' || firstChar == '[')
{
char open = firstChar;
char close = firstChar == '{' ? '}' : ']';
int depth = 1;
bool inString = false;
bool escape = false;
i++;
while (i < buffer.Length)
{
char c = buffer[i];
if (inString)
{
if (escape)
{
escape = false;
}
else if (c == '\\')
{
escape = true;
}
else if (c == '"')
{
inString = false;
}
}
}
catch (Exception ex)
else
{
OnError?.Invoke(this, new Exception($"Failed to process JSON for {clientName}.", ex));
if (c == '"')
{
inString = true;
}
else if (c == open)
{
depth++;
}
else if (c == close)
{
depth--;
if (depth == 0)
{
return i + 1;
}
}
}
private void CleanupInactiveClients(object? sender, ElapsedEventArgs e)
{
if (_isDisposed)
{
return;
i++;
}
DateTime now = DateTime.UtcNow;
foreach (var kvp in _buffers)
{
if (now - kvp.Value.LastUsed > _clientBufferTimeout)
{
if (_buffers.TryRemove(kvp.Key, out var removed))
{
lock (removed.SyncRoot)
{
removed.Clear(shrink: true);
}
}
}
return 0; // partial JSON
}
return 0; // only objects/arrays supported as JSON start
}
private void ThrowIfDisposed()
@@ -167,6 +286,23 @@ namespace EonaCat.Connections.Processors
}
}
public void ClearBuffer()
{
lock (_syncLock)
{
_buffer.Clear();
}
}
public void ResetStatistics()
{
lock (_syncLock)
{
TotalBytesProcessed = 0;
TotalChunksReceived = 0;
}
}
public void Dispose()
{
if (_isDisposed)
@@ -176,22 +312,15 @@ namespace EonaCat.Connections.Processors
_isDisposed = true;
_cleanupTimer.Stop();
_cleanupTimer.Dispose();
foreach (var entry in _buffers.Values)
lock (_syncLock)
{
lock (entry.SyncRoot)
{
entry.Clear(shrink: true);
}
_buffer.Clear();
}
_buffers.Clear();
// Nullify events to release subscriber references
OnProcessMessage = null;
OnProcessTextMessage = null;
OnMessageError = null;
OnError = null;
}
}
}

View File

@@ -1,185 +0,0 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace EonaCat.Connections.Processors
{
internal static class JsonDataProcessorHelper
{
internal static bool TryExtractFullJson(
StringBuilder buffer,
out ReadOnlyMemory<char> fullJson,
out ReadOnlyMemory<char> nonJson)
{
fullJson = ReadOnlyMemory<char>.Empty;
nonJson = ReadOnlyMemory<char>.Empty;
if (buffer.Length == 0)
{
return false;
}
int start = 0;
// Skip leading whitespace
while (start < buffer.Length && char.IsWhiteSpace(buffer[start]))
{
start++;
}
if (start >= buffer.Length)
{
return false;
}
// Detect non-JSON text before JSON
if (!IsJsonStart(buffer[start]) && !IsLiteralStart(buffer, start))
{
int nonJsonStart = start;
while (start < buffer.Length &&
!IsJsonStart(buffer[start]) &&
!IsLiteralStart(buffer, start))
{
start++;
}
if (start > nonJsonStart)
{
nonJson = buffer.ToString(nonJsonStart, start - nonJsonStart).AsMemory();
}
}
// Find full JSON token
int tokenEnd = FindJsonTokenEnd(buffer, start);
if (tokenEnd == 0)
{
// Partial JSON, leave buffer intact
return false;
}
fullJson = buffer.ToString(start, tokenEnd - start).AsMemory();
// Remove processed characters from buffer
buffer.Remove(0, tokenEnd);
return true;
}
private static bool IsJsonStart(char c) => c == '{' || c == '[' || c == '"' || c == '-' || (c >= '0' && c <= '9');
private static bool IsLiteralStart(StringBuilder buffer, int pos)
{
if (buffer.Length - pos >= 4)
{
var val = buffer.ToString(pos, 4);
if (val == "true" || val == "null")
{
return true;
}
}
if (buffer.Length - pos >= 5)
{
if (buffer.ToString(pos, 5) == "false")
{
return true;
}
}
return false;
}
private static int FindJsonTokenEnd(StringBuilder buffer, int start)
{
int depth = 0;
bool inString = false;
bool escape = false;
int i = start;
char firstChar = buffer[start];
if (firstChar == '{' || firstChar == '[')
{
depth = 1;
i++;
while (i < buffer.Length)
{
char c = buffer[i];
if (inString)
{
if (escape)
{
escape = false;
}
else if (c == '\\')
{
escape = true;
}
else if (c == '"')
{
inString = false;
}
}
else
{
if (c == '"')
{
inString = true;
}
else if (c == '{' || c == '[')
{
depth++;
}
else if (c == '}' || c == ']')
{
depth--;
}
if (depth == 0)
{
return i + 1;
}
}
i++;
}
// incomplete JSON
return 0;
}
else if (firstChar == '"')
{
i = start + 1;
while (i < buffer.Length)
{
char c = buffer[i];
if (escape)
{
escape = false;
}
else if (c == '\\')
{
escape = true;
}
else if (c == '"')
{
return i + 1;
}
i++;
}
// incomplete string
return 0;
}
else
{
while (i < buffer.Length &&
!char.IsWhiteSpace(buffer[i]) &&
buffer[i] != ',' && buffer[i] != ']' && buffer[i] != '}')
{
i++;
}
return i;
}
}
}
}