Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor replication server handlers #747

Merged
merged 6 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
448 changes: 35 additions & 413 deletions src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,61 @@
import com.yelp.nrtsearch.server.grpc.AddReplicaRequest;
import com.yelp.nrtsearch.server.grpc.AddReplicaResponse;
import com.yelp.nrtsearch.server.grpc.ReplicationServerClient;
import com.yelp.nrtsearch.server.luceneserver.handler.Handler;
import com.yelp.nrtsearch.server.luceneserver.index.IndexState;
import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager;
import com.yelp.nrtsearch.server.luceneserver.index.ShardState;
import com.yelp.nrtsearch.server.luceneserver.state.GlobalState;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AddReplicaHandler implements Handler<AddReplicaRequest, AddReplicaResponse> {
public class AddReplicaHandler extends Handler<AddReplicaRequest, AddReplicaResponse> {
private static final Logger logger = LoggerFactory.getLogger(AddReplicaHandler.class);

private final boolean useKeepAlive;
private final boolean verifyIndexId;

public AddReplicaHandler(boolean useKeepAlive) {
this.useKeepAlive = useKeepAlive;
public AddReplicaHandler(GlobalState globalState, boolean verifyIndexId) {
super(globalState);
this.useKeepAlive = globalState.getConfiguration().getUseKeepAliveForReplication();
this.verifyIndexId = verifyIndexId;
}

@Override
public AddReplicaResponse handle(IndexState indexState, AddReplicaRequest addReplicaRequest) {
public void handle(
AddReplicaRequest addReplicaRequest, StreamObserver<AddReplicaResponse> responseObserver) {
try {
IndexStateManager indexStateManager =
getGlobalState().getIndexStateManager(addReplicaRequest.getIndexName());
checkIndexId(addReplicaRequest.getIndexId(), indexStateManager.getIndexId(), verifyIndexId);

IndexState indexState = indexStateManager.getCurrent();
AddReplicaResponse reply = handle(indexState, addReplicaRequest);
logger.info("AddReplicaHandler returned " + reply);
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
logger.warn("error while trying addReplicas " + addReplicaRequest.getIndexName(), e);
responseObserver.onError(e);
} catch (Exception e) {
logger.warn("error while trying addReplicas " + addReplicaRequest.getIndexName(), e);
responseObserver.onError(
Status.INTERNAL
.withDescription(
"error while trying to addReplicas for index: "
+ addReplicaRequest.getIndexName())
.augmentDescription(e.getMessage())
.asRuntimeException());
}
}

private AddReplicaResponse handle(IndexState indexState, AddReplicaRequest addReplicaRequest) {
ShardState shardState = indexState.getShard(0);
if (shardState.isPrimary() == false) {
if (!shardState.isPrimary()) {
throw new IllegalArgumentException(
"index \"" + indexState.getName() + "\" was not started or is not a primary");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,67 @@
import com.yelp.nrtsearch.server.grpc.CopyFiles;
import com.yelp.nrtsearch.server.grpc.TransferStatus;
import com.yelp.nrtsearch.server.grpc.TransferStatusCode;
import com.yelp.nrtsearch.server.luceneserver.handler.Handler;
import com.yelp.nrtsearch.server.luceneserver.index.IndexState;
import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager;
import com.yelp.nrtsearch.server.luceneserver.index.ShardState;
import com.yelp.nrtsearch.server.luceneserver.nrt.NRTReplicaNode;
import com.yelp.nrtsearch.server.luceneserver.state.GlobalState;
import com.yelp.nrtsearch.server.monitoring.NrtMetrics;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.replicator.nrt.CopyJob;
import org.apache.lucene.replicator.nrt.FileMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CopyFilesHandler implements Handler<CopyFiles, TransferStatus> {
public class CopyFilesHandler extends Handler<CopyFiles, TransferStatus> {
private static final Logger logger = LoggerFactory.getLogger(CopyFilesHandler.class);
private static final long CHECK_SLEEP_TIME_MS = 10;
private static final int CHECKS_PER_STATUS_MESSAGE = 3000; // at least 30s between status messages
private final boolean verifyIndexId;

public CopyFilesHandler(GlobalState globalState, boolean verifyIndexId) {
super(globalState);
this.verifyIndexId = verifyIndexId;
}

@Override
public void handle(
public void handle(CopyFiles request, StreamObserver<TransferStatus> responseObserver) {
try {
IndexStateManager indexStateManager =
getGlobalState().getIndexStateManager(request.getIndexName());
checkIndexId(request.getIndexId(), indexStateManager.getIndexId(), verifyIndexId);

IndexState indexState = indexStateManager.getCurrent();
// we need to send multiple responses to client from this method
handle(indexState, request, responseObserver);
logger.info("CopyFilesHandler returned successfully");
} catch (StatusRuntimeException e) {
logger.warn("error while trying copyFiles " + request.getIndexName(), e);
responseObserver.onError(e);
} catch (Exception e) {
logger.warn(
String.format(
"error on copyFiles for primaryGen: %s, for index: %s",
request.getPrimaryGen(), request.getIndexName()),
e);
responseObserver.onError(
Status.INTERNAL
.withDescription(
String.format(
"error on copyFiles for primaryGen: %s, for index: %s",
request.getPrimaryGen(), request.getIndexName()))
.augmentDescription(e.getMessage())
.asRuntimeException());
}
}

private void handle(
IndexState indexState,
CopyFiles copyFilesRequest,
StreamObserver<TransferStatus> responseObserver)
Expand Down Expand Up @@ -117,10 +161,4 @@ public void handle(
NrtMetrics.nrtMergeSize.labelValues(indexName).observe(job.getTotalBytesCopied());
}
}

@Override
public TransferStatus handle(IndexState indexState, CopyFiles protoRequest)
throws HandlerException {
throw new UnsupportedOperationException("This method is in not implemented for this class");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,45 @@
import com.yelp.nrtsearch.server.grpc.GetNodesRequest;
import com.yelp.nrtsearch.server.grpc.GetNodesResponse;
import com.yelp.nrtsearch.server.grpc.NodeInfo;
import com.yelp.nrtsearch.server.luceneserver.handler.Handler;
import com.yelp.nrtsearch.server.luceneserver.index.IndexState;
import com.yelp.nrtsearch.server.luceneserver.index.ShardState;
import com.yelp.nrtsearch.server.luceneserver.nrt.NRTPrimaryNode;
import com.yelp.nrtsearch.server.luceneserver.state.GlobalState;
import com.yelp.nrtsearch.server.utils.HostPort;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GetNodesInfoHandler implements Handler<GetNodesRequest, GetNodesResponse> {
public class GetNodesInfoHandler extends Handler<GetNodesRequest, GetNodesResponse> {
private static final Logger logger = LoggerFactory.getLogger(GetNodesInfoHandler.class);

public GetNodesInfoHandler(GlobalState globalState) {
super(globalState);
}

@Override
public GetNodesResponse handle(IndexState indexState, GetNodesRequest getNodesRequest)
throws HandlerException {
public void handle(
GetNodesRequest getNodesRequest, StreamObserver<GetNodesResponse> responseObserver) {
try {
IndexState indexState = getGlobalState().getIndex(getNodesRequest.getIndexName());
GetNodesResponse reply = handle(indexState);
logger.debug("GetNodesInfoHandler returned GetNodeResponse of size " + reply.getNodesCount());
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Exception e) {
logger.warn("error on GetNodesInfoHandler", e);
responseObserver.onError(
Status.INTERNAL
.withDescription("error on GetNodesInfoHandler")
.augmentDescription(e.getMessage())
.asRuntimeException());
}
}

private GetNodesResponse handle(IndexState indexState) {
GetNodesResponse.Builder builder = GetNodesResponse.newBuilder();
ShardState shardState = indexState.getShard(0);
if (!shardState.isPrimary() || !shardState.isStarted()) {
Expand Down
50 changes: 0 additions & 50 deletions src/main/java/com/yelp/nrtsearch/server/luceneserver/Handler.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,68 @@
import com.yelp.nrtsearch.server.grpc.NewNRTPoint;
import com.yelp.nrtsearch.server.grpc.TransferStatus;
import com.yelp.nrtsearch.server.grpc.TransferStatusCode;
import com.yelp.nrtsearch.server.luceneserver.handler.Handler;
import com.yelp.nrtsearch.server.luceneserver.index.IndexState;
import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager;
import com.yelp.nrtsearch.server.luceneserver.index.ShardState;
import com.yelp.nrtsearch.server.luceneserver.state.GlobalState;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NewNRTPointHandler implements Handler<NewNRTPoint, TransferStatus> {
public class NewNRTPointHandler extends Handler<NewNRTPoint, TransferStatus> {
private static final Logger logger = LoggerFactory.getLogger(NewNRTPointHandler.class);
private final boolean verifyIndexId;

public NewNRTPointHandler(GlobalState globalState, boolean verifyIndexId) {
super(globalState);
this.verifyIndexId = verifyIndexId;
}

@Override
public TransferStatus handle(IndexState indexState, NewNRTPoint newNRTPointRequest)
throws HandlerException {
public void handle(NewNRTPoint request, StreamObserver<TransferStatus> responseObserver) {
try {
IndexStateManager indexStateManager =
getGlobalState().getIndexStateManager(request.getIndexName());
checkIndexId(request.getIndexId(), indexStateManager.getIndexId(), verifyIndexId);

IndexState indexState = indexStateManager.getCurrent();
TransferStatus reply = handle(indexState, request);
logger.debug(
"NewNRTPointHandler returned status "
+ reply.getCode()
+ " message: "
+ reply.getMessage());
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
logger.warn(
String.format(
"error on newNRTPoint for indexName: %s, for version: %s, primaryGen: %s",
request.getIndexName(), request.getVersion(), request.getPrimaryGen()),
e);
responseObserver.onError(e);
} catch (Exception e) {
logger.warn(
String.format(
"error on newNRTPoint for indexName: %s, for version: %s, primaryGen: %s",
request.getIndexName(), request.getVersion(), request.getPrimaryGen()),
e);
responseObserver.onError(
Status.INTERNAL
.withDescription(
String.format(
"error on newNRTPoint for indexName: %s, for version: %s, primaryGen: %s",
request.getIndexName(), request.getVersion(), request.getPrimaryGen()))
.augmentDescription(e.getMessage())
.asRuntimeException());
}
}

private TransferStatus handle(IndexState indexState, NewNRTPoint newNRTPointRequest) {
ShardState shardState = indexState.getShard(0);
if (shardState.isReplica() == false) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,66 @@
import com.yelp.nrtsearch.server.grpc.CopyStateRequest;
import com.yelp.nrtsearch.server.grpc.FileMetadata;
import com.yelp.nrtsearch.server.grpc.FilesMetadata;
import com.yelp.nrtsearch.server.luceneserver.handler.Handler;
import com.yelp.nrtsearch.server.luceneserver.index.IndexState;
import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager;
import com.yelp.nrtsearch.server.luceneserver.index.ShardState;
import com.yelp.nrtsearch.server.luceneserver.nrt.NRTPrimaryNode;
import com.yelp.nrtsearch.server.luceneserver.state.GlobalState;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Map;
import org.apache.lucene.replicator.nrt.FileMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecvCopyStateHandler extends Handler<CopyStateRequest, CopyState> {
private static final Logger logger = LoggerFactory.getLogger(RecvCopyStateHandler.class);

private final boolean verifyIndexId;

public RecvCopyStateHandler(GlobalState globalState, boolean verifyIndexId) {
super(globalState);
this.verifyIndexId = verifyIndexId;
}

public class RecvCopyStateHandler implements Handler<CopyStateRequest, CopyState> {
@Override
public CopyState handle(IndexState indexState, CopyStateRequest copyStateRequest) {
public void handle(CopyStateRequest request, StreamObserver<CopyState> responseObserver) {
try {
IndexStateManager indexStateManager =
getGlobalState().getIndexStateManager(request.getIndexName());
checkIndexId(request.getIndexId(), indexStateManager.getIndexId(), verifyIndexId);

IndexState indexState = indexStateManager.getCurrent();
CopyState reply = handle(indexState, request);
logger.debug(
"RecvCopyStateHandler returned, completedMergeFiles count: "
+ reply.getCompletedMergeFilesCount());
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
logger.warn("error while trying recvCopyState " + request.getIndexName(), e);
responseObserver.onError(e);
} catch (Exception e) {
logger.warn(
String.format(
"error on recvCopyState for replicaId: %s, for index: %s",
request.getReplicaId(), request.getIndexName()),
e);
responseObserver.onError(
Status.INTERNAL
.withDescription(
String.format(
"error on recvCopyState for replicaId: %s, for index: %s",
request.getReplicaId(), request.getIndexName()))
.augmentDescription(e.getMessage())
.asRuntimeException());
}
}

private CopyState handle(IndexState indexState, CopyStateRequest copyStateRequest) {
ShardState shardState = indexState.getShard(0);
if (shardState.isPrimary() == false) {
throw new IllegalArgumentException(
Expand Down
Loading
Loading