Updated
This commit is contained in:
@@ -44,8 +44,9 @@ public sealed class FileLoggerProvider : BatchingLoggerProvider, IDisposable
|
||||
private readonly ConcurrentDictionary<string, FileState> _files = new();
|
||||
private readonly ConcurrentDictionary<string, ConcurrentQueue<LogMessage>> _messageQueues = new();
|
||||
|
||||
private const int BufferSize = 1024 * 1024;
|
||||
private const int BufferSize = 4 * 1024 * 1024; // 4 MB
|
||||
private static readonly Encoding Utf8 = new UTF8Encoding(false);
|
||||
private readonly SemaphoreSlim _queueSignal = new(0);
|
||||
|
||||
public bool IncludeCorrelationId { get; }
|
||||
public bool EnableCategoryRouting { get; }
|
||||
@@ -58,7 +59,7 @@ public sealed class FileLoggerProvider : BatchingLoggerProvider, IDisposable
|
||||
public event EventHandler<string> OnRollOver;
|
||||
|
||||
private readonly Timer _flushTimer;
|
||||
private readonly TimeSpan _flushInterval = TimeSpan.FromSeconds(1);
|
||||
private readonly TimeSpan _flushInterval = TimeSpan.FromMilliseconds(100);
|
||||
private readonly string _fallbackPath;
|
||||
private readonly Aes _aes;
|
||||
|
||||
@@ -252,58 +253,83 @@ public sealed class FileLoggerProvider : BatchingLoggerProvider, IDisposable
|
||||
|
||||
private async Task FlushMessagesBatchAsync(FileState state, ConcurrentQueue<LogMessage> queue)
|
||||
{
|
||||
const int maxBatch = 128;
|
||||
const int maxBatch = 5000;
|
||||
int batchCount = 0;
|
||||
|
||||
await state.WriteLock.WaitAsync();
|
||||
try
|
||||
// Rent a buffer
|
||||
byte[] combined = ArrayPool<byte>.Shared.Rent(BufferSize);
|
||||
int pos = 0;
|
||||
|
||||
while (queue.TryDequeue(out var msg) && batchCount < maxBatch)
|
||||
{
|
||||
while (queue.TryDequeue(out var msg) && batchCount < maxBatch)
|
||||
// Rotate file if date changed
|
||||
var msgDate = msg.Timestamp.UtcDateTime.Date;
|
||||
if (state.Date != msgDate)
|
||||
{
|
||||
var msgDate = msg.Timestamp.UtcDateTime.Date;
|
||||
if (state.Date != msgDate)
|
||||
{
|
||||
await FlushBufferAsync(state).ConfigureAwait(false);
|
||||
RotateByDate(state, msgDate, string.Empty);
|
||||
}
|
||||
|
||||
var text = BuildMessage(msg);
|
||||
int byteCount = Utf8.GetByteCount(text);
|
||||
|
||||
// Flush buffer if message won't fit
|
||||
if (state.BufferPosition + byteCount > BufferSize)
|
||||
{
|
||||
await FlushBufferAsync(state).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
// Encode directly into buffer
|
||||
Utf8.GetBytes(text, 0, text.Length, state.Buffer, state.BufferPosition);
|
||||
state.BufferPosition += byteCount;
|
||||
state.Size += byteCount;
|
||||
|
||||
batchCount++;
|
||||
await FlushBufferAsync(state).ConfigureAwait(false);
|
||||
RotateByDate(state, msgDate, string.Empty);
|
||||
}
|
||||
|
||||
// Encrypt buffer in place if needed
|
||||
if (_isEncryptionEnabled && state.BufferPosition > 0)
|
||||
var msgText = BuildMessage(msg);
|
||||
int byteCount = Utf8.GetByteCount(msgText);
|
||||
|
||||
// Flush buffer if message won't fit
|
||||
if (pos + byteCount > combined.Length)
|
||||
{
|
||||
var encrypted = Encrypt(state.Buffer.AsSpan(0, state.BufferPosition).ToArray());
|
||||
await state.Stream.WriteAsync(encrypted, 0, encrypted.Length).ConfigureAwait(false);
|
||||
await state.Stream.FlushAsync().ConfigureAwait(false);
|
||||
Array.Clear(encrypted, 0, encrypted.Length);
|
||||
state.BufferPosition = 0;
|
||||
}
|
||||
else if (state.BufferPosition > 0)
|
||||
{
|
||||
await state.Stream.WriteAsync(state.Buffer, 0, state.BufferPosition).ConfigureAwait(false);
|
||||
await state.Stream.FlushAsync().ConfigureAwait(false);
|
||||
state.BufferPosition = 0;
|
||||
await FlushBufferAsync(state).ConfigureAwait(false);
|
||||
pos = 0;
|
||||
}
|
||||
|
||||
Utf8.GetBytes(msgText, 0, msgText.Length, combined, pos);
|
||||
pos += byteCount;
|
||||
batchCount++;
|
||||
}
|
||||
finally
|
||||
|
||||
if (pos == 0)
|
||||
{
|
||||
state.WriteLock.Release();
|
||||
ArrayPool<byte>.Shared.Return(combined);
|
||||
return;
|
||||
}
|
||||
|
||||
byte[] dataToWrite;
|
||||
|
||||
if (_isEncryptionEnabled)
|
||||
{
|
||||
// Encrypt creates a new array, old buffer cleared immediately
|
||||
dataToWrite = Encrypt(combined.AsSpan(0, pos).ToArray());
|
||||
Array.Clear(combined, 0, pos);
|
||||
}
|
||||
else
|
||||
{
|
||||
dataToWrite = combined;
|
||||
}
|
||||
|
||||
// Flush buffer if needed
|
||||
if (state.BufferPosition + dataToWrite.Length > BufferSize)
|
||||
{
|
||||
await FlushBufferAsync(state).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
// Rollover if file exceeds max size
|
||||
if (_maxFileSize > 0 && state.Size + dataToWrite.Length > _maxFileSize)
|
||||
{
|
||||
await FlushBufferAsync(state).ConfigureAwait(false);
|
||||
RollOverAndCompressOldest(state, string.Empty);
|
||||
}
|
||||
|
||||
// Copy into buffer
|
||||
Array.Copy(dataToWrite, 0, state.Buffer, state.BufferPosition, dataToWrite.Length);
|
||||
state.BufferPosition += dataToWrite.Length;
|
||||
state.Size += dataToWrite.Length;
|
||||
|
||||
if (!_isEncryptionEnabled)
|
||||
{
|
||||
// return pooled buffer if not encrypted
|
||||
ArrayPool<byte>.Shared.Return(dataToWrite);
|
||||
}
|
||||
|
||||
// Clear encrypted array immediately
|
||||
Array.Clear(dataToWrite, 0, dataToWrite.Length);
|
||||
}
|
||||
|
||||
private async Task FlushBufferAsync(FileState state, CancellationToken token = default)
|
||||
@@ -344,14 +370,21 @@ public sealed class FileLoggerProvider : BatchingLoggerProvider, IDisposable
|
||||
|
||||
foreach (var f in files)
|
||||
{
|
||||
_compressionQueue.Enqueue(f.FullName);
|
||||
EnqueueCompression(f.FullName);
|
||||
}
|
||||
}
|
||||
|
||||
private void EnqueueCompression(string file)
|
||||
{
|
||||
_compressionQueue.Enqueue(file);
|
||||
_queueSignal.Release();
|
||||
}
|
||||
|
||||
private async Task CompressionWorkerAsync()
|
||||
{
|
||||
while (!_disposed)
|
||||
{
|
||||
await _queueSignal.WaitAsync();
|
||||
if (_compressionQueue.TryDequeue(out var filePath))
|
||||
{
|
||||
await _compressionSemaphore.WaitAsync();
|
||||
@@ -364,23 +397,22 @@ public sealed class FileLoggerProvider : BatchingLoggerProvider, IDisposable
|
||||
_compressionSemaphore.Release();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Sleep longer if no work to reduce CPU
|
||||
await Task.Delay(2000);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private async Task CompressOldLogFileAsync(string filePath, int retryCount = 3)
|
||||
{
|
||||
if (!File.Exists(filePath) || filePath.EndsWith(".gz", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var dir = Path.GetDirectoryName(filePath);
|
||||
var name = Path.GetFileNameWithoutExtension(filePath);
|
||||
var ext = Path.GetExtension(filePath);
|
||||
var name = Path.GetFileNameWithoutExtension(filePath); // "app_20260212_1"
|
||||
var ext = Path.GetExtension(filePath); // ".log"
|
||||
|
||||
// Determine the next _N.log.gz name
|
||||
int suffix = 1;
|
||||
string compressedFile;
|
||||
do
|
||||
@@ -395,16 +427,17 @@ public sealed class FileLoggerProvider : BatchingLoggerProvider, IDisposable
|
||||
{
|
||||
using var original = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read);
|
||||
using var compressed = new FileStream(compressedFile, FileMode.CreateNew, FileAccess.Write);
|
||||
using var gzip = new GZipStream(compressed, CompressionLevel.Fastest); // faster
|
||||
using var gzip = new GZipStream(compressed, CompressionLevel.Optimal);
|
||||
|
||||
await original.CopyToAsync(gzip).ConfigureAwait(false);
|
||||
await gzip.FlushAsync().ConfigureAwait(false);
|
||||
|
||||
File.Delete(filePath);
|
||||
return;
|
||||
return; // success
|
||||
}
|
||||
catch (IOException)
|
||||
{
|
||||
// File busy? Wait a bit and retry
|
||||
await Task.Delay(200);
|
||||
}
|
||||
catch (Exception ex)
|
||||
@@ -678,7 +711,7 @@ public sealed class FileLoggerProvider : BatchingLoggerProvider, IDisposable
|
||||
var oldestFile = Path.Combine(dir, $"{name}_{_maxRolloverFiles}{ext}");
|
||||
if (File.Exists(oldestFile))
|
||||
{
|
||||
_compressionQueue.Enqueue(oldestFile);
|
||||
EnqueueCompression(oldestFile);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user