Skip to content

Commit

Permalink
Added ability to create scoped work in singleton event producers.
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonmwebb-lv committed Apr 22, 2024
1 parent 30de666 commit 07875b8
Show file tree
Hide file tree
Showing 15 changed files with 141 additions and 42 deletions.
7 changes: 4 additions & 3 deletions Build/Build.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,12 @@ protected override void OnBuildInitialized()

Target Pack => _ => _
.DependsOn(Compile)
.Requires(() => GitRepository.IsOnMainBranch())
.Executes(() =>
{
Log.Information("Generating NuGet packages for projects in solution");
int commitNum = 0;
string NuGetVersionCustom = "2.0.0.869";
string NuGetVersionCustom = "2.0.0.870";


//if it's not a tagged release - append the commit number to the package version
Expand All @@ -157,8 +158,7 @@ protected override void OnBuildInitialized()
{
Log.Information("Project: {0}", project.Name);
}
//Log.Information("Project: {0}", Solution.GetProject("RCommon.Emailing"));
//var path = projects.FirstOrDefault(x => x.Name == "RCommon.Emailing").Path.ToString();

DotNetTasks
.DotNetPack(_ => _
.SetPackageId("RCommon.Emailing")
Expand Down Expand Up @@ -470,6 +470,7 @@ protected override void OnBuildInitialized()

Target Push => _ => _
.DependsOn(Pack)
.Requires(() => GitRepository.IsOnMainBranch())
.Requires(() => NuGetApiUrl)
.Requires(() => NUGETAPIKEY)
.Requires(() => Configuration.Equals(Configuration.Release))
Expand Down
2 changes: 1 addition & 1 deletion Src/RCommon.Core/Configuration/RCommonBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public RCommonBuilder(IServiceCollection services)
Services = services;

// Event Bus
services.AddScoped<IEventBus>(sp =>
services.AddSingleton<IEventBus>(sp =>
{
return new InMemoryEventBus(sp, services);
});
Expand Down
5 changes: 0 additions & 5 deletions Src/RCommon.Core/EventHandling/InMemoryEventBusBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ public InMemoryEventBusBuilder(IRCommonBuilder builder)
{
Services = builder.Services;

/*builder.Services.AddSingleton<IEventBus>(sp =>
{
return new InMemoryEventBus(sp, builder.Services);
});*/

}

public IServiceCollection Services { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ public PublishWithEventBusEventProducer(IEventBus eventBus, ILogger<PublishWithE
_eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public async Task ProduceEventAsync<T>(T @event, CancellationToken cancellationToken = default)
where T : ISerializableEvent
{
Guard.IsNotNull(@event, nameof(@event));
_logger.LogInformation("{0} publishing event: {1}", new object[] { this.GetGenericTypeName(), @event });
_logger.LogInformation("{0} publishing event: {1}", new object[] { this.GetGenericTypeName(), @event.GetGenericTypeName() });

// This should already be using a Scoped publish method
await _eventBus.PublishAsync(@event);
}
}
Expand Down
57 changes: 57 additions & 0 deletions Src/RCommon.Core/Extensions/ILoggerExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Diagnostics;

namespace Microsoft.Extensions.Logging
{
public static class ILoggerExtensions
{

public static void LogInformation(this ILogger logger, string? message, object[]? @params, bool outputToConsole = false)
{
logger.LogInformation(message, @params);

if (outputToConsole)
{
OutputToConsole(message, @params);
}
}

public static void LogInformation(this ILogger logger, EventId eventId, string? message, object[]? @params, bool outputToConsole = false)
{
logger.LogInformation(eventId, message, @params);

if (outputToConsole)
{
OutputToConsole(message, @params);
}
}

public static void LogInformation(this ILogger logger, Exception exception, string? message, object[]? @params, bool outputToConsole = false)
{
logger.LogInformation(exception, message, @params);

if (outputToConsole)
{
OutputToConsole(message, @params);
}
}

public static void LogInformation(this ILogger logger, EventId eventId, Exception exception, string? message, object[]? @params, bool outputToConsole = false)
{
logger.LogInformation(eventId, exception, message, @params);

if (outputToConsole)
{
OutputToConsole(message, @params);
}
}

private static void OutputToConsole(string? message, object[]? @params)
{
System.Console.WriteLine(message, @params);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public static IRCommonBuilder WithEventHandling<T>(this IRCommonBuilder builder,
{

// MassTransit Event Bus
builder.Services.AddTransient(typeof(IMassTransitEventHandler<>), typeof(MassTransitEventHandler<>));
builder.Services.AddTransient(typeof(MassTransitEventHandler<>));
builder.Services.AddScoped(typeof(IMassTransitEventHandler<>), typeof(MassTransitEventHandler<>));
builder.Services.AddScoped(typeof(MassTransitEventHandler<>));
builder.AddMassTransit(actions);

return builder;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using RCommon.EventHandling;
using RCommon.EventHandling.Producers;
Expand All @@ -14,18 +15,24 @@ public class PublishWithMassTransitEventProducer : IEventProducer
{
private readonly IBus _bus;
private readonly ILogger<PublishWithMassTransitEventProducer> _logger;
private readonly IServiceProvider _serviceProvider;

public PublishWithMassTransitEventProducer(IBus bus, ILogger<PublishWithMassTransitEventProducer> logger)
public PublishWithMassTransitEventProducer(IBus bus, ILogger<PublishWithMassTransitEventProducer> logger, IServiceProvider serviceProvider)
{
_bus = bus ?? throw new ArgumentNullException(nameof(bus));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}

public async Task ProduceEventAsync<T>(T @event, CancellationToken cancellationToken = default) where T : ISerializableEvent
{
Guard.IsNotNull(@event, nameof(@event));
_logger.LogInformation("{0} publishing event: {1}", new object[] { this.GetGenericTypeName(), @event });
await _bus.Publish(@event, cancellationToken);

using (IServiceScope scope = _serviceProvider.CreateScope())
{
_logger.LogInformation("{0} publishing event: {1}", new object[] { this.GetGenericTypeName(), @event });
await _bus.Publish(@event, cancellationToken);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using RCommon.EventHandling;
using RCommon.EventHandling.Producers;
Expand All @@ -15,18 +16,23 @@ public class SendWithMassTransitEventProducer : IEventProducer

private readonly IBus _bus;
private readonly ILogger<PublishWithMassTransitEventProducer> _logger;
private readonly IServiceProvider _serviceProvider;

public SendWithMassTransitEventProducer(IBus bus, ILogger<PublishWithMassTransitEventProducer> logger)
public SendWithMassTransitEventProducer(IBus bus, ILogger<PublishWithMassTransitEventProducer> logger, IServiceProvider serviceProvider)
{
_bus = bus ?? throw new ArgumentNullException(nameof(bus));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}

public async Task ProduceEventAsync<T>(T @event, CancellationToken cancellationToken = default) where T : ISerializableEvent
{
Guard.IsNotNull(@event, nameof(@event));
_logger.LogInformation("{0} sending event: {1}", new object[] { this.GetGenericTypeName(), @event });
await _bus.Send(@event, cancellationToken);
using (IServiceScope scope = _serviceProvider.CreateScope())
{
_logger.LogInformation("{0} sending event: {1}", new object[] { this.GetGenericTypeName(), @event });
await _bus.Send(@event, cancellationToken);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using MassTransit;
using Microsoft.Extensions.Logging;
using RCommon.EventHandling;
using RCommon.EventHandling.Subscribers;
using System;
Expand All @@ -12,16 +13,18 @@ namespace RCommon.MassTransit.Subscribers
public class MassTransitEventHandler<TEvent> : IMassTransitEventHandler<TEvent>, IConsumer<TEvent>
where TEvent : class, ISerializableEvent
{
private readonly ILogger<MassTransitEventHandler<TEvent>> _logger;
private readonly ISubscriber<TEvent> _subscriber;

public MassTransitEventHandler(ISubscriber<TEvent> subscriber)
public MassTransitEventHandler(ILogger<MassTransitEventHandler<TEvent>> logger, ISubscriber<TEvent> subscriber)
{
_subscriber = subscriber;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_subscriber = subscriber ?? throw new ArgumentNullException(nameof(subscriber));
}

public async Task Consume(ConsumeContext<TEvent> context)
{
Console.WriteLine("{0} handling event {1} from MassTransit", new object[] { this.GetGenericTypeName(), context.Message });
_logger.LogDebug("{0} handling event {1}", new object[] { this.GetGenericTypeName(), context.Message });
await _subscriber.HandleAsync(context.Message);
}
}
Expand Down
12 changes: 9 additions & 3 deletions Src/RCommon.Mediatr/Producers/PublishWithMediatREventProducer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using MediatR;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using RCommon.EventHandling;
using RCommon.EventHandling.Producers;
Expand All @@ -16,19 +17,24 @@ public class PublishWithMediatREventProducer : IEventProducer
{
private readonly IMediatorService _mediatorService;
private readonly ILogger<PublishWithMediatREventProducer> _logger;
private readonly IServiceProvider _serviceProvider;

public PublishWithMediatREventProducer(IMediatorService mediatorService, ILogger<PublishWithMediatREventProducer> logger)
public PublishWithMediatREventProducer(IMediatorService mediatorService, ILogger<PublishWithMediatREventProducer> logger, IServiceProvider serviceProvider)
{
_mediatorService = mediatorService ?? throw new ArgumentNullException(nameof(mediatorService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}

public async Task ProduceEventAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
where TEvent : ISerializableEvent
{
Guard.IsNotNull(@event, nameof(@event));
_logger.LogInformation("{0} publishing event: {1}", new object[] { this.GetGenericTypeName(), @event });
await _mediatorService.Publish(@event, cancellationToken);
using (IServiceScope scope = _serviceProvider.CreateScope())
{
_logger.LogInformation("{0} publishing event: {1}", new object[] { this.GetGenericTypeName(), @event.GetGenericTypeName() });
await _mediatorService.Publish(@event, cancellationToken);
}
}
}
}
12 changes: 9 additions & 3 deletions Src/RCommon.Mediatr/Producers/SendWithMediatREventProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,32 @@
using RCommon.MediatR.Subscribers;
using RCommon.Mediator;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;

namespace RCommon.MediatR.Producers
{
public class SendWithMediatREventProducer : IEventProducer
{
private readonly IMediatorService _mediatorService;
private readonly ILogger<SendWithMediatREventProducer> _logger;
private readonly IServiceProvider _serviceProvider;

public SendWithMediatREventProducer(IMediatorService mediatorService, ILogger<SendWithMediatREventProducer> logger)
public SendWithMediatREventProducer(IMediatorService mediatorService, ILogger<SendWithMediatREventProducer> logger, IServiceProvider serviceProvider)
{
_mediatorService = mediatorService ?? throw new ArgumentNullException(nameof(mediatorService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}

public async Task ProduceEventAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
where TEvent : ISerializableEvent
{
Guard.IsNotNull(@event, nameof(@event));
_logger.LogInformation("{0} sending event: {1}", new object[] { this.GetGenericTypeName(), @event });
await _mediatorService.Send(@event, cancellationToken);
using (IServiceScope scope = _serviceProvider.CreateScope())
{
_logger.LogInformation("{0} sending event: {1}", new object[] { this.GetGenericTypeName(), @event.GetGenericTypeName() });
await _mediatorService.Send(@event, cancellationToken);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Configuration;
using RCommon.EventHandling;
using RCommon.EventHandling.Producers;
Expand All @@ -15,18 +16,23 @@ public class PublishWithWolverineEventProducer : IEventProducer
{
private readonly IMessageBus _messageBus;
private readonly ILogger<PublishWithWolverineEventProducer> _logger;
private readonly IServiceProvider _serviceProvider;

public PublishWithWolverineEventProducer(IMessageBus messageBus, ILogger<PublishWithWolverineEventProducer> logger)
public PublishWithWolverineEventProducer(IMessageBus messageBus, ILogger<PublishWithWolverineEventProducer> logger, IServiceProvider serviceProvider)
{
_messageBus = messageBus ?? throw new ArgumentNullException(nameof(messageBus));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}

public async Task ProduceEventAsync<T>(T @event, CancellationToken cancellationToken = default) where T : ISerializableEvent
{
Guard.IsNotNull(@event, nameof(@event));
_logger.LogInformation("{0} publishing event: {1}", new object[] { this.GetGenericTypeName(), @event });
await _messageBus.PublishAsync<T>(@event);
using (IServiceScope scope = _serviceProvider.CreateScope())
{
_logger.LogInformation("{0} publishing event: {1}", new object[] { this.GetGenericTypeName(), @event.GetGenericTypeName() });
await _messageBus.PublishAsync<T>(@event);
}
}
}
}
14 changes: 10 additions & 4 deletions Src/RCommon.Wolverine/Producers/SendWithWolverineEventProducer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using RCommon.EventHandling;
using RCommon.EventHandling.Producers;
using System;
Expand All @@ -14,19 +15,24 @@ public class SendWithWolverineEventProducer : IEventProducer
{
private readonly IMessageBus _messageBus;
private readonly ILogger<SendWithWolverineEventProducer> _logger;
private readonly IServiceProvider _serviceProvider;

public SendWithWolverineEventProducer(IMessageBus messageBus, ILogger<SendWithWolverineEventProducer> logger)
public SendWithWolverineEventProducer(IMessageBus messageBus, ILogger<SendWithWolverineEventProducer> logger, IServiceProvider serviceProvider)
{
_messageBus = messageBus ?? throw new ArgumentNullException(nameof(messageBus));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}


public async Task ProduceEventAsync<T>(T @event, CancellationToken cancellationToken = default) where T : ISerializableEvent
{
Guard.IsNotNull(@event, nameof(@event));
_logger.LogInformation("{0} sending event: {1}", new object[] { this.GetGenericTypeName(), @event });
await _messageBus.SendAsync(@event);
using (IServiceScope scope = _serviceProvider.CreateScope())
{
_logger.LogInformation("{0} sending event: {1}", new object[] { this.GetGenericTypeName(), @event.GetGenericTypeName() });
await _messageBus.SendAsync(@event);
}
}
}
}
Loading

0 comments on commit 07875b8

Please sign in to comment.