Skip to content

Commit

Permalink
IGNITE-24163 Optimize EventLog
Browse files Browse the repository at this point in the history
* Get rid of unnecessary locking in `log` method
* Add one more "lazy" log method to the interface
  • Loading branch information
PakhomovAlexander committed Jan 7, 2025
1 parent a7ef1aa commit 43c78cc
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,16 @@ public interface EventLog {
/**
* Writes event into every channel this event relates to.
*
* @param eventProvider Event provider.
* @param event the event to log.
*/
void log(Supplier<Event> eventProvider);
void log(Event event);

/**
* Lazy version of {@link #log(Event)}. It first checks if the events of the given type are enabled and then logs the event.
* INVARIANT: type must be a valid event type and eventProvider must provide an event of the same type.
*
* @param type the type of the event.
* @param eventProvider the event provider.
*/
void log(String type, Supplier<Event> eventProvider);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* Channel registry. The only way to send an event into channel is to get the channel from this registry.
* The channel can not be cached for a long time because it can be removed from the registry due to configuration changes.
*/
interface ChannelRegistry {
public interface ChannelRegistry {
/**
* Get channel by name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.NamedListView;
import org.apache.ignite.configuration.notifications.ConfigurationListener;
Expand All @@ -35,73 +34,57 @@
import org.apache.ignite.internal.eventlog.config.schema.ChannelView;
import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;

class ConfigurationBasedChannelRegistry implements ChannelRegistry {
private final ReadWriteLock guard;
public class ConfigurationBasedChannelRegistry implements ChannelRegistry {
private final AtomicReference<Map<String, EventChannel>> cache;

private final Map<String, EventChannel> cache;

private final Map<String, Set<EventChannel>> typeCache;
private final AtomicReference<Map<String, Set<EventChannel>>> typeCache;

private final SinkRegistry sinkRegistry;

ConfigurationBasedChannelRegistry(EventLogConfiguration cfg, SinkRegistry sinkRegistry) {
this.guard = new ReentrantReadWriteLock();
this.cache = new HashMap<>();
this.typeCache = new HashMap<>();
public ConfigurationBasedChannelRegistry(EventLogConfiguration cfg, SinkRegistry sinkRegistry) {
this.cache = new AtomicReference<>(new HashMap<>());
this.typeCache = new AtomicReference<>(new HashMap<>());
this.sinkRegistry = sinkRegistry;

cfg.channels().listen(new CacheUpdater());
}

@Override
public EventChannel getByName(String name) {
guard.readLock().lock();
try {
return cache.get(name);
} finally {
guard.readLock().unlock();
}
return cache.get().get(name);
}

@Override
public Set<EventChannel> findAllChannelsByEventType(String igniteEventType) {
guard.readLock().lock();
try {
Set<EventChannel> result = typeCache.get(igniteEventType);
return result == null ? Set.of() : new HashSet<>(result);
} finally {
guard.readLock().unlock();
}
Set<EventChannel> result = typeCache.get().get(igniteEventType);
return result == null ? Set.of() : new HashSet<>(result);
}

private class CacheUpdater implements ConfigurationListener<NamedListView<ChannelView>> {
@Override
public CompletableFuture<?> onUpdate(ConfigurationNotificationEvent<NamedListView<ChannelView>> ctx) {
NamedListView<ChannelView> newListValue = ctx.newValue();

guard.writeLock().lock();

try {
cache.clear();
typeCache.clear();

newListValue.forEach(view -> {
if (view.enabled()) {
EventChannel channel = createChannel(view);
cache.put(view.name(), channel);
for (String eventType : view.events()) {
typeCache.computeIfAbsent(
eventType.trim(),
t -> new HashSet<>()
).add(channel);
}
Map<String, EventChannel> newCache = new HashMap<>();
Map<String, Set<EventChannel>> newTypeCache = new HashMap<>();

newListValue.forEach(view -> {
if (view.enabled()) {
EventChannel channel = createChannel(view);
newCache.put(view.name(), channel);
for (String eventType : view.events()) {
newTypeCache.computeIfAbsent(
eventType.trim(),
t -> new HashSet<>()
).add(channel);
}
});
}
});

cache.set(newCache);
typeCache.set(newTypeCache);

return completedFuture(null);
} finally {
guard.writeLock().unlock();
}
return completedFuture(null);
}

private EventChannel createChannel(ChannelView view) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
import org.apache.ignite.internal.eventlog.config.schema.SinkView;

class ConfigurationBasedSinkRegistry implements SinkRegistry {
public class ConfigurationBasedSinkRegistry implements SinkRegistry {
private final ReadWriteLock guard;

private final Map<String, Sink> cache;
Expand All @@ -42,7 +42,7 @@ class ConfigurationBasedSinkRegistry implements SinkRegistry {

private final SinkFactory sinkFactory;

ConfigurationBasedSinkRegistry(EventLogConfiguration cfg, SinkFactory sinkFactory) {
public ConfigurationBasedSinkRegistry(EventLogConfiguration cfg, SinkFactory sinkFactory) {
this.guard = new ReentrantReadWriteLock();
this.cache = new HashMap<>();
this.cacheByChannel = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,20 @@ public EventLogImpl(EventLogConfiguration cfg, SinkFactory sinkFactory) {
}

@Override
public void log(Supplier<Event> eventProvider) {
Event event = eventProvider.get();
public void log(Event event) {
Set<EventChannel> channel = channelRegistry.findAllChannelsByEventType(event.getType());
channel.forEach(c -> c.log(event));
}

@Override
public void log(String type, Supplier<Event> eventProvider) {
Set<EventChannel> channels = channelRegistry.findAllChannelsByEventType(type);

if (channels.isEmpty()) {
return;
}

Event event = eventProvider.get();
channels.forEach(c -> c.log(event));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
/**
* Factory for creating sink instances.
*/
class LogSinkFactory implements SinkFactory {
public class LogSinkFactory implements SinkFactory {
private final EventSerializer eventSerializer;

LogSinkFactory(EventSerializer eventSerializer) {
public LogSinkFactory(EventSerializer eventSerializer) {
this.eventSerializer = eventSerializer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.Set;
import org.apache.ignite.internal.eventlog.api.Sink;

interface SinkRegistry {
public interface SinkRegistry {
Sink getByName(String name);

Set<Sink> findAllByChannel(String channel);
Expand Down

0 comments on commit 43c78cc

Please sign in to comment.