diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java index 922d3754e15..e771d625808 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java @@ -44,7 +44,16 @@ public interface EventLog { /** * Writes event into every channel this event relates to. * - * @param eventProvider Event provider. + * @param event the event to log. */ - void log(Supplier eventProvider); + void log(Event event); + + /** + * Lazy version of {@link #log(Event)}. It first checks if the events of the given type are enabled and then logs the event. + * INVARIANT: type must be a valid event type and eventProvider must provide an event of the same type. + * + * @param type the type of the event. + * @param eventProvider the event provider. + */ + void log(String type, Supplier eventProvider); } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistry.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistry.java index 272e8a2f42a..31b1e65c01a 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistry.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistry.java @@ -25,8 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.ignite.configuration.NamedListView; import org.apache.ignite.configuration.notifications.ConfigurationListener; @@ -35,19 +34,16 @@ import org.apache.ignite.internal.eventlog.config.schema.ChannelView; import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration; -class ConfigurationBasedChannelRegistry implements ChannelRegistry { - private final ReadWriteLock guard; +public class ConfigurationBasedChannelRegistry implements ChannelRegistry { + private final AtomicReference> cache; - private final Map cache; - - private final Map> typeCache; + private final AtomicReference>> typeCache; private final SinkRegistry sinkRegistry; - ConfigurationBasedChannelRegistry(EventLogConfiguration cfg, SinkRegistry sinkRegistry) { - this.guard = new ReentrantReadWriteLock(); - this.cache = new HashMap<>(); - this.typeCache = new HashMap<>(); + public ConfigurationBasedChannelRegistry(EventLogConfiguration cfg, SinkRegistry sinkRegistry) { + this.cache = new AtomicReference<>(new HashMap<>()); + this.typeCache = new AtomicReference<>(new HashMap<>()); this.sinkRegistry = sinkRegistry; cfg.channels().listen(new CacheUpdater()); @@ -55,23 +51,13 @@ class ConfigurationBasedChannelRegistry implements ChannelRegistry { @Override public EventChannel getByName(String name) { - guard.readLock().lock(); - try { - return cache.get(name); - } finally { - guard.readLock().unlock(); - } + return cache.get().get(name); } @Override public Set findAllChannelsByEventType(String igniteEventType) { - guard.readLock().lock(); - try { - Set result = typeCache.get(igniteEventType); - return result == null ? Set.of() : new HashSet<>(result); - } finally { - guard.readLock().unlock(); - } + Set result = typeCache.get().get(igniteEventType); + return result == null ? Set.of() : new HashSet<>(result); } private class CacheUpdater implements ConfigurationListener> { @@ -79,29 +65,26 @@ private class CacheUpdater implements ConfigurationListener onUpdate(ConfigurationNotificationEvent> ctx) { NamedListView newListValue = ctx.newValue(); - guard.writeLock().lock(); - - try { - cache.clear(); - typeCache.clear(); - - newListValue.forEach(view -> { - if (view.enabled()) { - EventChannel channel = createChannel(view); - cache.put(view.name(), channel); - for (String eventType : view.events()) { - typeCache.computeIfAbsent( - eventType.trim(), - t -> new HashSet<>() - ).add(channel); - } + Map newCache = new HashMap<>(); + Map> newTypeCache = new HashMap<>(); + + newListValue.forEach(view -> { + if (view.enabled()) { + EventChannel channel = createChannel(view); + newCache.put(view.name(), channel); + for (String eventType : view.events()) { + newTypeCache.computeIfAbsent( + eventType.trim(), + t -> new HashSet<>() + ).add(channel); } - }); + } + }); + + cache.set(newCache); + typeCache.set(newTypeCache); - return completedFuture(null); - } finally { - guard.writeLock().unlock(); - } + return completedFuture(null); } private EventChannel createChannel(ChannelView view) { diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java index 16122366c5e..4c029c88498 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java @@ -54,9 +54,20 @@ public EventLogImpl(EventLogConfiguration cfg, SinkFactory sinkFactory) { } @Override - public void log(Supplier eventProvider) { - Event event = eventProvider.get(); + public void log(Event event) { Set channel = channelRegistry.findAllChannelsByEventType(event.getType()); channel.forEach(c -> c.log(event)); } + + @Override + public void log(String type, Supplier eventProvider) { + Set channels = channelRegistry.findAllChannelsByEventType(type); + + if (channels.isEmpty()) { + return; + } + + Event event = eventProvider.get(); + channels.forEach(c -> c.log(event)); + } } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java index e967122956b..ce64bf45741 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java @@ -25,7 +25,7 @@ /** * Factory for creating sink instances. */ -public interface SinkFactory { +interface SinkFactory { /** * Creates a sink instance. * diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkRegistry.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkRegistry.java index 5151ae7b913..fb816ef4f57 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkRegistry.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkRegistry.java @@ -20,7 +20,7 @@ import java.util.Set; import org.apache.ignite.internal.eventlog.api.Sink; -interface SinkRegistry { +public interface SinkRegistry { Sink getByName(String name); Set findAllByChannel(String channel);