Files
EonaCat.LogStack/EonaCat.LogStack/EonaCatLoggerCore/Flows/DiagnosticsFlow.cs
2026-02-28 07:19:29 +01:00

300 lines
11 KiB
C#

using EonaCat.LogStack.Core;
using EonaCat.LogStack.EonaCatLogStackCore;
using EonaCat.LogStack.Extensions;
using Microsoft.Extensions.Primitives;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace EonaCat.LogStack.Flows
{
// This file is part of the EonaCat project(s) which is released under the Apache License.
// See the LICENSE file or go to https://EonaCat.com/License for full license details.
/// <summary>
/// Diagnostic counters snapshot emitted on a regular interval.
/// </summary>
public sealed class DiagnosticsSnapshot
{
public DateTime CapturedAt { get; internal set; }
public double CpuPercent { get; internal set; }
public long WorkingSetBytes { get; internal set; }
public long GcGen0 { get; internal set; }
public long GcGen1 { get; internal set; }
public long GcGen2 { get; internal set; }
public long ThreadCount { get; internal set; }
public long HandleCount { get; internal set; }
public double UptimeSeconds { get; internal set; }
public Dictionary<string, object> Custom { get; internal set; }
}
/// <summary>
/// A flow that periodically captures process diagnostics (CPU, memory, GC, threads)
/// and writes them as structured log events. Also acts as a pass-through: every
/// normal log event optionally gets runtime metrics injected as properties.
///
/// Additionally exposes an in-process <see cref="Counter"/> registry so application
/// code can record business metrics (request count, error rate, etc.) that are
/// flushed alongside diagnostic snapshots.
/// </summary>
public sealed class DiagnosticsFlow : FlowBase
{
/// <summary>Counter for business metrics.</summary>
public sealed class Counter
{
private long _value;
public string Name { get; }
public Counter(string name) { Name = name; }
public void Increment() { Interlocked.Increment(ref _value); }
public void IncrementBy(long delta) { Interlocked.Add(ref _value, delta); }
public void Reset() { Interlocked.Exchange(ref _value, 0); }
public long Value { get { return Interlocked.Read(ref _value); } }
}
private readonly ConcurrentDictionary<string, Counter> _counters
= new ConcurrentDictionary<string, Counter>(StringComparer.OrdinalIgnoreCase);
private readonly TimeSpan _snapshotInterval;
private readonly bool _injectIntoEvents;
private readonly bool _writeSnapshotEvents;
private readonly string _snapshotCategory;
private readonly IFlow _forwardTo;
private readonly Func<Dictionary<string, object>> _customMetricsFactory;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly Thread _samplerThread;
private readonly Stopwatch _uptime = Stopwatch.StartNew();
private volatile DiagnosticsSnapshot _latest;
private TimeSpan _lastCpuTime;
private DateTime _lastCpuSample;
private readonly Process _proc;
public DiagnosticsSnapshot LatestSnapshot { get { return _latest; } }
public DiagnosticsFlow(
TimeSpan snapshotInterval = default(TimeSpan),
bool injectIntoEvents = false,
bool writeSnapshotEvents = true,
string snapshotCategory = "Diagnostics",
IFlow forwardTo = null,
LogLevel minimumLevel = LogLevel.Trace,
Func<Dictionary<string, object>> customMetrics = null)
: base("Diagnostics", minimumLevel)
{
_snapshotInterval = snapshotInterval == default(TimeSpan)
? TimeSpan.FromSeconds(60)
: snapshotInterval;
_injectIntoEvents = injectIntoEvents;
_writeSnapshotEvents = writeSnapshotEvents;
_snapshotCategory = snapshotCategory ?? "Diagnostics";
_forwardTo = forwardTo;
_customMetricsFactory = customMetrics;
_proc = Process.GetCurrentProcess();
_lastCpuTime = _proc.TotalProcessorTime;
_lastCpuSample = DateTime.UtcNow;
_samplerThread = new Thread(SamplerLoop)
{
IsBackground = true,
Name = "DiagnosticsFlow.Sampler",
Priority = ThreadPriority.BelowNormal
};
_samplerThread.Start();
}
/// <summary>Gets or creates a named counter.</summary>
public Counter GetCounter(string name)
{
if (name == null)
{
throw new ArgumentNullException("name");
}
return _counters.GetOrAdd(name, n => new Counter(n));
}
/// <summary>Current value of a named counter (0 if not yet created).</summary>
public long ReadCounter(string name)
{
Counter c;
return _counters.TryGetValue(name, out c) ? c.Value : 0;
}
public override Task<WriteResult> BlastAsync(
LogEvent logEvent,
CancellationToken cancellationToken = default(CancellationToken))
{
if (!IsEnabled || !IsLogLevelEnabled(logEvent))
{
return Task.FromResult(WriteResult.LevelFiltered);
}
if (_injectIntoEvents)
{
DiagnosticsSnapshot snap = _latest;
if (snap != null)
{
logEvent.Properties.TryAdd("diag.mem_mb",(snap.WorkingSetBytes / 1024 / 1024).ToString());
logEvent.Properties.TryAdd("diag.cpu",snap.CpuPercent.ToString("F1"));
logEvent.Properties.TryAdd("diag.threads",snap.ThreadCount.ToString());
}
}
Interlocked.Increment(ref BlastedCount);
return Task.FromResult(WriteResult.Success);
}
public override Task<WriteResult> BlastBatchAsync(
ReadOnlyMemory<LogEvent> logEvents,
CancellationToken cancellationToken = default(CancellationToken))
{
if (!IsEnabled)
{
return Task.FromResult(WriteResult.FlowDisabled);
}
foreach (LogEvent e in logEvents.ToArray())
{
if (IsLogLevelEnabled(e))
{
BlastAsync(e, cancellationToken);
}
}
return Task.FromResult(WriteResult.Success);
}
public override Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
=> Task.FromResult(0);
public override async ValueTask DisposeAsync()
{
IsEnabled = false;
_cts.Cancel();
_samplerThread.Join(TimeSpan.FromSeconds(3));
_cts.Dispose();
await base.DisposeAsync().ConfigureAwait(false);
}
private void SamplerLoop()
{
while (!_cts.Token.IsCancellationRequested)
{
try
{
Thread.Sleep(_snapshotInterval);
DiagnosticsSnapshot snap = Capture();
_latest = snap;
if (_writeSnapshotEvents && _forwardTo != null)
{
LogEvent ev = BuildSnapshotEvent(snap);
_forwardTo.BlastAsync(ev).GetAwaiter().GetResult();
}
}
catch (ThreadInterruptedException) { break; }
catch (Exception ex)
{
Console.Error.WriteLine("[DiagnosticsFlow] Sampler error: " + ex.Message);
}
}
}
private DiagnosticsSnapshot Capture()
{
_proc.Refresh();
DateTime now = DateTime.UtcNow;
TimeSpan cpuNow = _proc.TotalProcessorTime;
double elapsed = (now - _lastCpuSample).TotalSeconds;
double cpu = elapsed > 0
? (cpuNow - _lastCpuTime).TotalSeconds / elapsed / Environment.ProcessorCount * 100.0
: 0;
_lastCpuTime = cpuNow;
_lastCpuSample = now;
Dictionary<string, object> custom = null;
if (_customMetricsFactory != null)
{
try { custom = _customMetricsFactory(); } catch { }
}
// Append counters to custom dict
if (_counters.Count > 0)
{
if (custom == null)
{
custom = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase);
}
foreach (KeyValuePair<string, Counter> kv in _counters)
{
custom["counter." + kv.Key] = kv.Value.Value;
}
}
return new DiagnosticsSnapshot
{
CapturedAt = now,
CpuPercent = Math.Round(cpu, 2),
WorkingSetBytes = _proc.WorkingSet64,
GcGen0 = GC.CollectionCount(0),
GcGen1 = GC.CollectionCount(1),
GcGen2 = GC.CollectionCount(2),
ThreadCount = _proc.Threads.Count,
HandleCount = _proc.HandleCount,
UptimeSeconds = _uptime.Elapsed.TotalSeconds,
Custom = custom
};
}
private LogEvent BuildSnapshotEvent(DiagnosticsSnapshot snap)
{
var sb = new StringBuilder(256);
sb.AppendFormat(
"Diagnostics | CPU={0:F1}% Mem={1}MB GC=[{2},{3},{4}] Threads={5} Handles={6} Uptime={7:F0}s",
snap.CpuPercent,
snap.WorkingSetBytes / 1024 / 1024,
snap.GcGen0, snap.GcGen1, snap.GcGen2,
snap.ThreadCount,
snap.HandleCount,
snap.UptimeSeconds);
var ev = new LogEvent
{
Level = LogLevel.Information,
Category = _snapshotCategory,
Message = new StringSegment(sb.ToString()),
Timestamp = snap.CapturedAt.Ticks
};
ev.Properties.TryAdd("cpu_pct", snap.CpuPercent.ToString("F2"));
ev.Properties.TryAdd("mem_bytes", snap.WorkingSetBytes.ToString());
ev.Properties.TryAdd("gc_gen0", snap.GcGen0.ToString());
ev.Properties.TryAdd("gc_gen1", snap.GcGen1.ToString());
ev.Properties.TryAdd("gc_gen2", snap.GcGen2.ToString());
ev.Properties.TryAdd("threads", snap.ThreadCount.ToString());
ev.Properties.TryAdd("handles", snap.HandleCount.ToString());
ev.Properties.TryAdd("uptime_s", snap.UptimeSeconds.ToString("F0"));
if (snap.Custom != null)
{
foreach (KeyValuePair<string, object> kv in snap.Custom)
{
ev.Properties.TryAdd(kv.Key, kv.Value != null ? kv.Value.ToString() : "null");
}
}
return ev;
}
}
}