From 52d3ed617f03dd1ba33cad776d79360073d359c5 Mon Sep 17 00:00:00 2001 From: EonaCat Date: Thu, 5 Mar 2026 18:59:55 +0100 Subject: [PATCH] Added batchSizes for all flows --- EonaCat.LogStack/EonaCat.LogStack.csproj | 6 ++-- .../EonaCatLoggerCore/Flows/ConsoleFlow.cs | 2 +- .../EonaCatLoggerCore/Flows/DatabaseFlow.cs | 8 +++-- .../EonaCatLoggerCore/Flows/DiscordFlow.cs | 8 +++-- .../Flows/ElasticSearchFlow.cs | 8 +++-- .../EonaCatLoggerCore/Flows/FileFlow.cs | 8 +++-- .../EonaCatLoggerCore/Flows/GrayLogFlow.cs | 8 +++-- .../EonaCatLoggerCore/Flows/HttpFlow.cs | 8 +++-- .../Flows/MicrosoftTeamsFlow.cs | 8 +++-- .../EonaCatLoggerCore/Flows/SlackFlow.cs | 8 +++-- .../EonaCatLoggerCore/Flows/SplunkFlow.cs | 8 +++-- .../EonaCatLoggerCore/Flows/SyslogTcpFlow.cs | 9 +++-- .../EonaCatLoggerCore/Flows/SyslogUdpFlow.cs | 9 +++-- .../EonaCatLoggerCore/Flows/TcpFlow.cs | 9 +++-- .../EonaCatLoggerCore/Flows/TelegramFlow.cs | 8 +++-- .../EonaCatLoggerCore/Flows/UdpFlow.cs | 21 +++++++----- .../EonaCatLoggerCore/Flows/ZabbixFlow.cs | 8 +++-- EonaCat.LogStack/LogBuilder.cs | 33 ++++++++++++++++--- Testers/EonaCat.LogStack.Test.Web/Program.cs | 2 +- 19 files changed, 119 insertions(+), 60 deletions(-) diff --git a/EonaCat.LogStack/EonaCat.LogStack.csproj b/EonaCat.LogStack/EonaCat.LogStack.csproj index 2e89c5d..292bb7c 100644 --- a/EonaCat.LogStack/EonaCat.LogStack.csproj +++ b/EonaCat.LogStack/EonaCat.LogStack.csproj @@ -14,7 +14,7 @@ It features a rich fluent API for routing log events to dozens of destinations EonaCat (Jeroen Saey) EonaCat;Logger;EonaCatLogStack;Log;Writer;Flows;LogStack;Memory;Speed;Jeroen;Saey - 0.0.1 + 0.0.2 README.md True LICENSE @@ -25,7 +25,7 @@ It features a rich fluent API for routing log events to dozens of destinations - 0.0.1+{chash:10}.{c:ymd} + 0.0.2+{chash:10}.{c:ymd} true true v[0-9]* @@ -36,7 +36,7 @@ It features a rich fluent API for routing log events to dozens of destinations - 0.0.1 + 0.0.2 EonaCat.LogStack EonaCat.LogStack https://git.saey.me/EonaCat/EonaCat.LogStack diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/ConsoleFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/ConsoleFlow.cs index 2c8bd9b..28d66d5 100644 --- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/ConsoleFlow.cs +++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/ConsoleFlow.cs @@ -32,7 +32,7 @@ namespace EonaCat.LogStack.Flows bool useColors = true, TimestampMode timestampMode = TimestampMode.Local, ColorSchema? colorSchema = null, - string template = "[{ts}] [{tz}] [Host: {host}] [Category: {category}] [Thread: {thread}] [{logtype}] {message}{props}") + string template = "[{ts}] [Host: {host}] [Category: {category}] [Thread: {thread}] [{logtype}] {message}{props}") : base("Console", minimumLevel) { _useColors = useColors; diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/DatabaseFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/DatabaseFlow.cs index e10f3ce..66086a6 100644 --- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/DatabaseFlow.cs +++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/DatabaseFlow.cs @@ -19,7 +19,7 @@ namespace EonaCat.LogStack.Flows public sealed class DatabaseFlow : FlowBase { private const int ChannelCapacity = 4096; - private const int DefaultBatchSize = 128; + private readonly int _batchSize; private readonly Channel _channel; private readonly Task _writerTask; @@ -31,12 +31,14 @@ namespace EonaCat.LogStack.Flows public DatabaseFlow( Func connectionFactory, string tableName = "Logs", + int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace) : base($"Database:{tableName}", minimumLevel) { _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); _tableName = tableName; + _batchSize = batchSize <= 0 ? 1 : batchSize; var channelOptions = new BoundedChannelOptions(ChannelCapacity) { FullMode = BoundedChannelFullMode.DropOldest, @@ -78,7 +80,7 @@ namespace EonaCat.LogStack.Flows private async Task ProcessLogEventsAsync(CancellationToken cancellationToken) { - var batch = new List(DefaultBatchSize); + var batch = new List(_batchSize); try { @@ -88,7 +90,7 @@ namespace EonaCat.LogStack.Flows { batch.Add(logEvent); - if (batch.Count >= DefaultBatchSize) + if (batch.Count >= _batchSize) { await WriteBatchAsync(batch, cancellationToken).ConfigureAwait(false); batch.Clear(); diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/DiscordFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/DiscordFlow.cs index b437223..ab051ab 100644 --- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/DiscordFlow.cs +++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/DiscordFlow.cs @@ -19,7 +19,7 @@ namespace EonaCat.LogStack.Flows public sealed class DiscordFlow : FlowBase, IAsyncDisposable { private const int ChannelCapacity = 4096; - private const int DefaultBatchSize = 10; + private readonly int _batchSize; private readonly Channel _channel; private readonly Task _workerTask; @@ -30,11 +30,13 @@ namespace EonaCat.LogStack.Flows public DiscordFlow( string webhookUrl, string botName, + int batchSize = 1, LogLevel minimumLevel = LogLevel.Information) : base("Discord", minimumLevel) { _webhookUrl = webhookUrl ?? throw new ArgumentNullException(nameof(webhookUrl)); _httpClient = new HttpClient(); + _batchSize = batchSize <= 0 ? 1 : batchSize; var channelOptions = new BoundedChannelOptions(ChannelCapacity) { @@ -67,7 +69,7 @@ namespace EonaCat.LogStack.Flows private async Task ProcessQueueAsync(string botName, CancellationToken cancellationToken) { - var batch = new List(DefaultBatchSize); + var batch = new List(_batchSize); try { @@ -77,7 +79,7 @@ namespace EonaCat.LogStack.Flows { batch.Add(logEvent); - if (batch.Count >= DefaultBatchSize) + if (batch.Count >= _batchSize) { await SendBatchAsync(botName, batch, cancellationToken); batch.Clear(); diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/ElasticSearchFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/ElasticSearchFlow.cs index 8c2f4a6..969446b 100644 --- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/ElasticSearchFlow.cs +++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/ElasticSearchFlow.cs @@ -19,7 +19,7 @@ namespace EonaCat.LogStack.Flows public sealed class ElasticSearchFlow : FlowBase, IAsyncDisposable { private const int ChannelCapacity = 4096; - private const int DefaultBatchSize = 100; // Bulk insert batch size + private readonly int _batchSize; private readonly Channel _channel; private readonly Task _workerTask; @@ -31,12 +31,14 @@ namespace EonaCat.LogStack.Flows public ElasticSearchFlow( string elasticsearchUrl, string indexName = "logs", + int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace) : base($"Elasticsearch:{indexName}", minimumLevel) { _elasticsearchUrl = elasticsearchUrl?.TrimEnd('/') ?? throw new ArgumentNullException(nameof(elasticsearchUrl)); _indexName = indexName; _httpClient = new HttpClient(); + _batchSize = batchSize <= 0 ? 1 : batchSize; var channelOptions = new BoundedChannelOptions(ChannelCapacity) { @@ -69,7 +71,7 @@ namespace EonaCat.LogStack.Flows private async Task ProcessQueueAsync(CancellationToken cancellationToken) { - var batch = new List(DefaultBatchSize); + var batch = new List(_batchSize); try { @@ -79,7 +81,7 @@ namespace EonaCat.LogStack.Flows { batch.Add(logEvent); - if (batch.Count >= DefaultBatchSize) + if (batch.Count >= _batchSize) { await SendBulkAsync(batch, cancellationToken); batch.Clear(); diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/FileFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/FileFlow.cs index 9ee5618..f11c204 100644 --- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/FileFlow.cs +++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/FileFlow.cs @@ -27,7 +27,7 @@ namespace EonaCat.LogStack.Flows { private const int FileBufferSize = 131072; // 128 KB private const int WriterBufferSize = 131072; // 128 KB - private const int DefaultBatchSize = 512; + private readonly int _batchSize; private const int QueueCapacity = 8192; private static readonly Dictionary LevelStrings = @@ -122,6 +122,7 @@ namespace EonaCat.LogStack.Flows long maxFileSize = 200 * 1024 * 1024, FileRetentionPolicy retention = null, int flushIntervalMs = 2000, + int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace, bool useCategoryRouting = false, LogLevel[] logLevelsForSeparateFiles = null, @@ -129,7 +130,7 @@ namespace EonaCat.LogStack.Flows BackpressureStrategy backpressure = BackpressureStrategy.DropOldest, FileOutputFormat outputFormat = FileOutputFormat.Text, CompressionFormat compression = CompressionFormat.GZip, - string template = "[{ts}] [{tz}] [Host: {host}] [Category: {category}] [Thread: {thread}] [{logtype}] {message}{props}", + string template = "[{ts}] [Host: {host}] [Category: {category}] [Thread: {thread}] [{logtype}] {message}{props}", long maxMemoryBytes = 20 * 1024 * 1024) : base("File:" + Path.Combine(directory, filePrefix), minimumLevel) { @@ -150,6 +151,7 @@ namespace EonaCat.LogStack.Flows CheckForProcessTermination(); + _batchSize = batchSize <= 0 ? 1 : batchSize; _directory = directory; _filePrefix = filePrefix; _template = template; @@ -505,7 +507,7 @@ namespace EonaCat.LogStack.Flows // Drain additional items int extra = 0; LogEvent next; - while (extra < DefaultBatchSize && _queue.TryTake(out next)) + while (extra < _batchSize && _queue.TryTake(out next)) { WriteLogEvent(next); extra++; diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/GrayLogFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/GrayLogFlow.cs index 8bb6084..aa24a09 100644 --- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/GrayLogFlow.cs +++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/GrayLogFlow.cs @@ -17,7 +17,7 @@ namespace EonaCat.LogStack.Flows public sealed class GraylogFlow : FlowBase { - private const int DefaultBatchSize = 256; + private readonly int _batchSize; private const int ChannelCapacity = 4096; private const int MaxUdpPacketSize = 8192; @@ -39,6 +39,7 @@ namespace EonaCat.LogStack.Flows int port = 12201, bool useTcp = false, string graylogHostName = null, + int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace, BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest) : base($"Graylog:{host}:{port}", minimumLevel) @@ -48,6 +49,7 @@ namespace EonaCat.LogStack.Flows _useTcp = useTcp; _backpressureStrategy = backpressureStrategy; _graylogHostName = graylogHostName ?? Environment.MachineName; + _batchSize = batchSize <= 0 ? 1 : batchSize; var channelOptions = new BoundedChannelOptions(ChannelCapacity) { @@ -122,7 +124,7 @@ namespace EonaCat.LogStack.Flows private async Task ProcessLogEventsAsync(CancellationToken cancellationToken) { - var batch = new List(DefaultBatchSize); + var batch = new List(_batchSize); while (!cancellationToken.IsCancellationRequested) { @@ -137,7 +139,7 @@ namespace EonaCat.LogStack.Flows { batch.Add(logEvent); - if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0) + if (batch.Count >= _batchSize || _channel.Reader.Count == 0) { await SendBatchAsync(batch, cancellationToken); batch.Clear(); diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/HttpFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/HttpFlow.cs index e5d7b0f..8575b34 100644 --- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/HttpFlow.cs +++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/HttpFlow.cs @@ -19,7 +19,7 @@ public sealed class HttpFlow : FlowBase // See the LICENSE file or go to https://EonaCat.com/License for full license details. private const int ChannelCapacity = 2048; - private const int DefaultBatchSize = 50; + private readonly int _batchSize; private const int MaxRetries = 3; private readonly Channel _channel; @@ -35,6 +35,7 @@ public sealed class HttpFlow : FlowBase public HttpFlow( string endpoint, HttpClient? httpClient = null, + int batchSize = 1, LogLevel minimumLevel = LogLevel.Information, TimeSpan? batchInterval = null, Dictionary? headers = null) @@ -58,6 +59,7 @@ public sealed class HttpFlow : FlowBase _ownHttpClient = false; } + _batchSize = batchSize <= 0 ? 1 : batchSize; var channelOptions = new BoundedChannelOptions(ChannelCapacity) { FullMode = BoundedChannelFullMode.DropOldest, @@ -103,7 +105,7 @@ public sealed class HttpFlow : FlowBase private async Task ProcessLogEventsAsync(CancellationToken cancellationToken) { - var batch = new List(DefaultBatchSize); + var batch = new List(_batchSize); try { @@ -112,7 +114,7 @@ public sealed class HttpFlow : FlowBase var hasMore = true; // Collect batch - while (batch.Count < DefaultBatchSize && hasMore) + while (batch.Count < _batchSize && hasMore) { if (_channel.Reader.TryRead(out var logEvent)) { diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/MicrosoftTeamsFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/MicrosoftTeamsFlow.cs index 9efd5cf..0670040 100644 --- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/MicrosoftTeamsFlow.cs +++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/MicrosoftTeamsFlow.cs @@ -19,7 +19,7 @@ namespace EonaCat.LogStack.Flows public sealed class MicrosoftTeamsFlow : FlowBase, IAsyncDisposable { private const int ChannelCapacity = 4096; - private const int DefaultBatchSize = 5; // Keep batches small to avoid throttling + private readonly int _batchSize; private readonly Channel _channel; private readonly Task _workerTask; @@ -29,6 +29,7 @@ namespace EonaCat.LogStack.Flows public MicrosoftTeamsFlow( string webhookUrl, + int batchSize = 1, LogLevel minimumLevel = LogLevel.Information) : base("MicrosoftTeams", minimumLevel) { @@ -42,6 +43,7 @@ namespace EonaCat.LogStack.Flows SingleWriter = false }; + _batchSize = batchSize <= 0 ? 1 : batchSize; _channel = Channel.CreateBounded(channelOptions); _cts = new CancellationTokenSource(); _workerTask = Task.Run(() => ProcessQueueAsync(_cts.Token)); @@ -66,7 +68,7 @@ namespace EonaCat.LogStack.Flows private async Task ProcessQueueAsync(CancellationToken cancellationToken) { - var batch = new List(DefaultBatchSize); + var batch = new List(_batchSize); try { @@ -76,7 +78,7 @@ namespace EonaCat.LogStack.Flows { batch.Add(logEvent); - if (batch.Count >= DefaultBatchSize) + if (batch.Count >= _batchSize) { await SendBatchAsync(batch, cancellationToken); batch.Clear(); diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/SlackFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/SlackFlow.cs index be4e143..a23b20e 100644 --- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/SlackFlow.cs +++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/SlackFlow.cs @@ -16,7 +16,7 @@ namespace EonaCat.LogStack.Flows public sealed class SlackFlow : FlowBase, IAsyncDisposable { private const int ChannelCapacity = 4096; - private const int DefaultBatchSize = 5; // Slack rate-limits, small batches are safer + private readonly int _batchSize; private readonly Channel _channel; private readonly Task _workerTask; @@ -26,6 +26,7 @@ namespace EonaCat.LogStack.Flows public SlackFlow( string webhookUrl, + int batchSize = 1, LogLevel minimumLevel = LogLevel.Information) : base("Slack", minimumLevel) { @@ -39,6 +40,7 @@ namespace EonaCat.LogStack.Flows SingleWriter = false }; + _batchSize = batchSize <= 0 ? 1 : batchSize; _channel = Channel.CreateBounded(channelOptions); _cts = new CancellationTokenSource(); _workerTask = Task.Run(() => ProcessQueueAsync(_cts.Token)); @@ -63,7 +65,7 @@ namespace EonaCat.LogStack.Flows private async Task ProcessQueueAsync(CancellationToken cancellationToken) { - var batch = new List(DefaultBatchSize); + var batch = new List(_batchSize); try { @@ -73,7 +75,7 @@ namespace EonaCat.LogStack.Flows { batch.Add(logEvent); - if (batch.Count >= DefaultBatchSize) + if (batch.Count >= _batchSize) { await SendBatchAsync(batch, cancellationToken); batch.Clear(); diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/SplunkFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/SplunkFlow.cs index 4ae2c7f..ef86426 100644 --- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/SplunkFlow.cs +++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/SplunkFlow.cs @@ -15,7 +15,7 @@ namespace EonaCat.LogStack.Flows { public sealed class SplunkFlow : FlowBase { - private const int DefaultBatchSize = 256; + private readonly int _batchSize; private const int ChannelCapacity = 4096; private readonly Channel _channel; @@ -35,6 +35,7 @@ namespace EonaCat.LogStack.Flows string token, string sourcetype = "splunk_logs", string hostName = null, + int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace, BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest) : base($"Splunk:{splunkUrl}", minimumLevel) @@ -58,6 +59,7 @@ namespace EonaCat.LogStack.Flows SingleWriter = false }; + _batchSize = batchSize <= 0 ? 1 : batchSize; _channel = Channel.CreateBounded(channelOptions); _cts = new CancellationTokenSource(); _httpClient = new HttpClient(); @@ -114,7 +116,7 @@ namespace EonaCat.LogStack.Flows private async Task ProcessLogEventsAsync(CancellationToken cancellationToken) { - var batch = new List(DefaultBatchSize); + var batch = new List(_batchSize); while (!cancellationToken.IsCancellationRequested) { @@ -124,7 +126,7 @@ namespace EonaCat.LogStack.Flows { batch.Add(logEvent); - if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0) + if (batch.Count >= _batchSize || _channel.Reader.Count == 0) { await SendBatchAsync(batch, cancellationToken); batch.Clear(); diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/SyslogTcpFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/SyslogTcpFlow.cs index 3d8d166..6e8c766 100644 --- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/SyslogTcpFlow.cs +++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/SyslogTcpFlow.cs @@ -19,7 +19,7 @@ namespace EonaCat.LogStack.Flows public sealed class SyslogTcpFlow : FlowBase { - private const int DefaultBatchSize = 256; + private readonly int _batchSize; private const int ChannelCapacity = 4096; private readonly Channel _channel; @@ -38,6 +38,7 @@ namespace EonaCat.LogStack.Flows public SyslogTcpFlow( string host, int port = 514, + int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace, BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest, bool useTls = false, @@ -49,6 +50,8 @@ namespace EonaCat.LogStack.Flows _port = port; _backpressureStrategy = backpressureStrategy; + _batchSize = batchSize <= 0 ? 1 : batchSize; + _useTls = useTls; _certValidationCallback = certValidationCallback ?? DefaultCertificateValidation; _clientCertificates = clientCertificates; @@ -118,7 +121,7 @@ namespace EonaCat.LogStack.Flows private async Task ProcessLogEventsAsync(CancellationToken cancellationToken) { - var batch = new List(DefaultBatchSize); + var batch = new List(_batchSize); var sb = new StringBuilder(8192); while (!cancellationToken.IsCancellationRequested) @@ -131,7 +134,7 @@ namespace EonaCat.LogStack.Flows { batch.Add(logEvent); - if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0) + if (batch.Count >= _batchSize || _channel.Reader.Count == 0) { await SendBatchAsync(batch, sb, cancellationToken); batch.Clear(); diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/SyslogUdpFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/SyslogUdpFlow.cs index 5f39eae..37898ea 100644 --- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/SyslogUdpFlow.cs +++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/SyslogUdpFlow.cs @@ -16,7 +16,7 @@ namespace EonaCat.LogStack.Flows public sealed class SyslogUdpFlow : FlowBase { - private const int DefaultBatchSize = 256; + private readonly int _batchSize; private const int ChannelCapacity = 4096; private const int MaxUdpPacketSize = 4096; @@ -32,6 +32,7 @@ namespace EonaCat.LogStack.Flows public SyslogUdpFlow( string host, int port = 514, + int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace, BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest) : base($"SyslogUDP:{host}:{port}", minimumLevel) @@ -40,6 +41,8 @@ namespace EonaCat.LogStack.Flows _port = port; _backpressureStrategy = backpressureStrategy; + _batchSize = batchSize <= 0 ? 1 : batchSize; + var channelOptions = new BoundedChannelOptions(ChannelCapacity) { FullMode = backpressureStrategy switch @@ -108,7 +111,7 @@ namespace EonaCat.LogStack.Flows private async Task ProcessLogEventsAsync(CancellationToken cancellationToken) { - var batch = new List(DefaultBatchSize); + var batch = new List(_batchSize); var sb = new StringBuilder(8192); while (!cancellationToken.IsCancellationRequested) @@ -119,7 +122,7 @@ namespace EonaCat.LogStack.Flows { batch.Add(logEvent); - if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0) + if (batch.Count >= _batchSize || _channel.Reader.Count == 0) { await SendBatchAsync(batch, sb, cancellationToken); batch.Clear(); diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/TcpFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/TcpFlow.cs index 55873f1..4b588e0 100644 --- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/TcpFlow.cs +++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/TcpFlow.cs @@ -17,7 +17,7 @@ namespace EonaCat.LogStack.Flows; public sealed class TcpFlow : FlowBase { - private const int DefaultBatchSize = 256; + private readonly int _batchSize; private const int ChannelCapacity = 4096; private readonly Channel _channel; @@ -36,6 +36,7 @@ public sealed class TcpFlow : FlowBase public TcpFlow( string host, int port, + int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace, BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest, bool useTls = false, @@ -47,6 +48,8 @@ public sealed class TcpFlow : FlowBase _port = port; _backpressureStrategy = backpressureStrategy; + _batchSize = batchSize <= 0 ? 1 : batchSize; + _useTls = useTls; _certValidationCallback = certValidationCallback ?? DefaultCertificateValidation; _clientCertificates = clientCertificates; @@ -163,7 +166,7 @@ public sealed class TcpFlow : FlowBase private async Task ProcessLogEventsAsync(CancellationToken cancellationToken) { - var batch = new List(DefaultBatchSize); + var batch = new List(_batchSize); var sb = new StringBuilder(8192); while (!cancellationToken.IsCancellationRequested) @@ -176,7 +179,7 @@ public sealed class TcpFlow : FlowBase { batch.Add(logEvent); - if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0) + if (batch.Count >= _batchSize || _channel.Reader.Count == 0) { await SendBatchAsync(batch, sb, cancellationToken); batch.Clear(); diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/TelegramFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/TelegramFlow.cs index f677596..e298664 100644 --- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/TelegramFlow.cs +++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/TelegramFlow.cs @@ -19,7 +19,7 @@ namespace EonaCat.LogStack.Flows public sealed class TelegramFlow : FlowBase, IAsyncDisposable { private const int ChannelCapacity = 4096; - private const int DefaultBatchSize = 5; + private readonly int _batchSize; private readonly Channel _channel; private readonly Task _workerTask; @@ -31,6 +31,7 @@ namespace EonaCat.LogStack.Flows public TelegramFlow( string botToken, string chatId, + int batchSize = 1, LogLevel minimumLevel = LogLevel.Information) : base("Telegram", minimumLevel) { @@ -45,6 +46,7 @@ namespace EonaCat.LogStack.Flows SingleWriter = false }; + _batchSize = batchSize <= 0 ? 1 : batchSize; _channel = Channel.CreateBounded(channelOptions); _cts = new CancellationTokenSource(); _workerTask = Task.Run(() => ProcessQueueAsync(_cts.Token)); @@ -69,7 +71,7 @@ namespace EonaCat.LogStack.Flows private async Task ProcessQueueAsync(CancellationToken cancellationToken) { - var batch = new List(DefaultBatchSize); + var batch = new List(_batchSize); try { @@ -79,7 +81,7 @@ namespace EonaCat.LogStack.Flows { batch.Add(logEvent); - if (batch.Count >= DefaultBatchSize) + if (batch.Count >= _batchSize) { await SendBatchAsync(batch, cancellationToken); batch.Clear(); diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/UdpFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/UdpFlow.cs index fd209b4..b8dbdc6 100644 --- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/UdpFlow.cs +++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/UdpFlow.cs @@ -15,7 +15,7 @@ namespace EonaCat.LogStack.Flows; public sealed class UdpFlow : FlowBase { - private const int DefaultBatchSize = 256; + private readonly int _batchSize; private const int ChannelCapacity = 4096; private readonly Channel _channel; @@ -30,18 +30,21 @@ public sealed class UdpFlow : FlowBase private readonly Task _flushTask; public UdpFlow( - string host, - int port, - int flushIntervalInMilliseconds = 2000, - LogLevel minimumLevel = LogLevel.Trace, - BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest) - : base($"UDP:{host}:{port}", minimumLevel) + string host, + int port, + int flushIntervalInMilliseconds = 2000, + int batchSize = 1, + LogLevel minimumLevel = LogLevel.Trace, + BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest) + : base($"UDP:{host}:{port}", minimumLevel) { _host = host ?? throw new ArgumentNullException(nameof(host)); _port = port; _backpressureStrategy = backpressureStrategy; _flushInterval = TimeSpan.FromMilliseconds(flushIntervalInMilliseconds); + _batchSize = batchSize <= 0 ? 1 : batchSize; + _udpClient = new UdpClient(); var channelOptions = new BoundedChannelOptions(ChannelCapacity) @@ -117,7 +120,7 @@ public sealed class UdpFlow : FlowBase private async Task ProcessLogEventsAsync(CancellationToken cancellationToken) { - var batch = new List(DefaultBatchSize); + var batch = new List(_batchSize); var sb = new StringBuilder(8192); try @@ -126,7 +129,7 @@ public sealed class UdpFlow : FlowBase { batch.Add(logEvent); - if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0) + if (batch.Count >= _batchSize || _channel.Reader.Count == 0) { await SendBatchAsync(batch, sb, cancellationToken); batch.Clear(); diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/ZabbixFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/ZabbixFlow.cs index 8a1421d..447f144 100644 --- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/ZabbixFlow.cs +++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/ZabbixFlow.cs @@ -16,7 +16,7 @@ namespace EonaCat.LogStack.Flows public sealed class ZabbixFlow : FlowBase { - private const int DefaultBatchSize = 256; + private readonly int _batchSize; private const int ChannelCapacity = 4096; private readonly Channel _channel; @@ -36,6 +36,7 @@ namespace EonaCat.LogStack.Flows int port = 10051, string zabbixHostName = null, string zabbixKey = "log_event", + int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace, BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest) : base($"Zabbix:{host}:{port}", minimumLevel) @@ -59,6 +60,7 @@ namespace EonaCat.LogStack.Flows SingleWriter = false }; + _batchSize = batchSize <= 0 ? 1 : batchSize; _channel = Channel.CreateBounded(channelOptions); _cts = new CancellationTokenSource(); _senderTask = Task.Run(() => ProcessLogEventsAsync(_cts.Token)); @@ -113,7 +115,7 @@ namespace EonaCat.LogStack.Flows private async Task ProcessLogEventsAsync(CancellationToken cancellationToken) { - var batch = new List(DefaultBatchSize); + var batch = new List(_batchSize); while (!cancellationToken.IsCancellationRequested) { @@ -125,7 +127,7 @@ namespace EonaCat.LogStack.Flows { batch.Add(logEvent); - if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0) + if (batch.Count >= _batchSize || _channel.Reader.Count == 0) { await SendBatchAsync(batch, cancellationToken); batch.Clear(); diff --git a/EonaCat.LogStack/LogBuilder.cs b/EonaCat.LogStack/LogBuilder.cs index aaeb558..833049d 100644 --- a/EonaCat.LogStack/LogBuilder.cs +++ b/EonaCat.LogStack/LogBuilder.cs @@ -98,12 +98,13 @@ public sealed class LogBuilder FileRetentionPolicy fileRetentionPolicy = null, int flushIntervalInMilliSeconds = 2000, bool useCategoryRouting = false, + int batchSize = 1, LogLevel[]? logLevelsForSeparateFiles = null, LogLevel minimumLevel = LogLevel.Trace, BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait, FileOutputFormat outputFormat = FileOutputFormat.Text, CompressionFormat compression = CompressionFormat.GZip, - string template = "[{ts}] [{tz}] [Host: {host}] [Category: {category}] [Thread: {thread}] [{logtype}] {message}{props}") + string template = "[{ts}] [Host: {host}] [Category: {category}] [Thread: {thread}] [{logtype}] {message}{props}") { _flows.Add(new FileFlow( directory, @@ -111,6 +112,7 @@ public sealed class LogBuilder maxFileSize, fileRetentionPolicy, flushIntervalInMilliSeconds, + batchSize, minimumLevel, useCategoryRouting, logLevelsForSeparateFiles, @@ -197,11 +199,13 @@ public sealed class LogBuilder public LogBuilder WriteToDatabase( Func connectionFactory, string tableName = "logs", + int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace) { _flows.Add(new DatabaseFlow( connectionFactory, tableName, + batchSize, minimumLevel)); return this; } @@ -225,11 +229,13 @@ public sealed class LogBuilder public LogBuilder WriteToDiscord( string webHookUrl, string botName = "EonaCatBot", + int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace) { _flows.Add(new DiscordFlow( webHookUrl, botName, + batchSize, minimumLevel)); return this; } @@ -240,11 +246,13 @@ public sealed class LogBuilder public LogBuilder WriteToElasticSearch( string elasticSearchUrl, string indexName = "EonaCatIndex", + int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace) { _flows.Add(new ElasticSearchFlow( elasticSearchUrl, indexName, + batchSize, minimumLevel)); return this; } @@ -252,11 +260,12 @@ public sealed class LogBuilder /// /// Adds Telegram /// - public LogBuilder WriteToTelegram(string botToken, string chatId = "EonaCat", LogLevel minimumLevel = LogLevel.Trace) + public LogBuilder WriteToTelegram(string botToken, string chatId = "EonaCat", int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace) { _flows.Add(new TelegramFlow( botToken, chatId, + batchSize, minimumLevel)); return this; } @@ -302,10 +311,11 @@ public sealed class LogBuilder /// /// Adds Slack /// - public LogBuilder WriteToSlack(string webhookUrl, LogLevel minimumLevel = LogLevel.Trace) + public LogBuilder WriteToSlack(string webhookUrl, int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace) { _flows.Add(new SlackFlow( webhookUrl, + batchSize, minimumLevel)); return this; } @@ -313,10 +323,11 @@ public sealed class LogBuilder /// /// Adds Slack /// - public LogBuilder WriteToMicrosoftTeams(string webhookUrl, LogLevel minimumLevel = LogLevel.Trace) + public LogBuilder WriteToMicrosoftTeams(string webhookUrl, int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace) { _flows.Add(new MicrosoftTeamsFlow( webhookUrl, + batchSize, minimumLevel)); return this; } @@ -436,6 +447,7 @@ public sealed class LogBuilder public LogBuilder WriteToSyslogTcp( string host, int port = 514, + int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace, BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait, bool useTls = false, @@ -455,6 +467,7 @@ public sealed class LogBuilder _flows.Add(new SyslogTcpFlow( host, port, + batchSize, minimumLevel, backpressureStrategy, useTls, @@ -488,12 +501,14 @@ public sealed class LogBuilder public LogBuilder WriteToSyslogUdp( string host, int port, + int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace, BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait) { _flows.Add(new SyslogUdpFlow( host, port, + batchSize, minimumLevel, backpressureStrategy)); return this; @@ -507,6 +522,7 @@ public sealed class LogBuilder int port = 10051, string zabbixHostname = null, string zabbixKey = "log_event", + int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace, BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait) { @@ -515,6 +531,7 @@ public sealed class LogBuilder port, zabbixHostname, zabbixKey, + batchSize, minimumLevel, backpressureStrategy)); return this; @@ -528,6 +545,7 @@ public sealed class LogBuilder int port = 12201, bool useTcp = false, string graylogHostName = null, + int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace, BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait) { @@ -536,6 +554,7 @@ public sealed class LogBuilder port, useTcp, graylogHostName, + batchSize, minimumLevel, backpressureStrategy)); return this; @@ -620,6 +639,7 @@ public sealed class LogBuilder string token, string sourceType = "splunk_logs", string hostName = null, + int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace, BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait) { @@ -628,6 +648,7 @@ public sealed class LogBuilder token, sourceType, hostName, + batchSize, minimumLevel, backpressureStrategy)); return this; @@ -707,6 +728,7 @@ public sealed class LogBuilder string host, int port, int flushIntervalInMilliseconds = 1000, + int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace, BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait) { @@ -714,6 +736,7 @@ public sealed class LogBuilder host, port, flushIntervalInMilliseconds, + batchSize, minimumLevel, backpressureStrategy)); return this; @@ -736,6 +759,7 @@ public sealed class LogBuilder public LogBuilder WriteToHttp( string endpoint, HttpClient? httpClient = null, + int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace, TimeSpan? batchInterval = null, Dictionary? headers = null) @@ -743,6 +767,7 @@ public sealed class LogBuilder _flows.Add(new HttpFlow( endpoint, httpClient, + batchSize, minimumLevel, batchInterval, headers)); diff --git a/Testers/EonaCat.LogStack.Test.Web/Program.cs b/Testers/EonaCat.LogStack.Test.Web/Program.cs index 83b8a84..daa59a5 100644 --- a/Testers/EonaCat.LogStack.Test.Web/Program.cs +++ b/Testers/EonaCat.LogStack.Test.Web/Program.cs @@ -17,8 +17,8 @@ namespace EonaCat.LogStack.Test.Web _ = Task.Run(async () => { var logBuilder = new LogBuilder(); + logBuilder.WithTimestampMode(TimestampMode.Local); logBuilder.WriteToConsole(); - logBuilder.WriteToFile("./logs"); logBuilder.WriteToFile("./logs", outputFormat: FileOutputFormat.Json); logBuilder.WriteToFile("./logs", outputFormat: FileOutputFormat.Xml); logBuilder.WriteToTcp("127.0.0.1", 514);