Skip to content

Commit

Permalink
Merge pull request #46 from andrewshan/main
Browse files Browse the repository at this point in the history
fix #45, switch server after sync operation fail
  • Loading branch information
andrewshan authored Dec 30, 2021
2 parents dad212c + fbf119b commit 9d9366b
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.tencent.polaris.api.config.global.ClusterType;
import com.tencent.polaris.api.config.global.ServerConnectorConfig;
import com.tencent.polaris.api.config.plugin.DefaultPlugins;
import com.tencent.polaris.api.control.Destroyable;
import com.tencent.polaris.api.exception.ErrorCode;
import com.tencent.polaris.api.exception.PolarisException;
import com.tencent.polaris.api.exception.RetriableException;
Expand All @@ -42,14 +43,13 @@
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.api.utils.ThreadPoolUtils;
import com.tencent.polaris.api.control.Destroyable;
import com.tencent.polaris.client.pb.ClientProto;
import com.tencent.polaris.client.pb.PolarisGRPCGrpc;
import com.tencent.polaris.client.pb.ResponseProto;
import com.tencent.polaris.client.pb.ServiceProto;
import com.tencent.polaris.client.util.NamedThreadFactory;
import com.tencent.polaris.plugins.connector.grpc.ServiceUpdateTask.Status;
import com.tencent.polaris.plugins.connector.grpc.ServiceUpdateTask.Type;
import com.tencent.polaris.client.util.NamedThreadFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -183,8 +183,8 @@ public void deRegisterServiceHandler(ServiceEventKey eventKey) throws PolarisExc
@Override
public CommonProviderResponse registerInstance(CommonProviderRequest req) throws PolarisException {
checkDestroyed();
ServiceKey serviceKey = new ServiceKey(req.getNamespace(), req.getService());
Connection connection = null;
ServiceKey serviceKey = new ServiceKey(req.getNamespace(), req.getService());
try {
waitDiscoverReady();
connection = connectionManager
Expand All @@ -205,6 +205,9 @@ public CommonProviderResponse registerInstance(CommonProviderRequest req) throws
if (t instanceof PolarisException) {
throw t;
}
if (null != connection) {
connection.reportFail();
}
throw new RetriableException(ErrorCode.NETWORK_ERROR,
String.format("fail to register host %s:%d service %s", req.getHost(), req.getPort(), serviceKey),
t);
Expand Down Expand Up @@ -322,6 +325,9 @@ public void deregisterInstance(CommonProviderRequest req) throws PolarisExceptio
//服务端异常不进行重试
throw t;
}
if (null != connection) {
connection.reportFail();
}
throw new RetriableException(ErrorCode.NETWORK_ERROR,
String.format("fail to deregister id %s, host %s:%d service %s",
req.getInstanceID(), req.getHost(), req.getPort(), serviceKey), t);
Expand Down Expand Up @@ -351,6 +357,9 @@ public void heartbeat(CommonProviderRequest req) throws PolarisException {
//服务端异常不进行重试
throw t;
}
if (null != connection) {
connection.reportFail();
}
throw new RetriableException(ErrorCode.NETWORK_ERROR,
String.format("fail to heartbeat id %s, host %s:%d service %s",
req.getInstanceID(), req.getHost(), req.getPort(), serviceKey), t);
Expand All @@ -365,12 +374,13 @@ public void heartbeat(CommonProviderRequest req) throws PolarisException {
public ReportClientResponse reportClient(ReportClientRequest req) throws PolarisException {
checkDestroyed();
waitDiscoverReady();
Connection connection = connectionManager
.getConnection(GrpcUtil.OP_KEY_REPORT_CLIENT, ClusterType.SERVICE_DISCOVER_CLUSTER);
PolarisGRPCGrpc.PolarisGRPCBlockingStub stub = PolarisGRPCGrpc.newBlockingStub(connection.getChannel());
GrpcUtil.attachRequestHeader(stub, GrpcUtil.nextHeartbeatReqId());
Connection connection = null;
ServiceKey serviceKey = new ServiceKey(req.getNamespace(), req.getService());
try {
connection = connectionManager
.getConnection(GrpcUtil.OP_KEY_REPORT_CLIENT, ClusterType.SERVICE_DISCOVER_CLUSTER);
PolarisGRPCGrpc.PolarisGRPCBlockingStub stub = PolarisGRPCGrpc.newBlockingStub(connection.getChannel());
GrpcUtil.attachRequestHeader(stub, GrpcUtil.nextHeartbeatReqId());
ClientProto.Client request = buildReportRequest(req);
ResponseProto.Response response = stub.reportClient(request);
LOG.debug("reportClient req:{}, rsp:{}", req, TextFormat.shortDebugString(response));
Expand All @@ -390,11 +400,16 @@ public ReportClientResponse reportClient(ReportClientRequest req) throws Polaris
//服务端异常不进行重试
throw t;
}
if (null != connection) {
connection.reportFail();
}
throw new RetriableException(ErrorCode.NETWORK_ERROR,
String.format("fail to report client host %s, version %s service %s",
req.getClientHost(), req.getVersion(), serviceKey), t);
} finally {
connection.release(GrpcUtil.OP_KEY_REPORT_CLIENT);
if (null != connection) {
connection.release(GrpcUtil.OP_KEY_REPORT_CLIENT);
}
}
}

Expand Down
7 changes: 4 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,11 @@
</links>
<doclint>none</doclint>
<excludePackageNames>
com.tencent.polaris.*.examples,com.tencent.polaris.*.examples.*
com.tencent.polaris.*.example,com.tencent.polaris.*.example.*
</excludePackageNames>
<doctitle>Tencent Polaris ${project.version} API</doctitle>
<windowtitle>Tencent Polaris ${project.version} API</windowtitle>
<doctitle>Tencent Polaris Java SDK ${project.version} API</doctitle>
<windowtitle>Tencent Polaris Java SDK ${project.version} API
</windowtitle>
</configuration>
</plugin>
</plugins>
Expand Down

0 comments on commit 9d9366b

Please sign in to comment.