Skip to content

Commit

Permalink
Merge pull request #27 from corentinaltepe/gzip
Browse files Browse the repository at this point in the history
Added HTTP message gzip compression.
  • Loading branch information
corentinaltepe authored Mar 13, 2022
2 parents c35445a + 1da7cdb commit bbf21db
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 34 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Under .NET Core, [remember to register](https://github.com/nlog/nlog/wiki/Regist
username="myusername"
password="secret"
orderWrites="true"
compressionLevel="noCompression"
layout="${level}|${message}${onexception:|${exception:format=type,message,method:maxInnerExceptionLevel=5:innerFormat=shortType,message,method}}|source=${logger}">
<!-- Loki requires at least one label associated with the log stream.
Make sure you specify at least one label here. -->
Expand All @@ -64,6 +65,13 @@ Under .NET Core, [remember to register](https://github.com/nlog/nlog/wiki/Regist

`orderWrites` - Orders the logs by timestamp before sending them to loki when logs are batched in a single HTTP call. This is required if you use Loki v2.3 or below. But it is not required if you use Loki v2.4 or above (see [out-of-order writes](https://grafana.com/docs/loki/next/configuration/#accept-out-of-order-writes)). You are strongly advised to set this value to `false` when using Loki v2.4+ since it reduces allocations by about 20% by the serializer (default `true`).

`compressionLevel` - Gzip compression level applied if any when when sending messages to Loki (default `noCompression`). Possible values:

- `noCompression`: no compression applied, HTTP header will not specify a Content-Encoding with gzip value.
- `fastest`: the compression operation should complete as quickly as possible, even if the resulting file is not optimally compressed.
- `optimal`: the compression operation should be optimally compressed, even if the operation takes a longer time to complete.
- `smallestSize`: supported by .NET 6 or greater only. The compression operation should create output as small as possible, even if the operation takes a longer time to complete.

`label` elements can be used to enrich messages with additional [labels](https://grafana.com/docs/loki/latest/design-documents/labels/). `label/@layout` support usual NLog layout renderers.

### Async Target
Expand Down
17 changes: 15 additions & 2 deletions src/Benchmark/Transport.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO.Compression;
using System.Net.Http;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
Expand All @@ -11,14 +12,19 @@ namespace Benchmark;
[MemoryDiagnoser]
public class Transport
{
[Params(CompressionLevel.NoCompression, CompressionLevel.Fastest, CompressionLevel.Optimal,
#if NET6_0_OR_GREATER
CompressionLevel.SmallestSize
#endif
)]
public CompressionLevel CompressionLevel { get; set; }

private readonly IList<LokiEvent> _manyLokiEvents;
private readonly IList<LokiEvent> _lokiEvents = new List<LokiEvent> {
new(
new LokiLabels(new LokiLabel("env", "benchmark"), new LokiLabel("job", "WriteLogEventsAsync")),
DateTime.Now,
"Info|Receive message from \"A\" with destination \"B\".")};
private readonly HttpLokiTransport _transport = new(new LokiHttpClient(
new HttpClient { BaseAddress = new Uri("http://localhost:3100") }), false);

public Transport()
{
Expand All @@ -27,6 +33,13 @@ public Transport()
_manyLokiEvents.Add(new LokiEvent(_lokiEvents[0].Labels, DateTime.Now, _lokiEvents[0].Line));
}

private HttpLokiTransport _transport;
[GlobalSetup]
public void GlobalSetup()
{
_transport = new(new LokiHttpClient(new HttpClient { BaseAddress = new Uri("http://localhost:3100") }), false, CompressionLevel);
}

[Benchmark]
public async Task WriteLogEventsAsync()
{
Expand Down
97 changes: 72 additions & 25 deletions src/NLog.Loki.Tests/HttpLokiTransportTests.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using System;
using System.Collections.Generic;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Json;
using System.Text;
using System.Threading.Tasks;
using Moq;
using NLog.Loki.Model;
Expand All @@ -12,17 +15,17 @@ namespace NLog.Loki.Tests;

public class HttpLokiTransportTests
{
private static List<LokiEvent> CreateLokiEvents()
private static IEnumerable<LokiEvent> CreateLokiEvents(int numberEvents = 3)
{
var date = new DateTime(2021, 12, 27, 9, 48, 26, DateTimeKind.Utc);
return new List<LokiEvent> {
new(new LokiLabels(new LokiLabel("env", "unittest"), new LokiLabel("job", "Job1")),
date, "Info|Receive message from A with destination B."),
new(new LokiLabels(new LokiLabel("env", "unittest"), new LokiLabel("job", "Job1")),
date + TimeSpan.FromSeconds(2.2), "Info|Another event has occured here."),
new(new LokiLabels(new LokiLabel("env", "unittest"), new LokiLabel("job", "Job1")),
date - TimeSpan.FromSeconds(0.9), "Info|Event from another stream."),
};
for(var i = 0; i < numberEvents; i++)
{
yield return new(new LokiLabels(new LokiLabel("env", "unittest"), new LokiLabel("job", "Job1")), date, "Info|Receive message from A with destination B.");
i++;
yield return new(new LokiLabels(new LokiLabel("env", "unittest"), new LokiLabel("job", "Job1")), date + TimeSpan.FromSeconds(2.2), "Info|Another event has occured here.");
i++;
yield return new(new LokiLabels(new LokiLabel("env", "unittest"), new LokiLabel("job", "Job1")), date - TimeSpan.FromSeconds(0.9), "Info|Event from another stream.");
}
}

[Test]
Expand All @@ -35,15 +38,16 @@ public async Task SerializeMessageToHttpLokiWithoutOrdering()
string serializedJsonMessage = null;
var httpClient = new Mock<ILokiHttpClient>();
_ = httpClient.Setup(c => c.PostAsync("loki/api/v1/push", It.IsAny<HttpContent>()))
.Returns<string, HttpContent>(async (s, json) =>
.Returns<string, HttpContent>(async (s, content) =>
{
// Intercept the json content so that we can verify it.
serializedJsonMessage = await json.ReadAsStringAsync().ConfigureAwait(false);
serializedJsonMessage = await content.ReadAsStringAsync().ConfigureAwait(false);
Assert.AreEqual("application/json", content.Headers.ContentType.MediaType);
return new HttpResponseMessage(HttpStatusCode.OK);
});

// Send the logging request
var transport = new HttpLokiTransport(httpClient.Object, orderWrites: false);
var transport = new HttpLokiTransport(httpClient.Object, orderWrites: false, CompressionLevel.NoCompression);
await transport.WriteLogEventsAsync(events).ConfigureAwait(false);

// Verify the json message format
Expand All @@ -62,15 +66,16 @@ public async Task SerializeMessageToHttpLokiWithOrdering()
string serializedJsonMessage = null;
var httpClient = new Mock<ILokiHttpClient>();
_ = httpClient.Setup(c => c.PostAsync("loki/api/v1/push", It.IsAny<HttpContent>()))
.Returns<string, HttpContent>(async (s, json) =>
.Returns<string, HttpContent>(async (s, content) =>
{
// Intercept the json content so that we can verify it.
serializedJsonMessage = await json.ReadAsStringAsync().ConfigureAwait(false);
serializedJsonMessage = await content.ReadAsStringAsync().ConfigureAwait(false);
Assert.AreEqual("application/json", content.Headers.ContentType.MediaType);
return new HttpResponseMessage(HttpStatusCode.OK);
});

// Send the logging request
var transport = new HttpLokiTransport(httpClient.Object, orderWrites: true);
var transport = new HttpLokiTransport(httpClient.Object, orderWrites: true, CompressionLevel.NoCompression);
await transport.WriteLogEventsAsync(events).ConfigureAwait(false);

// Verify the json message format
Expand All @@ -83,21 +88,22 @@ public async Task SerializeMessageToHttpLokiWithOrdering()
public async Task SerializeMessageToHttpLokiSingleEvent()
{
// Prepare the event to be sent to loki
var lokiEvent = CreateLokiEvents()[2];
var lokiEvent = CreateLokiEvents().ToList()[2];

// Configure the ILokiHttpClient such that we intercept the JSON content and simulate an OK response from Loki.
string serializedJsonMessage = null;
var httpClient = new Mock<ILokiHttpClient>();
_ = httpClient.Setup(c => c.PostAsync("loki/api/v1/push", It.IsAny<HttpContent>()))
.Returns<string, HttpContent>(async (s, json) =>
.Returns<string, HttpContent>(async (s, content) =>
{
// Intercept the json content so that we can verify it.
serializedJsonMessage = await json.ReadAsStringAsync().ConfigureAwait(false);
serializedJsonMessage = await content.ReadAsStringAsync().ConfigureAwait(false);
Assert.AreEqual("application/json", content.Headers.ContentType.MediaType);
return new HttpResponseMessage(HttpStatusCode.OK);
});

// Send the logging request
var transport = new HttpLokiTransport(httpClient.Object, false);
var transport = new HttpLokiTransport(httpClient.Object, false, CompressionLevel.NoCompression);
await transport.WriteLogEventsAsync(lokiEvent).ConfigureAwait(false);

// Verify the json message format
Expand All @@ -115,7 +121,7 @@ public void ThrowOnHttpClientException()
.ThrowsAsync(new Exception("Something went wrong whem sending HTTP message."));

// Send the logging request
var transport = new HttpLokiTransport(httpClient.Object, false);
var transport = new HttpLokiTransport(httpClient.Object, false, CompressionLevel.NoCompression);
var exception = Assert.ThrowsAsync<Exception>(() => transport.WriteLogEventsAsync(CreateLokiEvents()));
Assert.AreEqual("Something went wrong whem sending HTTP message.", exception.Message);
}
Expand All @@ -133,12 +139,53 @@ public void ThrowOnNonSuccessResponseCode()
.Returns(Task.FromResult(response));

// Send the logging request
var transport = new HttpLokiTransport(httpClient.Object, false);
var transport = new HttpLokiTransport(httpClient.Object, false, CompressionLevel.NoCompression);
var exception = Assert.ThrowsAsync<HttpRequestException>(() => transport.WriteLogEventsAsync(CreateLokiEvents()));
Assert.AreEqual("Failed pushing logs to Loki.", exception.Message);

#if NET6_0_OR_GREATER
Assert.AreEqual(HttpStatusCode.Conflict, exception.StatusCode);
#endif

#if NET6_0_OR_GREATER
Assert.AreEqual(HttpStatusCode.Conflict, exception.StatusCode);
#endif
}

[Test]
[TestCase(CompressionLevel.Fastest)]
[TestCase(CompressionLevel.Optimal)]
#if NET6_0_OR_GREATER
[TestCase(CompressionLevel.SmallestSize)]
#endif
public async Task CompressMessage(CompressionLevel level)
{
// Prepare the events to be sent to loki
var events = CreateLokiEvents(3);

// Configure the ILokiHttpClient such that we intercept the JSON content and simulate an OK response from Loki.
string serializedJsonMessage = null;
var httpClient = new Mock<ILokiHttpClient>();
_ = httpClient.Setup(c => c.PostAsync("loki/api/v1/push", It.IsAny<HttpContent>()))
.Returns<string, HttpContent>(async (s, content) =>
{
// Intercept the gzipped json content so that we can verify it.
var stream = await content.ReadAsStreamAsync().ConfigureAwait(false);
Assert.True(content.Headers.ContentEncoding.Any(s => s == "gzip"));
stream = new GZipStream(stream, CompressionMode.Decompress);
var buffer = new byte[128000];
var length = stream.Read(buffer, 0, buffer.Length);
serializedJsonMessage = Encoding.UTF8.GetString(buffer, 0, length);

Assert.True(content.Headers.ContentEncoding.Any(s => s == "gzip"));
Assert.AreEqual("application/json", content.Headers.ContentType.MediaType);

return new HttpResponseMessage(HttpStatusCode.OK);
});

// Send the logging request
var transport = new HttpLokiTransport(httpClient.Object, orderWrites: false, level);
await transport.WriteLogEventsAsync(events).ConfigureAwait(false);

// Verify the json message format
Assert.AreEqual(
"{\"streams\":[{\"stream\":{\"env\":\"unittest\",\"job\":\"Job1\"},\"values\":[[\"1640598506000000000\",\"Info|Receive message from A with destination B.\"],[\"1640598508200000000\",\"Info|Another event has occured here.\"],[\"1640598505100000000\",\"Info|Event from another stream.\"]]}]}",
serializedJsonMessage);
}
}
58 changes: 58 additions & 0 deletions src/NLog.Loki/CompressedContent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System;
using System.IO;
using System.IO.Compression;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;

namespace NLog.Loki;

/// <summary>
/// GZipped HTTP Content.
/// Inspired from https://programmer.help/blogs/httpclient-and-aps.net-web-api-compression-and-decompression-of-request-content.html
/// by arunmj82 (Tue, 18 Dec 2018).
/// </summary>
internal sealed class CompressedContent : HttpContent
{
private readonly HttpContent _originalContent;
private readonly CompressionLevel _level;

public CompressedContent(HttpContent content, CompressionLevel level)
{
_originalContent = content ?? throw new ArgumentNullException("content");
_level = level;

// Copy the underlying content's headers
foreach(var header in _originalContent.Headers)
_ = Headers.TryAddWithoutValidation(header.Key, header.Value);

// Add Content-Encoding header
Headers.ContentEncoding.Add("gzip");
}

protected override bool TryComputeLength(out long length)
{
length = -1;
return false;
}

protected async override Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
using var gzipStream = new GZipStream(stream, _level, leaveOpen: true);
await _originalContent.CopyToAsync(gzipStream).ConfigureAwait(false);
}

private bool _isDisposed;
protected override void Dispose(bool isDisposing)
{
if(!_isDisposed)
{
if(isDisposing)
{
_originalContent?.Dispose();
}
_isDisposed = true;
}
base.Dispose(isDisposing);
}
}
32 changes: 26 additions & 6 deletions src/NLog.Loki/HttpLokiTransport.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.IO.Compression;
using System.Net.Http;
using System.Net.Http.Json;
using System.Text.Json;
Expand All @@ -15,28 +16,47 @@ internal sealed class HttpLokiTransport : ILokiTransport
{
private readonly JsonSerializerOptions _jsonOptions;
private readonly ILokiHttpClient _lokiHttpClient;
private readonly CompressionLevel _gzipLevel;

public HttpLokiTransport(ILokiHttpClient lokiHttpClient, bool orderWrites)
public HttpLokiTransport(
ILokiHttpClient lokiHttpClient,
bool orderWrites,
CompressionLevel gzipLevel)
{
_lokiHttpClient = lokiHttpClient;

_gzipLevel = gzipLevel;
_jsonOptions = new JsonSerializerOptions();
_jsonOptions.Converters.Add(new LokiEventsSerializer(orderWrites));
_jsonOptions.Converters.Add(new LokiEventSerializer());
}

public async Task WriteLogEventsAsync(IEnumerable<LokiEvent> lokiEvents)
{
using var jsonStreamContent = JsonContent.Create(lokiEvents, options: _jsonOptions);
using var jsonStreamContent = CreateContent(lokiEvents);
using var response = await _lokiHttpClient.PostAsync("loki/api/v1/push", jsonStreamContent).ConfigureAwait(false);
await ValidateHttpResponse(response);
await ValidateHttpResponse(response).ConfigureAwait(false);
}

public async Task WriteLogEventsAsync(LokiEvent lokiEvent)
{
using var jsonStreamContent = JsonContent.Create(lokiEvent, options: _jsonOptions);
using var jsonStreamContent = CreateContent(lokiEvent);
using var response = await _lokiHttpClient.PostAsync("loki/api/v1/push", jsonStreamContent).ConfigureAwait(false);
await ValidateHttpResponse(response);
await ValidateHttpResponse(response).ConfigureAwait(false);
}

/// <summary>
/// Prepares the HttpContent for the loki event(s).
/// If gzip compression is enabled, prepares a gzip stream with the appropriate headers.
/// </summary>
private HttpContent CreateContent<T>(T lokiEvent)
{
var jsonContent = JsonContent.Create(lokiEvent, options: _jsonOptions);

// If no compression required
if(_gzipLevel == CompressionLevel.NoCompression)
return jsonContent;

return new CompressedContent(jsonContent, _gzipLevel);
}

private static async ValueTask ValidateHttpResponse(HttpResponseMessage response)
Expand Down
9 changes: 8 additions & 1 deletion src/NLog.Loki/LokiTarget.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO.Compression;
using System.Linq;
using System.Net.Http;
using System.Text;
Expand Down Expand Up @@ -32,6 +33,12 @@ public class LokiTarget : AsyncTaskTarget
/// </summary>
public bool OrderWrites { get; set; } = true;

/// <summary>
/// Defines if the HTTP messages sent to Loki must be gzip compressed, and with which compression level.
/// Possible values: NoCompression (default), Optimal, Fastest and SmallestSize (.NET 6 support only).
/// </summary>
public CompressionLevel CompressionLevel { get; set; } = CompressionLevel.NoCompression;

[ArrayParameter(typeof(LokiTargetLabel), "label")]
public IList<LokiTargetLabel> Labels { get; }

Expand Down Expand Up @@ -92,7 +99,7 @@ internal ILokiTransport GetLokiTransport(
if(Uri.TryCreate(endpointUri, UriKind.Absolute, out var uri))
{
if(uri.Scheme == Uri.UriSchemeHttp || uri.Scheme == Uri.UriSchemeHttps)
return new HttpLokiTransport(LokiHttpClientFactory(uri, usr, pwd), orderWrites);
return new HttpLokiTransport(LokiHttpClientFactory(uri, usr, pwd), orderWrites, CompressionLevel);
}

InternalLogger.Warn("Unable to create a valid Loki Endpoint URI from '{0}'", endpoint);
Expand Down
1 change: 1 addition & 0 deletions src/Template/nlog.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
endpoint="http://localhost:3100"
retryCount="3"
orderWrites="true"
compressionLevel="fastest"
layout="${level}|${message}${onexception:|${exception:format=type,message,method:maxInnerExceptionLevel=5:innerFormat=shortType,message,method}}|source=${logger}">
<label name="app" layout="template" />
</target>
Expand Down

0 comments on commit bbf21db

Please sign in to comment.