using EonaCat.LogStack.Core; using Microsoft.Extensions.Logging; using OpenTelemetry.Exporter; using OpenTelemetry.Logs; using OpenTelemetry.Resources; using System; using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using LogLevel = EonaCat.LogStack.Core.LogLevel; namespace EonaCat.LogStack.Flows { public sealed class OpenTelemetryFlow : FlowBase { private readonly ILoggerFactory _loggerFactory; private readonly ILogger _logger; public OpenTelemetryFlow(string serviceName, Uri endpoint, OtlpExportProtocol protocol = OtlpExportProtocol.Grpc, LogLevel minimumLevel = LogLevel.Trace) : base("OpenTelemetry:" + serviceName, minimumLevel) { if (string.IsNullOrWhiteSpace(serviceName)) { throw new ArgumentNullException(nameof(serviceName)); } if (endpoint == null) { throw new ArgumentNullException(nameof(endpoint)); } _loggerFactory = LoggerFactory.Create(builder => { builder.ClearProviders(); builder.AddOpenTelemetry(options => { options.SetResourceBuilder( ResourceBuilder.CreateDefault() .AddService(serviceName) .AddAttributes(new Dictionary { ["host.name"] = Environment.MachineName, ["process.id"] = Process.GetCurrentProcess().Id })); options.AddOtlpExporter(otlp => { otlp.Endpoint = endpoint; otlp.Protocol = protocol; }); options.IncludeScopes = true; options.IncludeFormattedMessage = true; options.ParseStateValues = true; }); }); _logger = _loggerFactory.CreateLogger(serviceName); } public override Task BlastAsync( LogEvent logEvent, CancellationToken cancellationToken = default) { if (!IsEnabled || !IsLogLevelEnabled(logEvent)) { return Task.FromResult(WriteResult.LevelFiltered); } WriteLog(logEvent); Interlocked.Increment(ref BlastedCount); return Task.FromResult(WriteResult.Success); } public override Task BlastBatchAsync( ReadOnlyMemory logEvents, CancellationToken cancellationToken = default) { if (!IsEnabled) { return Task.FromResult(WriteResult.FlowDisabled); } foreach (var e in logEvents.Span) { if (e.Level < MinimumLevel) { continue; } WriteLog(e); Interlocked.Increment(ref BlastedCount); } return Task.FromResult(WriteResult.Success); } private void WriteLog(LogEvent log) { var state = new List>(); if (!string.IsNullOrEmpty(log.Category)) { state.Add(new KeyValuePair("category", log.Category)); } foreach (var prop in log.Properties) { state.Add(new KeyValuePair(prop.Key, prop.Value ?? "null")); } if (log.Exception != null) { state.Add(new KeyValuePair("exception.type", log.Exception.GetType().FullName)); state.Add(new KeyValuePair("exception.message", log.Exception.Message)); state.Add(new KeyValuePair("exception.stacktrace", log.Exception.StackTrace)); } _logger.Log( MapLevel(log.Level), new EventId(0, log.Category), state, log.Exception, (s, e) => log.Message.ToString()); } private static Microsoft.Extensions.Logging.LogLevel MapLevel(LogLevel level) { return level switch { LogLevel.Trace => Microsoft.Extensions.Logging.LogLevel.Trace, LogLevel.Debug => Microsoft.Extensions.Logging.LogLevel.Debug, LogLevel.Information => Microsoft.Extensions.Logging.LogLevel.Information, LogLevel.Warning => Microsoft.Extensions.Logging.LogLevel.Warning, LogLevel.Error => Microsoft.Extensions.Logging.LogLevel.Error, LogLevel.Critical => Microsoft.Extensions.Logging.LogLevel.Critical, _ => Microsoft.Extensions.Logging.LogLevel.Information }; } public override async ValueTask DisposeAsync() { if (!IsEnabled) { return; } IsEnabled = false; _loggerFactory?.Dispose(); await base.DisposeAsync().ConfigureAwait(false); } public override Task FlushAsync(CancellationToken cancellationToken = default) { return Task.CompletedTask; } } }