Files
EonaCat.Logger/EonaCat.Logger/EonaCatCoreLogger/TcpLogger.cs
2026-02-03 20:46:51 +01:00

258 lines
9.2 KiB
C#

using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace EonaCat.Logger.EonaCatCoreLogger
{
public sealed class TcpLogger : ILogger, IDisposable
{
private readonly string _categoryName;
private readonly TcpLoggerOptions _options;
private readonly LoggerScopedContext _context = new();
private readonly Channel<string> _logChannel;
private readonly CancellationTokenSource _cts = new();
private readonly Task _processingTask;
private const int MaxQueueSize = 5000;
private const int MaxBatchSize = 20; // send logs in batches to reduce TCP writes
public bool IncludeCorrelationId { get; set; }
public event EventHandler<Exception> OnException;
public event EventHandler<string> OnLogDropped;
public TcpLogger(string categoryName, TcpLoggerOptions options)
{
_categoryName = categoryName ?? throw new ArgumentNullException(nameof(categoryName));
_options = options ?? throw new ArgumentNullException(nameof(options));
IncludeCorrelationId = options.IncludeCorrelationId;
_logChannel = Channel.CreateBounded<string>(new BoundedChannelOptions(MaxQueueSize)
{
FullMode = BoundedChannelFullMode.DropOldest,
SingleReader = true,
SingleWriter = false
});
// Start background processing task
_processingTask = Task.Run(() => ProcessLogQueueAsync(_cts.Token), _cts.Token);
}
public IDisposable BeginScope<TState>(TState state) => _context.BeginScope(state);
public bool IsEnabled(LogLevel logLevel) => _options.IsEnabled;
public void SetContext(string key, string value) => _context.Set(key, value);
public void ClearContext() => _context.Clear();
public string GetContext(string key) => _context.Get(key);
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state,
Exception exception, Func<TState, Exception, string> formatter)
{
if (!IsEnabled(logLevel) || formatter == null)
return;
try
{
if (IncludeCorrelationId)
{
var correlationId = _context.Get("CorrelationId") ?? Guid.NewGuid().ToString();
_context.Set("CorrelationId", correlationId);
}
string message = formatter(state, exception);
var sb = new StringBuilder(256);
sb.Append('[').Append(DateTime.UtcNow.ToString("u")).Append("] | ");
sb.Append('[').Append(logLevel).Append("] | ");
sb.Append('[').Append(_categoryName).Append("] | ");
sb.Append("Message: ").Append(message);
var contextData = _context.GetAll();
if (contextData.Count > 0)
{
sb.Append(" | Context: ");
foreach (var kvp in contextData)
sb.Append(kvp.Key).Append("=").Append(kvp.Value).Append("; ");
}
if (exception != null)
sb.Append(" | Exception: ").Append(exception);
if (!_logChannel.Writer.TryWrite(sb.ToString()))
OnLogDropped?.Invoke(this, sb.ToString()); // notify if log is dropped
}
catch (Exception ex)
{
OnException?.Invoke(this, ex);
}
}
private async Task ProcessLogQueueAsync(CancellationToken token)
{
TcpClient client = null;
StreamWriter writer = null;
while (!token.IsCancellationRequested)
{
try
{
// Attempt to connect with exponential backoff
int attempt = 0;
while (client == null || !client.Connected)
{
try
{
client?.Dispose();
client = new TcpClient();
await ConnectWithRetryAsync(client, _options.Host, _options.Port, token);
writer = new StreamWriter(client.GetStream(), Encoding.UTF8) { AutoFlush = true };
break; // connected
}
catch (Exception ex)
{
OnException?.Invoke(this, ex);
attempt++;
int delay = Math.Min(1000 * (int)Math.Pow(2, attempt), 30000); // max 30s backoff
await Task.Delay(delay, token).ConfigureAwait(false);
}
}
// Process logs in batches
var batch = new List<string>(MaxBatchSize);
await foreach (var log in _logChannel.Reader.ReadAllAsync(token))
{
batch.Add(log);
if (batch.Count >= MaxBatchSize)
{
await SafeWriteBatch(writer, batch, client, token);
batch.Clear();
}
}
// Flush any remaining logs
if (batch.Count > 0)
await SafeWriteBatch(writer, batch, client, token);
}
catch (OperationCanceledException)
{
break; // normal shutdown
}
catch (Exception ex)
{
OnException?.Invoke(this, ex);
await Task.Delay(1000, token); // prevent tight loop
}
}
// Cleanup
writer?.Dispose();
client?.Dispose();
}
private async Task ConnectWithRetryAsync(TcpClient client, string host, int port, CancellationToken token)
{
int attempt = 0;
while (!token.IsCancellationRequested)
{
try
{
var connectTask = client.ConnectAsync(host, port);
var delayTask = Task.Delay(Timeout.Infinite, token);
var completed = await Task.WhenAny(connectTask, delayTask);
if (completed == connectTask)
{
await connectTask;
return;
}
else
{
token.ThrowIfCancellationRequested();
}
}
catch (Exception ex)
{
attempt++;
OnException?.Invoke(this, ex);
int delayMs = Math.Min(1000 * (int)Math.Pow(2, attempt), 30000);
await Task.Delay(delayMs, token);
client.Dispose();
client = new TcpClient();
}
}
}
private async Task SafeWriteBatch(StreamWriter writer, List<string> batch, TcpClient client, CancellationToken token)
{
try
{
if (writer != null && client?.Connected == true)
{
await writer.WriteLineAsync(string.Join(Environment.NewLine, batch)).ConfigureAwait(false);
}
else
{
// Optionally write to local file if TCP server is down
await FallbackWriteAsync(batch).ConfigureAwait(false);
}
}
catch (Exception ex)
{
OnException?.Invoke(this, ex);
await FallbackWriteAsync(batch).ConfigureAwait(false);
}
}
private async Task FallbackWriteAsync(List<string> batch)
{
try
{
if (!_options.EnableFallbackLogging)
{
return;
}
string fallbackFile = _options.FallbackLogFilePath ?? Path.Combine(AppContext.BaseDirectory, "tcp_logger_fallback.log");
Directory.CreateDirectory(Path.GetDirectoryName(fallbackFile)!);
using (var writer = new StreamWriter(fallbackFile, append: true, encoding: Encoding.UTF8))
{
foreach (var line in batch)
{
await writer.WriteLineAsync(line).ConfigureAwait(false);
}
}
}
catch
{
// Do nothing
}
}
public void Dispose()
{
_cts.Cancel();
_logChannel.Writer.Complete();
try
{
_processingTask.Wait(TimeSpan.FromSeconds(5));
}
catch
{
// Do nothing
}
_cts.Dispose();
}
}
}