Added batchSizes for all flows

This commit is contained in:
2026-03-05 18:59:55 +01:00
parent 3f3356eb4a
commit 52d3ed617f
19 changed files with 119 additions and 60 deletions

View File

@@ -14,7 +14,7 @@ It features a rich fluent API for routing log events to dozens of destinations
<Copyright>EonaCat (Jeroen Saey)</Copyright>
<PackageTags>EonaCat;Logger;EonaCatLogStack;Log;Writer;Flows;LogStack;Memory;Speed;Jeroen;Saey</PackageTags>
<PackageIconUrl />
<FileVersion>0.0.1</FileVersion>
<FileVersion>0.0.2</FileVersion>
<PackageReadmeFile>README.md</PackageReadmeFile>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
@@ -25,7 +25,7 @@ It features a rich fluent API for routing log events to dozens of destinations
</PropertyGroup>
<PropertyGroup>
<EVRevisionFormat>0.0.1+{chash:10}.{c:ymd}</EVRevisionFormat>
<EVRevisionFormat>0.0.2+{chash:10}.{c:ymd}</EVRevisionFormat>
<EVDefault>true</EVDefault>
<EVInfo>true</EVInfo>
<EVTagMatch>v[0-9]*</EVTagMatch>
@@ -36,7 +36,7 @@ It features a rich fluent API for routing log events to dozens of destinations
</PropertyGroup>
<PropertyGroup>
<Version>0.0.1</Version>
<Version>0.0.2</Version>
<PackageId>EonaCat.LogStack</PackageId>
<Product>EonaCat.LogStack</Product>
<RepositoryUrl>https://git.saey.me/EonaCat/EonaCat.LogStack</RepositoryUrl>

View File

@@ -32,7 +32,7 @@ namespace EonaCat.LogStack.Flows
bool useColors = true,
TimestampMode timestampMode = TimestampMode.Local,
ColorSchema? colorSchema = null,
string template = "[{ts}] [{tz}] [Host: {host}] [Category: {category}] [Thread: {thread}] [{logtype}] {message}{props}")
string template = "[{ts}] [Host: {host}] [Category: {category}] [Thread: {thread}] [{logtype}] {message}{props}")
: base("Console", minimumLevel)
{
_useColors = useColors;

View File

@@ -19,7 +19,7 @@ namespace EonaCat.LogStack.Flows
public sealed class DatabaseFlow : FlowBase
{
private const int ChannelCapacity = 4096;
private const int DefaultBatchSize = 128;
private readonly int _batchSize;
private readonly Channel<LogEvent> _channel;
private readonly Task _writerTask;
@@ -31,12 +31,14 @@ namespace EonaCat.LogStack.Flows
public DatabaseFlow(
Func<DbConnection> connectionFactory,
string tableName = "Logs",
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace)
: base($"Database:{tableName}", minimumLevel)
{
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
_tableName = tableName;
_batchSize = batchSize <= 0 ? 1 : batchSize;
var channelOptions = new BoundedChannelOptions(ChannelCapacity)
{
FullMode = BoundedChannelFullMode.DropOldest,
@@ -78,7 +80,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessLogEventsAsync(CancellationToken cancellationToken)
{
var batch = new List<LogEvent>(DefaultBatchSize);
var batch = new List<LogEvent>(_batchSize);
try
{
@@ -88,7 +90,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
if (batch.Count >= DefaultBatchSize)
if (batch.Count >= _batchSize)
{
await WriteBatchAsync(batch, cancellationToken).ConfigureAwait(false);
batch.Clear();

View File

@@ -19,7 +19,7 @@ namespace EonaCat.LogStack.Flows
public sealed class DiscordFlow : FlowBase, IAsyncDisposable
{
private const int ChannelCapacity = 4096;
private const int DefaultBatchSize = 10;
private readonly int _batchSize;
private readonly Channel<LogEvent> _channel;
private readonly Task _workerTask;
@@ -30,11 +30,13 @@ namespace EonaCat.LogStack.Flows
public DiscordFlow(
string webhookUrl,
string botName,
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Information)
: base("Discord", minimumLevel)
{
_webhookUrl = webhookUrl ?? throw new ArgumentNullException(nameof(webhookUrl));
_httpClient = new HttpClient();
_batchSize = batchSize <= 0 ? 1 : batchSize;
var channelOptions = new BoundedChannelOptions(ChannelCapacity)
{
@@ -67,7 +69,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessQueueAsync(string botName, CancellationToken cancellationToken)
{
var batch = new List<LogEvent>(DefaultBatchSize);
var batch = new List<LogEvent>(_batchSize);
try
{
@@ -77,7 +79,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
if (batch.Count >= DefaultBatchSize)
if (batch.Count >= _batchSize)
{
await SendBatchAsync(botName, batch, cancellationToken);
batch.Clear();

View File

@@ -19,7 +19,7 @@ namespace EonaCat.LogStack.Flows
public sealed class ElasticSearchFlow : FlowBase, IAsyncDisposable
{
private const int ChannelCapacity = 4096;
private const int DefaultBatchSize = 100; // Bulk insert batch size
private readonly int _batchSize;
private readonly Channel<LogEvent> _channel;
private readonly Task _workerTask;
@@ -31,12 +31,14 @@ namespace EonaCat.LogStack.Flows
public ElasticSearchFlow(
string elasticsearchUrl,
string indexName = "logs",
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace)
: base($"Elasticsearch:{indexName}", minimumLevel)
{
_elasticsearchUrl = elasticsearchUrl?.TrimEnd('/') ?? throw new ArgumentNullException(nameof(elasticsearchUrl));
_indexName = indexName;
_httpClient = new HttpClient();
_batchSize = batchSize <= 0 ? 1 : batchSize;
var channelOptions = new BoundedChannelOptions(ChannelCapacity)
{
@@ -69,7 +71,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessQueueAsync(CancellationToken cancellationToken)
{
var batch = new List<LogEvent>(DefaultBatchSize);
var batch = new List<LogEvent>(_batchSize);
try
{
@@ -79,7 +81,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
if (batch.Count >= DefaultBatchSize)
if (batch.Count >= _batchSize)
{
await SendBulkAsync(batch, cancellationToken);
batch.Clear();

View File

@@ -27,7 +27,7 @@ namespace EonaCat.LogStack.Flows
{
private const int FileBufferSize = 131072; // 128 KB
private const int WriterBufferSize = 131072; // 128 KB
private const int DefaultBatchSize = 512;
private readonly int _batchSize;
private const int QueueCapacity = 8192;
private static readonly Dictionary<LogLevel, string> LevelStrings =
@@ -122,6 +122,7 @@ namespace EonaCat.LogStack.Flows
long maxFileSize = 200 * 1024 * 1024,
FileRetentionPolicy retention = null,
int flushIntervalMs = 2000,
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
bool useCategoryRouting = false,
LogLevel[] logLevelsForSeparateFiles = null,
@@ -129,7 +130,7 @@ namespace EonaCat.LogStack.Flows
BackpressureStrategy backpressure = BackpressureStrategy.DropOldest,
FileOutputFormat outputFormat = FileOutputFormat.Text,
CompressionFormat compression = CompressionFormat.GZip,
string template = "[{ts}] [{tz}] [Host: {host}] [Category: {category}] [Thread: {thread}] [{logtype}] {message}{props}",
string template = "[{ts}] [Host: {host}] [Category: {category}] [Thread: {thread}] [{logtype}] {message}{props}",
long maxMemoryBytes = 20 * 1024 * 1024)
: base("File:" + Path.Combine(directory, filePrefix), minimumLevel)
{
@@ -150,6 +151,7 @@ namespace EonaCat.LogStack.Flows
CheckForProcessTermination();
_batchSize = batchSize <= 0 ? 1 : batchSize;
_directory = directory;
_filePrefix = filePrefix;
_template = template;
@@ -505,7 +507,7 @@ namespace EonaCat.LogStack.Flows
// Drain additional items
int extra = 0;
LogEvent next;
while (extra < DefaultBatchSize && _queue.TryTake(out next))
while (extra < _batchSize && _queue.TryTake(out next))
{
WriteLogEvent(next);
extra++;

View File

@@ -17,7 +17,7 @@ namespace EonaCat.LogStack.Flows
public sealed class GraylogFlow : FlowBase
{
private const int DefaultBatchSize = 256;
private readonly int _batchSize;
private const int ChannelCapacity = 4096;
private const int MaxUdpPacketSize = 8192;
@@ -39,6 +39,7 @@ namespace EonaCat.LogStack.Flows
int port = 12201,
bool useTcp = false,
string graylogHostName = null,
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest)
: base($"Graylog:{host}:{port}", minimumLevel)
@@ -48,6 +49,7 @@ namespace EonaCat.LogStack.Flows
_useTcp = useTcp;
_backpressureStrategy = backpressureStrategy;
_graylogHostName = graylogHostName ?? Environment.MachineName;
_batchSize = batchSize <= 0 ? 1 : batchSize;
var channelOptions = new BoundedChannelOptions(ChannelCapacity)
{
@@ -122,7 +124,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessLogEventsAsync(CancellationToken cancellationToken)
{
var batch = new List<LogEvent>(DefaultBatchSize);
var batch = new List<LogEvent>(_batchSize);
while (!cancellationToken.IsCancellationRequested)
{
@@ -137,7 +139,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0)
if (batch.Count >= _batchSize || _channel.Reader.Count == 0)
{
await SendBatchAsync(batch, cancellationToken);
batch.Clear();

View File

@@ -19,7 +19,7 @@ public sealed class HttpFlow : FlowBase
// See the LICENSE file or go to https://EonaCat.com/License for full license details.
private const int ChannelCapacity = 2048;
private const int DefaultBatchSize = 50;
private readonly int _batchSize;
private const int MaxRetries = 3;
private readonly Channel<LogEvent> _channel;
@@ -35,6 +35,7 @@ public sealed class HttpFlow : FlowBase
public HttpFlow(
string endpoint,
HttpClient? httpClient = null,
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Information,
TimeSpan? batchInterval = null,
Dictionary<string, string>? headers = null)
@@ -58,6 +59,7 @@ public sealed class HttpFlow : FlowBase
_ownHttpClient = false;
}
_batchSize = batchSize <= 0 ? 1 : batchSize;
var channelOptions = new BoundedChannelOptions(ChannelCapacity)
{
FullMode = BoundedChannelFullMode.DropOldest,
@@ -103,7 +105,7 @@ public sealed class HttpFlow : FlowBase
private async Task ProcessLogEventsAsync(CancellationToken cancellationToken)
{
var batch = new List<LogEvent>(DefaultBatchSize);
var batch = new List<LogEvent>(_batchSize);
try
{
@@ -112,7 +114,7 @@ public sealed class HttpFlow : FlowBase
var hasMore = true;
// Collect batch
while (batch.Count < DefaultBatchSize && hasMore)
while (batch.Count < _batchSize && hasMore)
{
if (_channel.Reader.TryRead(out var logEvent))
{

View File

@@ -19,7 +19,7 @@ namespace EonaCat.LogStack.Flows
public sealed class MicrosoftTeamsFlow : FlowBase, IAsyncDisposable
{
private const int ChannelCapacity = 4096;
private const int DefaultBatchSize = 5; // Keep batches small to avoid throttling
private readonly int _batchSize;
private readonly Channel<LogEvent> _channel;
private readonly Task _workerTask;
@@ -29,6 +29,7 @@ namespace EonaCat.LogStack.Flows
public MicrosoftTeamsFlow(
string webhookUrl,
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Information)
: base("MicrosoftTeams", minimumLevel)
{
@@ -42,6 +43,7 @@ namespace EonaCat.LogStack.Flows
SingleWriter = false
};
_batchSize = batchSize <= 0 ? 1 : batchSize;
_channel = Channel.CreateBounded<LogEvent>(channelOptions);
_cts = new CancellationTokenSource();
_workerTask = Task.Run(() => ProcessQueueAsync(_cts.Token));
@@ -66,7 +68,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessQueueAsync(CancellationToken cancellationToken)
{
var batch = new List<LogEvent>(DefaultBatchSize);
var batch = new List<LogEvent>(_batchSize);
try
{
@@ -76,7 +78,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
if (batch.Count >= DefaultBatchSize)
if (batch.Count >= _batchSize)
{
await SendBatchAsync(batch, cancellationToken);
batch.Clear();

View File

@@ -16,7 +16,7 @@ namespace EonaCat.LogStack.Flows
public sealed class SlackFlow : FlowBase, IAsyncDisposable
{
private const int ChannelCapacity = 4096;
private const int DefaultBatchSize = 5; // Slack rate-limits, small batches are safer
private readonly int _batchSize;
private readonly Channel<LogEvent> _channel;
private readonly Task _workerTask;
@@ -26,6 +26,7 @@ namespace EonaCat.LogStack.Flows
public SlackFlow(
string webhookUrl,
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Information)
: base("Slack", minimumLevel)
{
@@ -39,6 +40,7 @@ namespace EonaCat.LogStack.Flows
SingleWriter = false
};
_batchSize = batchSize <= 0 ? 1 : batchSize;
_channel = Channel.CreateBounded<LogEvent>(channelOptions);
_cts = new CancellationTokenSource();
_workerTask = Task.Run(() => ProcessQueueAsync(_cts.Token));
@@ -63,7 +65,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessQueueAsync(CancellationToken cancellationToken)
{
var batch = new List<LogEvent>(DefaultBatchSize);
var batch = new List<LogEvent>(_batchSize);
try
{
@@ -73,7 +75,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
if (batch.Count >= DefaultBatchSize)
if (batch.Count >= _batchSize)
{
await SendBatchAsync(batch, cancellationToken);
batch.Clear();

View File

@@ -15,7 +15,7 @@ namespace EonaCat.LogStack.Flows
{
public sealed class SplunkFlow : FlowBase
{
private const int DefaultBatchSize = 256;
private readonly int _batchSize;
private const int ChannelCapacity = 4096;
private readonly Channel<LogEvent> _channel;
@@ -35,6 +35,7 @@ namespace EonaCat.LogStack.Flows
string token,
string sourcetype = "splunk_logs",
string hostName = null,
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest)
: base($"Splunk:{splunkUrl}", minimumLevel)
@@ -58,6 +59,7 @@ namespace EonaCat.LogStack.Flows
SingleWriter = false
};
_batchSize = batchSize <= 0 ? 1 : batchSize;
_channel = Channel.CreateBounded<LogEvent>(channelOptions);
_cts = new CancellationTokenSource();
_httpClient = new HttpClient();
@@ -114,7 +116,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessLogEventsAsync(CancellationToken cancellationToken)
{
var batch = new List<LogEvent>(DefaultBatchSize);
var batch = new List<LogEvent>(_batchSize);
while (!cancellationToken.IsCancellationRequested)
{
@@ -124,7 +126,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0)
if (batch.Count >= _batchSize || _channel.Reader.Count == 0)
{
await SendBatchAsync(batch, cancellationToken);
batch.Clear();

View File

@@ -19,7 +19,7 @@ namespace EonaCat.LogStack.Flows
public sealed class SyslogTcpFlow : FlowBase
{
private const int DefaultBatchSize = 256;
private readonly int _batchSize;
private const int ChannelCapacity = 4096;
private readonly Channel<LogEvent> _channel;
@@ -38,6 +38,7 @@ namespace EonaCat.LogStack.Flows
public SyslogTcpFlow(
string host,
int port = 514,
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest,
bool useTls = false,
@@ -49,6 +50,8 @@ namespace EonaCat.LogStack.Flows
_port = port;
_backpressureStrategy = backpressureStrategy;
_batchSize = batchSize <= 0 ? 1 : batchSize;
_useTls = useTls;
_certValidationCallback = certValidationCallback ?? DefaultCertificateValidation;
_clientCertificates = clientCertificates;
@@ -118,7 +121,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessLogEventsAsync(CancellationToken cancellationToken)
{
var batch = new List<LogEvent>(DefaultBatchSize);
var batch = new List<LogEvent>(_batchSize);
var sb = new StringBuilder(8192);
while (!cancellationToken.IsCancellationRequested)
@@ -131,7 +134,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0)
if (batch.Count >= _batchSize || _channel.Reader.Count == 0)
{
await SendBatchAsync(batch, sb, cancellationToken);
batch.Clear();

View File

@@ -16,7 +16,7 @@ namespace EonaCat.LogStack.Flows
public sealed class SyslogUdpFlow : FlowBase
{
private const int DefaultBatchSize = 256;
private readonly int _batchSize;
private const int ChannelCapacity = 4096;
private const int MaxUdpPacketSize = 4096;
@@ -32,6 +32,7 @@ namespace EonaCat.LogStack.Flows
public SyslogUdpFlow(
string host,
int port = 514,
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest)
: base($"SyslogUDP:{host}:{port}", minimumLevel)
@@ -40,6 +41,8 @@ namespace EonaCat.LogStack.Flows
_port = port;
_backpressureStrategy = backpressureStrategy;
_batchSize = batchSize <= 0 ? 1 : batchSize;
var channelOptions = new BoundedChannelOptions(ChannelCapacity)
{
FullMode = backpressureStrategy switch
@@ -108,7 +111,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessLogEventsAsync(CancellationToken cancellationToken)
{
var batch = new List<LogEvent>(DefaultBatchSize);
var batch = new List<LogEvent>(_batchSize);
var sb = new StringBuilder(8192);
while (!cancellationToken.IsCancellationRequested)
@@ -119,7 +122,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0)
if (batch.Count >= _batchSize || _channel.Reader.Count == 0)
{
await SendBatchAsync(batch, sb, cancellationToken);
batch.Clear();

View File

@@ -17,7 +17,7 @@ namespace EonaCat.LogStack.Flows;
public sealed class TcpFlow : FlowBase
{
private const int DefaultBatchSize = 256;
private readonly int _batchSize;
private const int ChannelCapacity = 4096;
private readonly Channel<LogEvent> _channel;
@@ -36,6 +36,7 @@ public sealed class TcpFlow : FlowBase
public TcpFlow(
string host,
int port,
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest,
bool useTls = false,
@@ -47,6 +48,8 @@ public sealed class TcpFlow : FlowBase
_port = port;
_backpressureStrategy = backpressureStrategy;
_batchSize = batchSize <= 0 ? 1 : batchSize;
_useTls = useTls;
_certValidationCallback = certValidationCallback ?? DefaultCertificateValidation;
_clientCertificates = clientCertificates;
@@ -163,7 +166,7 @@ public sealed class TcpFlow : FlowBase
private async Task ProcessLogEventsAsync(CancellationToken cancellationToken)
{
var batch = new List<LogEvent>(DefaultBatchSize);
var batch = new List<LogEvent>(_batchSize);
var sb = new StringBuilder(8192);
while (!cancellationToken.IsCancellationRequested)
@@ -176,7 +179,7 @@ public sealed class TcpFlow : FlowBase
{
batch.Add(logEvent);
if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0)
if (batch.Count >= _batchSize || _channel.Reader.Count == 0)
{
await SendBatchAsync(batch, sb, cancellationToken);
batch.Clear();

View File

@@ -19,7 +19,7 @@ namespace EonaCat.LogStack.Flows
public sealed class TelegramFlow : FlowBase, IAsyncDisposable
{
private const int ChannelCapacity = 4096;
private const int DefaultBatchSize = 5;
private readonly int _batchSize;
private readonly Channel<LogEvent> _channel;
private readonly Task _workerTask;
@@ -31,6 +31,7 @@ namespace EonaCat.LogStack.Flows
public TelegramFlow(
string botToken,
string chatId,
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Information)
: base("Telegram", minimumLevel)
{
@@ -45,6 +46,7 @@ namespace EonaCat.LogStack.Flows
SingleWriter = false
};
_batchSize = batchSize <= 0 ? 1 : batchSize;
_channel = Channel.CreateBounded<LogEvent>(channelOptions);
_cts = new CancellationTokenSource();
_workerTask = Task.Run(() => ProcessQueueAsync(_cts.Token));
@@ -69,7 +71,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessQueueAsync(CancellationToken cancellationToken)
{
var batch = new List<LogEvent>(DefaultBatchSize);
var batch = new List<LogEvent>(_batchSize);
try
{
@@ -79,7 +81,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
if (batch.Count >= DefaultBatchSize)
if (batch.Count >= _batchSize)
{
await SendBatchAsync(batch, cancellationToken);
batch.Clear();

View File

@@ -15,7 +15,7 @@ namespace EonaCat.LogStack.Flows;
public sealed class UdpFlow : FlowBase
{
private const int DefaultBatchSize = 256;
private readonly int _batchSize;
private const int ChannelCapacity = 4096;
private readonly Channel<LogEvent> _channel;
@@ -33,6 +33,7 @@ public sealed class UdpFlow : FlowBase
string host,
int port,
int flushIntervalInMilliseconds = 2000,
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest)
: base($"UDP:{host}:{port}", minimumLevel)
@@ -42,6 +43,8 @@ public sealed class UdpFlow : FlowBase
_backpressureStrategy = backpressureStrategy;
_flushInterval = TimeSpan.FromMilliseconds(flushIntervalInMilliseconds);
_batchSize = batchSize <= 0 ? 1 : batchSize;
_udpClient = new UdpClient();
var channelOptions = new BoundedChannelOptions(ChannelCapacity)
@@ -117,7 +120,7 @@ public sealed class UdpFlow : FlowBase
private async Task ProcessLogEventsAsync(CancellationToken cancellationToken)
{
var batch = new List<LogEvent>(DefaultBatchSize);
var batch = new List<LogEvent>(_batchSize);
var sb = new StringBuilder(8192);
try
@@ -126,7 +129,7 @@ public sealed class UdpFlow : FlowBase
{
batch.Add(logEvent);
if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0)
if (batch.Count >= _batchSize || _channel.Reader.Count == 0)
{
await SendBatchAsync(batch, sb, cancellationToken);
batch.Clear();

View File

@@ -16,7 +16,7 @@ namespace EonaCat.LogStack.Flows
public sealed class ZabbixFlow : FlowBase
{
private const int DefaultBatchSize = 256;
private readonly int _batchSize;
private const int ChannelCapacity = 4096;
private readonly Channel<LogEvent> _channel;
@@ -36,6 +36,7 @@ namespace EonaCat.LogStack.Flows
int port = 10051,
string zabbixHostName = null,
string zabbixKey = "log_event",
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.DropOldest)
: base($"Zabbix:{host}:{port}", minimumLevel)
@@ -59,6 +60,7 @@ namespace EonaCat.LogStack.Flows
SingleWriter = false
};
_batchSize = batchSize <= 0 ? 1 : batchSize;
_channel = Channel.CreateBounded<LogEvent>(channelOptions);
_cts = new CancellationTokenSource();
_senderTask = Task.Run(() => ProcessLogEventsAsync(_cts.Token));
@@ -113,7 +115,7 @@ namespace EonaCat.LogStack.Flows
private async Task ProcessLogEventsAsync(CancellationToken cancellationToken)
{
var batch = new List<LogEvent>(DefaultBatchSize);
var batch = new List<LogEvent>(_batchSize);
while (!cancellationToken.IsCancellationRequested)
{
@@ -125,7 +127,7 @@ namespace EonaCat.LogStack.Flows
{
batch.Add(logEvent);
if (batch.Count >= DefaultBatchSize || _channel.Reader.Count == 0)
if (batch.Count >= _batchSize || _channel.Reader.Count == 0)
{
await SendBatchAsync(batch, cancellationToken);
batch.Clear();

View File

@@ -98,12 +98,13 @@ public sealed class LogBuilder
FileRetentionPolicy fileRetentionPolicy = null,
int flushIntervalInMilliSeconds = 2000,
bool useCategoryRouting = false,
int batchSize = 1,
LogLevel[]? logLevelsForSeparateFiles = null,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait,
FileOutputFormat outputFormat = FileOutputFormat.Text,
CompressionFormat compression = CompressionFormat.GZip,
string template = "[{ts}] [{tz}] [Host: {host}] [Category: {category}] [Thread: {thread}] [{logtype}] {message}{props}")
string template = "[{ts}] [Host: {host}] [Category: {category}] [Thread: {thread}] [{logtype}] {message}{props}")
{
_flows.Add(new FileFlow(
directory,
@@ -111,6 +112,7 @@ public sealed class LogBuilder
maxFileSize,
fileRetentionPolicy,
flushIntervalInMilliSeconds,
batchSize,
minimumLevel,
useCategoryRouting,
logLevelsForSeparateFiles,
@@ -197,11 +199,13 @@ public sealed class LogBuilder
public LogBuilder WriteToDatabase(
Func<DbConnection> connectionFactory,
string tableName = "logs",
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace)
{
_flows.Add(new DatabaseFlow(
connectionFactory,
tableName,
batchSize,
minimumLevel));
return this;
}
@@ -225,11 +229,13 @@ public sealed class LogBuilder
public LogBuilder WriteToDiscord(
string webHookUrl,
string botName = "EonaCatBot",
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace)
{
_flows.Add(new DiscordFlow(
webHookUrl,
botName,
batchSize,
minimumLevel));
return this;
}
@@ -240,11 +246,13 @@ public sealed class LogBuilder
public LogBuilder WriteToElasticSearch(
string elasticSearchUrl,
string indexName = "EonaCatIndex",
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace)
{
_flows.Add(new ElasticSearchFlow(
elasticSearchUrl,
indexName,
batchSize,
minimumLevel));
return this;
}
@@ -252,11 +260,12 @@ public sealed class LogBuilder
/// <summary>
/// Adds Telegram
/// </summary>
public LogBuilder WriteToTelegram(string botToken, string chatId = "EonaCat", LogLevel minimumLevel = LogLevel.Trace)
public LogBuilder WriteToTelegram(string botToken, string chatId = "EonaCat", int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace)
{
_flows.Add(new TelegramFlow(
botToken,
chatId,
batchSize,
minimumLevel));
return this;
}
@@ -302,10 +311,11 @@ public sealed class LogBuilder
/// <summary>
/// Adds Slack
/// </summary>
public LogBuilder WriteToSlack(string webhookUrl, LogLevel minimumLevel = LogLevel.Trace)
public LogBuilder WriteToSlack(string webhookUrl, int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace)
{
_flows.Add(new SlackFlow(
webhookUrl,
batchSize,
minimumLevel));
return this;
}
@@ -313,10 +323,11 @@ public sealed class LogBuilder
/// <summary>
/// Adds Slack
/// </summary>
public LogBuilder WriteToMicrosoftTeams(string webhookUrl, LogLevel minimumLevel = LogLevel.Trace)
public LogBuilder WriteToMicrosoftTeams(string webhookUrl, int batchSize = 1, LogLevel minimumLevel = LogLevel.Trace)
{
_flows.Add(new MicrosoftTeamsFlow(
webhookUrl,
batchSize,
minimumLevel));
return this;
}
@@ -436,6 +447,7 @@ public sealed class LogBuilder
public LogBuilder WriteToSyslogTcp(
string host,
int port = 514,
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait,
bool useTls = false,
@@ -455,6 +467,7 @@ public sealed class LogBuilder
_flows.Add(new SyslogTcpFlow(
host,
port,
batchSize,
minimumLevel,
backpressureStrategy,
useTls,
@@ -488,12 +501,14 @@ public sealed class LogBuilder
public LogBuilder WriteToSyslogUdp(
string host,
int port,
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait)
{
_flows.Add(new SyslogUdpFlow(
host,
port,
batchSize,
minimumLevel,
backpressureStrategy));
return this;
@@ -507,6 +522,7 @@ public sealed class LogBuilder
int port = 10051,
string zabbixHostname = null,
string zabbixKey = "log_event",
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait)
{
@@ -515,6 +531,7 @@ public sealed class LogBuilder
port,
zabbixHostname,
zabbixKey,
batchSize,
minimumLevel,
backpressureStrategy));
return this;
@@ -528,6 +545,7 @@ public sealed class LogBuilder
int port = 12201,
bool useTcp = false,
string graylogHostName = null,
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait)
{
@@ -536,6 +554,7 @@ public sealed class LogBuilder
port,
useTcp,
graylogHostName,
batchSize,
minimumLevel,
backpressureStrategy));
return this;
@@ -620,6 +639,7 @@ public sealed class LogBuilder
string token,
string sourceType = "splunk_logs",
string hostName = null,
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait)
{
@@ -628,6 +648,7 @@ public sealed class LogBuilder
token,
sourceType,
hostName,
batchSize,
minimumLevel,
backpressureStrategy));
return this;
@@ -707,6 +728,7 @@ public sealed class LogBuilder
string host,
int port,
int flushIntervalInMilliseconds = 1000,
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
BackpressureStrategy backpressureStrategy = BackpressureStrategy.Wait)
{
@@ -714,6 +736,7 @@ public sealed class LogBuilder
host,
port,
flushIntervalInMilliseconds,
batchSize,
minimumLevel,
backpressureStrategy));
return this;
@@ -736,6 +759,7 @@ public sealed class LogBuilder
public LogBuilder WriteToHttp(
string endpoint,
HttpClient? httpClient = null,
int batchSize = 1,
LogLevel minimumLevel = LogLevel.Trace,
TimeSpan? batchInterval = null,
Dictionary<string, string>? headers = null)
@@ -743,6 +767,7 @@ public sealed class LogBuilder
_flows.Add(new HttpFlow(
endpoint,
httpClient,
batchSize,
minimumLevel,
batchInterval,
headers));

View File

@@ -17,8 +17,8 @@ namespace EonaCat.LogStack.Test.Web
_ = Task.Run(async () =>
{
var logBuilder = new LogBuilder();
logBuilder.WithTimestampMode(TimestampMode.Local);
logBuilder.WriteToConsole();
logBuilder.WriteToFile("./logs");
logBuilder.WriteToFile("./logs", outputFormat: FileOutputFormat.Json);
logBuilder.WriteToFile("./logs", outputFormat: FileOutputFormat.Xml);
logBuilder.WriteToTcp("127.0.0.1", 514);