Files
EonaCat.LogStack/EonaCat.LogStack.OpenTelemetryFlow/OpenTelemetryFlow.cs
2026-02-27 21:12:55 +01:00

162 lines
5.4 KiB
C#

using EonaCat.LogStack.Core;
using Microsoft.Extensions.Logging;
using OpenTelemetry.Exporter;
using OpenTelemetry.Logs;
using OpenTelemetry.Resources;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using LogLevel = EonaCat.LogStack.Core.LogLevel;
namespace EonaCat.LogStack.Flows
{
public sealed class OpenTelemetryFlow : FlowBase
{
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
public OpenTelemetryFlow(string serviceName, Uri endpoint, OtlpExportProtocol protocol = OtlpExportProtocol.Grpc, LogLevel minimumLevel = LogLevel.Trace) : base("OpenTelemetry:" + serviceName, minimumLevel)
{
if (string.IsNullOrWhiteSpace(serviceName))
{
throw new ArgumentNullException(nameof(serviceName));
}
if (endpoint == null)
{
throw new ArgumentNullException(nameof(endpoint));
}
_loggerFactory = LoggerFactory.Create(builder =>
{
builder.ClearProviders();
builder.AddOpenTelemetry(options =>
{
options.SetResourceBuilder(
ResourceBuilder.CreateDefault()
.AddService(serviceName)
.AddAttributes(new Dictionary<string, object>
{
["host.name"] = Environment.MachineName,
["process.id"] = Process.GetCurrentProcess().Id
}));
options.AddOtlpExporter(otlp =>
{
otlp.Endpoint = endpoint;
otlp.Protocol = protocol;
});
options.IncludeScopes = true;
options.IncludeFormattedMessage = true;
options.ParseStateValues = true;
});
});
_logger = _loggerFactory.CreateLogger(serviceName);
}
public override Task<WriteResult> BlastAsync(
LogEvent logEvent,
CancellationToken cancellationToken = default)
{
if (!IsEnabled || !IsLogLevelEnabled(logEvent))
{
return Task.FromResult(WriteResult.LevelFiltered);
}
WriteLog(logEvent);
Interlocked.Increment(ref BlastedCount);
return Task.FromResult(WriteResult.Success);
}
public override Task<WriteResult> BlastBatchAsync(
ReadOnlyMemory<LogEvent> logEvents,
CancellationToken cancellationToken = default)
{
if (!IsEnabled)
{
return Task.FromResult(WriteResult.FlowDisabled);
}
foreach (var e in logEvents.Span)
{
if (e.Level < MinimumLevel)
{
continue;
}
WriteLog(e);
Interlocked.Increment(ref BlastedCount);
}
return Task.FromResult(WriteResult.Success);
}
private void WriteLog(LogEvent log)
{
var state = new List<KeyValuePair<string, object>>();
if (!string.IsNullOrEmpty(log.Category))
{
state.Add(new KeyValuePair<string, object>("category", log.Category));
}
foreach (var prop in log.Properties)
{
state.Add(new KeyValuePair<string, object>(prop.Key, prop.Value ?? "null"));
}
if (log.Exception != null)
{
state.Add(new KeyValuePair<string, object>("exception.type", log.Exception.GetType().FullName));
state.Add(new KeyValuePair<string, object>("exception.message", log.Exception.Message));
state.Add(new KeyValuePair<string, object>("exception.stacktrace", log.Exception.StackTrace));
}
_logger.Log(
MapLevel(log.Level),
new EventId(0, log.Category),
state,
log.Exception,
(s, e) => log.Message.ToString());
}
private static Microsoft.Extensions.Logging.LogLevel MapLevel(LogLevel level)
{
return level switch
{
LogLevel.Trace => Microsoft.Extensions.Logging.LogLevel.Trace,
LogLevel.Debug => Microsoft.Extensions.Logging.LogLevel.Debug,
LogLevel.Information => Microsoft.Extensions.Logging.LogLevel.Information,
LogLevel.Warning => Microsoft.Extensions.Logging.LogLevel.Warning,
LogLevel.Error => Microsoft.Extensions.Logging.LogLevel.Error,
LogLevel.Critical => Microsoft.Extensions.Logging.LogLevel.Critical,
_ => Microsoft.Extensions.Logging.LogLevel.Information
};
}
public override async ValueTask DisposeAsync()
{
if (!IsEnabled)
{
return;
}
IsEnabled = false;
_loggerFactory?.Dispose();
await base.DisposeAsync().ConfigureAwait(false);
}
public override Task FlushAsync(CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
}
}