diff --git a/modules/eventlog/src/integrationTest/java/org/apache/ignite/internal/eventlog/impl/ItEventLogConfigurationTest.java b/modules/eventlog/src/integrationTest/java/org/apache/ignite/internal/eventlog/impl/ItEventLogConfigurationTest.java index 2f481c34402..484b31fcd5e 100644 --- a/modules/eventlog/src/integrationTest/java/org/apache/ignite/internal/eventlog/impl/ItEventLogConfigurationTest.java +++ b/modules/eventlog/src/integrationTest/java/org/apache/ignite/internal/eventlog/impl/ItEventLogConfigurationTest.java @@ -81,7 +81,7 @@ void configureChannelAndTestSink() throws Exception { .user(EventUser.system()) .build(); - eventLog.log(() -> event); + eventLog.log(event); // Then event is written into the sink. assertThat(inMemoryCollectionSink.events(), contains(event)); @@ -110,7 +110,7 @@ void configureTestSinkAndChannel() throws Exception { .user(EventUser.system()) .build(); - eventLog.log(() -> event); + eventLog.log(event); // Then event is written into the sink. assertThat(inMemoryCollectionSink.events(), contains(event)); @@ -139,7 +139,7 @@ void channelConfigurationChanges() throws Exception { .user(EventUser.system()) .build(); - eventLog.log(() -> event); + eventLog.log(event); // Then event is written into the sink. assertThat(inMemoryCollectionSink.events(), hasSize(1)); @@ -150,7 +150,7 @@ void channelConfigurationChanges() throws Exception { .user(EventUser.system()) .build(); - eventLog.log(() -> event2); + eventLog.log(event2); // Then the event is not written into the sink. assertThat(inMemoryCollectionSink.events(), hasSize(1)); @@ -168,7 +168,7 @@ void channelConfigurationChanges() throws Exception { assertThat(inMemoryCollectionSink.events(), hasSize(1)); // When log event2 again. - eventLog.log(() -> event2); + eventLog.log(event2); // Then the event2 is written into the sink. assertThat(inMemoryCollectionSink.events(), hasSize(2)); @@ -197,7 +197,7 @@ void enableDisableChannel() throws Exception { .user(EventUser.system()) .build(); - eventLog.log(() -> event); + eventLog.log(event); // Then event is written into the sink. assertThat(inMemoryCollectionSink.events(), hasSize(1)); @@ -209,7 +209,7 @@ void enableDisableChannel() throws Exception { )).get(); // When log event again. - eventLog.log(() -> event); + eventLog.log(event); // Then the event is not written into the sink. assertThat(inMemoryCollectionSink.events(), hasSize(1)); @@ -221,7 +221,7 @@ void enableDisableChannel() throws Exception { )).get(); // When log event again. - eventLog.log(() -> event); + eventLog.log(event); // Then the event is written into the sink. assertThat(inMemoryCollectionSink.events(), hasSize(2)); diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventFactory.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventFactory.java index d652ead4162..b0f2824a511 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventFactory.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventFactory.java @@ -35,7 +35,8 @@ public interface EventFactory { /** * Creates an event builder with the event type defined. The type is set by the factory. For example, - * {@link IgniteEvents.CONNECTION_CLOSED.build} will return a builder with {@link IgniteEventType.CONNECTION_CLOSED} type set. + * {@link IgniteEvents.USER_AUTHENTICATION_SUCCESS.builder()} will return a builder with + * {@link IgniteEventType.USER_AUTHENTICATION_SUCCESS} type set. */ EventBuilder builder(); } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java index 922d3754e15..559620abedf 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java @@ -44,7 +44,16 @@ public interface EventLog { /** * Writes event into every channel this event relates to. * - * @param eventProvider Event provider. + * @param event The event to log. */ - void log(Supplier eventProvider); + void log(Event event); + + /** + * Lazy version of {@link #log(Event)}. It first checks if the events of the given type are enabled and then logs the event. + * INVARIANT: type must be a valid event type and eventProvider must provide an event of the same type. + * + * @param type the type of the event. + * @param eventProvider the event provider. + */ + void log(String type, Supplier eventProvider); } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEvents.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEvents.java index 1b7aae4312f..41d007bcd97 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEvents.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEvents.java @@ -29,22 +29,26 @@ *
{@code IgniteEvents.USER_AUTHENTICATION_SUCCESS.create(EventUser.system());}
*/ public final class IgniteEvents implements EventFactory { - public static final IgniteEvents USER_AUTHENTICATION_SUCCESS = new IgniteEvents(IgniteEventType.USER_AUTHENTICATION_SUCCESS.name()); - public static final IgniteEvents USER_AUTHENTICATION_FAILURE = new IgniteEvents(IgniteEventType.USER_AUTHENTICATION_FAILURE.name()); + public static final IgniteEvents USER_AUTHENTICATION_SUCCESS = new IgniteEvents(IgniteEventType.USER_AUTHENTICATION_SUCCESS); + public static final IgniteEvents USER_AUTHENTICATION_FAILURE = new IgniteEvents(IgniteEventType.USER_AUTHENTICATION_FAILURE); - public static final IgniteEvents CLIENT_CONNECTION_ESTABLISHED = new IgniteEvents(IgniteEventType.CLIENT_CONNECTION_ESTABLISHED.name()); - public static final IgniteEvents CLIENT_CONNECTION_CLOSED = new IgniteEvents(IgniteEventType.CLIENT_CONNECTION_CLOSED.name()); + public static final IgniteEvents CLIENT_CONNECTION_ESTABLISHED = new IgniteEvents(IgniteEventType.CLIENT_CONNECTION_ESTABLISHED); + public static final IgniteEvents CLIENT_CONNECTION_CLOSED = new IgniteEvents(IgniteEventType.CLIENT_CONNECTION_CLOSED); - private final String type; + private final IgniteEventType type; - private IgniteEvents(String type) { + private IgniteEvents(IgniteEventType type) { this.type = type; } + public String type() { + return type.name(); + } + @Override public Event create(EventUser user) { return Event.builder() - .type(type) + .type(type.name()) .user(user) .timestamp(System.currentTimeMillis()) .build(); @@ -52,6 +56,6 @@ public Event create(EventUser user) { @Override public EventBuilder builder() { - return new EventBuilder().type(type); + return new EventBuilder().type(type.name()); } } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelRegistry.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelRegistry.java index 74b7415dfe4..1728b55b90a 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelRegistry.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelRegistry.java @@ -19,6 +19,7 @@ import java.util.Set; import org.apache.ignite.internal.eventlog.api.EventChannel; +import org.jetbrains.annotations.Nullable; /** * Channel registry. The only way to send an event into channel is to get the channel from this registry. @@ -36,8 +37,9 @@ interface ChannelRegistry { /** * Get all channels that can handle the given event type. * - * @param igniteEventType Ignite event type. - * @return Set of channels. + * @param type Ignite event type. + * @return Set of channels or {@code null} if there are no channels for the given event type. */ - Set findAllChannelsByEventType(String igniteEventType); + @Nullable + Set findAllChannelsByEventType(String type); } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistry.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistry.java index 272e8a2f42a..cff1bab7574 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistry.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistry.java @@ -25,8 +25,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import org.apache.ignite.configuration.NamedListView; import org.apache.ignite.configuration.notifications.ConfigurationListener; @@ -34,18 +32,16 @@ import org.apache.ignite.internal.eventlog.api.EventChannel; import org.apache.ignite.internal.eventlog.config.schema.ChannelView; import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration; +import org.jetbrains.annotations.Nullable; -class ConfigurationBasedChannelRegistry implements ChannelRegistry { - private final ReadWriteLock guard; +public class ConfigurationBasedChannelRegistry implements ChannelRegistry { + private volatile Map cache; - private final Map cache; - - private final Map> typeCache; + private volatile Map> typeCache; private final SinkRegistry sinkRegistry; - ConfigurationBasedChannelRegistry(EventLogConfiguration cfg, SinkRegistry sinkRegistry) { - this.guard = new ReentrantReadWriteLock(); + public ConfigurationBasedChannelRegistry(EventLogConfiguration cfg, SinkRegistry sinkRegistry) { this.cache = new HashMap<>(); this.typeCache = new HashMap<>(); this.sinkRegistry = sinkRegistry; @@ -55,23 +51,18 @@ class ConfigurationBasedChannelRegistry implements ChannelRegistry { @Override public EventChannel getByName(String name) { - guard.readLock().lock(); - try { - return cache.get(name); - } finally { - guard.readLock().unlock(); - } + return cache.get(name); } @Override + @Nullable public Set findAllChannelsByEventType(String igniteEventType) { - guard.readLock().lock(); - try { - Set result = typeCache.get(igniteEventType); - return result == null ? Set.of() : new HashSet<>(result); - } finally { - guard.readLock().unlock(); + Set result = typeCache.get(igniteEventType); + if (result == null) { + return null; } + + return new HashSet<>(result); } private class CacheUpdater implements ConfigurationListener> { @@ -79,29 +70,26 @@ private class CacheUpdater implements ConfigurationListener onUpdate(ConfigurationNotificationEvent> ctx) { NamedListView newListValue = ctx.newValue(); - guard.writeLock().lock(); - - try { - cache.clear(); - typeCache.clear(); - - newListValue.forEach(view -> { - if (view.enabled()) { - EventChannel channel = createChannel(view); - cache.put(view.name(), channel); - for (String eventType : view.events()) { - typeCache.computeIfAbsent( - eventType.trim(), - t -> new HashSet<>() - ).add(channel); - } + Map newCache = new HashMap<>(); + Map> newTypeCache = new HashMap<>(); + + newListValue.forEach(view -> { + if (view.enabled()) { + EventChannel channel = createChannel(view); + newCache.put(view.name(), channel); + for (String eventType : view.events()) { + newTypeCache.computeIfAbsent( + eventType.trim(), + t -> new HashSet<>() + ).add(channel); } - }); + } + }); + + cache = newCache; + typeCache = newTypeCache; - return completedFuture(null); - } finally { - guard.writeLock().unlock(); - } + return completedFuture(null); } private EventChannel createChannel(ChannelView view) { diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java index ef0830348fc..f4eb65a4939 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java @@ -24,8 +24,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.configuration.NamedListView; import org.apache.ignite.configuration.notifications.ConfigurationListener; import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent; @@ -34,16 +32,13 @@ import org.apache.ignite.internal.eventlog.config.schema.SinkView; class ConfigurationBasedSinkRegistry implements SinkRegistry { - private final ReadWriteLock guard; + private volatile Map cache; - private final Map cache; - - private final Map> cacheByChannel; + private volatile Map> cacheByChannel; private final SinkFactory sinkFactory; ConfigurationBasedSinkRegistry(EventLogConfiguration cfg, SinkFactory sinkFactory) { - this.guard = new ReentrantReadWriteLock(); this.cache = new HashMap<>(); this.cacheByChannel = new HashMap<>(); this.sinkFactory = sinkFactory; @@ -53,23 +48,13 @@ class ConfigurationBasedSinkRegistry implements SinkRegistry { @Override public Sink getByName(String name) { - guard.readLock().lock(); - try { - return cache.get(name); - } finally { - guard.readLock().unlock(); - } + return cache.get(name); } @Override public Set findAllByChannel(String channel) { - guard.readLock().lock(); - try { - Set sinks = cacheByChannel.get(channel); - return sinks == null ? Set.of() : new HashSet<>(sinks); - } finally { - guard.readLock().unlock(); - } + Set sinks = cacheByChannel.get(channel); + return sinks == null ? Set.of() : new HashSet<>(sinks); } private class CacheUpdater implements ConfigurationListener> { @@ -77,21 +62,19 @@ private class CacheUpdater implements ConfigurationListener onUpdate(ConfigurationNotificationEvent> ctx) { NamedListView newListValue = ctx.newValue(); - guard.writeLock().lock(); - try { - cache.values().forEach(Sink::stop); - cache.clear(); + Map newCache = new HashMap<>(); + Map> newCacheByChannel = new HashMap<>(); - cacheByChannel.clear(); - for (SinkView sinkView : newListValue) { - Sink sink = sinkFactory.createSink(sinkView); - cache.put(sinkView.name(), sink); - cacheByChannel.computeIfAbsent(sinkView.channel(), k -> new HashSet<>()).add(sink); - } - return completedFuture(null); - } finally { - guard.writeLock().unlock(); + for (SinkView sinkView : newListValue) { + Sink sink = sinkFactory.createSink(sinkView); + newCache.put(sinkView.name(), sink); + newCacheByChannel.computeIfAbsent(sinkView.channel(), k -> new HashSet<>()).add(sink); } + + cache = newCache; + cacheByChannel = newCacheByChannel; + + return completedFuture(null); } } } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventChannelImpl.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventChannelImpl.java index 28148720959..3c1f078b611 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventChannelImpl.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventChannelImpl.java @@ -21,6 +21,7 @@ import java.util.Set; import org.apache.ignite.internal.eventlog.api.Event; import org.apache.ignite.internal.eventlog.api.EventChannel; +import org.apache.ignite.internal.eventlog.api.Sink; class EventChannelImpl implements EventChannel { private final SinkRegistry sinkRegistry; @@ -42,6 +43,11 @@ public Set types() { @Override public void log(Event event) { - sinkRegistry.findAllByChannel(channelName).forEach(s -> s.write(event)); + Set sinks = sinkRegistry.findAllByChannel(channelName); + if (sinks == null) { + return; + } + + sinks.forEach(s -> s.write(event)); } } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java index 84ee9fdc657..01fcb79303f 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java @@ -55,9 +55,27 @@ public EventLogImpl(EventLogConfiguration cfg, Supplier clusterIdSupplier, } @Override - public void log(Supplier eventProvider) { - Event event = eventProvider.get(); + public void log(Event event) { Set channel = channelRegistry.findAllChannelsByEventType(event.getType()); + if (channel == null) { + return; + } + channel.forEach(c -> c.log(event)); } + + @Override + public void log(String type, Supplier eventProvider) { + Set channels = channelRegistry.findAllChannelsByEventType(type); + if (channels == null) { + return; + } + + Event event = eventProvider.get(); + if (!event.getType().equals(type)) { + throw new IllegalArgumentException("Event type mismatch: " + event.getType() + " != " + type); + } + + channels.forEach(c -> c.log(event)); + } } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java index e967122956b..ce64bf45741 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java @@ -25,7 +25,7 @@ /** * Factory for creating sink instances. */ -public interface SinkFactory { +interface SinkFactory { /** * Creates a sink instance. * diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkRegistry.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkRegistry.java index 5151ae7b913..6c64f29d40a 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkRegistry.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkRegistry.java @@ -19,9 +19,27 @@ import java.util.Set; import org.apache.ignite.internal.eventlog.api.Sink; +import org.jetbrains.annotations.Nullable; +/** + * Sink registry. Follows the same principles as {@link ChannelRegistry}. + */ interface SinkRegistry { - Sink getByName(String name); + /** + * Get sink by name. + * + * @param name Sink name. + * @return Sink instance. + */ + @Nullable + Sink getByName(String name); - Set findAllByChannel(String channel); + /** + * Get all sinks that can handle the given channel. + * + * @param channel Channel name. + * @return Set of sinks or {@code null} if there are no sinks for the given channel. + */ + @Nullable + Set> findAllByChannel(String channel); } diff --git a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistryTest.java b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistryTest.java index 711725a2ade..a8458941fbb 100644 --- a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistryTest.java +++ b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistryTest.java @@ -119,7 +119,7 @@ void findAllChannelsByEventType() throws Exception { // Then registry returns the channel by type. assertThat(registry.findAllChannelsByEventType(IgniteEventType.USER_AUTHENTICATION_SUCCESS.name()), hasSize(1)); // But for another type it returns empty set. - assertThat(registry.findAllChannelsByEventType(IgniteEventType.CLIENT_CONNECTION_CLOSED.name()), hasSize(0)); + assertThat(registry.findAllChannelsByEventType(IgniteEventType.CLIENT_CONNECTION_CLOSED.name()), nullValue()); // When update configuration entry. cfg.channels().change(c -> c.update(TEST_CHANNEL, s -> { diff --git a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/EventLogTest.java b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/EventLogTest.java index 888aa9a007e..15912954187 100644 --- a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/EventLogTest.java +++ b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/EventLogTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.ArrayList; import java.util.HashMap; @@ -62,7 +63,7 @@ void logsEventCorrectly() { // Given no channels and sinks. // Then nothing thrown. - assertDoesNotThrow(() -> eventLog.log(() -> TEST_EVENT)); + assertDoesNotThrow(() -> eventLog.log(TEST_EVENT)); // When add a channel but there is no sink. channelRegistry.register(TEST_CHANNEL_NAME, () -> channelFactory.createChannel( @@ -70,14 +71,14 @@ void logsEventCorrectly() { ); // Then nothing thrown. - assertDoesNotThrow(() -> eventLog.log(() -> TEST_EVENT)); + assertDoesNotThrow(() -> eventLog.log(TEST_EVENT)); // When add a sink for the channel. List container = new ArrayList<>(); sinkRegistry.register(TEST_CHANNEL_NAME, container::add); // And log event. - eventLog.log(() -> TEST_EVENT); + eventLog.log(TEST_EVENT); // Then event is logged. assertThat(container, hasItem(TEST_EVENT)); @@ -86,11 +87,65 @@ void logsEventCorrectly() { Event event = IgniteEvents.CLIENT_CONNECTION_CLOSED.create(TEST_USER); // Then nothing thrown. - assertDoesNotThrow(() -> eventLog.log(() -> event)); + assertDoesNotThrow(() -> eventLog.log(event)); // And the event is not logged. assertThat(container, not(hasItem(event))); } + @Test + void lazyLog() { + // Given no channels and sinks. + + // Then nothing thrown. + assertDoesNotThrow(() -> eventLog.log(TEST_EVENT.getType(), () -> TEST_EVENT)); + + // When add a channel but there is no sink. + channelRegistry.register( + TEST_CHANNEL_NAME, + () -> channelFactory.createChannel(TEST_CHANNEL_NAME, Set.of(TEST_EVENT.getType())) + ); + + // Then nothing thrown. + assertDoesNotThrow(() -> eventLog.log(TEST_EVENT.getType(), () -> TEST_EVENT)); + + // When add a sink for the channel. + List container = new ArrayList<>(); + sinkRegistry.register(TEST_CHANNEL_NAME, container::add); + + // And log event. + eventLog.log(TEST_EVENT.getType(), () -> TEST_EVENT); + + // Then event is logged. + assertThat(container, hasItem(TEST_EVENT)); + + // When log event with a type that is not supported by the channel. + Event event = IgniteEvents.CLIENT_CONNECTION_CLOSED.create(TEST_USER); + + // Then nothing thrown. + assertDoesNotThrow(() -> eventLog.log(event.getType(), () -> event)); + // And the event is not logged. + assertThat(container, not(hasItem(event))); + + } + + @Test + void lazyLogInvariant() { + // Given channel is added to the registry. + channelRegistry.register( + TEST_CHANNEL_NAME, + () -> channelFactory.createChannel(TEST_CHANNEL_NAME, Set.of(TEST_EVENT.getType())) + ); + + // Expect exception if the type does not match the provided event. + assertThrows( + IllegalArgumentException.class, + () -> eventLog.log( + TEST_EVENT.getType(), + () -> IgniteEvents.CLIENT_CONNECTION_CLOSED.create(TEST_USER) + ) + ); + } + private static class TestChannelRegistry implements ChannelRegistry { private final Map> channels; diff --git a/modules/security/src/main/java/org/apache/ignite/internal/security/authentication/AuthenticationManagerImpl.java b/modules/security/src/main/java/org/apache/ignite/internal/security/authentication/AuthenticationManagerImpl.java index 153d1a9ea98..87fbc03c9cb 100644 --- a/modules/security/src/main/java/org/apache/ignite/internal/security/authentication/AuthenticationManagerImpl.java +++ b/modules/security/src/main/java/org/apache/ignite/internal/security/authentication/AuthenticationManagerImpl.java @@ -205,8 +205,9 @@ private CompletableFuture authenticate( } private void logAuthenticationFailure(AuthenticationRequest authenticationRequest) { - eventLog.log(() -> - IgniteEvents.USER_AUTHENTICATION_FAILURE.builder() + eventLog.log( + IgniteEvents.USER_AUTHENTICATION_FAILURE.type(), + () -> IgniteEvents.USER_AUTHENTICATION_FAILURE.builder() .user(EventUser.system()) .fields(Map.of("identity", tryGetUsernameOrUnknown(authenticationRequest))) .build() @@ -221,10 +222,12 @@ private static String tryGetUsernameOrUnknown(AuthenticationRequest authen } private void logUserAuthenticated(UserDetails userDetails) { - eventLog.log(() -> - IgniteEvents.USER_AUTHENTICATION_SUCCESS.create(EventUser.of( + eventLog.log( + IgniteEvents.USER_AUTHENTICATION_SUCCESS.type(), + () -> IgniteEvents.USER_AUTHENTICATION_SUCCESS.create(EventUser.of( userDetails.username(), userDetails.providerName() - ))); + )) + ); } private void refreshProviders(@Nullable SecurityView view) {