Skip to content

Commit

Permalink
[SCB-2813]Nacos implementation not properly handle instance change (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
liubao68 authored Oct 23, 2023
1 parent ed60267 commit 4a1da76
Show file tree
Hide file tree
Showing 20 changed files with 135 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,12 @@
import org.apache.servicecomb.core.definition.SchemaMeta;
import org.apache.servicecomb.foundation.common.utils.ClassLoaderScopeContext;
import org.apache.servicecomb.registry.definition.DefinitionConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* 对静态路径和动态路径的operation进行预先处理,加速operation的查询定位
*/
public class ServicePathManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ServicePathManager.class);

private static final String REST_PATH_MANAGER = "RestServicePathManager";

protected MicroserviceMeta microserviceMeta;
Expand Down Expand Up @@ -71,11 +67,6 @@ private void addSchema(SchemaMeta schemaMeta) {
operationMeta.putExtData(RestConst.SWAGGER_REST_OPERATION, restOperationMeta);
addResource(restOperationMeta);
}

LOGGER.info("add schema to service paths. {}:{}:{}.",
schemaMeta.getAppId(),
schemaMeta.getMicroserviceName(),
schemaMeta.getSchemaId());
}

public OperationLocator consumerLocateOperation(String path, String httpMethod) {
Expand Down
4 changes: 4 additions & 0 deletions demo/demo-nacos/consumer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
<groupId>org.apache.servicecomb</groupId>
<artifactId>java-chassis-spring-boot-starter-standalone</artifactId>
</dependency>
<dependency>
<groupId>org.apache.servicecomb</groupId>
<artifactId>registry-nacos</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
4 changes: 4 additions & 0 deletions demo/demo-nacos/gateway/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
<groupId>org.apache.servicecomb</groupId>
<artifactId>edge-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.servicecomb</groupId>
<artifactId>registry-nacos</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
14 changes: 0 additions & 14 deletions demo/demo-nacos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,6 @@
<groupId>org.apache.servicecomb</groupId>
<artifactId>solution-basic</artifactId>
</dependency>
<dependency>
<groupId>org.apache.servicecomb</groupId>
<artifactId>registry-nacos</artifactId>
</dependency>
<dependency>
<groupId>org.apache.servicecomb</groupId>
<artifactId>servicestage-environment</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.servicecomb</groupId>
<artifactId>registry-service-center</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions demo/demo-nacos/provider/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
<groupId>org.apache.servicecomb</groupId>
<artifactId>java-chassis-spring-boot-starter-standalone</artifactId>
</dependency>
<dependency>
<groupId>org.apache.servicecomb</groupId>
<artifactId>registry-nacos</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
13 changes: 4 additions & 9 deletions demo/demo-nacos/test-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,12 @@
<groupId>org.apache.servicecomb.demo</groupId>
<artifactId>demo-schema</artifactId>
</dependency>
<dependency>
<groupId>org.apache.servicecomb</groupId>
<artifactId>registry-local</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>docker</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static void main(String[] args) throws Exception {
try {
new SpringApplicationBuilder().web(WebApplicationType.NONE).sources(TestClientApplication.class).run(args);

CategorizedTestCaseRunner.runCategorizedTestCase("consumer");
run();
} catch (Exception e) {
TestMgr.failed("test case run failed", e);
LOGGER.error("-------------- test failed -------------");
Expand All @@ -42,4 +42,8 @@ public static void main(String[] args) throws Exception {
}
TestMgr.summary();
}

public static void run() throws Exception {
CategorizedTestCaseRunner.runCategorizedTestCase("consumer");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ servicecomb:
application: demo-nacos
name: test-client
version: 0.0.1
registry:
nacos:
enabled: true
serverAddr: ${PAAS_CSE_NACOS_ENDPOINT:http://127.0.0.1:8848}

rest:
address: 0.0.0.0:9097 # should be same with server.port to use web container
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,29 @@
* limitations under the License.
*/

package org.apache.servicecomb.registry.nacos;
package org.apache.servicecomb.samples;

import org.springframework.beans.factory.annotation.Autowired;
import org.apache.servicecomb.demo.TestMgr;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import com.alibaba.nacos.client.naming.event.InstancesChangeEvent;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.listener.Subscriber;
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = TestClientApplication.class)
public class NocasIT {

public class InstancesChangeEventListener extends Subscriber<InstancesChangeEvent> {
private final NacosDiscovery nacosDiscovery;

@Autowired
public InstancesChangeEventListener(NacosDiscovery nacosDiscovery) {
this.nacosDiscovery = nacosDiscovery;
@BeforeEach
public void setUp() {
TestMgr.errors().clear();
}

@Override
public void onEvent(InstancesChangeEvent event) {
nacosDiscovery.onInstanceChangedEvent(event);
}
@Test
public void clientGetsNoError() throws Exception {
TestClientApplication.run();

@Override
public Class<? extends Event> subscribeType() {
return InstancesChangeEvent.class;
Assertions.assertTrue(TestMgr.errors().isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import com.google.common.annotations.VisibleForTesting;

public class NacosDynamicPropertiesSource implements DynamicPropertiesSource {
public static final String SOURCE_NAME = "kie";
public static final String SOURCE_NAME = "nacos";

private static final Logger LOGGER = LoggerFactory.getLogger(NacosDynamicPropertiesSource.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void shutdown() {

protected void startPoll() {
executorService.scheduleAtFixedRate(this::pollMeters,
0,
config.getMsPollInterval(),
config.getMsPollInterval(),
TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class MetricsBootstrapConfig {
public static final String CONFIG_LATENCY_DISTRIBUTION_MIN_SCOPE_LEN =
"servicecomb.metrics.publisher.defaultLog.invocation.latencyDistribution.minScopeLength";

public static final int DEFAULT_METRICS_WINDOW_TIME = 60000;
public static final int DEFAULT_METRICS_WINDOW_TIME = 300_000;

private long msPollInterval;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.lang3.StringUtils;
import org.apache.servicecomb.foundation.common.cache.VersionedCache;
import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
import org.apache.servicecomb.registry.api.Discovery;
Expand Down Expand Up @@ -98,7 +97,7 @@ private void doTask() {
changed = true;
}
// check ping status
if (System.currentTimeMillis() - instance.getPingSuccessTime() > 180_000L) {
if (System.currentTimeMillis() - instance.getPingTime() > 180_000L) {
boolean pingResult = ping.ping(instance);
if (pingResult && instance.getPingStatus() != PingStatus.OK) {
instance.setPingStatus(PingStatus.OK);
Expand All @@ -107,9 +106,7 @@ private void doTask() {
instance.setPingStatus(PingStatus.FAIL);
changed = true;
}
if (pingResult) {
instance.setPingSuccessTime(System.currentTimeMillis());
}
instance.setPingTime(System.currentTimeMillis());
}
// check unused
if (instance.getHistoryStatus() == HistoryStatus.HISTORY) {
Expand All @@ -118,24 +115,24 @@ private void doTask() {
instance.getIsolationStatus() == IsolationStatus.ISOLATED) {
removed.computeIfAbsent(apps.getKey(), k -> new HashMap<>())
.computeIfAbsent(services.getKey(), k -> new ArrayList<>()).add(instance.getInstanceId());
LOGGER.info("Remove instance {}/{}/{}/{}/{}/{}/{}/{}",
apps.getKey(), services.getKey(), instance.getRegistryName(),
instance.getInstanceId(), instance.getHistoryStatus(),
instance.getStatus(), instance.getPingStatus(), instance.getIsolationStatus());
changed = true;
}
}
}
if (changed) {
rebuildVersionCache(apps.getKey(), apps.getKey());
rebuildVersionCache(apps.getKey(), services.getKey());
}
}
}
// remove unused
for (Entry<String, Map<String, List<String>>> apps : removed.entrySet()) {
for (Entry<String, List<String>> services : apps.getValue().entrySet()) {
for (String instance : services.getValue()) {
StatefulDiscoveryInstance removedInstance =
allInstances.get(apps.getKey()).get(services.getKey()).remove(instance);
LOGGER.info("Remove instance {}/{}/{}/{}/{}/{}/{}/{}",
apps.getKey(), services.getKey(), removedInstance.getRegistryName(),
instance, removedInstance.getHistoryStatus(),
removedInstance.getStatus(), removedInstance.getPingStatus(), removedInstance.getIsolationStatus());
allInstances.get(apps.getKey()).get(services.getKey()).remove(instance);
}
}
}
Expand All @@ -152,27 +149,28 @@ private void onInstancesChanged(String registryName, String application, String
new ConcurrentHashMapEx<>()).computeIfAbsent(serviceName, key -> new ConcurrentHashMapEx<>());

for (StatefulDiscoveryInstance statefulInstance : statefulInstances.values()) {
if (StringUtils.isEmpty(registryName)) {
statefulInstance.setHistoryStatus(HistoryStatus.HISTORY);
continue;
}
if (registryName.equals(statefulInstance.getRegistryName())) {
statefulInstance.setHistoryStatus(HistoryStatus.HISTORY);
if (registryName == null || registryName.equals(statefulInstance.getRegistryName())) {
if (!instances.contains(statefulInstance)) {
statefulInstance.setPingTime(0);
statefulInstance.setHistoryStatus(HistoryStatus.HISTORY);
}
}
}

for (DiscoveryInstance instance : instances) {
StatefulDiscoveryInstance target = statefulInstances.get(instance.getInstanceId());
if (target == null) {
statefulInstances.put(instance.getInstanceId(), new StatefulDiscoveryInstance(instance));
StatefulDiscoveryInstance target = new StatefulDiscoveryInstance(instance);
StatefulDiscoveryInstance origin = statefulInstances.get(instance.getInstanceId());
if (origin == null) {
statefulInstances.put(instance.getInstanceId(), target);
continue;
}
target.setHistoryStatus(HistoryStatus.CURRENT);
target.setMicroserviceInstanceStatus(instance.getStatus());
target.setPingTime(origin.getPingTime());
target.setPingStatus(origin.getPingStatus());
target.setIsolateDuration(origin.getIsolateDuration());
target.setIsolationStatus(origin.getIsolationStatus());
statefulInstances.put(instance.getInstanceId(), target);
}

rebuildVersionCache(application, serviceName);

StringBuilder instanceInfo = new StringBuilder();
for (DiscoveryInstance instance : instances) {
instanceInfo.append("{")
Expand All @@ -184,6 +182,8 @@ private void onInstancesChanged(String registryName, String application, String
}
LOGGER.info("Applying new instance list for {}/{}/{}. Endpoints {}",
application, serviceName, instances.size(), instanceInfo);

rebuildVersionCache(application, serviceName);
}

public void onInstanceIsolated(StatefulDiscoveryInstance instance, long isolateDuration) {
Expand Down Expand Up @@ -233,6 +233,20 @@ private VersionedCache calcAvailableInstance(String application, String serviceN
result.add(instance);
}
}
StringBuilder instanceInfo = new StringBuilder();
for (StatefulDiscoveryInstance instance : result) {
instanceInfo.append("{")
.append(instance.getInstanceId()).append(",")
.append(instance.getHistoryStatus()).append(",")
.append(instance.getStatus()).append(",")
.append(instance.getPingStatus()).append(",")
.append(instance.getIsolationStatus()).append(",")
.append(instance.getEndpoints()).append(",")
.append(instance.getRegistryName())
.append("}");
}
LOGGER.info("Rebuild cached instance list for {}/{}/{}. Endpoints {}",
application, serviceName, result.size(), instanceInfo);
return new VersionedCache()
.name(application + ":" + serviceName)
.autoCacheVersion()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public enum HistoryStatus {

private PingStatus pingStatus = PingStatus.UNKNOWN;

private long pingSuccessTime;
private long pingTime = 0;

private HistoryStatus historyStatus = HistoryStatus.CURRENT;

Expand Down Expand Up @@ -114,12 +114,12 @@ public void setIsolateDuration(long isolateDuration) {
this.isolateDuration = isolateDuration;
}

public long getPingSuccessTime() {
return pingSuccessTime;
public long getPingTime() {
return pingTime;
}

public void setPingSuccessTime(long pingSuccessTime) {
this.pingSuccessTime = pingSuccessTime;
public void setPingTime(long pingTime) {
this.pingTime = pingTime;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,12 @@ public static <T> List<T> loadSortedService(Class<T> serviceType) {
.map(Entry::getValue)
.collect(Collectors.toList());

LOGGER.info("Found SPI service {}, count={}.", serviceType.getName(), services.size());
StringBuilder info = new StringBuilder();
for (int idx = 0; idx < services.size(); idx++) {
T service = services.get(idx);
LOGGER.info(" {}. {}.", idx, service.getClass().getName());
info.append("{").append(idx).append(",").append(service.getClass().getSimpleName()).append("}");
}
LOGGER.info("Found SPI service {}, services={}.", serviceType.getSimpleName(), info);

return services;
}
Expand Down
Loading

0 comments on commit 4a1da76

Please sign in to comment.