Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-24163 Optimize EventLog #5004

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@

import java.util.List;
import java.util.UUID;
import java.util.function.Supplier;
import org.apache.ignite.client.handler.configuration.ClientConnectorConfiguration;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.cluster.management.ClusterTag;
import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import org.apache.ignite.internal.eventlog.api.Event;
import org.apache.ignite.internal.eventlog.api.EventLog;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
Expand Down Expand Up @@ -80,7 +83,17 @@ public TestServer(ClientConnectorConfiguration clientConnectorConfiguration, Net
this.testSslConfig = testSslConfig;
this.authenticationManager = securityConfiguration == null
? new DummyAuthenticationManager()
: new AuthenticationManagerImpl(securityConfiguration, ign -> {});
: new AuthenticationManagerImpl(securityConfiguration, new EventLog() {
@Override
public void log(Event event) {

}

@Override
public void log(String type, Supplier<Event> eventProvider) {

}
});
this.clientConnectorConfiguration = clientConnectorConfiguration;
this.networkConfiguration = networkConfiguration;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Objects;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.Ignite;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.client.fakes.FakeInternalTable;
Expand All @@ -57,6 +58,8 @@
import org.apache.ignite.internal.configuration.NodeConfiguration;
import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
import org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
import org.apache.ignite.internal.eventlog.api.Event;
import org.apache.ignite.internal.eventlog.api.EventLog;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.TestClockService;
Expand Down Expand Up @@ -221,7 +224,17 @@ public TestServer(
if (securityConfiguration == null) {
authenticationManager = new DummyAuthenticationManager();
} else {
authenticationManager = new AuthenticationManagerImpl(securityConfiguration, ign -> {});
authenticationManager = new AuthenticationManagerImpl(securityConfiguration, new EventLog() {
@Override
public void log(Event event) {

}

@Override
public void log(String type, Supplier<Event> eventProvider) {

}
});
assertThat(authenticationManager.startAsync(componentContext), willCompleteSuccessfully());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void configureChannelAndTestSink() throws Exception {
.user(EventUser.system())
.build();

eventLog.log(() -> event);
eventLog.log(event);

// Then event is written into the sink.
assertThat(inMemoryCollectionSink.events(), contains(event));
Expand Down Expand Up @@ -110,7 +110,7 @@ void configureTestSinkAndChannel() throws Exception {
.user(EventUser.system())
.build();

eventLog.log(() -> event);
eventLog.log(event);

// Then event is written into the sink.
assertThat(inMemoryCollectionSink.events(), contains(event));
Expand Down Expand Up @@ -139,7 +139,7 @@ void channelConfigurationChanges() throws Exception {
.user(EventUser.system())
.build();

eventLog.log(() -> event);
eventLog.log(event);

// Then event is written into the sink.
assertThat(inMemoryCollectionSink.events(), hasSize(1));
Expand All @@ -150,7 +150,7 @@ void channelConfigurationChanges() throws Exception {
.user(EventUser.system())
.build();

eventLog.log(() -> event2);
eventLog.log(event2);

// Then the event is not written into the sink.
assertThat(inMemoryCollectionSink.events(), hasSize(1));
Expand All @@ -168,7 +168,7 @@ void channelConfigurationChanges() throws Exception {
assertThat(inMemoryCollectionSink.events(), hasSize(1));

// When log event2 again.
eventLog.log(() -> event2);
eventLog.log(event2);

// Then the event2 is written into the sink.
assertThat(inMemoryCollectionSink.events(), hasSize(2));
Expand Down Expand Up @@ -197,7 +197,7 @@ void enableDisableChannel() throws Exception {
.user(EventUser.system())
.build();

eventLog.log(() -> event);
eventLog.log(event);

// Then event is written into the sink.
assertThat(inMemoryCollectionSink.events(), hasSize(1));
Expand All @@ -209,7 +209,7 @@ void enableDisableChannel() throws Exception {
)).get();

// When log event again.
eventLog.log(() -> event);
eventLog.log(event);

// Then the event is not written into the sink.
assertThat(inMemoryCollectionSink.events(), hasSize(1));
Expand All @@ -221,7 +221,7 @@ void enableDisableChannel() throws Exception {
)).get();

// When log event again.
eventLog.log(() -> event);
eventLog.log(event);

// Then the event is written into the sink.
assertThat(inMemoryCollectionSink.events(), hasSize(2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public interface EventFactory {

/**
* Creates an event builder with the event type defined. The type is set by the factory. For example,
* {@link IgniteEvents.CONNECTION_CLOSED.build} will return a builder with {@link IgniteEventType.CONNECTION_CLOSED} type set.
* {@link IgniteEvents.USER_AUTHENTICATION_SUCCESS.builder} will return a builder with
* {@link IgniteEventType.USER_AUTHENTICATION_SUCCESS} type set.
*/
EventBuilder builder();
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,16 @@ public interface EventLog {
/**
* Writes event into every channel this event relates to.
*
* @param eventProvider Event provider.
* @param event The event to log.
*/
void log(Supplier<Event> eventProvider);
void log(Event event);

/**
* Lazy version of {@link #log(Event)}. It first checks if the events of the given type are enabled and then logs the event. INVARIANT:
* type must be a valid event type and eventProvider must provide an event of the same type.
*
* @param type the type of the event.
* @param eventProvider the event provider.
*/
void log(String type, Supplier<Event> eventProvider);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,33 @@
* <pre>{@code IgniteEvents.USER_AUTHENTICATION_SUCCESS.create(EventUser.system());}</pre>
*/
public final class IgniteEvents implements EventFactory {
public static final IgniteEvents USER_AUTHENTICATION_SUCCESS = new IgniteEvents(IgniteEventType.USER_AUTHENTICATION_SUCCESS.name());
public static final IgniteEvents USER_AUTHENTICATION_FAILURE = new IgniteEvents(IgniteEventType.USER_AUTHENTICATION_FAILURE.name());
public static final IgniteEvents USER_AUTHENTICATION_SUCCESS = new IgniteEvents(IgniteEventType.USER_AUTHENTICATION_SUCCESS);
public static final IgniteEvents USER_AUTHENTICATION_FAILURE = new IgniteEvents(IgniteEventType.USER_AUTHENTICATION_FAILURE);

public static final IgniteEvents CLIENT_CONNECTION_ESTABLISHED = new IgniteEvents(IgniteEventType.CLIENT_CONNECTION_ESTABLISHED.name());
public static final IgniteEvents CLIENT_CONNECTION_CLOSED = new IgniteEvents(IgniteEventType.CLIENT_CONNECTION_CLOSED.name());
public static final IgniteEvents CLIENT_CONNECTION_ESTABLISHED = new IgniteEvents(IgniteEventType.CLIENT_CONNECTION_ESTABLISHED);
public static final IgniteEvents CLIENT_CONNECTION_CLOSED = new IgniteEvents(IgniteEventType.CLIENT_CONNECTION_CLOSED);

private final String type;
private final IgniteEventType type;

private IgniteEvents(String type) {
private IgniteEvents(IgniteEventType type) {
this.type = type;
}

public String type() {
return type.name();
}

@Override
public Event create(EventUser user) {
return Event.builder()
.type(type)
.type(type.name())
.user(user)
.timestamp(System.currentTimeMillis())
.build();
}

@Override
public EventBuilder builder() {
return new EventBuilder().type(type);
return new EventBuilder().type(type.name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Set;
import org.apache.ignite.internal.eventlog.api.EventChannel;
import org.jetbrains.annotations.Nullable;

/**
* Channel registry. The only way to send an event into channel is to get the channel from this registry.
Expand All @@ -36,8 +37,9 @@ interface ChannelRegistry {
/**
* Get all channels that can handle the given event type.
*
* @param igniteEventType Ignite event type.
* @return Set of channels.
* @param type Ignite event type.
* @return Set of channels or {@code null} if there are no channels for the given event type.
*/
Set<EventChannel> findAllChannelsByEventType(String igniteEventType);
@Nullable
Set<EventChannel> findAllChannelsByEventType(String type);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,23 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.NamedListView;
import org.apache.ignite.configuration.notifications.ConfigurationListener;
import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.internal.eventlog.api.EventChannel;
import org.apache.ignite.internal.eventlog.config.schema.ChannelView;
import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
import org.jetbrains.annotations.Nullable;

class ConfigurationBasedChannelRegistry implements ChannelRegistry {
private final ReadWriteLock guard;
private volatile Map<String, EventChannel> cache;

private final Map<String, EventChannel> cache;

private final Map<String, Set<EventChannel>> typeCache;
private volatile Map<String, Set<EventChannel>> typeCache;

private final SinkRegistry sinkRegistry;

ConfigurationBasedChannelRegistry(EventLogConfiguration cfg, SinkRegistry sinkRegistry) {
this.guard = new ReentrantReadWriteLock();
this.cache = new HashMap<>();
this.typeCache = new HashMap<>();
this.sinkRegistry = sinkRegistry;
Expand All @@ -55,53 +51,40 @@ class ConfigurationBasedChannelRegistry implements ChannelRegistry {

@Override
public EventChannel getByName(String name) {
guard.readLock().lock();
try {
return cache.get(name);
} finally {
guard.readLock().unlock();
}
return cache.get(name);
}

@Override
@Nullable
public Set<EventChannel> findAllChannelsByEventType(String igniteEventType) {
guard.readLock().lock();
try {
Set<EventChannel> result = typeCache.get(igniteEventType);
return result == null ? Set.of() : new HashSet<>(result);
} finally {
guard.readLock().unlock();
}
return typeCache.get(igniteEventType);
}

private class CacheUpdater implements ConfigurationListener<NamedListView<ChannelView>> {
@Override
public CompletableFuture<?> onUpdate(ConfigurationNotificationEvent<NamedListView<ChannelView>> ctx) {
NamedListView<ChannelView> newListValue = ctx.newValue();

guard.writeLock().lock();

try {
cache.clear();
typeCache.clear();

newListValue.forEach(view -> {
if (view.enabled()) {
EventChannel channel = createChannel(view);
cache.put(view.name(), channel);
for (String eventType : view.events()) {
typeCache.computeIfAbsent(
eventType.trim(),
t -> new HashSet<>()
).add(channel);
}
Map<String, EventChannel> newCache = new HashMap<>();
Map<String, Set<EventChannel>> newTypeCache = new HashMap<>();

newListValue.forEach(view -> {
if (view.enabled()) {
EventChannel channel = createChannel(view);
newCache.put(view.name(), channel);
for (String eventType : view.events()) {
newTypeCache.computeIfAbsent(
eventType.trim(),
t -> new HashSet<>()
).add(channel);
}
});
}
});

cache = newCache;
typeCache = newTypeCache;

return completedFuture(null);
} finally {
guard.writeLock().unlock();
}
return completedFuture(null);
}

private EventChannel createChannel(ChannelView view) {
Expand Down
Loading