Files
EonaCat.Logger/EonaCat.Logger/EonaCatCoreLogger/BatchingDatabaseLogger.cs
2025-04-26 10:56:32 +02:00

188 lines
6.3 KiB
C#

using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data.Common;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace EonaCat.Logger.EonaCatCoreLogger
{
public class BatchingDatabaseLogger : ILogger, IDisposable
{
private readonly string _categoryName;
private readonly BatchingDatabaseLoggerOptions _options;
private readonly LoggerScopedContext _context = new();
private readonly BlockingCollection<LogEntry> _queue;
private readonly CancellationTokenSource _cts;
private readonly Task _processingTask;
public bool IncludeCorrelationId { get; set; }
public event EventHandler<Exception> OnException;
public BatchingDatabaseLogger(string categoryName, BatchingDatabaseLoggerOptions options)
{
_categoryName = categoryName;
_options = options;
IncludeCorrelationId = options.IncludeCorrelationId;
_queue = new BlockingCollection<LogEntry>(new ConcurrentQueue<LogEntry>());
_cts = new CancellationTokenSource();
_processingTask = Task.Run(ProcessQueueAsync);
}
public IDisposable BeginScope<TState>(TState state) => null;
public bool IsEnabled(LogLevel logLevel) => _options.IsEnabled;
public void SetContext(string key, string value) => _context.Set(key, value);
public void ClearContext() => _context.Clear();
public string GetContext(string key) => _context.Get(key);
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state,
Exception exception, Func<TState, Exception, string> formatter)
{
if (!IsEnabled(logLevel) || formatter == null) return;
var message = formatter(state, exception);
var correlationId = IncludeCorrelationId
? _context.Get("CorrelationId") ?? Guid.NewGuid().ToString()
: null;
if (correlationId != null)
{
_context.Set("CorrelationId", correlationId);
}
_queue.Add(new LogEntry
{
Timestamp = DateTime.UtcNow,
LogLevel = logLevel.ToString(),
Category = _categoryName,
Message = message,
Exception = exception?.ToString(),
CorrelationId = correlationId
});
}
private async Task ProcessQueueAsync()
{
var batch = new List<LogEntry>();
var timeoutMs = (int)Math.Min(_options.BatchInterval.TotalMilliseconds, int.MaxValue);
while (!_cts.Token.IsCancellationRequested)
{
try
{
if (_queue.TryTake(out var logEntry, timeoutMs, _cts.Token))
{
batch.Add(logEntry);
// Drain the queue quickly without waiting
while (_queue.TryTake(out var additionalEntry))
{
batch.Add(additionalEntry);
}
}
if (batch.Count > 0)
{
await InsertBatchSafelyAsync(batch);
}
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
OnException?.Invoke(this, ex);
}
}
// Final flush outside the loop
if (batch.Count > 0)
{
await InsertBatchSafelyAsync(batch);
}
}
private async Task InsertBatchSafelyAsync(List<LogEntry> batch)
{
try
{
await InsertBatchAsync(batch);
}
catch (Exception ex)
{
OnException?.Invoke(this, ex);
}
finally
{
batch.Clear();
}
}
private async Task InsertBatchAsync(List<LogEntry> batch)
{
using var connection = _options.DbProviderFactory.CreateConnection();
if (connection == null)
{
throw new InvalidOperationException("Failed to create database connection.");
}
connection.ConnectionString = _options.ConnectionString;
await connection.OpenAsync();
foreach (var entry in batch)
{
using var command = connection.CreateCommand();
command.CommandText = _options.InsertCommand;
command.Parameters.Clear();
command.Parameters.Add(CreateParameter(command, "Timestamp", entry.Timestamp));
command.Parameters.Add(CreateParameter(command, "LogLevel", entry.LogLevel));
command.Parameters.Add(CreateParameter(command, "Category", entry.Category));
command.Parameters.Add(CreateParameter(command, "Message", entry.Message));
command.Parameters.Add(CreateParameter(command, "Exception", entry.Exception));
command.Parameters.Add(CreateParameter(command, "CorrelationId", entry.CorrelationId));
await command.ExecuteNonQueryAsync();
}
}
private DbParameter CreateParameter(DbCommand command, string name, object value)
{
var param = command.CreateParameter();
param.ParameterName = $"@{name}";
param.Value = value ?? DBNull.Value;
return param;
}
public void Dispose()
{
_cts.Cancel();
_queue.CompleteAdding();
try
{
_processingTask.Wait(_options.ShutdownTimeout);
}
catch (Exception ex)
{
OnException?.Invoke(this, ex);
}
}
private class LogEntry
{
public DateTime Timestamp { get; set; }
public string LogLevel { get; set; }
public string Category { get; set; }
public string Message { get; set; }
public string Exception { get; set; }
public string CorrelationId { get; set; }
}
}
}