This commit is contained in:
Jeroen Saey
2026-02-13 10:42:23 +01:00
parent 0a188b4e86
commit 807fe7920e
3 changed files with 879 additions and 825 deletions

View File

@@ -13,8 +13,8 @@
<Copyright>EonaCat (Jeroen Saey)</Copyright> <Copyright>EonaCat (Jeroen Saey)</Copyright>
<PackageTags>EonaCat;Logger;EonaCatLogger;Log;Writer;Jeroen;Saey</PackageTags> <PackageTags>EonaCat;Logger;EonaCatLogger;Log;Writer;Jeroen;Saey</PackageTags>
<PackageIconUrl /> <PackageIconUrl />
<Version>1.7.1</Version> <Version>1.7.2</Version>
<FileVersion>1.7.1</FileVersion> <FileVersion>1.7.2</FileVersion>
<PackageReadmeFile>README.md</PackageReadmeFile> <PackageReadmeFile>README.md</PackageReadmeFile>
<GenerateDocumentationFile>True</GenerateDocumentationFile> <GenerateDocumentationFile>True</GenerateDocumentationFile>
<PackageLicenseFile>LICENSE</PackageLicenseFile> <PackageLicenseFile>LICENSE</PackageLicenseFile>
@@ -25,7 +25,7 @@
</PropertyGroup> </PropertyGroup>
<PropertyGroup> <PropertyGroup>
<EVRevisionFormat>1.7.1+{chash:10}.{c:ymd}</EVRevisionFormat> <EVRevisionFormat>1.7.2+{chash:10}.{c:ymd}</EVRevisionFormat>
<EVDefault>true</EVDefault> <EVDefault>true</EVDefault>
<EVInfo>true</EVInfo> <EVInfo>true</EVInfo>
<EVTagMatch>v[0-9]*</EVTagMatch> <EVTagMatch>v[0-9]*</EVTagMatch>

View File

@@ -29,6 +29,9 @@ public sealed class FileLoggerProvider : BatchingLoggerProvider, IDisposable
private readonly byte[] _encryptionIV; private readonly byte[] _encryptionIV;
private readonly bool _isEncryptionEnabled; private readonly bool _isEncryptionEnabled;
private const int MaxQueueSize = 50_000;
private const int MaxCategories = 256;
public bool IsEncryptionEnabled => _encryptionKey != null && _encryptionIV != null; public bool IsEncryptionEnabled => _encryptionKey != null && _encryptionIV != null;
private bool _disposed; private bool _disposed;
@@ -42,9 +45,29 @@ public sealed class FileLoggerProvider : BatchingLoggerProvider, IDisposable
private readonly LoggerScopedContext _context = new(); private readonly LoggerScopedContext _context = new();
private readonly ConcurrentDictionary<string, FileState> _files = new(); private readonly ConcurrentDictionary<string, FileState> _files = new();
private readonly ConcurrentDictionary<string, ConcurrentQueue<LogMessage>> _messageQueues = new();
private const int BufferSize = 4 * 1024 * 1024; // 4 MB private sealed class MessageQueue
{
public readonly ConcurrentQueue<LogMessage> Queue = new();
public int Count;
public bool TryDequeue(out LogMessage msg)
{
if (Queue.TryDequeue(out msg))
{
Interlocked.Decrement(ref Count);
return true;
}
msg = default;
return false;
}
}
private readonly ConcurrentDictionary<string, MessageQueue> _messageQueues = new();
private const int BufferSize = 256 * 1024; // 256 KB
private static readonly Encoding Utf8 = new UTF8Encoding(false); private static readonly Encoding Utf8 = new UTF8Encoding(false);
private readonly SemaphoreSlim _queueSignal = new(0); private readonly SemaphoreSlim _queueSignal = new(0);
@@ -165,42 +188,39 @@ public sealed class FileLoggerProvider : BatchingLoggerProvider, IDisposable
_ = PeriodicFlushAsync().ContinueWith(_ => Interlocked.Exchange(ref _isFlushing, 0)); _ = PeriodicFlushAsync().ContinueWith(_ => Interlocked.Exchange(ref _isFlushing, 0));
} }
internal override Task WriteMessagesAsync(IReadOnlyList<LogMessage> messages, CancellationToken token internal override Task WriteMessagesAsync(
) IReadOnlyList<LogMessage> messages,
CancellationToken token)
{ {
try foreach (var msg in messages)
{ {
var filtered = messages.Where(m => m.Level >= MinimumLogLevel).ToList(); if (msg.Level < MinimumLogLevel)
{
continue;
}
if (EnableCategoryRouting) var key = EnableCategoryRouting
? SanitizeCategory(msg.Category)
: string.Empty;
var mq = _messageQueues.GetOrAdd(key, _ => new MessageQueue());
if (Interlocked.Increment(ref mq.Count) > MaxQueueSize)
{ {
var grouped = filtered.GroupBy(m => SanitizeCategory(m.Category)); if (mq.Queue.TryDequeue(out _))
foreach (var group in grouped)
{ {
var queue = _messageQueues.GetOrAdd(group.Key, _ => new ConcurrentQueue<LogMessage>()); Interlocked.Decrement(ref mq.Count);
foreach (var msg in group)
{
queue.Enqueue(msg);
} }
} }
}
else mq.Queue.Enqueue(msg);
{
var queue = _messageQueues.GetOrAdd(string.Empty, _ => new ConcurrentQueue<LogMessage>());
foreach (var msg in filtered)
{
queue.Enqueue(msg);
}
}
}
catch (Exception ex)
{
OnError?.Invoke(this, new ErrorMessage { Exception = ex, Message = $"Failed to enqueue messages: {ex.Message}" });
} }
return Task.CompletedTask; return Task.CompletedTask;
} }
private async Task PeriodicFlushAsync() private async Task PeriodicFlushAsync()
{ {
if (_disposed) if (_disposed)
@@ -211,7 +231,7 @@ public sealed class FileLoggerProvider : BatchingLoggerProvider, IDisposable
foreach (var kv in _messageQueues) foreach (var kv in _messageQueues)
{ {
var key = kv.Key; var key = kv.Key;
var queue = kv.Value; var mq = kv.Value;
if (!_files.TryGetValue(key, out var state)) if (!_files.TryGetValue(key, out var state))
{ {
@@ -221,50 +241,78 @@ public sealed class FileLoggerProvider : BatchingLoggerProvider, IDisposable
if (!TryRecover(state)) if (!TryRecover(state))
{ {
// drop messages if recovery fails while (mq.Queue.TryDequeue(out _))
while (queue.TryDequeue(out _)) { } {
Interlocked.Decrement(ref mq.Count);
}
continue; continue;
} }
if (queue.IsEmpty) if (mq.Count == 0)
{ {
continue; continue;
} }
await state.WriteLock.WaitAsync(); // Non-blocking lock attempt
if (!state.WriteLock.Wait(0))
{
continue;
}
try try
{ {
await FlushMessagesBatchAsync(state, queue).ConfigureAwait(false); await FlushMessagesBatchAsync(state, mq)
.ConfigureAwait(false);
// If buffer has remaining data, flush it
if (state.BufferPosition > 0) if (state.BufferPosition > 0)
{ {
await FlushBufferAsync(state).ConfigureAwait(false); await FlushBufferAsync(state)
.ConfigureAwait(false);
} }
} }
finally finally
{ {
state.WriteLock.Release(); state.WriteLock.Release();
} }
// Cleanup empty categories
if (mq.Count == 0)
{
if (_messageQueues.TryRemove(key, out var removed))
{
while (removed.Queue.TryDequeue(out _)) { }
}
if (_files.TryRemove(key, out var removedState))
{
removedState.Dispose();
}
}
} }
QueueOldFilesForCompression(); QueueOldFilesForCompression();
} }
private async Task FlushMessagesBatchAsync(FileState state, ConcurrentQueue<LogMessage> queue)
private async Task FlushMessagesBatchAsync(FileState state, MessageQueue queue)
{ {
const int maxBatch = 5000; const int MaxBatch = 5000;
int batchCount = 0; int batchCount = 0;
// Start with a reasonably sized buffer from the pool // Rent buffer
int estimatedSize = 1024 * 64; // 64KB starting buffer int estimatedSize = 64 * 1024; // 64 KB
byte[] combined = ArrayPool<byte>.Shared.Rent(estimatedSize); byte[] combined = ArrayPool<byte>.Shared.Rent(estimatedSize);
int pos = 0; int pos = 0;
while (queue.TryDequeue(out var msg) && batchCount < maxBatch) try
{
while (queue.TryDequeue(out var msg) && batchCount < MaxBatch)
{ {
var msgDate = msg.Timestamp.UtcDateTime.Date; var msgDate = msg.Timestamp.UtcDateTime.Date;
// Rotate if date changed
if (state.Date != msgDate) if (state.Date != msgDate)
{ {
await FlushBufferAsync(state).ConfigureAwait(false); await FlushBufferAsync(state).ConfigureAwait(false);
@@ -280,30 +328,27 @@ public sealed class FileLoggerProvider : BatchingLoggerProvider, IDisposable
int newSize = Math.Max(combined.Length * 2, pos + requiredLength); int newSize = Math.Max(combined.Length * 2, pos + requiredLength);
byte[] newBuffer = ArrayPool<byte>.Shared.Rent(newSize); byte[] newBuffer = ArrayPool<byte>.Shared.Rent(newSize);
Array.Copy(combined, 0, newBuffer, 0, pos); Array.Copy(combined, 0, newBuffer, 0, pos);
ArrayPool<byte>.Shared.Return(combined); ArrayPool<byte>.Shared.Return(combined, clearArray: true);
combined = newBuffer; combined = newBuffer;
} }
// Write directly into combined buffer
pos += Utf8.GetBytes(messageString, 0, messageString.Length, combined, pos); pos += Utf8.GetBytes(messageString, 0, messageString.Length, combined, pos);
batchCount++; batchCount++;
} }
if (pos == 0) if (pos == 0)
{ {
ArrayPool<byte>.Shared.Return(combined);
return; // nothing to write return; // nothing to write
} }
// If encryption is enabled, encrypt into a new array
byte[] dataToWrite; byte[] dataToWrite;
if (_isEncryptionEnabled) if (_isEncryptionEnabled)
{ {
// Encrypt and clear combined buffer immediately
dataToWrite = Encrypt(combined.AsSpan(0, pos).ToArray()); dataToWrite = Encrypt(combined.AsSpan(0, pos).ToArray());
} }
else else
{ {
// No encryption: just use the pooled buffer slice directly
dataToWrite = combined; dataToWrite = combined;
} }
@@ -313,28 +358,28 @@ public sealed class FileLoggerProvider : BatchingLoggerProvider, IDisposable
await FlushBufferAsync(state).ConfigureAwait(false); await FlushBufferAsync(state).ConfigureAwait(false);
} }
// Rollover if file exceeds max size // Rollover
if (_maxFileSize > 0 && state.Size + pos > _maxFileSize) if (_maxFileSize > 0 && state.Size + pos > _maxFileSize)
{ {
await FlushBufferAsync(state).ConfigureAwait(false); await FlushBufferAsync(state).ConfigureAwait(false);
RollOverAndCompressOldest(state, string.Empty); RollOverAndCompressOldest(state, string.Empty);
} }
// Copy directly into the file buffer // Copy to file buffer
Array.Copy(dataToWrite, 0, state.Buffer, state.BufferPosition, pos); Array.Copy(dataToWrite, 0, state.Buffer, state.BufferPosition, pos);
state.BufferPosition += pos; state.BufferPosition += pos;
state.Size += pos; state.Size += pos;
// Clear sensitive data // Clear sensitive data
Array.Clear(dataToWrite, 0, pos); Array.Clear(dataToWrite, 0, dataToWrite.Length);
}
// Return buffer if unencrypted finally
if (!_isEncryptionEnabled)
{ {
ArrayPool<byte>.Shared.Return(dataToWrite); ArrayPool<byte>.Shared.Return(combined, clearArray: true);
} }
} }
private async Task FlushBufferAsync(FileState state, CancellationToken token = default) private async Task FlushBufferAsync(FileState state, CancellationToken token = default)
{ {
if (state.IsFaulted || state.BufferPosition == 0 || state.Stream == null) if (state.IsFaulted || state.BufferPosition == 0 || state.Stream == null)
@@ -394,7 +439,11 @@ public sealed class FileLoggerProvider : BatchingLoggerProvider, IDisposable
// Use a timeout to avoid hanging if disposed // Use a timeout to avoid hanging if disposed
if (!await _queueSignal.WaitAsync(TimeSpan.FromSeconds(1))) if (!await _queueSignal.WaitAsync(TimeSpan.FromSeconds(1)))
{ {
if (_disposed) break; if (_disposed)
{
break;
}
continue; continue;
} }
} }
@@ -486,12 +535,12 @@ public sealed class FileLoggerProvider : BatchingLoggerProvider, IDisposable
using var encryptor = _aes.CreateEncryptor(); using var encryptor = _aes.CreateEncryptor();
var encrypted = encryptor.TransformFinalBlock(plainBytes, 0, plainBytes.Length); var encrypted = encryptor.TransformFinalBlock(plainBytes, 0, plainBytes.Length);
// Clear plaintext bytes // Clear plaintext
Array.Clear(plainBytes, 0, plainBytes.Length); Array.Clear(plainBytes, 0, plainBytes.Length);
return encrypted; return encrypted;
} }
public byte[] Decrypt(byte[] encryptedData) public byte[] Decrypt(byte[] encryptedData)
{ {
if (!IsEncryptionEnabled || encryptedData == null || encryptedData.Length == 0) if (!IsEncryptionEnabled || encryptedData == null || encryptedData.Length == 0)
@@ -799,10 +848,6 @@ public sealed class FileLoggerProvider : BatchingLoggerProvider, IDisposable
_files.Clear(); _files.Clear();
_messageQueues.Clear(); _messageQueues.Clear();
// Signal compression worker to stop and wait for it to finish
_compressionSemaphore?.Dispose();
_queueSignal?.Dispose();
// Wait for compression worker to finish remaining tasks with timeout // Wait for compression worker to finish remaining tasks with timeout
try try
{ {
@@ -816,6 +861,12 @@ public sealed class FileLoggerProvider : BatchingLoggerProvider, IDisposable
// Do nothing // Do nothing
} }
_queueSignal.Release();
// Signal compression worker to stop and wait for it to finish
_compressionSemaphore?.Dispose();
_queueSignal?.Dispose();
_aes?.Dispose(); _aes?.Dispose();
} }
} }

View File

@@ -135,7 +135,10 @@ public abstract class BatchingLoggerProvider : ILoggerProvider, IDisposable
catch catch
{ {
// Handle any WaitHandle disposal race condition // Handle any WaitHandle disposal race condition
if (_disposed) break; if (_disposed)
{
break;
}
} }
continue; continue;
} }