Skip to content

Commit

Permalink
Added option to order logs by timestamps. Saves 20% allocations by th…
Browse files Browse the repository at this point in the history
…e serializer when set to false. Option true by default to avoid breaking changes on Loki v2.3 and below. You are advised to set option to false when using Loki v2.4+. (#11)

* Added a benchmark to highlight cost of ordering events by timestamp.

Results for 100 ordered events:
- ordering: 11448 B allocated
- no ordering: 9176 B

So, 2.2kB for 100 events (already ordered) saved.

* Implemented configuration to inject option.

* Documented orderWrites and set it to true by default to avoid breaking changes.

* Added unit testing on the ordering of the serialized logs sent to Loki.
  • Loading branch information
corentinaltepe authored Jan 16, 2022
1 parent b6dea29 commit 2556149
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 48 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Under .NET Core, [remember to register](https://github.com/nlog/nlog/wiki/Regist
endpoint="http://localhost:3100"
username="myusername"
password="secret"
orderWrites="true"
layout="${level}|${message}${onexception:|${exception:format=type,message,method:maxInnerExceptionLevel=5:innerFormat=shortType,message,method}}|source=${logger}">
<!-- Loki requires at least one label associated with the log stream.
Make sure you specify at least one label here. -->
Expand All @@ -61,6 +62,8 @@ Under .NET Core, [remember to register](https://github.com/nlog/nlog/wiki/Regist

`username` and `password` are optional fields, used for basic authentication with Loki.

`orderWrites` - Orders the logs by timestamp before sending them to loki when logs are batched in a single HTTP call. This is required if you use Loki v2.3 or below. But it is not required if you use Loki v2.4 or above (see [out-of-order writes](https://grafana.com/docs/loki/next/configuration/#accept-out-of-order-writes)). You are strongly advised to set this value to `false` when using Loki v2.4+ since it reduces allocations by about 20% by the serializer (default `true`).

`label` elements can be used to enrich messages with additional [labels](https://grafana.com/docs/loki/latest/design-documents/labels/). `label/@layout` support usual NLog layout renderers.

### Async Target
Expand Down
60 changes: 60 additions & 0 deletions src/Benchmark/LokiEventsSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Text.Json;
using BenchmarkDotNet.Attributes;
using NLog.Loki.Model;

namespace Benchmark;

[MemoryDiagnoser]
public class LokiEventsSerializer
{
private readonly MemoryStream stream = new();
private readonly JsonSerializerOptions serializerOptionsWithoutOrdering = new();
private readonly JsonSerializerOptions serializerOptionsWithOrdering = new();

private readonly LokiEvent @event;
private readonly IEnumerable<LokiEvent> manyLokiEvents;

public LokiEventsSerializer()
{
@event = new LokiEvent(
new LokiLabels(new LokiLabel("env", "benchmark"), new LokiLabel("job", "WriteLogEventsAsync")),
DateTime.Now,
"Info|Receive message from \"A\" with destination \"B\".");
var events = new List<LokiEvent>(100);
for(var i = 0; i < 100; i++)
events.Add(new LokiEvent(@event.Labels, DateTime.Now, @event.Line));
manyLokiEvents = events;

serializerOptionsWithoutOrdering = new JsonSerializerOptions();
serializerOptionsWithoutOrdering.Converters.Add(new NLog.Loki.LokiEventsSerializer(orderWrites: false));
serializerOptionsWithoutOrdering.Converters.Add(new NLog.Loki.LokiEventSerializer());

serializerOptionsWithOrdering = new JsonSerializerOptions();
serializerOptionsWithOrdering.Converters.Add(new NLog.Loki.LokiEventsSerializer(orderWrites: true));
serializerOptionsWithOrdering.Converters.Add(new NLog.Loki.LokiEventSerializer());
}

[Benchmark]
public void SerializeManyEventsWithoutOrdering()
{
stream.Position = 0;
JsonSerializer.Serialize(stream, manyLokiEvents, serializerOptionsWithoutOrdering);
}

[Benchmark]
public void SerializeManyEventsWithOrdering()
{
stream.Position = 0;
JsonSerializer.Serialize(stream, manyLokiEvents, serializerOptionsWithOrdering);
}

[Benchmark]
public void SerializeSingleEvent()
{
stream.Position = 0;
JsonSerializer.Serialize(stream, @event, serializerOptionsWithoutOrdering);
}
}
5 changes: 3 additions & 2 deletions src/Benchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ public static void Main(string[] args)
.WithOptions(ConfigOptions.DisableLogFile);

var summary = BenchmarkRunner.Run(new[]{
BenchmarkConverter.TypeToBenchmarks( typeof(Benchmarks), config),
BenchmarkConverter.TypeToBenchmarks( typeof(Transport), config),
BenchmarkConverter.TypeToBenchmarks(typeof(Benchmarks), config),
BenchmarkConverter.TypeToBenchmarks(typeof(Transport), config),
BenchmarkConverter.TypeToBenchmarks(typeof(LokiEventsSerializer), config),
});
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/Benchmark/Transport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ public class Transport
DateTime.Now,
"Info|Receive message from \"A\" with destination \"B\".")};
private readonly HttpLokiTransport transport = new(new LokiHttpClient(
new HttpClient { BaseAddress = new Uri("http://localhost:3100") }));
new HttpClient { BaseAddress = new Uri("http://localhost:3100") }), false);

public Transport()
{
manyLokiEvents = new List<LokiEvent>(1000);
for(var i = 0; i < 1000; i++)
manyLokiEvents = new List<LokiEvent>(100);
for(var i = 0; i < 100; i++)
manyLokiEvents.Add(new LokiEvent(lokiEvents[0].Labels, DateTime.Now, lokiEvents[0].Line));
}

Expand Down
58 changes: 43 additions & 15 deletions src/NLog.Loki.Tests/HttpLokiTransportTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ private static List<LokiEvent> CreateLokiEvents()
new(new LokiLabels(new LokiLabel("env", "unittest"), new LokiLabel("job", "Job1")),
date, "Info|Receive message from A with destination B."),
new(new LokiLabels(new LokiLabel("env", "unittest"), new LokiLabel("job", "Job1")),
date, "Info|Another event has occured here."),
date + TimeSpan.FromSeconds(2.2), "Info|Another event has occured here."),
new(new LokiLabels(new LokiLabel("env", "unittest"), new LokiLabel("job", "Job1")),
date, "Info|Event from another stream."),
date - TimeSpan.FromSeconds(0.9), "Info|Event from another stream."),
};
}

[Test]
public async Task SerializeMessageToHttpLoki()
public async Task SerializeMessageToHttpLokiWithoutOrdering()
{
// Prepare the events to be sent to loki
var events = CreateLokiEvents();
Expand All @@ -43,12 +43,39 @@ public async Task SerializeMessageToHttpLoki()
});

// Send the logging request
var transport = new HttpLokiTransport(httpClient.Object);
var transport = new HttpLokiTransport(httpClient.Object, orderWrites: false);
await transport.WriteLogEventsAsync(events).ConfigureAwait(false);

// Verify the json message format
Assert.AreEqual(
"{\"streams\":[{\"stream\":{\"env\":\"unittest\",\"job\":\"Job1\"},\"values\":[[\"1640598506000000000\",\"Info|Receive message from A with destination B.\"],[\"1640598506000000000\",\"Info|Another event has occured here.\"],[\"1640598506000000000\",\"Info|Event from another stream.\"]]}]}",
"{\"streams\":[{\"stream\":{\"env\":\"unittest\",\"job\":\"Job1\"},\"values\":[[\"1640598506000000000\",\"Info|Receive message from A with destination B.\"],[\"1640598508200000000\",\"Info|Another event has occured here.\"],[\"1640598505100000000\",\"Info|Event from another stream.\"]]}]}",
serializedJsonMessage);
}

[Test]
public async Task SerializeMessageToHttpLokiWithOrdering()
{
// Prepare the events to be sent to loki
var events = CreateLokiEvents();

// Configure the ILokiHttpClient such that we intercept the JSON content and simulate an OK response from Loki.
string serializedJsonMessage = null;
var httpClient = new Mock<ILokiHttpClient>();
_ = httpClient.Setup(c => c.PostAsync("loki/api/v1/push", It.IsAny<HttpContent>()))
.Returns<string, HttpContent>(async (s, json) =>
{
// Intercept the json content so that we can verify it.
serializedJsonMessage = await json.ReadAsStringAsync().ConfigureAwait(false);
return new HttpResponseMessage(HttpStatusCode.OK);
});

// Send the logging request
var transport = new HttpLokiTransport(httpClient.Object, orderWrites: true);
await transport.WriteLogEventsAsync(events).ConfigureAwait(false);

// Verify the json message format
Assert.AreEqual(
"{\"streams\":[{\"stream\":{\"env\":\"unittest\",\"job\":\"Job1\"},\"values\":[[\"1640598505100000000\",\"Info|Event from another stream.\"],[\"1640598506000000000\",\"Info|Receive message from A with destination B.\"],[\"1640598508200000000\",\"Info|Another event has occured here.\"]]}]}",
serializedJsonMessage);
}

Expand All @@ -70,42 +97,43 @@ public async Task SerializeMessageToHttpLokiSingleEvent()
});

// Send the logging request
var transport = new HttpLokiTransport(httpClient.Object);
var transport = new HttpLokiTransport(httpClient.Object, false);
await transport.WriteLogEventsAsync(lokiEvent).ConfigureAwait(false);

// Verify the json message format
Assert.AreEqual(
"{\"streams\":[{\"stream\":{\"env\":\"unittest\",\"job\":\"Job1\"},\"values\":[[\"1640598506000000000\",\"Info|Event from another stream.\"]]}]}",
"{\"streams\":[{\"stream\":{\"env\":\"unittest\",\"job\":\"Job1\"},\"values\":[[\"1640598505100000000\",\"Info|Event from another stream.\"]]}]}",
serializedJsonMessage);
}

[Test]
public void ThrowOnHttpClientException()
public void ThrowOnHttpClientException()
{
var httpClient = new Mock<ILokiHttpClient>();
_ = httpClient
.Setup(c => c.PostAsync("loki/api/v1/push", It.IsAny<HttpContent>()))
.ThrowsAsync(new Exception("Something went wrong whem sending HTTP message."));

// Send the logging request
var transport = new HttpLokiTransport(httpClient.Object);
var transport = new HttpLokiTransport(httpClient.Object, false);
var exception = Assert.ThrowsAsync<Exception>(() => transport.WriteLogEventsAsync(CreateLokiEvents()));
Assert.AreEqual("Something went wrong whem sending HTTP message.", exception.Message);
}

[Test]
public void ThrowOnNonSuccessResponseCode()
public void ThrowOnNonSuccessResponseCode()
{
var response = new HttpResponseMessage(HttpStatusCode.Conflict) {
Content = JsonContent.Create(new {reason = "A stream must have a least one label."}),
var response = new HttpResponseMessage(HttpStatusCode.Conflict)
{
Content = JsonContent.Create(new { reason = "A stream must have a least one label." }),
};
var httpClient = new Mock<ILokiHttpClient>();
_ = httpClient
.Setup(c => c.PostAsync("loki/api/v1/push", It.IsAny<HttpContent>()))
.Returns(Task.FromResult(response));

// Send the logging request
var transport = new HttpLokiTransport(httpClient.Object);
var transport = new HttpLokiTransport(httpClient.Object, false);
var exception = Assert.ThrowsAsync<HttpRequestException>(() => transport.WriteLogEventsAsync(CreateLokiEvents()));
Assert.AreEqual("Failed pushing logs to Loki.", exception.Message);
Assert.AreEqual(HttpStatusCode.Conflict, exception.StatusCode);
Expand Down
2 changes: 1 addition & 1 deletion src/NLog.Loki.Tests/LokiTargetTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public Type GetLokiTransport(string endpointLayout)
Environment.SetEnvironmentVariable("HOST", "loki.lvh.me");

var endpoint = Layout.FromString(endpointLayout);
var lokiTargetTransport = new LokiTarget().GetLokiTransport(endpoint, null, null);
var lokiTargetTransport = new LokiTarget().GetLokiTransport(endpoint, null, null, false);

return lokiTargetTransport.GetType();
}
Expand Down
24 changes: 10 additions & 14 deletions src/NLog.Loki/HttpLokiTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
using System.Net.Http.Json;
using System.Text.Json;
using System.Threading.Tasks;
using NLog.Loki.Model;
using NLog.Common;
using NLog.Loki.Model;

namespace NLog.Loki
{
Expand All @@ -13,32 +13,28 @@ namespace NLog.Loki
/// </remarks>
internal class HttpLokiTransport : ILokiTransport
{
private static readonly JsonSerializerOptions JsonOptions;

static HttpLokiTransport()
{
JsonOptions = new JsonSerializerOptions();
JsonOptions.Converters.Add(new LokiEventsSerializer());
JsonOptions.Converters.Add(new LokiEventSerializer());
}

private readonly JsonSerializerOptions jsonOptions;
private readonly ILokiHttpClient lokiHttpClient;

public HttpLokiTransport(ILokiHttpClient lokiHttpClient)
public HttpLokiTransport(ILokiHttpClient lokiHttpClient, bool orderWrites)
{
this.lokiHttpClient = lokiHttpClient;

jsonOptions = new JsonSerializerOptions();
jsonOptions.Converters.Add(new LokiEventsSerializer(orderWrites));
jsonOptions.Converters.Add(new LokiEventSerializer());
}

public async Task WriteLogEventsAsync(IEnumerable<LokiEvent> lokiEvents)
{
using var jsonStreamContent = JsonContent.Create(lokiEvents, options: JsonOptions);
using var jsonStreamContent = JsonContent.Create(lokiEvents, options: jsonOptions);
using var response = await lokiHttpClient.PostAsync("loki/api/v1/push", jsonStreamContent).ConfigureAwait(false);
await ValidateHttpResponse(response);
}

public async Task WriteLogEventsAsync(LokiEvent lokiEvent)
{
using var jsonStreamContent = JsonContent.Create(lokiEvent, options: JsonOptions);
using var jsonStreamContent = JsonContent.Create(lokiEvent, options: jsonOptions);
using var response = await lokiHttpClient.PostAsync("loki/api/v1/push", jsonStreamContent).ConfigureAwait(false);
await ValidateHttpResponse(response);
}
Expand All @@ -51,7 +47,7 @@ private static async ValueTask ValidateHttpResponse(HttpResponseMessage response
// Read the response's content
string content = response.Content == null ? null :
await response.Content.ReadAsStringAsync().ConfigureAwait(false);

InternalLogger.Error("Failed pushing logs to Loki. Code: {Code}. Reason: {Reason}. Message: {Message}.",
response.StatusCode, response.ReasonPhrase, content);
throw new HttpRequestException("Failed pushing logs to Loki.", inner: null, response.StatusCode);
Expand Down
14 changes: 11 additions & 3 deletions src/NLog.Loki/LokiEventsSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ namespace NLog.Loki;
/// </remarks>
internal class LokiEventsSerializer : JsonConverter<IEnumerable<LokiEvent>>
{
private readonly bool orderWrites;

public LokiEventsSerializer(bool orderWrites)
{
this.orderWrites = orderWrites;
}

public override IEnumerable<LokiEvent> Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
throw new NotSupportedException("This converter only supports serializing to JSON.");
Expand All @@ -42,9 +49,10 @@ public override void Write(Utf8JsonWriter writer, IEnumerable<LokiEvent> value,

writer.WriteStartArray("values");

// TODO: add option with out-of-order by default, but opt-in to enforce ordering
// https://grafana.com/docs/loki/latest/configuration/#accept-out-of-order-writes
foreach(var @event in stream.OrderBy(le => le.Timestamp))
// Order logs by timestamp only if the option is opted-in, because it costs
// approximately 20% more allocation when serializing 100 events.
IEnumerable<LokiEvent> orderedStream = orderWrites ? stream.OrderBy(le => le.Timestamp) : stream;
foreach(var @event in orderedStream)
{
writer.WriteStartArray();
var timestamp = UnixDateTimeConverter.ToUnixTimeNs(@event.Timestamp).ToString("g", CultureInfo.InvariantCulture);
Expand Down
26 changes: 16 additions & 10 deletions src/NLog.Loki/LokiTarget.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ public class LokiTarget : AsyncTaskTarget

public Layout Password { get; init; }

/// <summary>
/// Orders the logs by timestamp before sending them to Loki.
/// Required as <see langword="true"/> before Loki v2.4. Leave as <see langword="false"/> if you are running Loki v2.4 or above.
/// See <see href="https://grafana.com/docs/loki/latest/configuration/#accept-out-of-order-writes"/>.
/// </summary>
public bool OrderWrites { get; init; } = true;

[ArrayParameter(typeof(LokiTargetLabel), "label")]
public IList<LokiTargetLabel> Labels { get; }

Expand All @@ -34,10 +41,9 @@ public LokiTarget()
{
Labels = new List<LokiTargetLabel>();

lazyLokiTransport =
new Lazy<ILokiTransport>(
() => GetLokiTransport(Endpoint, Username, Password),
LazyThreadSafetyMode.ExecutionAndPublication);
lazyLokiTransport = new Lazy<ILokiTransport>(
() => GetLokiTransport(Endpoint, Username, Password, OrderWrites),
LazyThreadSafetyMode.ExecutionAndPublication);
}

protected override void Write(IList<AsyncLogEventInfo> logEvents)
Expand Down Expand Up @@ -65,10 +71,8 @@ private IEnumerable<LokiEvent> GetLokiEvents(IEnumerable<LogEventInfo> logEvents

private LokiEvent GetLokiEvent(LogEventInfo logEvent)
{
var labels =
new LokiLabels(
Labels.Select(
ltl => new LokiLabel(ltl.Name, ltl.Layout.Render(logEvent))));
var labels = new LokiLabels(
Labels.Select(ltl => new LokiLabel(ltl.Name, ltl.Layout.Render(logEvent))));

var line = RenderLogEvent(Layout, logEvent);

Expand All @@ -77,17 +81,19 @@ private LokiEvent GetLokiEvent(LogEventInfo logEvent)
return @event;
}

internal ILokiTransport GetLokiTransport(Layout endpoint, Layout username, Layout password)
internal ILokiTransport GetLokiTransport(
Layout endpoint, Layout username, Layout password, bool orderWrites)
{
var endpointUri = RenderLogEvent(endpoint, LogEventInfo.CreateNullEvent());
var usr = RenderLogEvent(username, LogEventInfo.CreateNullEvent());
var pwd = RenderLogEvent(password, LogEventInfo.CreateNullEvent());

if(Uri.TryCreate(endpointUri, UriKind.Absolute, out var uri))
{
if(uri.Scheme == Uri.UriSchemeHttp || uri.Scheme == Uri.UriSchemeHttps)
{
var lokiHttpClient = LokiHttpClientFactory(uri, usr, pwd);
var httpLokiTransport = new HttpLokiTransport(lokiHttpClient);
var httpLokiTransport = new HttpLokiTransport(lokiHttpClient, orderWrites);

return httpLokiTransport;
}
Expand Down
1 change: 1 addition & 0 deletions src/Template/nlog.config
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
taskDelayMilliseconds="500"
endpoint="http://localhost:3100"
retryCount="3"
orderWrites="true"
layout="${level}|${message}${onexception:|${exception:format=type,message,method:maxInnerExceptionLevel=5:innerFormat=shortType,message,method}}|source=${logger}">
<label name="app" layout="template" />
</target>
Expand Down

0 comments on commit 2556149

Please sign in to comment.