Files
EonaCat.LogStack/EonaCat.LogStack.Status/Services/SyslogService.cs
2026-04-06 08:15:54 +02:00

241 lines
6.5 KiB
C#

using EonaCat.LogStack.Status.Models;
using EonaCat.LogStack.Status.Services;
using System.Net.Sockets;
using System.Text;
using System.Text.Json;
using System.Threading.Channels;
public class SyslogUdpService : BackgroundService
{
private readonly ILogger<SyslogUdpService> _logger;
private readonly IngestionService _ingestionService;
private readonly int _port;
private readonly Channel<LogEntry> _channel;
private readonly IServiceScopeFactory _scopeFactory;
public static bool IsRunning { get; private set; }
public SyslogUdpService(ILogger<SyslogUdpService> logger, IServiceScopeFactory scopeFactory, IConfiguration config)
{
_logger = logger;
_scopeFactory = scopeFactory;
_port = config.GetValue("Syslog:Port", 514);
_channel = Channel.CreateBounded<LogEntry>(new BoundedChannelOptions(10_000)
{
FullMode = BoundedChannelFullMode.DropOldest
});
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var receiverTask = ReceiveLoop(stoppingToken);
var processorTask = ProcessLoop(stoppingToken);
await Task.WhenAll(receiverTask, processorTask);
}
private async Task ReceiveLoop(CancellationToken token)
{
using var udpClient = new UdpClient(_port);
udpClient.Client.ReceiveBufferSize = 4 * 1024 * 1024;
_logger.LogInformation("Syslog UDP server listening on port {Port}", _port);
IsRunning = true;
while (!token.IsCancellationRequested)
{
try
{
var result = await udpClient.ReceiveAsync(token);
var message = Encoding.UTF8.GetString(result.Buffer);
var remoteIp = result.RemoteEndPoint.Address.ToString();
var entry = ParseMessage(message, remoteIp);
if (entry != null)
{
await _channel.Writer.WriteAsync(entry, token);
}
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Receive loop error");
}
}
}
private async Task ProcessLoop(CancellationToken token)
{
var batch = new List<LogEntry>(100);
while (await _channel.Reader.WaitToReadAsync(token))
{
while (_channel.Reader.TryRead(out var entry))
{
batch.Add(entry);
if (batch.Count >= 100)
{
await Flush(batch);
}
}
if (batch.Count > 0)
{
await Flush(batch);
}
}
}
private async Task Flush(List<LogEntry> batch)
{
try
{
using var scope = _scopeFactory.CreateScope();
var ingestionService = scope.ServiceProvider.GetRequiredService<IngestionService>();
await ingestionService.IngestBatchAsync(batch.ToArray());
}
catch (Exception ex)
{
_logger.LogError(ex, "Batch ingestion failed");
}
finally
{
batch.Clear();
}
}
private LogEntry ParseMessage(string rawMessage, string remoteIp)
{
try
{
if (IsJson(rawMessage))
return ParseJson(rawMessage, remoteIp);
return ParseSyslogAdvanced(rawMessage, remoteIp);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Parsing failed");
return new LogEntry
{
Source = "Syslog.Unknown",
Level = "Info",
Message = rawMessage,
Host = remoteIp,
Timestamp = DateTime.UtcNow
};
}
}
private bool IsJson(string input)
{
input = input.TrimStart();
return input.StartsWith("{") || input.StartsWith("[");
}
private LogEntry ParseJson(string json, string remoteIp)
{
using var doc = JsonDocument.Parse(json);
var root = doc.RootElement;
string Get(string name) =>
root.TryGetProperty(name, out var val) ? val.ToString() : null;
return new LogEntry
{
Source = Get("source") ?? "Custom.Json",
Level = MapLevel(Get("level")),
Message = Get("message"),
Exception = Get("exception"),
Host = Get("host") ?? remoteIp,
TraceId = Get("traceId"),
Properties = json,
Timestamp = DateTime.TryParse(Get("timestamp"), out var dt)
? dt : DateTime.UtcNow
};
}
private LogEntry ParseSyslogAdvanced(string message, string remoteIp)
{
var entry = new LogEntry
{
Source = "Syslog",
Host = remoteIp,
Message = message,
Timestamp = DateTime.UtcNow,
Level = "Info"
};
try
{
// PRI parsing
if (message.StartsWith("<"))
{
var end = message.IndexOf('>');
if (end > 0)
{
var pri = int.Parse(message[1..end]);
var severity = pri % 8;
entry.Level = MapSyslogSeverity(severity);
message = message[(end + 1)..].Trim();
}
}
// Try RFC5424 parsing
var parts = message.Split(' ', 7, StringSplitOptions.RemoveEmptyEntries);
if (parts.Length >= 7 && int.TryParse(parts[0], out _))
{
entry.Timestamp = DateTime.TryParse(parts[1], out var ts)
? ts : entry.Timestamp;
entry.Host = parts[2];
entry.Source = parts[3];
entry.Message = parts[6];
}
else
{
entry.Message = message;
}
}
catch
{
// fallback
}
return entry;
}
private string MapSyslogSeverity(int severity) => severity switch
{
0 or 1 => "Fatal",
2 or 3 => "Error",
4 => "Warning",
5 or 6 => "Info",
7 => "Debug",
_ => "Info"
};
private string MapLevel(string level) => level?.ToLower() switch
{
"trace" => "Debug",
"debug" => "Debug",
"info" => "Info",
"warn" or "warning" => "Warning",
"error" => "Error",
"fatal" => "Fatal",
_ => "Info"
};
}