diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java index 922d3754e15..e771d625808 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java @@ -44,7 +44,16 @@ public interface EventLog { /** * Writes event into every channel this event relates to. * - * @param eventProvider Event provider. + * @param event the event to log. */ - void log(Supplier eventProvider); + void log(Event event); + + /** + * Lazy version of {@link #log(Event)}. It first checks if the events of the given type are enabled and then logs the event. + * INVARIANT: type must be a valid event type and eventProvider must provide an event of the same type. + * + * @param type the type of the event. + * @param eventProvider the event provider. + */ + void log(String type, Supplier eventProvider); } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelRegistry.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelRegistry.java index 74b7415dfe4..b7f53d541f8 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelRegistry.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelRegistry.java @@ -24,7 +24,7 @@ * Channel registry. The only way to send an event into channel is to get the channel from this registry. * The channel can not be cached for a long time because it can be removed from the registry due to configuration changes. */ -interface ChannelRegistry { +public interface ChannelRegistry { /** * Get channel by name. * diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistry.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistry.java index 272e8a2f42a..31b1e65c01a 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistry.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistry.java @@ -25,8 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.ignite.configuration.NamedListView; import org.apache.ignite.configuration.notifications.ConfigurationListener; @@ -35,19 +34,16 @@ import org.apache.ignite.internal.eventlog.config.schema.ChannelView; import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration; -class ConfigurationBasedChannelRegistry implements ChannelRegistry { - private final ReadWriteLock guard; +public class ConfigurationBasedChannelRegistry implements ChannelRegistry { + private final AtomicReference> cache; - private final Map cache; - - private final Map> typeCache; + private final AtomicReference>> typeCache; private final SinkRegistry sinkRegistry; - ConfigurationBasedChannelRegistry(EventLogConfiguration cfg, SinkRegistry sinkRegistry) { - this.guard = new ReentrantReadWriteLock(); - this.cache = new HashMap<>(); - this.typeCache = new HashMap<>(); + public ConfigurationBasedChannelRegistry(EventLogConfiguration cfg, SinkRegistry sinkRegistry) { + this.cache = new AtomicReference<>(new HashMap<>()); + this.typeCache = new AtomicReference<>(new HashMap<>()); this.sinkRegistry = sinkRegistry; cfg.channels().listen(new CacheUpdater()); @@ -55,23 +51,13 @@ class ConfigurationBasedChannelRegistry implements ChannelRegistry { @Override public EventChannel getByName(String name) { - guard.readLock().lock(); - try { - return cache.get(name); - } finally { - guard.readLock().unlock(); - } + return cache.get().get(name); } @Override public Set findAllChannelsByEventType(String igniteEventType) { - guard.readLock().lock(); - try { - Set result = typeCache.get(igniteEventType); - return result == null ? Set.of() : new HashSet<>(result); - } finally { - guard.readLock().unlock(); - } + Set result = typeCache.get().get(igniteEventType); + return result == null ? Set.of() : new HashSet<>(result); } private class CacheUpdater implements ConfigurationListener> { @@ -79,29 +65,26 @@ private class CacheUpdater implements ConfigurationListener onUpdate(ConfigurationNotificationEvent> ctx) { NamedListView newListValue = ctx.newValue(); - guard.writeLock().lock(); - - try { - cache.clear(); - typeCache.clear(); - - newListValue.forEach(view -> { - if (view.enabled()) { - EventChannel channel = createChannel(view); - cache.put(view.name(), channel); - for (String eventType : view.events()) { - typeCache.computeIfAbsent( - eventType.trim(), - t -> new HashSet<>() - ).add(channel); - } + Map newCache = new HashMap<>(); + Map> newTypeCache = new HashMap<>(); + + newListValue.forEach(view -> { + if (view.enabled()) { + EventChannel channel = createChannel(view); + newCache.put(view.name(), channel); + for (String eventType : view.events()) { + newTypeCache.computeIfAbsent( + eventType.trim(), + t -> new HashSet<>() + ).add(channel); } - }); + } + }); + + cache.set(newCache); + typeCache.set(newTypeCache); - return completedFuture(null); - } finally { - guard.writeLock().unlock(); - } + return completedFuture(null); } private EventChannel createChannel(ChannelView view) { diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java index 9d32da6c148..8241b4d9a1a 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java @@ -33,7 +33,7 @@ import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration; import org.apache.ignite.internal.eventlog.config.schema.SinkView; -class ConfigurationBasedSinkRegistry implements SinkRegistry { +public class ConfigurationBasedSinkRegistry implements SinkRegistry { private final ReadWriteLock guard; private final Map cache; @@ -42,7 +42,7 @@ class ConfigurationBasedSinkRegistry implements SinkRegistry { private final SinkFactory sinkFactory; - ConfigurationBasedSinkRegistry(EventLogConfiguration cfg, SinkFactory sinkFactory) { + public ConfigurationBasedSinkRegistry(EventLogConfiguration cfg, SinkFactory sinkFactory) { this.guard = new ReentrantReadWriteLock(); this.cache = new HashMap<>(); this.cacheByChannel = new HashMap<>(); diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java index 16122366c5e..4c029c88498 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java @@ -54,9 +54,20 @@ public EventLogImpl(EventLogConfiguration cfg, SinkFactory sinkFactory) { } @Override - public void log(Supplier eventProvider) { - Event event = eventProvider.get(); + public void log(Event event) { Set channel = channelRegistry.findAllChannelsByEventType(event.getType()); channel.forEach(c -> c.log(event)); } + + @Override + public void log(String type, Supplier eventProvider) { + Set channels = channelRegistry.findAllChannelsByEventType(type); + + if (channels.isEmpty()) { + return; + } + + Event event = eventProvider.get(); + channels.forEach(c -> c.log(event)); + } } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSinkFactory.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSinkFactory.java index 3a458ab06ba..8f480bc52cb 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSinkFactory.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSinkFactory.java @@ -25,10 +25,10 @@ /** * Factory for creating sink instances. */ -class LogSinkFactory implements SinkFactory { +public class LogSinkFactory implements SinkFactory { private final EventSerializer eventSerializer; - LogSinkFactory(EventSerializer eventSerializer) { + public LogSinkFactory(EventSerializer eventSerializer) { this.eventSerializer = eventSerializer; } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkRegistry.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkRegistry.java index 5151ae7b913..fb816ef4f57 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkRegistry.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkRegistry.java @@ -20,7 +20,7 @@ import java.util.Set; import org.apache.ignite.internal.eventlog.api.Sink; -interface SinkRegistry { +public interface SinkRegistry { Sink getByName(String name); Set findAllByChannel(String channel);