Updated
This commit is contained in:
@@ -5,7 +5,7 @@ namespace EonaCat.Connections.Processors
|
|||||||
{
|
{
|
||||||
public class ProcessedJsonMessage<TData>
|
public class ProcessedJsonMessage<TData>
|
||||||
{
|
{
|
||||||
public TData Data { get; set; } = default!;
|
public TData Data { get; set; }
|
||||||
public string RawData { get; set; } = string.Empty;
|
public string RawData { get; set; } = string.Empty;
|
||||||
public string ClientName { get; set; } = string.Empty;
|
public string ClientName { get; set; } = string.Empty;
|
||||||
public string ClientEndpoint { get; set; } = string.Empty;
|
public string ClientEndpoint { get; set; } = string.Empty;
|
||||||
@@ -23,7 +23,8 @@ namespace EonaCat.Connections.Processors
|
|||||||
private readonly object _syncLock = new object();
|
private readonly object _syncLock = new object();
|
||||||
private bool _isDisposed;
|
private bool _isDisposed;
|
||||||
|
|
||||||
public int MaxAllowedBufferSize { get; set; } = 30 * 1024 * 1024; // 30 MB
|
// 30 MB default max buffer size
|
||||||
|
public int MaxAllowedBufferSize { get; set; } = 30 * 1024 * 1024;
|
||||||
public int MaxMessagesPerBatch { get; set; } = 200;
|
public int MaxMessagesPerBatch { get; set; } = 200;
|
||||||
public string ClientName { get; }
|
public string ClientName { get; }
|
||||||
|
|
||||||
@@ -104,7 +105,7 @@ namespace EonaCat.Connections.Processors
|
|||||||
TryExtractFullJson(out int fullJsonStart, out int fullJsonLength,
|
TryExtractFullJson(out int fullJsonStart, out int fullJsonLength,
|
||||||
out int textStart, out int textLength))
|
out int textStart, out int textLength))
|
||||||
{
|
{
|
||||||
// --- Process leading text ---
|
// Check if we have any leading text to process
|
||||||
if (textLength > 0)
|
if (textLength > 0)
|
||||||
{
|
{
|
||||||
string text = _buffer.ToString(textStart, textLength);
|
string text = _buffer.ToString(textStart, textLength);
|
||||||
@@ -114,13 +115,13 @@ namespace EonaCat.Connections.Processors
|
|||||||
ClientName = client
|
ClientName = client
|
||||||
});
|
});
|
||||||
|
|
||||||
// Remove processed text immediately
|
// Remove the processed text immediately
|
||||||
_buffer.Remove(textStart, textLength);
|
_buffer.Remove(textStart, textLength);
|
||||||
processedCount++;
|
processedCount++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Process JSON ---
|
// Process the full JSON
|
||||||
if (fullJsonLength > 0)
|
if (fullJsonLength > 0)
|
||||||
{
|
{
|
||||||
string json = _buffer.ToString(fullJsonStart, fullJsonLength);
|
string json = _buffer.ToString(fullJsonStart, fullJsonLength);
|
||||||
@@ -148,7 +149,7 @@ namespace EonaCat.Connections.Processors
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Prevent buffer overflow ---
|
// Prevent buffer overflow
|
||||||
if (_buffer.Length > MaxAllowedBufferSize)
|
if (_buffer.Length > MaxAllowedBufferSize)
|
||||||
{
|
{
|
||||||
OnMessageError?.Invoke(this, new Exception(
|
OnMessageError?.Invoke(this, new Exception(
|
||||||
@@ -185,7 +186,7 @@ namespace EonaCat.Connections.Processors
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Leading text before JSON ---
|
// Json needs to start with { or [ (we dont care about numbers, strings, bools, nulls etc, THIS IS A BRASSER MOVE !!! WE DONT NEED SUCH JSON XD)
|
||||||
if (_buffer[pos] != '{' && _buffer[pos] != '[')
|
if (_buffer[pos] != '{' && _buffer[pos] != '[')
|
||||||
{
|
{
|
||||||
textStart = pos;
|
textStart = pos;
|
||||||
@@ -198,19 +199,20 @@ namespace EonaCat.Connections.Processors
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- JSON token ---
|
// Check if we also have a json end token
|
||||||
fullJsonStart = pos;
|
fullJsonStart = pos;
|
||||||
fullJsonLength = FindJsonTokenEnd(_buffer, pos) - pos;
|
fullJsonLength = FindJsonTokenEnd(_buffer, pos) - pos;
|
||||||
|
|
||||||
if (fullJsonLength <= 0)
|
if (fullJsonLength <= 0)
|
||||||
{
|
{
|
||||||
return false; // partial JSON
|
// partial JSON
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private int FindJsonTokenEnd(StringBuilder buffer, int start)
|
private static int FindJsonTokenEnd(StringBuilder buffer, int start)
|
||||||
{
|
{
|
||||||
if (start >= buffer.Length)
|
if (start >= buffer.Length)
|
||||||
{
|
{
|
||||||
@@ -272,10 +274,12 @@ namespace EonaCat.Connections.Processors
|
|||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0; // partial JSON
|
// partial JSON
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0; // only objects/arrays supported as JSON start
|
// only objects/arrays supported as JSON start
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void ThrowIfDisposed()
|
private void ThrowIfDisposed()
|
||||||
@@ -317,7 +321,6 @@ namespace EonaCat.Connections.Processors
|
|||||||
_buffer.Clear();
|
_buffer.Clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Nullify events to release subscriber references
|
|
||||||
OnProcessMessage = null;
|
OnProcessMessage = null;
|
||||||
OnProcessTextMessage = null;
|
OnProcessTextMessage = null;
|
||||||
OnMessageError = null;
|
OnMessageError = null;
|
||||||
|
|||||||
Reference in New Issue
Block a user