Skip to content

Commit

Permalink
[SCB-2813]DynamicProperties: when key removed should get default value (
Browse files Browse the repository at this point in the history
  • Loading branch information
liubao68 authored Oct 19, 2023
1 parent 06b2b5f commit 7f18ee5
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 152 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.servicecomb.config.center.client;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
* This event is fired when configuration changed of config center.
*/
public class ConfigCenterConfigurationChangedEvent {
private final Map<String, Object> added;

private final Map<String, Object> deleted;

private final Map<String, Object> updated;

private Set<String> changed;

private ConfigCenterConfigurationChangedEvent(Map<String, Object> added, Map<String, Object> updated,
Map<String, Object> deleted) {
this.added = added;
this.deleted = deleted;
this.updated = updated;
this.changed = new HashSet<>();
this.changed.addAll(added.keySet());
this.changed.addAll(updated.keySet());
this.changed.addAll(deleted.keySet());
}

public static ConfigCenterConfigurationChangedEvent createIncremental(Map<String, Object> latest,
Map<String, Object> last) {
Map<String, Object> itemsCreated = new HashMap<>();
Map<String, Object> itemsDeleted = new HashMap<>();
Map<String, Object> itemsModified = new HashMap<>();

for (Map.Entry<String, Object> entry : latest.entrySet()) {
String itemKey = entry.getKey();
if (!last.containsKey(itemKey)) {
itemsCreated.put(itemKey, entry.getValue());
} else if (!Objects.equals(last.get(itemKey), latest.get(itemKey))) {
itemsModified.put(itemKey, entry.getValue());
}
}
for (String itemKey : last.keySet()) {
if (!latest.containsKey(itemKey)) {
itemsDeleted.put(itemKey, null);
}
}
ConfigCenterConfigurationChangedEvent event = ConfigCenterConfigurationChangedEvent
.createIncremental(itemsCreated, itemsModified, itemsDeleted);
return event;
}

public static ConfigCenterConfigurationChangedEvent createIncremental(Map<String, Object> added,
Map<String, Object> updated,
Map<String, Object> deleted) {
return new ConfigCenterConfigurationChangedEvent(added, updated, deleted);
}

public static ConfigCenterConfigurationChangedEvent createIncremental(Map<String, Object> updated) {
return new ConfigCenterConfigurationChangedEvent(new HashMap<>(), updated, new HashMap<>());
}

public final Map<String, Object> getAdded() {
return added;
}


public final Map<String, Object> getUpdated() {
return updated;
}


public final Map<String, Object> getDeleted() {
return deleted;
}

public final Set<String> getChanged() {
return changed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.servicecomb.config.center.client.model.QueryConfigurationsRequest;
import org.apache.servicecomb.config.center.client.model.QueryConfigurationsResponse;
import org.apache.servicecomb.config.common.ConfigConverter;
import org.apache.servicecomb.config.common.ConfigurationChangedEvent;
import org.apache.servicecomb.http.client.task.AbstractTask;
import org.apache.servicecomb.http.client.task.Task;
import org.slf4j.Logger;
Expand Down Expand Up @@ -76,11 +75,14 @@ public void execute() {
if (response.isChanged()) {
queryConfigurationsRequest.setRevision(response.getRevision());
Map<String, Object> lastData = configConverter.updateData(response.getConfigurations());
ConfigurationChangedEvent event = ConfigurationChangedEvent
ConfigCenterConfigurationChangedEvent event = ConfigCenterConfigurationChangedEvent
.createIncremental(configConverter.getCurrentData(), lastData);
eventBus.post(event);
if (!event.getChanged().isEmpty()) {
eventBus.post(event);
}
}
startTask(new BackOffSleepTask(configCenterConfiguration.getRefreshIntervalInMillis(), new PollConfigurationTask(0)));
startTask(
new BackOffSleepTask(configCenterConfiguration.getRefreshIntervalInMillis(), new PollConfigurationTask(0)));
} catch (Exception e) {
LOGGER.error("get configurations from ConfigCenter failed, and will try again.", e);
startTask(new BackOffSleepTask(failCount + 1, new PollConfigurationTask(failCount + 1)));
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.Executors;

import org.apache.servicecomb.config.common.ConfigConverter;
import org.apache.servicecomb.config.common.ConfigurationChangedEvent;
import org.apache.servicecomb.config.kie.client.model.ConfigurationsRequest;
import org.apache.servicecomb.config.kie.client.model.ConfigurationsRequestFactory;
import org.apache.servicecomb.config.kie.client.model.ConfigurationsResponse;
Expand Down Expand Up @@ -90,9 +89,9 @@ private void onDataChanged() {
this.configurationsRequests.forEach(r -> latestData.putAll(r.getLastRawData()));

Map<String, Object> lastData = configConverter.updateData(latestData);
ConfigurationChangedEvent event = ConfigurationChangedEvent
KieConfigurationChangedEvent event = KieConfigurationChangedEvent
.createIncremental(configConverter.getCurrentData(), lastData);
if (event.isChanged()) {
if (!event.getChanged().isEmpty()) {
eventBus.post(event);
}
}
Expand Down Expand Up @@ -128,9 +127,11 @@ public void execute() {
onDataChanged();
}
if (KieConfigManager.this.kieConfiguration.isEnableLongPolling()) {
startTask(new BackOffSleepTask(LONG_POLLING_INTERVAL, new PollConfigurationTask(0, this.configurationsRequest)));
startTask(
new BackOffSleepTask(LONG_POLLING_INTERVAL, new PollConfigurationTask(0, this.configurationsRequest)));
} else {
startTask(new BackOffSleepTask(kieConfiguration.getRefreshIntervalInMillis(), new PollConfigurationTask(0, this.configurationsRequest)));
startTask(new BackOffSleepTask(kieConfiguration.getRefreshIntervalInMillis(),
new PollConfigurationTask(0, this.configurationsRequest)));
}
} catch (Exception e) {
LOGGER.error("get configurations from KieConfigCenter failed, and will try again.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,62 +15,67 @@
* limitations under the License.
*/

package org.apache.servicecomb.config.common;
package org.apache.servicecomb.config.kie.client;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

public class ConfigurationChangedEvent {
/**
* This event is fired when configuration changed of kie.
*/
public class KieConfigurationChangedEvent {
private final Map<String, Object> added;

private final Map<String, Object> deleted;

private final Map<String, Object> updated;

private final boolean changed;

private Map<String, Object> complete;
private Set<String> changed;

private ConfigurationChangedEvent(Map<String, Object> added, Map<String, Object> updated,
Map<String, Object> deleted, boolean changed) {
private KieConfigurationChangedEvent(Map<String, Object> added, Map<String, Object> updated,
Map<String, Object> deleted) {
this.added = added;
this.deleted = deleted;
this.updated = updated;
this.changed = changed;
this.changed = new HashSet<>();
this.changed.addAll(added.keySet());
this.changed.addAll(updated.keySet());
this.changed.addAll(deleted.keySet());
}

public static ConfigurationChangedEvent createIncremental(Map<String, Object> latest, Map<String, Object> last) {
public static KieConfigurationChangedEvent createIncremental(Map<String, Object> latest, Map<String, Object> last) {
Map<String, Object> itemsCreated = new HashMap<>();
Map<String, Object> itemsDeleted = new HashMap<>();
Map<String, Object> itemsModified = new HashMap<>();
boolean changed = false;

for (Map.Entry<String, Object> entry : latest.entrySet()) {
String itemKey = entry.getKey();
if (!last.containsKey(itemKey)) {
itemsCreated.put(itemKey, entry.getValue());
changed = true;
} else if (!Objects.equals(last.get(itemKey), latest.get(itemKey))) {
itemsModified.put(itemKey, entry.getValue());
changed = true;
}
}
for (String itemKey : last.keySet()) {
if (!latest.containsKey(itemKey)) {
itemsDeleted.put(itemKey, null);
changed = true;
}
}
ConfigurationChangedEvent event = ConfigurationChangedEvent
.createIncremental(itemsCreated, itemsModified, itemsDeleted, changed);
event.complete = latest;
KieConfigurationChangedEvent event = KieConfigurationChangedEvent
.createIncremental(itemsCreated, itemsModified, itemsDeleted);
return event;
}

private static ConfigurationChangedEvent createIncremental(Map<String, Object> added, Map<String, Object> updated,
Map<String, Object> deleted, boolean changed) {
return new ConfigurationChangedEvent(added, updated, deleted, changed);
public static KieConfigurationChangedEvent createIncremental(Map<String, Object> added, Map<String, Object> updated,
Map<String, Object> deleted) {
return new KieConfigurationChangedEvent(added, updated, deleted);
}

public static KieConfigurationChangedEvent createIncremental(Map<String, Object> updated) {
return new KieConfigurationChangedEvent(new HashMap<>(), updated, new HashMap<>());
}

public final Map<String, Object> getAdded() {
Expand All @@ -87,11 +92,7 @@ public final Map<String, Object> getDeleted() {
return deleted;
}

public final Map<String, Object> getComplete() {
return complete;
}

public final boolean isChanged() {
public final Set<String> getChanged() {
return changed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.servicecomb.config.BootStrapProperties;
import org.apache.servicecomb.config.ConfigurationChangedEvent;
import org.apache.servicecomb.config.DynamicPropertiesSource;
import org.apache.servicecomb.config.center.client.ConfigCenterAddressManager;
import org.apache.servicecomb.config.center.client.ConfigCenterClient;
import org.apache.servicecomb.config.center.client.ConfigCenterConfigurationChangedEvent;
import org.apache.servicecomb.config.center.client.ConfigCenterManager;
import org.apache.servicecomb.config.center.client.model.ConfigCenterConfiguration;
import org.apache.servicecomb.config.center.client.model.QueryConfigurationsRequest;
import org.apache.servicecomb.config.center.client.model.QueryConfigurationsResponse;
import org.apache.servicecomb.config.common.ConfigConverter;
import org.apache.servicecomb.config.common.ConfigurationChangedEvent;
import org.apache.servicecomb.foundation.auth.AuthHeaderProvider;
import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
import org.apache.servicecomb.foundation.common.event.EventManager;
Expand Down Expand Up @@ -108,10 +109,13 @@ private QueryConfigurationsRequest firstPull(ConfigCenterConfig configCenterConf
}

@Subscribe
public void onConfigurationChangedEvent(ConfigurationChangedEvent event) {
public void onConfigurationChangedEvent(ConfigCenterConfigurationChangedEvent event) {
LOGGER.info("Dynamic configuration changed: {}", event.getChanged());
data.putAll(event.getAdded());
data.putAll(event.getUpdated());
event.getDeleted().forEach((k, v) -> data.remove(k));
EventManager.post(ConfigurationChangedEvent.createIncremental(event.getAdded(),
event.getUpdated(), event.getDeleted()));
}

private QueryConfigurationsRequest createQueryConfigurationsRequest(Environment environment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.servicecomb.config.BootStrapProperties;
import org.apache.servicecomb.config.ConfigurationChangedEvent;
import org.apache.servicecomb.config.DynamicPropertiesSource;
import org.apache.servicecomb.config.common.ConfigConverter;
import org.apache.servicecomb.config.common.ConfigurationChangedEvent;
import org.apache.servicecomb.config.kie.client.KieClient;
import org.apache.servicecomb.config.kie.client.KieConfigManager;
import org.apache.servicecomb.config.kie.client.KieConfigurationChangedEvent;
import org.apache.servicecomb.config.kie.client.model.KieAddressManager;
import org.apache.servicecomb.config.kie.client.model.KieConfiguration;
import org.apache.servicecomb.foundation.auth.AuthHeaderProvider;
Expand All @@ -43,12 +44,16 @@
import org.apache.servicecomb.http.client.auth.RequestAuthHeaderProvider;
import org.apache.servicecomb.http.client.common.HttpTransport;
import org.apache.servicecomb.http.client.common.HttpTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment;
import org.springframework.core.env.MapPropertySource;

import com.google.common.eventbus.Subscribe;

public class KieDynamicPropertiesSource implements DynamicPropertiesSource {
private static final Logger LOGGER = LoggerFactory.getLogger(KieDynamicPropertiesSource.class);

public static final String SOURCE_NAME = "kie";

private final Map<String, Object> data = new ConcurrentHashMapEx<>();
Expand Down Expand Up @@ -84,10 +89,13 @@ private void init(Environment environment) {
}

@Subscribe
public void onConfigurationChangedEvent(ConfigurationChangedEvent event) {
public void onConfigurationChangedEvent(KieConfigurationChangedEvent event) {
LOGGER.info("Dynamic configuration changed: {}", event.getChanged());
data.putAll(event.getAdded());
data.putAll(event.getUpdated());
event.getDeleted().forEach((k, v) -> data.remove(k));
EventManager.post(ConfigurationChangedEvent.createIncremental(event.getAdded(),
event.getUpdated(), event.getDeleted()));
}

private KieConfiguration createKieConfiguration(KieConfig kieConfig, Environment environment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
import java.util.Objects;
import java.util.Set;

/**
* This event is fired when configuration changed. And the change is already applied to Environment.
*
* Listeners can use Environment to get the latest value.
*/
public class ConfigurationChangedEvent {
private final Map<String, Object> added;

Expand Down
Loading

0 comments on commit 7f18ee5

Please sign in to comment.