Updated
This commit is contained in:
@@ -1,174 +1,243 @@
|
||||
using EonaCat.Json;
|
||||
using EonaCat.Json.Linq;
|
||||
using EonaCat.Connections.Models;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Text;
|
||||
using System.Timers;
|
||||
using Timer = System.Timers.Timer;
|
||||
|
||||
namespace EonaCat.Connections.Processors
|
||||
{
|
||||
// This file is part of the EonaCat project(s) which is released under the Apache License.
|
||||
// See the LICENSE file or go to https://EonaCat.com/license for full license details.
|
||||
|
||||
/// <summary>
|
||||
/// Processes incoming data streams into JSON or text messages per client buffer.
|
||||
/// </summary>
|
||||
public class JsonDataProcessor<TMessage> : IDisposable
|
||||
public sealed class JsonDataProcessor<TData> : IDisposable
|
||||
{
|
||||
private const int DefaultMaxBufferSize = 20 * 1024 * 1024; // 20 MB
|
||||
private const int DefaultMaxMessagesPerBatch = 200;
|
||||
private static readonly TimeSpan DefaultClientBufferTimeout = TimeSpan.FromMinutes(5);
|
||||
public int MaxAllowedBufferSize = 20 * 1024 * 1024;
|
||||
public int MaxMessagesPerBatch = 200;
|
||||
private const int MaxCleanupRemovalsPerTick = 50;
|
||||
|
||||
private readonly ConcurrentDictionary<string, BufferEntry> _buffers = new ConcurrentDictionary<string, BufferEntry>();
|
||||
private readonly Timer _cleanupTimer;
|
||||
private readonly ConcurrentDictionary<string, BufferEntry> _buffers = new();
|
||||
private readonly System.Timers.Timer _cleanupTimer;
|
||||
private readonly TimeSpan _clientBufferTimeout = TimeSpan.FromMinutes(5);
|
||||
private bool _isDisposed;
|
||||
|
||||
/// <summary>
|
||||
/// Maximum allowed buffer size in bytes (default: 20 MB).
|
||||
/// </summary>
|
||||
public int MaxAllowedBufferSize { get; set; } = DefaultMaxBufferSize;
|
||||
public string ClientName { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Maximum number of messages processed per batch (default: 200).
|
||||
/// </summary>
|
||||
public int MaxMessagesPerBatch { get; set; } = DefaultMaxMessagesPerBatch;
|
||||
|
||||
/// <summary>
|
||||
/// Default client name when one is not provided in <see cref="DataReceivedEventArgs"/>.
|
||||
/// </summary>
|
||||
public string ClientName { get; set; } = Guid.NewGuid().ToString();
|
||||
|
||||
public Action<TMessage, string, string> ProcessMessage { get; set; }
|
||||
public Action<string, string> ProcessTextMessage { get; set; }
|
||||
|
||||
public event EventHandler<Exception> OnMessageError;
|
||||
public event EventHandler<Exception> OnError;
|
||||
|
||||
private class BufferEntry
|
||||
private sealed class BufferEntry
|
||||
{
|
||||
public readonly StringBuilder Buffer = new StringBuilder();
|
||||
public readonly StringBuilder Buffer = new();
|
||||
public DateTime LastUsed = DateTime.UtcNow;
|
||||
public readonly object SyncRoot = new object();
|
||||
public readonly object SyncRoot = new();
|
||||
|
||||
public void Clear(bool shrink = false)
|
||||
{
|
||||
Buffer.Clear();
|
||||
if (shrink && Buffer.Capacity > 1024)
|
||||
{
|
||||
Buffer.Capacity = 1024;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public event EventHandler<ProcessedMessage<TData>>? OnProcessMessage;
|
||||
|
||||
public event EventHandler<ProcessedTextMessage>? OnProcessTextMessage;
|
||||
|
||||
public event EventHandler<Exception>? OnMessageError;
|
||||
|
||||
public event EventHandler<Exception>? OnError;
|
||||
|
||||
public JsonDataProcessor()
|
||||
{
|
||||
_cleanupTimer = new Timer(DefaultClientBufferTimeout.TotalMilliseconds / 5);
|
||||
_cleanupTimer.AutoReset = true;
|
||||
ClientName = Guid.NewGuid().ToString();
|
||||
|
||||
_cleanupTimer = new System.Timers.Timer(Math.Max(5000, _clientBufferTimeout.TotalMilliseconds / 5))
|
||||
{
|
||||
AutoReset = true
|
||||
};
|
||||
_cleanupTimer.Elapsed += CleanupInactiveClients;
|
||||
_cleanupTimer.Start();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Process incoming raw data.
|
||||
/// </summary>
|
||||
public void Process(DataReceivedEventArgs e)
|
||||
public void Process(DataReceivedEventArgs e, string? currentClientName = null)
|
||||
{
|
||||
EnsureNotDisposed();
|
||||
|
||||
if (e.IsBinary)
|
||||
{
|
||||
e.StringData = Encoding.UTF8.GetString(e.Data);
|
||||
}
|
||||
|
||||
if (string.IsNullOrWhiteSpace(e.StringData))
|
||||
{
|
||||
OnError?.Invoke(this, new Exception("Received empty data."));
|
||||
return;
|
||||
}
|
||||
|
||||
string clientName = string.IsNullOrWhiteSpace(e.Nickname) ? ClientName : e.Nickname;
|
||||
string incomingText = e.StringData.Trim();
|
||||
if (incomingText.Length == 0)
|
||||
ThrowIfDisposed();
|
||||
if (e == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
string endpoint = e.RemoteEndPoint?.ToString();
|
||||
string dataString = e.IsBinary ? Encoding.UTF8.GetString(e.Data ?? Array.Empty<byte>()) : e.StringData;
|
||||
if (string.IsNullOrWhiteSpace(dataString))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
string client = e.Nickname ?? currentClientName ?? ClientName;
|
||||
ProcessInternal(dataString.Trim(), client, endpoint);
|
||||
}
|
||||
|
||||
public void Process(string jsonString, string? currentClientName = null, string? endpoint = null)
|
||||
{
|
||||
ThrowIfDisposed();
|
||||
if (string.IsNullOrWhiteSpace(jsonString))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
string client = currentClientName ?? ClientName;
|
||||
ProcessInternal(jsonString.Trim(), client, endpoint);
|
||||
}
|
||||
|
||||
private void ProcessInternal(string jsonString, string clientName, string? clientEndpoint)
|
||||
{
|
||||
var bufferEntry = _buffers.GetOrAdd(clientName, _ => new BufferEntry());
|
||||
var pendingJson = new List<string>();
|
||||
var pendingText = new List<string>();
|
||||
|
||||
lock (bufferEntry.SyncRoot)
|
||||
{
|
||||
// Check for buffer overflow
|
||||
if (bufferEntry.Buffer.Length > MaxAllowedBufferSize)
|
||||
{
|
||||
bufferEntry.Buffer.Clear();
|
||||
OnError?.Invoke(this, new Exception($"Buffer overflow ({MaxAllowedBufferSize} bytes) for client {clientName} ({clientEndpoint})."));
|
||||
bufferEntry.Clear(shrink: true);
|
||||
}
|
||||
|
||||
bufferEntry.Buffer.Append(incomingText);
|
||||
bufferEntry.Buffer.Append(jsonString);
|
||||
bufferEntry.LastUsed = DateTime.UtcNow;
|
||||
|
||||
int processedCount = 0;
|
||||
|
||||
while (processedCount < MaxMessagesPerBatch &&
|
||||
ExtractNextJson(bufferEntry.Buffer, out var jsonChunk))
|
||||
JsonDataProcessorHelper.TryExtractCompleteJson(bufferEntry.Buffer, out string[] json, out string[] nonJsonText))
|
||||
{
|
||||
ProcessDataReceived(jsonChunk, clientName);
|
||||
processedCount++;
|
||||
// No more messages
|
||||
if ((json == null || json.Length == 0) && (nonJsonText == null || nonJsonText.Length == 0))
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
if (json != null && json.Length > 0)
|
||||
{
|
||||
foreach (var jsonMessage in json)
|
||||
{
|
||||
pendingJson.Add(jsonMessage);
|
||||
processedCount++;
|
||||
if (processedCount >= MaxMessagesPerBatch)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (nonJsonText != null && nonJsonText.Length > 0)
|
||||
{
|
||||
foreach (var textMessage in nonJsonText)
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(textMessage))
|
||||
{
|
||||
pendingText.Add(textMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Cleanup buffer if needed
|
||||
if (bufferEntry.Buffer.Capacity > MaxAllowedBufferSize / 2)
|
||||
{
|
||||
bufferEntry.Clear(shrink: true);
|
||||
}
|
||||
|
||||
// Handle leftover non-JSON text
|
||||
if (bufferEntry.Buffer.Length > 0 && !ContainsJsonStructure(bufferEntry.Buffer))
|
||||
{
|
||||
var leftover = bufferEntry.Buffer.ToString();
|
||||
bufferEntry.Buffer.Clear();
|
||||
ProcessTextMessage?.Invoke(leftover, clientName);
|
||||
string leftover = bufferEntry.Buffer.ToString();
|
||||
bufferEntry.Clear(shrink: true);
|
||||
if (!string.IsNullOrWhiteSpace(leftover))
|
||||
{
|
||||
pendingText.Add(leftover);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pendingText.Count > 0)
|
||||
{
|
||||
foreach (var textMessage in pendingText)
|
||||
{
|
||||
try
|
||||
{
|
||||
OnProcessTextMessage?.Invoke(this, new ProcessedTextMessage { Text = textMessage, ClientName = clientName, ClientEndpoint = clientEndpoint });
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
OnError?.Invoke(this, new Exception($"ProcessTextMessage handler threw for client {clientName} ({clientEndpoint}).", exception));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pendingJson.Count > 0)
|
||||
{
|
||||
foreach (var jsonMessage in pendingJson)
|
||||
{
|
||||
ProcessDataReceived(jsonMessage, clientName, clientEndpoint);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void ProcessDataReceived(string data, string clientName)
|
||||
private void ProcessDataReceived(string data, string clientName, string? clientEndpoint)
|
||||
{
|
||||
EnsureNotDisposed();
|
||||
|
||||
ThrowIfDisposed();
|
||||
if (string.IsNullOrWhiteSpace(data))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (string.IsNullOrWhiteSpace(clientName))
|
||||
{
|
||||
clientName = ClientName;
|
||||
}
|
||||
|
||||
bool looksLikeJson = data.Length > 1 &&
|
||||
((data[0] == '{' && data[data.Length - 1] == '}') ||
|
||||
(data[0] == '[' && data[data.Length - 1] == ']') ||
|
||||
data[0] == '"' || // string
|
||||
char.IsDigit(data[0]) || data[0] == '-' || // numbers
|
||||
data.StartsWith("true") ||
|
||||
data.StartsWith("false") ||
|
||||
data.StartsWith("null"));
|
||||
((data[0] == '{' && data[data.Length - 1] == '}') ||
|
||||
(data[0] == '[' && data[data.Length - 1] == ']'));
|
||||
|
||||
if (!looksLikeJson)
|
||||
{
|
||||
ProcessTextMessage?.Invoke(data, clientName);
|
||||
try
|
||||
{
|
||||
OnProcessTextMessage?.Invoke(this, new ProcessedTextMessage { Text = data, ClientName = clientName, ClientEndpoint = clientEndpoint });
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
OnError?.Invoke(this, new Exception($"ProcessTextMessage handler threw for client {clientName} ({clientEndpoint}).", exception));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
// Try to detect JSON-encoded exceptions
|
||||
if (data.IndexOf("Exception", StringComparison.OrdinalIgnoreCase) >= 0 ||
|
||||
data.IndexOf("Error", StringComparison.OrdinalIgnoreCase) >= 0)
|
||||
if (data.Contains("Exception", StringComparison.OrdinalIgnoreCase) ||
|
||||
data.Contains("Error", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
TryHandleJsonException(data);
|
||||
GetError(data);
|
||||
}
|
||||
|
||||
var messages = JsonHelper.ToObjects<TMessage>(data);
|
||||
if (messages != null && ProcessMessage != null)
|
||||
var messages = JsonHelper.ToObjects<TData>(data);
|
||||
if (messages != null)
|
||||
{
|
||||
foreach (var message in messages)
|
||||
{
|
||||
ProcessMessage(message, clientName, data);
|
||||
try
|
||||
{
|
||||
OnProcessMessage?.Invoke(this, new ProcessedMessage<TData> { Data = message, RawData = data, ClientName = clientName, ClientEndpoint = clientEndpoint });
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
OnError?.Invoke(this, new Exception($"ProcessMessage handler threw for client {clientName} ({clientEndpoint}).", exception));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
catch (Exception exception)
|
||||
{
|
||||
OnError?.Invoke(this, new Exception("Failed to process JSON message.", ex));
|
||||
OnError?.Invoke(this, new Exception($"Failed to process JSON message from {clientName} ({clientEndpoint}).", exception));
|
||||
}
|
||||
}
|
||||
|
||||
private void TryHandleJsonException(string data)
|
||||
private void GetError(string data)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -176,142 +245,25 @@ namespace EonaCat.Connections.Processors
|
||||
var exceptionToken = jsonObject.SelectToken("Exception");
|
||||
if (exceptionToken != null && exceptionToken.Type != JTokenType.Null)
|
||||
{
|
||||
var exception = JsonHelper.ExtractException(data);
|
||||
if (exception != null && OnMessageError != null)
|
||||
var extracted = JsonHelper.ExtractException(data);
|
||||
if (extracted != null)
|
||||
{
|
||||
OnMessageError(this, new Exception(exception.Message));
|
||||
OnMessageError?.Invoke(this, new Exception(extracted.Message));
|
||||
}
|
||||
}
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Ignore malformed exception JSON
|
||||
// Do nothing
|
||||
}
|
||||
}
|
||||
|
||||
private static bool ExtractNextJson(StringBuilder buffer, out string json)
|
||||
{
|
||||
json = null;
|
||||
if (buffer.Length == 0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
int depth = 0;
|
||||
bool inString = false, escape = false;
|
||||
int startIndex = -1;
|
||||
|
||||
for (int i = 0; i < buffer.Length; i++)
|
||||
{
|
||||
char c = buffer[i];
|
||||
|
||||
if (inString)
|
||||
{
|
||||
if (escape)
|
||||
{
|
||||
escape = false;
|
||||
}
|
||||
else if (c == '\\')
|
||||
{
|
||||
escape = true;
|
||||
}
|
||||
else if (c == '"')
|
||||
{
|
||||
inString = false;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
switch (c)
|
||||
{
|
||||
case '"':
|
||||
inString = true;
|
||||
if (depth == 0 && startIndex == -1)
|
||||
{
|
||||
startIndex = i; // string-only JSON
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case '{':
|
||||
case '[':
|
||||
if (depth == 0)
|
||||
{
|
||||
startIndex = i;
|
||||
}
|
||||
|
||||
depth++;
|
||||
break;
|
||||
|
||||
case '}':
|
||||
case ']':
|
||||
depth--;
|
||||
if (depth == 0 && startIndex != -1)
|
||||
{
|
||||
int length = i - startIndex + 1;
|
||||
json = buffer.ToString(startIndex, length);
|
||||
buffer.Remove(0, i + 1);
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
if (depth == 0 && startIndex == -1 &&
|
||||
(char.IsDigit(c) || c == '-' || c == 't' || c == 'f' || c == 'n'))
|
||||
{
|
||||
startIndex = i;
|
||||
int tokenEnd = FindPrimitiveEnd(buffer, i);
|
||||
json = buffer.ToString(startIndex, tokenEnd - startIndex);
|
||||
buffer.Remove(0, tokenEnd);
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private static int FindPrimitiveEnd(StringBuilder buffer, int startIndex)
|
||||
{
|
||||
// Keywords: true/false/null
|
||||
if (buffer.Length >= startIndex + 4 && buffer.ToString(startIndex, 4) == "true")
|
||||
{
|
||||
return startIndex + 4;
|
||||
}
|
||||
|
||||
if (buffer.Length >= startIndex + 5 && buffer.ToString(startIndex, 5) == "false")
|
||||
{
|
||||
return startIndex + 5;
|
||||
}
|
||||
|
||||
if (buffer.Length >= startIndex + 4 && buffer.ToString(startIndex, 4) == "null")
|
||||
{
|
||||
return startIndex + 4;
|
||||
}
|
||||
|
||||
// Numbers: scan until non-number/decimal/exponent
|
||||
int i = startIndex;
|
||||
while (i < buffer.Length)
|
||||
{
|
||||
char c = buffer[i];
|
||||
if (!(char.IsDigit(c) || c == '-' || c == '+' || c == '.' || c == 'e' || c == 'E'))
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
i++;
|
||||
}
|
||||
return i;
|
||||
}
|
||||
|
||||
private static bool ContainsJsonStructure(StringBuilder buffer)
|
||||
{
|
||||
for (int i = 0; i < buffer.Length; i++)
|
||||
{
|
||||
char c = buffer[i];
|
||||
if (c == '{' || c == '[' || c == '"' || c == 't' || c == 'f' || c == 'n' || c == '-' || char.IsDigit(c))
|
||||
if (c == '{' || c == '[')
|
||||
{
|
||||
return true;
|
||||
}
|
||||
@@ -319,22 +271,36 @@ namespace EonaCat.Connections.Processors
|
||||
return false;
|
||||
}
|
||||
|
||||
private void CleanupInactiveClients(object sender, ElapsedEventArgs e)
|
||||
private void CleanupInactiveClients(object? sender, ElapsedEventArgs e)
|
||||
{
|
||||
var now = DateTime.UtcNow;
|
||||
if (_isDisposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
DateTime now = DateTime.UtcNow;
|
||||
var keysToRemove = new List<string>(capacity: 128);
|
||||
|
||||
foreach (var kvp in _buffers)
|
||||
{
|
||||
var bufferEntry = kvp.Value;
|
||||
if (now - bufferEntry.LastUsed > DefaultClientBufferTimeout)
|
||||
if (now - kvp.Value.LastUsed > _clientBufferTimeout)
|
||||
{
|
||||
BufferEntry removed;
|
||||
if (_buffers.TryRemove(kvp.Key, out removed))
|
||||
keysToRemove.Add(kvp.Key);
|
||||
if (keysToRemove.Count >= MaxCleanupRemovalsPerTick)
|
||||
{
|
||||
lock (removed.SyncRoot)
|
||||
{
|
||||
removed.Buffer.Clear();
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the selected keys
|
||||
foreach (var key in keysToRemove)
|
||||
{
|
||||
if (_buffers.TryRemove(key, out var removed))
|
||||
{
|
||||
lock (removed.SyncRoot)
|
||||
{
|
||||
removed.Clear(shrink: true);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -347,21 +313,20 @@ namespace EonaCat.Connections.Processors
|
||||
return;
|
||||
}
|
||||
|
||||
BufferEntry removed;
|
||||
if (_buffers.TryRemove(clientName, out removed))
|
||||
if (_buffers.TryRemove(clientName, out var removed))
|
||||
{
|
||||
lock (removed.SyncRoot)
|
||||
{
|
||||
removed.Buffer.Clear();
|
||||
removed.Clear(shrink: true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void EnsureNotDisposed()
|
||||
private void ThrowIfDisposed()
|
||||
{
|
||||
if (_isDisposed)
|
||||
{
|
||||
throw new ObjectDisposedException(nameof(JsonDataProcessor<TMessage>));
|
||||
throw new ObjectDisposedException(nameof(JsonDataProcessor<TData>));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -372,30 +337,33 @@ namespace EonaCat.Connections.Processors
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
_cleanupTimer.Stop();
|
||||
_cleanupTimer.Elapsed -= CleanupInactiveClients;
|
||||
_cleanupTimer.Dispose();
|
||||
_isDisposed = true;
|
||||
|
||||
foreach (var bufferEntry in _buffers.Values)
|
||||
_cleanupTimer.Elapsed -= CleanupInactiveClients;
|
||||
_cleanupTimer.Stop();
|
||||
_cleanupTimer.Dispose();
|
||||
|
||||
foreach (var entry in _buffers.Values)
|
||||
{
|
||||
lock (entry.SyncRoot)
|
||||
{
|
||||
lock (bufferEntry.SyncRoot)
|
||||
{
|
||||
bufferEntry.Buffer.Clear();
|
||||
}
|
||||
entry.Clear(shrink: true);
|
||||
}
|
||||
_buffers.Clear();
|
||||
}
|
||||
|
||||
ProcessMessage = null;
|
||||
ProcessTextMessage = null;
|
||||
OnMessageError = null;
|
||||
OnError = null;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_isDisposed = true;
|
||||
}
|
||||
_buffers.Clear();
|
||||
OnProcessMessage = null;
|
||||
OnProcessTextMessage = null;
|
||||
OnMessageError = null;
|
||||
OnError = null;
|
||||
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal static class StringExtensions
|
||||
{
|
||||
internal static bool Contains(this string? source, string toCheck, StringComparison comp) =>
|
||||
source?.IndexOf(toCheck, comp) >= 0;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user