diff --git a/EonaCat.LogStack/EonaCat.LogStack.csproj b/EonaCat.LogStack/EonaCat.LogStack.csproj
index 2e89c5d..292bb7c 100644
--- a/EonaCat.LogStack/EonaCat.LogStack.csproj
+++ b/EonaCat.LogStack/EonaCat.LogStack.csproj
@@ -14,7 +14,7 @@ It features a rich fluent API for routing log events to dozens of destinations
EonaCat (Jeroen Saey)
EonaCat;Logger;EonaCatLogStack;Log;Writer;Flows;LogStack;Memory;Speed;Jeroen;Saey
- 0.0.1
+ 0.0.2
README.md
True
LICENSE
@@ -25,7 +25,7 @@ It features a rich fluent API for routing log events to dozens of destinations
- 0.0.1+{chash:10}.{c:ymd}
+ 0.0.2+{chash:10}.{c:ymd}
true
true
v[0-9]*
@@ -36,7 +36,7 @@ It features a rich fluent API for routing log events to dozens of destinations
- 0.0.1
+ 0.0.2
EonaCat.LogStack
EonaCat.LogStack
https://git.saey.me/EonaCat/EonaCat.LogStack
diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/ConsoleFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/ConsoleFlow.cs
index 2c8bd9b..28d66d5 100644
--- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/ConsoleFlow.cs
+++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/ConsoleFlow.cs
@@ -32,7 +32,7 @@ namespace EonaCat.LogStack.Flows
bool useColors = true,
TimestampMode timestampMode = TimestampMode.Local,
ColorSchema? colorSchema = null,
- string template = "[{ts}] [{tz}] [Host: {host}] [Category: {category}] [Thread: {thread}] [{logtype}] {message}{props}")
+ string template = "[{ts}] [Host: {host}] [Category: {category}] [Thread: {thread}] [{logtype}] {message}{props}")
: base("Console", minimumLevel)
{
_useColors = useColors;
diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/DatabaseFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/DatabaseFlow.cs
index e10f3ce..66086a6 100644
--- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/DatabaseFlow.cs
+++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/DatabaseFlow.cs
@@ -19,7 +19,7 @@ namespace EonaCat.LogStack.Flows
public sealed class DatabaseFlow : FlowBase
{
private const int ChannelCapacity = 4096;
- private const int DefaultBatchSize = 128;
+ private readonly int _batchSize;
private readonly Channel _channel;
private readonly Task _writerTask;
@@ -31,12 +31,14 @@ namespace EonaCat.LogStack.Flows
public DatabaseFlow(
Func connectionFactory,
string tableName = "Logs",
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace)
: base($"Database:{tableName}", minimumLevel)
{
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
_tableName = tableName;
+ _batchSize = batchSize <= 0 ? 1 : batchSize;
var channelOptions = new BoundedChannelOptions(ChannelCapacity)
{
FullMode = BoundedChannelFullMode.DropOldest,
@@ -78,7 +80,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessLogEventsAsync(CancellationToken cancellationToken)
{
- var batch = new List(DefaultBatchSize);
+ var batch = new List(_batchSize);
try
{
@@ -88,7 +90,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
- if (batch.Count >= DefaultBatchSize)
+ if (batch.Count >= _batchSize)
{
await WriteBatchAsync(batch, cancellationToken).ConfigureAwait(false);
batch.Clear();
diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/DiscordFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/DiscordFlow.cs
index b437223..ab051ab 100644
--- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/DiscordFlow.cs
+++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/DiscordFlow.cs
@@ -19,7 +19,7 @@ namespace EonaCat.LogStack.Flows
public sealed class DiscordFlow : FlowBase, IAsyncDisposable
{
private const int ChannelCapacity = 4096;
- private const int DefaultBatchSize = 10;
+ private readonly int _batchSize;
private readonly Channel _channel;
private readonly Task _workerTask;
@@ -30,11 +30,13 @@ namespace EonaCat.LogStack.Flows
public DiscordFlow(
string webhookUrl,
string botName,
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Information)
: base("Discord", minimumLevel)
{
_webhookUrl = webhookUrl ?? throw new ArgumentNullException(nameof(webhookUrl));
_httpClient = new HttpClient();
+ _batchSize = batchSize <= 0 ? 1 : batchSize;
var channelOptions = new BoundedChannelOptions(ChannelCapacity)
{
@@ -67,7 +69,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessQueueAsync(string botName, CancellationToken cancellationToken)
{
- var batch = new List(DefaultBatchSize);
+ var batch = new List(_batchSize);
try
{
@@ -77,7 +79,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
- if (batch.Count >= DefaultBatchSize)
+ if (batch.Count >= _batchSize)
{
await SendBatchAsync(botName, batch, cancellationToken);
batch.Clear();
diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/ElasticSearchFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/ElasticSearchFlow.cs
index 8c2f4a6..969446b 100644
--- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/ElasticSearchFlow.cs
+++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/ElasticSearchFlow.cs
@@ -19,7 +19,7 @@ namespace EonaCat.LogStack.Flows
public sealed class ElasticSearchFlow : FlowBase, IAsyncDisposable
{
private const int ChannelCapacity = 4096;
- private const int DefaultBatchSize = 100; // Bulk insert batch size
+ private readonly int _batchSize;
private readonly Channel _channel;
private readonly Task _workerTask;
@@ -31,12 +31,14 @@ namespace EonaCat.LogStack.Flows
public ElasticSearchFlow(
string elasticsearchUrl,
string indexName = "logs",
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace)
: base($"Elasticsearch:{indexName}", minimumLevel)
{
_elasticsearchUrl = elasticsearchUrl?.TrimEnd('/') ?? throw new ArgumentNullException(nameof(elasticsearchUrl));
_indexName = indexName;
_httpClient = new HttpClient();
+ _batchSize = batchSize <= 0 ? 1 : batchSize;
var channelOptions = new BoundedChannelOptions(ChannelCapacity)
{
@@ -69,7 +71,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessQueueAsync(CancellationToken cancellationToken)
{
- var batch = new List(DefaultBatchSize);
+ var batch = new List(_batchSize);
try
{
@@ -79,7 +81,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
- if (batch.Count >= DefaultBatchSize)
+ if (batch.Count >= _batchSize)
{
await SendBulkAsync(batch, cancellationToken);
batch.Clear();
diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/FileFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/FileFlow.cs
index 9ee5618..f11c204 100644
--- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/FileFlow.cs
+++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/FileFlow.cs
@@ -27,7 +27,7 @@ namespace EonaCat.LogStack.Flows
{
private const int FileBufferSize = 131072; // 128 KB
private const int WriterBufferSize = 131072; // 128 KB
- private const int DefaultBatchSize = 512;
+ private readonly int _batchSize;
private const int QueueCapacity = 8192;
private static readonly Dictionary LevelStrings =
@@ -122,6 +122,7 @@ namespace EonaCat.LogStack.Flows
long maxFileSize = 200 * 1024 * 1024,
FileRetentionPolicy retention = null,
int flushIntervalMs = 2000,
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
bool useCategoryRouting = false,
LogLevel[] logLevelsForSeparateFiles = null,
@@ -129,7 +130,7 @@ namespace EonaCat.LogStack.Flows
BackpressureStrategy backpressure = BackpressureStrategy.DropOldest,
FileOutputFormat outputFormat = FileOutputFormat.Text,
CompressionFormat compression = CompressionFormat.GZip,
- string template = "[{ts}] [{tz}] [Host: {host}] [Category: {category}] [Thread: {thread}] [{logtype}] {message}{props}",
+ string template = "[{ts}] [Host: {host}] [Category: {category}] [Thread: {thread}] [{logtype}] {message}{props}",
long maxMemoryBytes = 20 * 1024 * 1024)
: base("File:" + Path.Combine(directory, filePrefix), minimumLevel)
{
@@ -150,6 +151,7 @@ namespace EonaCat.LogStack.Flows
CheckForProcessTermination();
+ _batchSize = batchSize <= 0 ? 1 : batchSize;
_directory = directory;
_filePrefix = filePrefix;
_template = template;
@@ -505,7 +507,7 @@ namespace EonaCat.LogStack.Flows
// Drain additional items
int extra = 0;
LogEvent next;
- while (extra < DefaultBatchSize && _queue.TryTake(out next))
+ while (extra < _batchSize && _queue.TryTake(out next))
{
WriteLogEvent(next);
extra++;
diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/GrayLogFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/GrayLogFlow.cs
index 8bb6084..aa24a09 100644
--- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/GrayLogFlow.cs
+++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/GrayLogFlow.cs
@@ -17,7 +17,7 @@ namespace EonaCat.LogStack.Flows
public sealed class GraylogFlow : FlowBase
{
- private const int DefaultBatchSize = 256;
+ private readonly int _batchSize;
private const int ChannelCapacity = 4096;
private const int MaxUdpPacketSize = 8192;
@@ -39,6 +39,7 @@ namespace EonaCat.LogStack.Flows
int port = 12201,
bool useTcp = false,
string graylogHostName = null,
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest)
: base($"Graylog:{host}:{port}", minimumLevel)
@@ -48,6 +49,7 @@ namespace EonaCat.LogStack.Flows
_useTcp = useTcp;
_backpressureStrategy = backpressureStrategy;
_graylogHostName = graylogHostName ?? Environment.MachineName;
+ _batchSize = batchSize <= 0 ? 1 : batchSize;
var channelOptions = new BoundedChannelOptions(ChannelCapacity)
{
@@ -122,7 +124,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessLogEventsAsync(CancellationToken cancellationToken)
{
- var batch = new List(DefaultBatchSize);
+ var batch = new List(_batchSize);
while (!cancellationToken.IsCancellationRequested)
{
@@ -137,7 +139,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
- if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0)
+ if (batch.Count >= _batchSize || _channel.Reader.Count == 0)
{
await SendBatchAsync(batch, cancellationToken);
batch.Clear();
diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/HttpFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/HttpFlow.cs
index e5d7b0f..8575b34 100644
--- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/HttpFlow.cs
+++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/HttpFlow.cs
@@ -19,7 +19,7 @@ public sealed class HttpFlow : FlowBase
// See the LICENSE file or go to https://EonaCat.com/License for full license details.
private const int ChannelCapacity = 2048;
- private const int DefaultBatchSize = 50;
+ private readonly int _batchSize;
private const int MaxRetries = 3;
private readonly Channel _channel;
@@ -35,6 +35,7 @@ public sealed class HttpFlow : FlowBase
public HttpFlow(
string endpoint,
HttpClient? httpClient = null,
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Information,
TimeSpan? batchInterval = null,
Dictionary? headers = null)
@@ -58,6 +59,7 @@ public sealed class HttpFlow : FlowBase
_ownHttpClient = false;
}
+ _batchSize = batchSize <= 0 ? 1 : batchSize;
var channelOptions = new BoundedChannelOptions(ChannelCapacity)
{
FullMode = BoundedChannelFullMode.DropOldest,
@@ -103,7 +105,7 @@ public sealed class HttpFlow : FlowBase
private async Task ProcessLogEventsAsync(CancellationToken cancellationToken)
{
- var batch = new List(DefaultBatchSize);
+ var batch = new List(_batchSize);
try
{
@@ -112,7 +114,7 @@ public sealed class HttpFlow : FlowBase
var hasMore = true;
// Collect batch
- while (batch.Count < DefaultBatchSize && hasMore)
+ while (batch.Count < _batchSize && hasMore)
{
if (_channel.Reader.TryRead(out var logEvent))
{
diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/MicrosoftTeamsFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/MicrosoftTeamsFlow.cs
index 9efd5cf..0670040 100644
--- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/MicrosoftTeamsFlow.cs
+++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/MicrosoftTeamsFlow.cs
@@ -19,7 +19,7 @@ namespace EonaCat.LogStack.Flows
public sealed class MicrosoftTeamsFlow : FlowBase, IAsyncDisposable
{
private const int ChannelCapacity = 4096;
- private const int DefaultBatchSize = 5; // Keep batches small to avoid throttling
+ private readonly int _batchSize;
private readonly Channel _channel;
private readonly Task _workerTask;
@@ -29,6 +29,7 @@ namespace EonaCat.LogStack.Flows
public MicrosoftTeamsFlow(
string webhookUrl,
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Information)
: base("MicrosoftTeams", minimumLevel)
{
@@ -42,6 +43,7 @@ namespace EonaCat.LogStack.Flows
SingleWriter = false
};
+ _batchSize = batchSize <= 0 ? 1 : batchSize;
_channel = Channel.CreateBounded(channelOptions);
_cts = new CancellationTokenSource();
_workerTask = Task.Run(() => ProcessQueueAsync(_cts.Token));
@@ -66,7 +68,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessQueueAsync(CancellationToken cancellationToken)
{
- var batch = new List(DefaultBatchSize);
+ var batch = new List(_batchSize);
try
{
@@ -76,7 +78,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
- if (batch.Count >= DefaultBatchSize)
+ if (batch.Count >= _batchSize)
{
await SendBatchAsync(batch, cancellationToken);
batch.Clear();
diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/SlackFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/SlackFlow.cs
index be4e143..a23b20e 100644
--- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/SlackFlow.cs
+++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/SlackFlow.cs
@@ -16,7 +16,7 @@ namespace EonaCat.LogStack.Flows
public sealed class SlackFlow : FlowBase, IAsyncDisposable
{
private const int ChannelCapacity = 4096;
- private const int DefaultBatchSize = 5; // Slack rate-limits, small batches are safer
+ private readonly int _batchSize;
private readonly Channel _channel;
private readonly Task _workerTask;
@@ -26,6 +26,7 @@ namespace EonaCat.LogStack.Flows
public SlackFlow(
string webhookUrl,
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Information)
: base("Slack", minimumLevel)
{
@@ -39,6 +40,7 @@ namespace EonaCat.LogStack.Flows
SingleWriter = false
};
+ _batchSize = batchSize <= 0 ? 1 : batchSize;
_channel = Channel.CreateBounded(channelOptions);
_cts = new CancellationTokenSource();
_workerTask = Task.Run(() => ProcessQueueAsync(_cts.Token));
@@ -63,7 +65,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessQueueAsync(CancellationToken cancellationToken)
{
- var batch = new List(DefaultBatchSize);
+ var batch = new List(_batchSize);
try
{
@@ -73,7 +75,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
- if (batch.Count >= DefaultBatchSize)
+ if (batch.Count >= _batchSize)
{
await SendBatchAsync(batch, cancellationToken);
batch.Clear();
diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/SplunkFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/SplunkFlow.cs
index 4ae2c7f..ef86426 100644
--- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/SplunkFlow.cs
+++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/SplunkFlow.cs
@@ -15,7 +15,7 @@ namespace EonaCat.LogStack.Flows
{
public sealed class SplunkFlow : FlowBase
{
- private const int DefaultBatchSize = 256;
+ private readonly int _batchSize;
private const int ChannelCapacity = 4096;
private readonly Channel _channel;
@@ -35,6 +35,7 @@ namespace EonaCat.LogStack.Flows
string token,
string sourcetype = "splunk_logs",
string hostName = null,
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest)
: base($"Splunk:{splunkUrl}", minimumLevel)
@@ -58,6 +59,7 @@ namespace EonaCat.LogStack.Flows
SingleWriter = false
};
+ _batchSize = batchSize <= 0 ? 1 : batchSize;
_channel = Channel.CreateBounded(channelOptions);
_cts = new CancellationTokenSource();
_httpClient = new HttpClient();
@@ -114,7 +116,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessLogEventsAsync(CancellationToken cancellationToken)
{
- var batch = new List(DefaultBatchSize);
+ var batch = new List(_batchSize);
while (!cancellationToken.IsCancellationRequested)
{
@@ -124,7 +126,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
- if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0)
+ if (batch.Count >= _batchSize || _channel.Reader.Count == 0)
{
await SendBatchAsync(batch, cancellationToken);
batch.Clear();
diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/SyslogTcpFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/SyslogTcpFlow.cs
index 3d8d166..6e8c766 100644
--- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/SyslogTcpFlow.cs
+++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/SyslogTcpFlow.cs
@@ -19,7 +19,7 @@ namespace EonaCat.LogStack.Flows
public sealed class SyslogTcpFlow : FlowBase
{
- private const int DefaultBatchSize = 256;
+ private readonly int _batchSize;
private const int ChannelCapacity = 4096;
private readonly Channel _channel;
@@ -38,6 +38,7 @@ namespace EonaCat.LogStack.Flows
public SyslogTcpFlow(
string host,
int port = 514,
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest,
bool useTls = false,
@@ -49,6 +50,8 @@ namespace EonaCat.LogStack.Flows
_port = port;
_backpressureStrategy = backpressureStrategy;
+ _batchSize = batchSize <= 0 ? 1 : batchSize;
+
_useTls = useTls;
_certValidationCallback = certValidationCallback ?? DefaultCertificateValidation;
_clientCertificates = clientCertificates;
@@ -118,7 +121,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessLogEventsAsync(CancellationToken cancellationToken)
{
- var batch = new List(DefaultBatchSize);
+ var batch = new List(_batchSize);
var sb = new StringBuilder(8192);
while (!cancellationToken.IsCancellationRequested)
@@ -131,7 +134,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
- if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0)
+ if (batch.Count >= _batchSize || _channel.Reader.Count == 0)
{
await SendBatchAsync(batch, sb, cancellationToken);
batch.Clear();
diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/SyslogUdpFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/SyslogUdpFlow.cs
index 5f39eae..37898ea 100644
--- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/SyslogUdpFlow.cs
+++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/SyslogUdpFlow.cs
@@ -16,7 +16,7 @@ namespace EonaCat.LogStack.Flows
public sealed class SyslogUdpFlow : FlowBase
{
- private const int DefaultBatchSize = 256;
+ private readonly int _batchSize;
private const int ChannelCapacity = 4096;
private const int MaxUdpPacketSize = 4096;
@@ -32,6 +32,7 @@ namespace EonaCat.LogStack.Flows
public SyslogUdpFlow(
string host,
int port = 514,
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest)
: base($"SyslogUDP:{host}:{port}", minimumLevel)
@@ -40,6 +41,8 @@ namespace EonaCat.LogStack.Flows
_port = port;
_backpressureStrategy = backpressureStrategy;
+ _batchSize = batchSize <= 0 ? 1 : batchSize;
+
var channelOptions = new BoundedChannelOptions(ChannelCapacity)
{
FullMode = backpressureStrategy switch
@@ -108,7 +111,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessLogEventsAsync(CancellationToken cancellationToken)
{
- var batch = new List(DefaultBatchSize);
+ var batch = new List(_batchSize);
var sb = new StringBuilder(8192);
while (!cancellationToken.IsCancellationRequested)
@@ -119,7 +122,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
- if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0)
+ if (batch.Count >= _batchSize || _channel.Reader.Count == 0)
{
await SendBatchAsync(batch, sb, cancellationToken);
batch.Clear();
diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/TcpFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/TcpFlow.cs
index 55873f1..4b588e0 100644
--- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/TcpFlow.cs
+++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/TcpFlow.cs
@@ -17,7 +17,7 @@ namespace EonaCat.LogStack.Flows;
public sealed class TcpFlow : FlowBase
{
- private const int DefaultBatchSize = 256;
+ private readonly int _batchSize;
private const int ChannelCapacity = 4096;
private readonly Channel _channel;
@@ -36,6 +36,7 @@ public sealed class TcpFlow : FlowBase
public TcpFlow(
string host,
int port,
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest,
bool useTls = false,
@@ -47,6 +48,8 @@ public sealed class TcpFlow : FlowBase
_port = port;
_backpressureStrategy = backpressureStrategy;
+ _batchSize = batchSize <= 0 ? 1 : batchSize;
+
_useTls = useTls;
_certValidationCallback = certValidationCallback ?? DefaultCertificateValidation;
_clientCertificates = clientCertificates;
@@ -163,7 +166,7 @@ public sealed class TcpFlow : FlowBase
private async Task ProcessLogEventsAsync(CancellationToken cancellationToken)
{
- var batch = new List(DefaultBatchSize);
+ var batch = new List(_batchSize);
var sb = new StringBuilder(8192);
while (!cancellationToken.IsCancellationRequested)
@@ -176,7 +179,7 @@ public sealed class TcpFlow : FlowBase
{
batch.Add(logEvent);
- if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0)
+ if (batch.Count >= _batchSize || _channel.Reader.Count == 0)
{
await SendBatchAsync(batch, sb, cancellationToken);
batch.Clear();
diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/TelegramFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/TelegramFlow.cs
index f677596..e298664 100644
--- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/TelegramFlow.cs
+++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/TelegramFlow.cs
@@ -19,7 +19,7 @@ namespace EonaCat.LogStack.Flows
public sealed class TelegramFlow : FlowBase, IAsyncDisposable
{
private const int ChannelCapacity = 4096;
- private const int DefaultBatchSize = 5;
+ private readonly int _batchSize;
private readonly Channel _channel;
private readonly Task _workerTask;
@@ -31,6 +31,7 @@ namespace EonaCat.LogStack.Flows
public TelegramFlow(
string botToken,
string chatId,
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Information)
: base("Telegram", minimumLevel)
{
@@ -45,6 +46,7 @@ namespace EonaCat.LogStack.Flows
SingleWriter = false
};
+ _batchSize = batchSize <= 0 ? 1 : batchSize;
_channel = Channel.CreateBounded(channelOptions);
_cts = new CancellationTokenSource();
_workerTask = Task.Run(() => ProcessQueueAsync(_cts.Token));
@@ -69,7 +71,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessQueueAsync(CancellationToken cancellationToken)
{
- var batch = new List(DefaultBatchSize);
+ var batch = new List(_batchSize);
try
{
@@ -79,7 +81,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
- if (batch.Count >= DefaultBatchSize)
+ if (batch.Count >= _batchSize)
{
await SendBatchAsync(batch, cancellationToken);
batch.Clear();
diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/UdpFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/UdpFlow.cs
index fd209b4..b8dbdc6 100644
--- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/UdpFlow.cs
+++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/UdpFlow.cs
@@ -15,7 +15,7 @@ namespace EonaCat.LogStack.Flows;
public sealed class UdpFlow : FlowBase
{
- private const int DefaultBatchSize = 256;
+ private readonly int _batchSize;
private const int ChannelCapacity = 4096;
private readonly Channel _channel;
@@ -30,18 +30,21 @@ public sealed class UdpFlow : FlowBase
private readonly Task _flushTask;
public UdpFlow(
- string host,
- int port,
- int flushIntervalInMilliseconds = 2000,
- LogLevel minimumLevel = LogLevel.Trace,
- BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest)
- : base($"UDP:{host}:{port}", minimumLevel)
+ string host,
+ int port,
+ int flushIntervalInMilliseconds = 2000,
+ int batchSize = 1,
+ LogLevel minimumLevel = LogLevel.Trace,
+ BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest)
+ : base($"UDP:{host}:{port}", minimumLevel)
{
_host = host ?? throw new ArgumentNullException(nameof(host));
_port = port;
_backpressureStrategy = backpressureStrategy;
_flushInterval = TimeSpan.FromMilliseconds(flushIntervalInMilliseconds);
+ _batchSize = batchSize <= 0 ? 1 : batchSize;
+
_udpClient = new UdpClient();
var channelOptions = new BoundedChannelOptions(ChannelCapacity)
@@ -117,7 +120,7 @@ public sealed class UdpFlow : FlowBase
private async Task ProcessLogEventsAsync(CancellationToken cancellationToken)
{
- var batch = new List(DefaultBatchSize);
+ var batch = new List(_batchSize);
var sb = new StringBuilder(8192);
try
@@ -126,7 +129,7 @@ public sealed class UdpFlow : FlowBase
{
batch.Add(logEvent);
- if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0)
+ if (batch.Count >= _batchSize || _channel.Reader.Count == 0)
{
await SendBatchAsync(batch, sb, cancellationToken);
batch.Clear();
diff --git a/EonaCat.LogStack/EonaCatLoggerCore/Flows/ZabbixFlow.cs b/EonaCat.LogStack/EonaCatLoggerCore/Flows/ZabbixFlow.cs
index 8a1421d..447f144 100644
--- a/EonaCat.LogStack/EonaCatLoggerCore/Flows/ZabbixFlow.cs
+++ b/EonaCat.LogStack/EonaCatLoggerCore/Flows/ZabbixFlow.cs
@@ -16,7 +16,7 @@ namespace EonaCat.LogStack.Flows
public sealed class ZabbixFlow : FlowBase
{
- private const int DefaultBatchSize = 256;
+ private readonly int _batchSize;
private const int ChannelCapacity = 4096;
private readonly Channel _channel;
@@ -36,6 +36,7 @@ namespace EonaCat.LogStack.Flows
int port = 10051,
string zabbixHostName = null,
string zabbixKey = "log_event",
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest)
: base($"Zabbix:{host}:{port}", minimumLevel)
@@ -59,6 +60,7 @@ namespace EonaCat.LogStack.Flows
SingleWriter = false
};
+ _batchSize = batchSize <= 0 ? 1 : batchSize;
_channel = Channel.CreateBounded(channelOptions);
_cts = new CancellationTokenSource();
_senderTask = Task.Run(() => ProcessLogEventsAsync(_cts.Token));
@@ -113,7 +115,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessLogEventsAsync(CancellationToken cancellationToken)
{
- var batch = new List(DefaultBatchSize);
+ var batch = new List(_batchSize);
while (!cancellationToken.IsCancellationRequested)
{
@@ -125,7 +127,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
- if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0)
+ if (batch.Count >= _batchSize || _channel.Reader.Count == 0)
{
await SendBatchAsync(batch, cancellationToken);
batch.Clear();
diff --git a/EonaCat.LogStack/LogBuilder.cs b/EonaCat.LogStack/LogBuilder.cs
index aaeb558..833049d 100644
--- a/EonaCat.LogStack/LogBuilder.cs
+++ b/EonaCat.LogStack/LogBuilder.cs
@@ -98,12 +98,13 @@ public sealed class LogBuilder
FileRetentionPolicy fileRetentionPolicy = null,
int flushIntervalInMilliSeconds = 2000,
bool useCategoryRouting = false,
+ int batchSize = 1,
LogLevel[]? logLevelsForSeparateFiles = null,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait,
FileOutputFormat outputFormat = FileOutputFormat.Text,
CompressionFormat compression = CompressionFormat.GZip,
- string template = "[{ts}] [{tz}] [Host: {host}] [Category: {category}] [Thread: {thread}] [{logtype}] {message}{props}")
+ string template = "[{ts}] [Host: {host}] [Category: {category}] [Thread: {thread}] [{logtype}] {message}{props}")
{
_flows.Add(new FileFlow(
directory,
@@ -111,6 +112,7 @@ public sealed class LogBuilder
maxFileSize,
fileRetentionPolicy,
flushIntervalInMilliSeconds,
+ batchSize,
minimumLevel,
useCategoryRouting,
logLevelsForSeparateFiles,
@@ -197,11 +199,13 @@ public sealed class LogBuilder
public LogBuilder WriteToDatabase(
Func connectionFactory,
string tableName = "logs",
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace)
{
_flows.Add(new DatabaseFlow(
connectionFactory,
tableName,
+ batchSize,
minimumLevel));
return this;
}
@@ -225,11 +229,13 @@ public sealed class LogBuilder
public LogBuilder WriteToDiscord(
string webHookUrl,
string botName = "EonaCatBot",
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace)
{
_flows.Add(new DiscordFlow(
webHookUrl,
botName,
+ batchSize,
minimumLevel));
return this;
}
@@ -240,11 +246,13 @@ public sealed class LogBuilder
public LogBuilder WriteToElasticSearch(
string elasticSearchUrl,
string indexName = "EonaCatIndex",
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace)
{
_flows.Add(new ElasticSearchFlow(
elasticSearchUrl,
indexName,
+ batchSize,
minimumLevel));
return this;
}
@@ -252,11 +260,12 @@ public sealed class LogBuilder
///
/// Adds Telegram
///
- public LogBuilder WriteToTelegram(string botToken, string chatId = "EonaCat", LogLevel minimumLevel = LogLevel.Trace)
+ public LogBuilder WriteToTelegram(string botToken, string chatId = "EonaCat", int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace)
{
_flows.Add(new TelegramFlow(
botToken,
chatId,
+ batchSize,
minimumLevel));
return this;
}
@@ -302,10 +311,11 @@ public sealed class LogBuilder
///
/// Adds Slack
///
- public LogBuilder WriteToSlack(string webhookUrl, LogLevel minimumLevel = LogLevel.Trace)
+ public LogBuilder WriteToSlack(string webhookUrl, int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace)
{
_flows.Add(new SlackFlow(
webhookUrl,
+ batchSize,
minimumLevel));
return this;
}
@@ -313,10 +323,11 @@ public sealed class LogBuilder
///
/// Adds Slack
///
- public LogBuilder WriteToMicrosoftTeams(string webhookUrl, LogLevel minimumLevel = LogLevel.Trace)
+ public LogBuilder WriteToMicrosoftTeams(string webhookUrl, int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace)
{
_flows.Add(new MicrosoftTeamsFlow(
webhookUrl,
+ batchSize,
minimumLevel));
return this;
}
@@ -436,6 +447,7 @@ public sealed class LogBuilder
public LogBuilder WriteToSyslogTcp(
string host,
int port = 514,
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait,
bool useTls = false,
@@ -455,6 +467,7 @@ public sealed class LogBuilder
_flows.Add(new SyslogTcpFlow(
host,
port,
+ batchSize,
minimumLevel,
backpressureStrategy,
useTls,
@@ -488,12 +501,14 @@ public sealed class LogBuilder
public LogBuilder WriteToSyslogUdp(
string host,
int port,
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait)
{
_flows.Add(new SyslogUdpFlow(
host,
port,
+ batchSize,
minimumLevel,
backpressureStrategy));
return this;
@@ -507,6 +522,7 @@ public sealed class LogBuilder
int port = 10051,
string zabbixHostname = null,
string zabbixKey = "log_event",
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait)
{
@@ -515,6 +531,7 @@ public sealed class LogBuilder
port,
zabbixHostname,
zabbixKey,
+ batchSize,
minimumLevel,
backpressureStrategy));
return this;
@@ -528,6 +545,7 @@ public sealed class LogBuilder
int port = 12201,
bool useTcp = false,
string graylogHostName = null,
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait)
{
@@ -536,6 +554,7 @@ public sealed class LogBuilder
port,
useTcp,
graylogHostName,
+ batchSize,
minimumLevel,
backpressureStrategy));
return this;
@@ -620,6 +639,7 @@ public sealed class LogBuilder
string token,
string sourceType = "splunk_logs",
string hostName = null,
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait)
{
@@ -628,6 +648,7 @@ public sealed class LogBuilder
token,
sourceType,
hostName,
+ batchSize,
minimumLevel,
backpressureStrategy));
return this;
@@ -707,6 +728,7 @@ public sealed class LogBuilder
string host,
int port,
int flushIntervalInMilliseconds = 1000,
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait)
{
@@ -714,6 +736,7 @@ public sealed class LogBuilder
host,
port,
flushIntervalInMilliseconds,
+ batchSize,
minimumLevel,
backpressureStrategy));
return this;
@@ -736,6 +759,7 @@ public sealed class LogBuilder
public LogBuilder WriteToHttp(
string endpoint,
HttpClient? httpClient = null,
+ int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
TimeSpan? batchInterval = null,
Dictionary? headers = null)
@@ -743,6 +767,7 @@ public sealed class LogBuilder
_flows.Add(new HttpFlow(
endpoint,
httpClient,
+ batchSize,
minimumLevel,
batchInterval,
headers));
diff --git a/Testers/EonaCat.LogStack.Test.Web/Program.cs b/Testers/EonaCat.LogStack.Test.Web/Program.cs
index 83b8a84..daa59a5 100644
--- a/Testers/EonaCat.LogStack.Test.Web/Program.cs
+++ b/Testers/EonaCat.LogStack.Test.Web/Program.cs
@@ -17,8 +17,8 @@ namespace EonaCat.LogStack.Test.Web
_ = Task.Run(async () =>
{
var logBuilder = new LogBuilder();
+ logBuilder.WithTimestampMode(TimestampMode.Local);
logBuilder.WriteToConsole();
- logBuilder.WriteToFile("./logs");
logBuilder.WriteToFile("./logs", outputFormat: FileOutputFormat.Json);
logBuilder.WriteToFile("./logs", outputFormat: FileOutputFormat.Xml);
logBuilder.WriteToTcp("127.0.0.1", 514);