Skip to content

Commit

Permalink
fix: 熔断插件中,事件回调线程与业务线程出现死锁问题 (#553)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewshan authored Aug 22, 2024
1 parent 6fbbb71 commit 6f04521
Showing 1 changed file with 31 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -479,40 +479,36 @@ public String getName() {

void onCircuitBreakerRuleChanged(ServiceKey serviceKey) {
circuitBreakerRuleDictionary.onServiceChanged(serviceKey);
synchronized (countersCache) {
LOG.info("onCircuitBreakerRuleChanged: clear service {} from ResourceCounters", serviceKey);
for (Map.Entry<Level, Cache<Resource, Optional<ResourceCounters>>> entry : countersCache.entrySet()) {
Cache<Resource, Optional<ResourceCounters>> cacheValue = entry.getValue();
for (Resource resource : cacheValue.asMap().keySet()) {
if (Objects.equals(resource.getService(), serviceKey)) {
cacheValue.invalidate(resource);
}
LOG.info("onCircuitBreakerRuleChanged: clear service {} from ResourceCounters", serviceKey);
for (Map.Entry<Level, Cache<Resource, Optional<ResourceCounters>>> entry : countersCache.entrySet()) {
Cache<Resource, Optional<ResourceCounters>> cacheValue = entry.getValue();
for (Resource resource : cacheValue.asMap().keySet()) {
if (Objects.equals(resource.getService(), serviceKey)) {
cacheValue.invalidate(resource);
}
}
HealthCheckContainer healthCheckContainer = healthCheckCache.get(serviceKey);
if (null != healthCheckContainer) {
for (Map.Entry<Resource, ResourceWrap> entry : resourceMapping.entrySet()) {
Resource resource = entry.getKey();
if (Objects.equals(resource.getService(), serviceKey)) {
LOG.info("onCircuitBreakerRuleChanged: clear resource {} from healthCheckContainer", resource);
healthCheckContainer.removeResource(resource);
}
}
HealthCheckContainer healthCheckContainer = healthCheckCache.get(serviceKey);
if (null != healthCheckContainer) {
for (Map.Entry<Resource, ResourceWrap> entry : resourceMapping.entrySet()) {
Resource resource = entry.getKey();
if (Objects.equals(resource.getService(), serviceKey)) {
LOG.info("onCircuitBreakerRuleChanged: clear resource {} from healthCheckContainer", resource);
healthCheckContainer.removeResource(resource);
}
}
}
}

void onCircuitBreakerRuleAdded(ServiceKey serviceKey) {
circuitBreakerRuleDictionary.onServiceChanged(serviceKey);
synchronized (countersCache) {
LOG.info("onCircuitBreakerRuleChanged: clear service {} from ResourceCounters", serviceKey);
for (Map.Entry<Level, Cache<Resource, Optional<ResourceCounters>>> entry : countersCache.entrySet()) {
Cache<Resource, Optional<ResourceCounters>> cacheValue = entry.getValue();
for (Map.Entry<Resource, Optional<ResourceCounters>> entryCache: cacheValue.asMap().entrySet()) {
Resource resource = entryCache.getKey();
if (Objects.equals(resource.getService(), serviceKey) && !entryCache.getValue().isPresent()) {
cacheValue.invalidate(resource);
}
LOG.info("onCircuitBreakerRuleAdded: clear service {} from ResourceCounters", serviceKey);
for (Map.Entry<Level, Cache<Resource, Optional<ResourceCounters>>> entry : countersCache.entrySet()) {
Cache<Resource, Optional<ResourceCounters>> cacheValue = entry.getValue();
for (Map.Entry<Resource, Optional<ResourceCounters>> entryCache: cacheValue.asMap().entrySet()) {
Resource resource = entryCache.getKey();
if (Objects.equals(resource.getService(), serviceKey) && !entryCache.getValue().isPresent()) {
cacheValue.invalidate(resource);
}
}
}
Expand All @@ -533,19 +529,17 @@ public HealthCheckContainer apply(ServiceKey serviceKey, HealthCheckContainer he
return null;
}
});
synchronized (countersCache) {
for (Map.Entry<Level, Cache<Resource, Optional<ResourceCounters>>> entry : countersCache.entrySet()) {
Cache<Resource, Optional<ResourceCounters>> cacheValue = entry.getValue();
for (Map.Entry<Resource, Optional<ResourceCounters>> entryCache : cacheValue.asMap().entrySet()) {
Resource resource = entryCache.getKey();
if (Objects.equals(resource.getService(), svcKey)) {
if (entryCache.getValue().isPresent()) {
LOG.info("onFaultDetectRuleChanged: ResourceCounters {} setReloadFaultDetect true", svcKey);
ResourceCounters resourceCounters = entryCache.getValue().get();
resourceCounters.setReloadFaultDetect(true);
}

for (Map.Entry<Level, Cache<Resource, Optional<ResourceCounters>>> entry : countersCache.entrySet()) {
Cache<Resource, Optional<ResourceCounters>> cacheValue = entry.getValue();
for (Map.Entry<Resource, Optional<ResourceCounters>> entryCache : cacheValue.asMap().entrySet()) {
Resource resource = entryCache.getKey();
if (Objects.equals(resource.getService(), svcKey)) {
if (entryCache.getValue().isPresent()) {
LOG.info("onFaultDetectRuleChanged: ResourceCounters {} setReloadFaultDetect true", svcKey);
ResourceCounters resourceCounters = entryCache.getValue().get();
resourceCounters.setReloadFaultDetect(true);
}

}
}
}
Expand Down

0 comments on commit 6f04521

Please sign in to comment.