Files
EonaCat.Logger/EonaCat.Logger/EonaCatCoreLogger/ElasticSearchLogger.cs
2026-01-12 22:09:07 +01:00

193 lines
6.6 KiB
C#

using EonaCat.Json;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace EonaCat.Logger.EonaCatCoreLogger
{
// This file is part of the EonaCat project(s) which is released under the Apache License.
// See the LICENSE file or go to https://EonaCat.com/License for full license details.
public class ElasticSearchLogger : ILogger, IDisposable
{
private readonly string _categoryName;
private readonly ElasticSearchLoggerOptions _options;
private readonly HttpClient _httpClient;
private readonly List<string> _buffer = new();
private readonly object _lock = new();
private readonly CancellationTokenSource _cts = new();
private readonly Task _flushTask;
private readonly LoggerScopedContext _context = new();
public bool IncludeCorrelationId { get; set; }
public event EventHandler<Exception> OnException;
public event EventHandler<string> OnInvalidStatusCode;
public ElasticSearchLogger(string categoryName, ElasticSearchLoggerOptions options)
{
_categoryName = categoryName;
_options = options;
IncludeCorrelationId = options.IncludeCorrelationId;
_httpClient = new HttpClient();
_flushTask = Task.Run(() => FlushLoopAsync(_cts.Token));
}
public void SetContext(string key, string value) => _context.Set(key, value);
public void ClearContext() => _context.Clear();
public string GetContext(string key) => _context.Get(key);
public IDisposable BeginScope<TState>(TState state) => _context.BeginScope(state);
public bool IsEnabled(LogLevel logLevel) => _options.IsEnabled;
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state,
Exception exception, Func<TState, Exception, string> formatter)
{
if (!_options.IsEnabled || formatter == null)
{
return;
}
if (IncludeCorrelationId)
{
var correlationId = _context.Get("CorrelationId") ?? Guid.NewGuid().ToString();
_context.Set("CorrelationId", correlationId);
}
var logDoc = new
{
timestamp = DateTime.UtcNow,
level = logLevel.ToString(),
category = _categoryName,
message = formatter(state, exception),
exception = exception?.ToString(),
eventId = eventId.Id,
customContext = _context.GetAll()
};
string json = JsonHelper.ToJson(logDoc);
lock (_lock)
{
_buffer.Add(json);
// Optional: drop oldest if buffer is too large
if (_buffer.Count > _options.MaxBufferSize)
{
_buffer.RemoveAt(0);
}
}
}
private async Task FlushLoopAsync(CancellationToken token)
{
try
{
while (!token.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(_options.FlushIntervalSeconds), token);
await FlushBufferAsync();
}
}
catch (OperationCanceledException) { }
await FlushBufferAsync(); // flush remaining logs on shutdown
}
private async Task FlushBufferAsync()
{
List<string> toSend;
lock (_lock)
{
if (_buffer.Count == 0)
{
return;
}
toSend = new List<string>(_buffer);
_buffer.Clear();
}
string indexName = $"{_options.IndexName}-{DateTime.UtcNow:yyyy.MM.dd}";
string url = $"{_options.Uri.TrimEnd('/')}/{(_options.UseBulkInsert ? "_bulk" : indexName + "/_doc")}";
using var request = new HttpRequestMessage(HttpMethod.Post, url);
request.Headers.Accept.ParseAdd("application/json");
if (!string.IsNullOrWhiteSpace(_options.Username))
{
var authToken = Convert.ToBase64String(Encoding.UTF8.GetBytes($"{_options.Username}:{_options.Password}"));
request.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", authToken);
}
foreach (var header in _options.CustomHeaders)
{
request.Headers.TryAddWithoutValidation(header.Key, header.Value);
}
var dynamicHeaders = new Dictionary<string, string>
{
{ "index", _options.IndexName },
{ "date", DateTime.UtcNow.ToString("yyyy-MM-dd") },
{ "timestamp", DateTime.UtcNow.ToString("o") },
};
foreach (var header in _options.TemplateHeaders)
{
var value = ReplaceTemplate(header.Value, dynamicHeaders);
request.Headers.TryAddWithoutValidation(header.Key, value);
}
foreach (var kv in _context.GetAll())
{
request.Headers.TryAddWithoutValidation($"X-Context-{kv.Key}", kv.Value);
}
request.Content = new StringContent(
_options.UseBulkInsert
? string.Join("\n", toSend.Select(d => $"{{\"index\":{{}}}}\n{d}")) + "\n"
: string.Join("\n", toSend),
Encoding.UTF8,
"application/json"
);
try
{
var response = await _httpClient.SendAsync(request);
if (!response.IsSuccessStatusCode)
{
var errorContent = await response.Content.ReadAsStringAsync();
OnInvalidStatusCode?.Invoke(this, $"ElasticSearch request failed: {response.StatusCode}, {errorContent}");
}
}
catch (Exception ex)
{
OnException?.Invoke(this, ex);
}
}
private static string ReplaceTemplate(string template, Dictionary<string, string> values)
{
foreach (var kv in values)
{
template = template.Replace($"{{{kv.Key}}}", kv.Value);
}
return template;
}
public void Dispose()
{
_cts.Cancel();
_flushTask.Wait();
_cts.Dispose();
_httpClient.Dispose();
}
}
}