Skip to content

Commit

Permalink
IGNITE-24163 Optimize EventLog
Browse files Browse the repository at this point in the history
* Get rid of unnecessary locking in `log` method
* Add one more "lazy" log method to the interface
  • Loading branch information
PakhomovAlexander committed Jan 8, 2025
1 parent 3739f46 commit 7fbdc5b
Show file tree
Hide file tree
Showing 14 changed files with 200 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void configureChannelAndTestSink() throws Exception {
.user(EventUser.system())
.build();

eventLog.log(() -> event);
eventLog.log(event);

// Then event is written into the sink.
assertThat(inMemoryCollectionSink.events(), contains(event));
Expand Down Expand Up @@ -110,7 +110,7 @@ void configureTestSinkAndChannel() throws Exception {
.user(EventUser.system())
.build();

eventLog.log(() -> event);
eventLog.log(event);

// Then event is written into the sink.
assertThat(inMemoryCollectionSink.events(), contains(event));
Expand Down Expand Up @@ -139,7 +139,7 @@ void channelConfigurationChanges() throws Exception {
.user(EventUser.system())
.build();

eventLog.log(() -> event);
eventLog.log(event);

// Then event is written into the sink.
assertThat(inMemoryCollectionSink.events(), hasSize(1));
Expand All @@ -150,7 +150,7 @@ void channelConfigurationChanges() throws Exception {
.user(EventUser.system())
.build();

eventLog.log(() -> event2);
eventLog.log(event2);

// Then the event is not written into the sink.
assertThat(inMemoryCollectionSink.events(), hasSize(1));
Expand All @@ -168,7 +168,7 @@ void channelConfigurationChanges() throws Exception {
assertThat(inMemoryCollectionSink.events(), hasSize(1));

// When log event2 again.
eventLog.log(() -> event2);
eventLog.log(event2);

// Then the event2 is written into the sink.
assertThat(inMemoryCollectionSink.events(), hasSize(2));
Expand Down Expand Up @@ -197,7 +197,7 @@ void enableDisableChannel() throws Exception {
.user(EventUser.system())
.build();

eventLog.log(() -> event);
eventLog.log(event);

// Then event is written into the sink.
assertThat(inMemoryCollectionSink.events(), hasSize(1));
Expand All @@ -209,7 +209,7 @@ void enableDisableChannel() throws Exception {
)).get();

// When log event again.
eventLog.log(() -> event);
eventLog.log(event);

// Then the event is not written into the sink.
assertThat(inMemoryCollectionSink.events(), hasSize(1));
Expand All @@ -221,7 +221,7 @@ void enableDisableChannel() throws Exception {
)).get();

// When log event again.
eventLog.log(() -> event);
eventLog.log(event);

// Then the event is written into the sink.
assertThat(inMemoryCollectionSink.events(), hasSize(2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public interface EventFactory {

/**
* Creates an event builder with the event type defined. The type is set by the factory. For example,
* {@link IgniteEvents.CONNECTION_CLOSED.build} will return a builder with {@link IgniteEventType.CONNECTION_CLOSED} type set.
* {@link IgniteEvents.USER_AUTHENTICATION_SUCCESS.builder()} will return a builder with
* {@link IgniteEventType.USER_AUTHENTICATION_SUCCESS} type set.
*/
EventBuilder builder();
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,16 @@ public interface EventLog {
/**
* Writes event into every channel this event relates to.
*
* @param eventProvider Event provider.
* @param event The event to log.
*/
void log(Supplier<Event> eventProvider);
void log(Event event);

/**
* Lazy version of {@link #log(Event)}. It first checks if the events of the given type are enabled and then logs the event.
* INVARIANT: type must be a valid event type and eventProvider must provide an event of the same type.
*
* @param type the type of the event.
* @param eventProvider the event provider.
*/
void log(String type, Supplier<Event> eventProvider);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,33 @@
* <pre>{@code IgniteEvents.USER_AUTHENTICATION_SUCCESS.create(EventUser.system());}</pre>
*/
public final class IgniteEvents implements EventFactory {
public static final IgniteEvents USER_AUTHENTICATION_SUCCESS = new IgniteEvents(IgniteEventType.USER_AUTHENTICATION_SUCCESS.name());
public static final IgniteEvents USER_AUTHENTICATION_FAILURE = new IgniteEvents(IgniteEventType.USER_AUTHENTICATION_FAILURE.name());
public static final IgniteEvents USER_AUTHENTICATION_SUCCESS = new IgniteEvents(IgniteEventType.USER_AUTHENTICATION_SUCCESS);
public static final IgniteEvents USER_AUTHENTICATION_FAILURE = new IgniteEvents(IgniteEventType.USER_AUTHENTICATION_FAILURE);

public static final IgniteEvents CLIENT_CONNECTION_ESTABLISHED = new IgniteEvents(IgniteEventType.CLIENT_CONNECTION_ESTABLISHED.name());
public static final IgniteEvents CLIENT_CONNECTION_CLOSED = new IgniteEvents(IgniteEventType.CLIENT_CONNECTION_CLOSED.name());
public static final IgniteEvents CLIENT_CONNECTION_ESTABLISHED = new IgniteEvents(IgniteEventType.CLIENT_CONNECTION_ESTABLISHED);
public static final IgniteEvents CLIENT_CONNECTION_CLOSED = new IgniteEvents(IgniteEventType.CLIENT_CONNECTION_CLOSED);

private final String type;
private final IgniteEventType type;

private IgniteEvents(String type) {
private IgniteEvents(IgniteEventType type) {
this.type = type;
}

public String type() {
return type.name();
}

@Override
public Event create(EventUser user) {
return Event.builder()
.type(type)
.type(type.name())
.user(user)
.timestamp(System.currentTimeMillis())
.build();
}

@Override
public EventBuilder builder() {
return new EventBuilder().type(type);
return new EventBuilder().type(type.name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Set;
import org.apache.ignite.internal.eventlog.api.EventChannel;
import org.jetbrains.annotations.Nullable;

/**
* Channel registry. The only way to send an event into channel is to get the channel from this registry.
Expand All @@ -36,8 +37,9 @@ interface ChannelRegistry {
/**
* Get all channels that can handle the given event type.
*
* @param igniteEventType Ignite event type.
* @return Set of channels.
* @param type Ignite event type.
* @return Set of channels or {@code null} if there are no channels for the given event type.
*/
Set<EventChannel> findAllChannelsByEventType(String igniteEventType);
@Nullable
Set<EventChannel> findAllChannelsByEventType(String type);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,23 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.NamedListView;
import org.apache.ignite.configuration.notifications.ConfigurationListener;
import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.internal.eventlog.api.EventChannel;
import org.apache.ignite.internal.eventlog.config.schema.ChannelView;
import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
import org.jetbrains.annotations.Nullable;

class ConfigurationBasedChannelRegistry implements ChannelRegistry {
private final ReadWriteLock guard;
public class ConfigurationBasedChannelRegistry implements ChannelRegistry {
private volatile Map<String, EventChannel> cache;

private final Map<String, EventChannel> cache;

private final Map<String, Set<EventChannel>> typeCache;
private volatile Map<String, Set<EventChannel>> typeCache;

private final SinkRegistry sinkRegistry;

ConfigurationBasedChannelRegistry(EventLogConfiguration cfg, SinkRegistry sinkRegistry) {
this.guard = new ReentrantReadWriteLock();
public ConfigurationBasedChannelRegistry(EventLogConfiguration cfg, SinkRegistry sinkRegistry) {
this.cache = new HashMap<>();
this.typeCache = new HashMap<>();
this.sinkRegistry = sinkRegistry;
Expand All @@ -55,53 +51,45 @@ class ConfigurationBasedChannelRegistry implements ChannelRegistry {

@Override
public EventChannel getByName(String name) {
guard.readLock().lock();
try {
return cache.get(name);
} finally {
guard.readLock().unlock();
}
return cache.get(name);
}

@Override
@Nullable
public Set<EventChannel> findAllChannelsByEventType(String igniteEventType) {
guard.readLock().lock();
try {
Set<EventChannel> result = typeCache.get(igniteEventType);
return result == null ? Set.of() : new HashSet<>(result);
} finally {
guard.readLock().unlock();
Set<EventChannel> result = typeCache.get(igniteEventType);
if (result == null) {
return null;
}

return new HashSet<>(result);
}

private class CacheUpdater implements ConfigurationListener<NamedListView<ChannelView>> {
@Override
public CompletableFuture<?> onUpdate(ConfigurationNotificationEvent<NamedListView<ChannelView>> ctx) {
NamedListView<ChannelView> newListValue = ctx.newValue();

guard.writeLock().lock();

try {
cache.clear();
typeCache.clear();

newListValue.forEach(view -> {
if (view.enabled()) {
EventChannel channel = createChannel(view);
cache.put(view.name(), channel);
for (String eventType : view.events()) {
typeCache.computeIfAbsent(
eventType.trim(),
t -> new HashSet<>()
).add(channel);
}
Map<String, EventChannel> newCache = new HashMap<>();
Map<String, Set<EventChannel>> newTypeCache = new HashMap<>();

newListValue.forEach(view -> {
if (view.enabled()) {
EventChannel channel = createChannel(view);
newCache.put(view.name(), channel);
for (String eventType : view.events()) {
newTypeCache.computeIfAbsent(
eventType.trim(),
t -> new HashSet<>()
).add(channel);
}
});
}
});

cache = newCache;
typeCache = newTypeCache;

return completedFuture(null);
} finally {
guard.writeLock().unlock();
}
return completedFuture(null);
}

private EventChannel createChannel(ChannelView view) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.configuration.NamedListView;
import org.apache.ignite.configuration.notifications.ConfigurationListener;
import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
Expand All @@ -34,16 +32,13 @@
import org.apache.ignite.internal.eventlog.config.schema.SinkView;

class ConfigurationBasedSinkRegistry implements SinkRegistry {
private final ReadWriteLock guard;
private volatile Map<String, Sink> cache;

private final Map<String, Sink> cache;

private final Map<String, Set<Sink>> cacheByChannel;
private volatile Map<String, Set<Sink>> cacheByChannel;

private final SinkFactory sinkFactory;

ConfigurationBasedSinkRegistry(EventLogConfiguration cfg, SinkFactory sinkFactory) {
this.guard = new ReentrantReadWriteLock();
this.cache = new HashMap<>();
this.cacheByChannel = new HashMap<>();
this.sinkFactory = sinkFactory;
Expand All @@ -53,45 +48,33 @@ class ConfigurationBasedSinkRegistry implements SinkRegistry {

@Override
public Sink getByName(String name) {
guard.readLock().lock();
try {
return cache.get(name);
} finally {
guard.readLock().unlock();
}
return cache.get(name);
}

@Override
public Set<Sink> findAllByChannel(String channel) {
guard.readLock().lock();
try {
Set<Sink> sinks = cacheByChannel.get(channel);
return sinks == null ? Set.of() : new HashSet<>(sinks);
} finally {
guard.readLock().unlock();
}
Set<Sink> sinks = cacheByChannel.get(channel);
return sinks == null ? Set.of() : new HashSet<>(sinks);
}

private class CacheUpdater implements ConfigurationListener<NamedListView<SinkView>> {
@Override
public CompletableFuture<?> onUpdate(ConfigurationNotificationEvent<NamedListView<SinkView>> ctx) {
NamedListView<SinkView> newListValue = ctx.newValue();

guard.writeLock().lock();
try {
cache.values().forEach(Sink::stop);
cache.clear();
Map<String, Sink> newCache = new HashMap<>();
Map<String, Set<Sink>> newCacheByChannel = new HashMap<>();

cacheByChannel.clear();
for (SinkView sinkView : newListValue) {
Sink sink = sinkFactory.createSink(sinkView);
cache.put(sinkView.name(), sink);
cacheByChannel.computeIfAbsent(sinkView.channel(), k -> new HashSet<>()).add(sink);
}
return completedFuture(null);
} finally {
guard.writeLock().unlock();
for (SinkView sinkView : newListValue) {
Sink sink = sinkFactory.createSink(sinkView);
newCache.put(sinkView.name(), sink);
newCacheByChannel.computeIfAbsent(sinkView.channel(), k -> new HashSet<>()).add(sink);
}

cache = newCache;
cacheByChannel = newCacheByChannel;

return completedFuture(null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Set;
import org.apache.ignite.internal.eventlog.api.Event;
import org.apache.ignite.internal.eventlog.api.EventChannel;
import org.apache.ignite.internal.eventlog.api.Sink;

class EventChannelImpl implements EventChannel {
private final SinkRegistry sinkRegistry;
Expand All @@ -42,6 +43,11 @@ public Set<String> types() {

@Override
public void log(Event event) {
sinkRegistry.findAllByChannel(channelName).forEach(s -> s.write(event));
Set<Sink> sinks = sinkRegistry.findAllByChannel(channelName);
if (sinks == null) {
return;
}

sinks.forEach(s -> s.write(event));
}
}
Loading

0 comments on commit 7fbdc5b

Please sign in to comment.