From c786eaaef49ae77f95e2e1df4116f37e6cf00f3c Mon Sep 17 00:00:00 2001 From: Sarthak Nandi Date: Fri, 4 Oct 2024 08:37:24 -0700 Subject: [PATCH 1/5] Refactor ReplicationServer handlers --- .../nrtsearch/server/grpc/LuceneServer.java | 436 ++---------------- .../luceneserver/AddReplicaHandler.java | 48 +- .../server/luceneserver/CopyFilesHandler.java | 53 ++- .../luceneserver/GetNodesInfoHandler.java | 30 +- .../server/luceneserver/Handler.java | 49 -- .../luceneserver/NewNRTPointHandler.java | 58 ++- .../luceneserver/RecvCopyStateHandler.java | 53 ++- ...ReplicaCurrentSearchingVersionHandler.java | 39 +- .../luceneserver/WriteNRTPointHandler.java | 38 +- .../server/luceneserver/handler/Handler.java | 23 +- .../handler/RecvRawFileHandler.java | 84 ++++ .../handler/RecvRawFileV2Handler.java | 137 ++++++ .../handler/SendRawFileHandler.java | 117 +++++ 13 files changed, 685 insertions(+), 480 deletions(-) delete mode 100644 src/main/java/com/yelp/nrtsearch/server/luceneserver/Handler.java create mode 100644 src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/RecvRawFileHandler.java create mode 100644 src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/RecvRawFileV2Handler.java create mode 100644 src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/SendRawFileHandler.java diff --git a/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java b/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java index 45009f873..eaf2483e1 100644 --- a/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java +++ b/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java @@ -23,7 +23,6 @@ import com.google.inject.Inject; import com.google.inject.Injector; import com.google.protobuf.Any; -import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import com.google.protobuf.util.JsonFormat; import com.yelp.nrtsearch.LuceneServerModule; @@ -33,11 +32,9 @@ import com.yelp.nrtsearch.server.luceneserver.CopyFilesHandler; import com.yelp.nrtsearch.server.luceneserver.GetNodesInfoHandler; import com.yelp.nrtsearch.server.luceneserver.GlobalState; -import com.yelp.nrtsearch.server.luceneserver.IndexState; import com.yelp.nrtsearch.server.luceneserver.NewNRTPointHandler; import com.yelp.nrtsearch.server.luceneserver.RecvCopyStateHandler; import com.yelp.nrtsearch.server.luceneserver.ReplicaCurrentSearchingVersionHandler; -import com.yelp.nrtsearch.server.luceneserver.ShardState; import com.yelp.nrtsearch.server.luceneserver.WriteNRTPointHandler; import com.yelp.nrtsearch.server.luceneserver.analysis.AnalyzerCreator; import com.yelp.nrtsearch.server.luceneserver.custom.request.CustomRequestProcessor; @@ -61,12 +58,15 @@ import com.yelp.nrtsearch.server.luceneserver.handler.LiveSettingsHandler; import com.yelp.nrtsearch.server.luceneserver.handler.MetricsHandler; import com.yelp.nrtsearch.server.luceneserver.handler.ReadyHandler; +import com.yelp.nrtsearch.server.luceneserver.handler.RecvRawFileHandler; +import com.yelp.nrtsearch.server.luceneserver.handler.RecvRawFileV2Handler; import com.yelp.nrtsearch.server.luceneserver.handler.RefreshHandler; import com.yelp.nrtsearch.server.luceneserver.handler.RegisterFieldsHandler; import com.yelp.nrtsearch.server.luceneserver.handler.ReleaseSnapshotHandler; import com.yelp.nrtsearch.server.luceneserver.handler.ReloadStateHandler; import com.yelp.nrtsearch.server.luceneserver.handler.SearchHandler; import com.yelp.nrtsearch.server.luceneserver.handler.SearchV2Handler; +import com.yelp.nrtsearch.server.luceneserver.handler.SendRawFileHandler; import com.yelp.nrtsearch.server.luceneserver.handler.SettingsHandler; import com.yelp.nrtsearch.server.luceneserver.handler.StartIndexHandler; import com.yelp.nrtsearch.server.luceneserver.handler.StartIndexV2Handler; @@ -75,7 +75,6 @@ import com.yelp.nrtsearch.server.luceneserver.handler.StopIndexHandler; import com.yelp.nrtsearch.server.luceneserver.handler.UpdateFieldsHandler; import com.yelp.nrtsearch.server.luceneserver.highlights.HighlighterService; -import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager; import com.yelp.nrtsearch.server.luceneserver.index.handlers.LiveSettingsV2Handler; import com.yelp.nrtsearch.server.luceneserver.index.handlers.SettingsV2Handler; import com.yelp.nrtsearch.server.luceneserver.logging.HitsLoggerCreator; @@ -106,16 +105,13 @@ import io.grpc.ServerBuilder; import io.grpc.ServerInterceptors; import io.grpc.Status; -import io.grpc.StatusRuntimeException; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import io.grpc.protobuf.services.ProtoReflectionService; import io.grpc.stub.StreamObserver; import io.prometheus.metrics.instrumentation.jvm.JvmMetrics; import io.prometheus.metrics.model.registry.PrometheusRegistry; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStream; import java.util.List; import java.util.Optional; import java.util.concurrent.Callable; @@ -123,8 +119,6 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.suggest.document.CompletionPostingsFormatUtil; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexInput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import picocli.CommandLine; @@ -681,6 +675,17 @@ static class ReplicationServerImpl extends ReplicationServerGrpc.ReplicationServ private final GlobalState globalState; private final boolean verifyIndexId; + private final AddReplicaHandler addReplicaHandler; + private final CopyFilesHandler copyFilesHandler; + private final GetNodesInfoHandler getNodesInfoHandler; + private final NewNRTPointHandler newNRTPointHandler; + private final RecvCopyStateHandler recvCopyStateHandler; + private final RecvRawFileHandler recvRawFileHandler; + private final RecvRawFileV2Handler recvRawFileV2Handler; + private final ReplicaCurrentSearchingVersionHandler replicaCurrentSearchingVersionHandler; + private final SendRawFileHandler sendRawFileHandler; + private final WriteNRTPointHandler writeNRTPointHandler; + @VisibleForTesting static void checkIndexId(String actual, String expected, boolean throwException) { if (!actual.equals(expected)) { @@ -697,446 +702,77 @@ static void checkIndexId(String actual, String expected, boolean throwException) public ReplicationServerImpl(GlobalState globalState, boolean verifyIndexId) { this.globalState = globalState; this.verifyIndexId = verifyIndexId; + + addReplicaHandler = new AddReplicaHandler(globalState, verifyIndexId); + copyFilesHandler = new CopyFilesHandler(globalState, verifyIndexId); + getNodesInfoHandler = new GetNodesInfoHandler(globalState); + newNRTPointHandler = new NewNRTPointHandler(globalState, verifyIndexId); + recvCopyStateHandler = new RecvCopyStateHandler(globalState, verifyIndexId); + recvRawFileHandler = new RecvRawFileHandler(globalState, verifyIndexId); + recvRawFileV2Handler = new RecvRawFileV2Handler(globalState, verifyIndexId); + replicaCurrentSearchingVersionHandler = + new ReplicaCurrentSearchingVersionHandler(globalState); + sendRawFileHandler = new SendRawFileHandler(globalState); + writeNRTPointHandler = new WriteNRTPointHandler(globalState); } @Override public void addReplicas( AddReplicaRequest addReplicaRequest, StreamObserver responseStreamObserver) { - try { - IndexStateManager indexStateManager = - globalState.getIndexStateManager(addReplicaRequest.getIndexName()); - checkIndexId(addReplicaRequest.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); - - IndexState indexState = indexStateManager.getCurrent(); - boolean useKeepAliveForReplication = - globalState.getConfiguration().getUseKeepAliveForReplication(); - AddReplicaResponse reply = - new AddReplicaHandler(useKeepAliveForReplication).handle(indexState, addReplicaRequest); - logger.info("AddReplicaHandler returned " + reply.toString()); - responseStreamObserver.onNext(reply); - responseStreamObserver.onCompleted(); - } catch (StatusRuntimeException e) { - logger.warn("error while trying addReplicas " + addReplicaRequest.getIndexName(), e); - responseStreamObserver.onError(e); - } catch (Exception e) { - logger.warn("error while trying addReplicas " + addReplicaRequest.getIndexName(), e); - responseStreamObserver.onError( - Status.INTERNAL - .withDescription( - "error while trying to addReplicas for index: " - + addReplicaRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + addReplicaHandler.handle(addReplicaRequest, responseStreamObserver); } @Override public StreamObserver sendRawFile( StreamObserver responseObserver) { - OutputStream outputStream = null; - try { - // TODO: where do we write these files to? - outputStream = new FileOutputStream(File.createTempFile("tempfile", ".tmp")); - } catch (IOException e) { - new RuntimeException(e); - } - return new SendRawFileStreamObserver(outputStream, responseObserver); - } - - static class SendRawFileStreamObserver implements StreamObserver { - private static final Logger logger = - LoggerFactory.getLogger(SendRawFileStreamObserver.class.getName()); - private final OutputStream outputStream; - private final StreamObserver responseObserver; - private final long startTime; - - SendRawFileStreamObserver( - OutputStream outputStream, StreamObserver responseObserver) { - this.outputStream = outputStream; - this.responseObserver = responseObserver; - startTime = System.nanoTime(); - } - - @Override - public void onNext(RawFileChunk value) { - // called by client once per chunk of data - try { - logger.trace("sendRawFile onNext"); - value.getContent().writeTo(outputStream); - } catch (IOException e) { - try { - outputStream.close(); - } catch (IOException ex) { - logger.warn("error trying to close outputStream", ex); - } finally { - // we either had error in writing to outputStream or cant close it, - // either case we need to raise it back to client - responseObserver.onError(e); - } - } - } - - @Override - public void onError(Throwable t) { - logger.warn("sendRawFile cancelled", t); - try { - outputStream.close(); - } catch (IOException e) { - logger.warn("error while trying to close outputStream", e); - } finally { - // we want to raise error always here - responseObserver.onError(t); - } - } - - @Override - public void onCompleted() { - logger.info("sendRawFile completed"); - // called by client after the entire file is sent - try { - outputStream.close(); - // TOOD: should we send fileSize copied? - long endTime = System.nanoTime(); - long totalTimeInMilliSeoncds = (endTime - startTime) / (1000 * 1000); - responseObserver.onNext( - TransferStatus.newBuilder() - .setCode(TransferStatusCode.Done) - .setMessage(String.valueOf(totalTimeInMilliSeoncds)) - .build()); - responseObserver.onCompleted(); - } catch (IOException e) { - logger.warn("error while trying to close outputStream", e); - responseObserver.onError(e); - } - } + return sendRawFileHandler.handle(responseObserver); } @Override public void recvRawFile( FileInfo fileInfoRequest, StreamObserver rawFileChunkStreamObserver) { - try { - IndexStateManager indexStateManager = - globalState.getIndexStateManager(fileInfoRequest.getIndexName()); - checkIndexId(fileInfoRequest.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); - - IndexState indexState = indexStateManager.getCurrent(); - ShardState shardState = indexState.getShard(0); - try (IndexInput luceneFile = - shardState.indexDir.openInput(fileInfoRequest.getFileName(), IOContext.DEFAULT)) { - long len = luceneFile.length(); - long pos = fileInfoRequest.getFpStart(); - luceneFile.seek(pos); - byte[] buffer = new byte[1024 * 64]; - long totalRead; - totalRead = pos; - while (totalRead < len) { - int chunkSize = (int) Math.min(buffer.length, (len - totalRead)); - luceneFile.readBytes(buffer, 0, chunkSize); - RawFileChunk rawFileChunk = - RawFileChunk.newBuilder() - .setContent(ByteString.copyFrom(buffer, 0, chunkSize)) - .build(); - rawFileChunkStreamObserver.onNext(rawFileChunk); - totalRead += chunkSize; - } - // EOF - rawFileChunkStreamObserver.onCompleted(); - } - } catch (StatusRuntimeException e) { - logger.warn("error on recvRawFile " + fileInfoRequest.getFileName(), e); - rawFileChunkStreamObserver.onError(e); - } catch (Exception e) { - logger.warn("error on recvRawFile " + fileInfoRequest.getFileName(), e); - rawFileChunkStreamObserver.onError( - Status.INTERNAL - .withDescription("error on recvRawFile: " + fileInfoRequest.getFileName()) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + recvRawFileHandler.handle(fileInfoRequest, rawFileChunkStreamObserver); } @Override public StreamObserver recvRawFileV2( StreamObserver rawFileChunkStreamObserver) { - return new StreamObserver<>() { - private IndexState indexState; - private IndexInput luceneFile; - private byte[] buffer; - private final int ackEvery = - globalState.getConfiguration().getFileCopyConfig().getAckEvery(); - private final int maxInflight = - globalState.getConfiguration().getFileCopyConfig().getMaxInFlight(); - private int lastAckedSeq = 0; - private int currentSeq = 0; - private long fileOffset; - private long fileLength; - - @Override - public void onNext(FileInfo fileInfoRequest) { - try { - if (indexState == null) { - // Start transfer - IndexStateManager indexStateManager = - globalState.getIndexStateManager(fileInfoRequest.getIndexName()); - checkIndexId( - fileInfoRequest.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); - - indexState = indexStateManager.getCurrent(); - ShardState shardState = indexState.getShard(0); - if (shardState == null) { - throw new IllegalStateException( - "Error getting shard state for: " + fileInfoRequest.getIndexName()); - } - luceneFile = - shardState.indexDir.openInput(fileInfoRequest.getFileName(), IOContext.DEFAULT); - luceneFile.seek(fileInfoRequest.getFpStart()); - fileOffset = fileInfoRequest.getFpStart(); - fileLength = luceneFile.length(); - buffer = new byte[globalState.getConfiguration().getFileCopyConfig().getChunkSize()]; - } else { - // ack existing transfer - lastAckedSeq = fileInfoRequest.getAckSeqNum(); - if (lastAckedSeq <= 0) { - throw new IllegalArgumentException( - "Invalid ackSeqNum: " + fileInfoRequest.getAckSeqNum()); - } - } - while (fileOffset < fileLength && (currentSeq - lastAckedSeq) < maxInflight) { - int chunkSize = (int) Math.min(buffer.length, (fileLength - fileOffset)); - luceneFile.readBytes(buffer, 0, chunkSize); - currentSeq++; - RawFileChunk rawFileChunk = - RawFileChunk.newBuilder() - .setContent(ByteString.copyFrom(buffer, 0, chunkSize)) - .setSeqNum(currentSeq) - .setAck((currentSeq % ackEvery) == 0) - .build(); - rawFileChunkStreamObserver.onNext(rawFileChunk); - fileOffset += chunkSize; - if (fileOffset == fileLength) { - rawFileChunkStreamObserver.onCompleted(); - } - } - logger.debug( - String.format("recvRawFileV2: in flight chunks: %d", currentSeq - lastAckedSeq)); - } catch (Throwable t) { - maybeCloseFile(); - rawFileChunkStreamObserver.onError(t); - throw new RuntimeException(t); - } - } - - @Override - public void onError(Throwable t) { - logger.error("recvRawFileV2 onError", t); - maybeCloseFile(); - rawFileChunkStreamObserver.onError(t); - } - - @Override - public void onCompleted() { - maybeCloseFile(); - logger.debug("recvRawFileV2 onCompleted"); - } - - private void maybeCloseFile() { - if (luceneFile != null) { - try { - luceneFile.close(); - } catch (IOException e) { - logger.warn("Error closing index file", e); - } - luceneFile = null; - } - } - }; + return recvRawFileV2Handler.handle(rawFileChunkStreamObserver); } @Override public void recvCopyState( CopyStateRequest request, StreamObserver responseObserver) { - try { - IndexStateManager indexStateManager = - globalState.getIndexStateManager(request.getIndexName()); - checkIndexId(request.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); - - IndexState indexState = indexStateManager.getCurrent(); - CopyState reply = new RecvCopyStateHandler().handle(indexState, request); - logger.debug( - "RecvCopyStateHandler returned, completedMergeFiles count: " - + reply.getCompletedMergeFilesCount()); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (StatusRuntimeException e) { - logger.warn("error while trying recvCopyState " + request.getIndexName(), e); - responseObserver.onError(e); - } catch (Exception e) { - logger.warn( - String.format( - "error on recvCopyState for replicaId: %s, for index: %s", - request.getReplicaId(), request.getIndexName()), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - String.format( - "error on recvCopyState for replicaId: %s, for index: %s", - request.getReplicaId(), request.getIndexName())) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + recvCopyStateHandler.handle(request, responseObserver); } @Override public void copyFiles(CopyFiles request, StreamObserver responseObserver) { - try { - IndexStateManager indexStateManager = - globalState.getIndexStateManager(request.getIndexName()); - checkIndexId(request.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); - - IndexState indexState = indexStateManager.getCurrent(); - CopyFilesHandler copyFilesHandler = new CopyFilesHandler(); - // we need to send multiple responses to client from this method - copyFilesHandler.handle(indexState, request, responseObserver); - logger.info("CopyFilesHandler returned successfully"); - } catch (StatusRuntimeException e) { - logger.warn("error while trying copyFiles " + request.getIndexName(), e); - responseObserver.onError(e); - } catch (Exception e) { - logger.warn( - String.format( - "error on copyFiles for primaryGen: %s, for index: %s", - request.getPrimaryGen(), request.getIndexName()), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - String.format( - "error on copyFiles for primaryGen: %s, for index: %s", - request.getPrimaryGen(), request.getIndexName())) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + copyFilesHandler.handle(request, responseObserver); } @Override public void newNRTPoint(NewNRTPoint request, StreamObserver responseObserver) { - try { - IndexStateManager indexStateManager = - globalState.getIndexStateManager(request.getIndexName()); - checkIndexId(request.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); - - IndexState indexState = indexStateManager.getCurrent(); - NewNRTPointHandler newNRTPointHander = new NewNRTPointHandler(); - TransferStatus reply = newNRTPointHander.handle(indexState, request); - logger.debug( - "NewNRTPointHandler returned status " - + reply.getCode() - + " message: " - + reply.getMessage()); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (StatusRuntimeException e) { - logger.warn( - String.format( - "error on newNRTPoint for indexName: %s, for version: %s, primaryGen: %s", - request.getIndexName(), request.getVersion(), request.getPrimaryGen()), - e); - responseObserver.onError(e); - } catch (Exception e) { - logger.warn( - String.format( - "error on newNRTPoint for indexName: %s, for version: %s, primaryGen: %s", - request.getIndexName(), request.getVersion(), request.getPrimaryGen()), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - String.format( - "error on newNRTPoint for indexName: %s, for version: %s, primaryGen: %s", - request.getIndexName(), request.getVersion(), request.getPrimaryGen())) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + newNRTPointHandler.handle(request, responseObserver); } @Override public void writeNRTPoint( IndexName indexNameRequest, StreamObserver responseObserver) { - try { - IndexStateManager indexStateManager = - globalState.getIndexStateManager(indexNameRequest.getIndexName()); - IndexState indexState = indexStateManager.getCurrent(); - WriteNRTPointHandler writeNRTPointHander = - new WriteNRTPointHandler(indexStateManager.getIndexId()); - SearcherVersion reply = writeNRTPointHander.handle(indexState, indexNameRequest); - logger.debug("WriteNRTPointHandler returned version " + reply.getVersion()); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.warn( - String.format( - "error on writeNRTPoint for indexName: %s", indexNameRequest.getIndexName()), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - String.format( - "error on writeNRTPoint for indexName: %s", - indexNameRequest.getIndexName())) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + writeNRTPointHandler.handle(indexNameRequest, responseObserver); } @Override public void getCurrentSearcherVersion( IndexName indexNameRequest, StreamObserver responseObserver) { - try { - IndexState indexState = globalState.getIndex(indexNameRequest.getIndexName()); - ReplicaCurrentSearchingVersionHandler replicaCurrentSearchingVersionHandler = - new ReplicaCurrentSearchingVersionHandler(); - SearcherVersion reply = - replicaCurrentSearchingVersionHandler.handle(indexState, indexNameRequest); - logger.info("ReplicaCurrentSearchingVersionHandler returned version " + reply.getVersion()); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.warn( - String.format( - "error on getCurrentSearcherVersion for indexName: %s", - indexNameRequest.getIndexName()), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - String.format( - "error on getCurrentSearcherVersion for indexName: %s", - indexNameRequest.getIndexName())) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + replicaCurrentSearchingVersionHandler.handle(indexNameRequest, responseObserver); } @Override public void getConnectedNodes( GetNodesRequest getNodesRequest, StreamObserver responseObserver) { - try { - IndexState indexState = globalState.getIndex(getNodesRequest.getIndexName()); - GetNodesResponse reply = new GetNodesInfoHandler().handle(indexState, getNodesRequest); - logger.debug( - "GetNodesInfoHandler returned GetNodeResponse of size " + reply.getNodesCount()); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.warn("error on GetNodesInfoHandler", e); - responseObserver.onError( - Status.INTERNAL - .withDescription(String.format("error on GetNodesInfoHandler")) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + getNodesInfoHandler.handle(getNodesRequest, responseObserver); } } } diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/AddReplicaHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/AddReplicaHandler.java index 0b5cd7dbb..a25152e70 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/AddReplicaHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/AddReplicaHandler.java @@ -18,20 +18,58 @@ import com.yelp.nrtsearch.server.grpc.AddReplicaRequest; import com.yelp.nrtsearch.server.grpc.AddReplicaResponse; import com.yelp.nrtsearch.server.grpc.ReplicationServerClient; +import com.yelp.nrtsearch.server.luceneserver.handler.Handler; +import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class AddReplicaHandler implements Handler { +public class AddReplicaHandler extends Handler { + private static final Logger logger = LoggerFactory.getLogger(AddReplicaHandler.class); private final boolean useKeepAlive; + private final boolean verifyIndexId; - public AddReplicaHandler(boolean useKeepAlive) { - this.useKeepAlive = useKeepAlive; + public AddReplicaHandler(GlobalState globalState, boolean verifyIndexId) { + super(globalState); + this.useKeepAlive = globalState.getConfiguration().getUseKeepAliveForReplication(); + this.verifyIndexId = verifyIndexId; } @Override - public AddReplicaResponse handle(IndexState indexState, AddReplicaRequest addReplicaRequest) { + public void handle( + AddReplicaRequest addReplicaRequest, StreamObserver responseObserver) { + try { + IndexStateManager indexStateManager = + getGlobalState().getIndexStateManager(addReplicaRequest.getIndexName()); + checkIndexId(addReplicaRequest.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); + + IndexState indexState = indexStateManager.getCurrent(); + AddReplicaResponse reply = handle(indexState, addReplicaRequest); + logger.info("AddReplicaHandler returned " + reply); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } catch (StatusRuntimeException e) { + logger.warn("error while trying addReplicas " + addReplicaRequest.getIndexName(), e); + responseObserver.onError(e); + } catch (Exception e) { + logger.warn("error while trying addReplicas " + addReplicaRequest.getIndexName(), e); + responseObserver.onError( + Status.INTERNAL + .withDescription( + "error while trying to addReplicas for index: " + + addReplicaRequest.getIndexName()) + .augmentDescription(e.getMessage()) + .asRuntimeException()); + } + } + + private AddReplicaResponse handle(IndexState indexState, AddReplicaRequest addReplicaRequest) { ShardState shardState = indexState.getShard(0); - if (shardState.isPrimary() == false) { + if (!shardState.isPrimary()) { throw new IllegalArgumentException( "index \"" + indexState.getName() + "\" was not started or is not a primary"); } diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/CopyFilesHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/CopyFilesHandler.java index 3609ecd05..802df6a91 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/CopyFilesHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/CopyFilesHandler.java @@ -18,20 +18,63 @@ import com.yelp.nrtsearch.server.grpc.CopyFiles; import com.yelp.nrtsearch.server.grpc.TransferStatus; import com.yelp.nrtsearch.server.grpc.TransferStatusCode; +import com.yelp.nrtsearch.server.luceneserver.handler.Handler; +import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager; import com.yelp.nrtsearch.server.monitoring.NrtMetrics; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.replicator.nrt.CopyJob; import org.apache.lucene.replicator.nrt.FileMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class CopyFilesHandler implements Handler { +public class CopyFilesHandler extends Handler { + private static final Logger logger = LoggerFactory.getLogger(CopyFilesHandler.class); private static final long CHECK_SLEEP_TIME_MS = 10; private static final int CHECKS_PER_STATUS_MESSAGE = 3000; // at least 30s between status messages + private final boolean verifyIndexId; + + public CopyFilesHandler(GlobalState globalState, boolean verifyIndexId) { + super(globalState); + this.verifyIndexId = verifyIndexId; + } @Override - public void handle( + public void handle(CopyFiles request, StreamObserver responseObserver) { + try { + IndexStateManager indexStateManager = + getGlobalState().getIndexStateManager(request.getIndexName()); + checkIndexId(request.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); + + IndexState indexState = indexStateManager.getCurrent(); + // we need to send multiple responses to client from this method + handle(indexState, request, responseObserver); + logger.info("CopyFilesHandler returned successfully"); + } catch (StatusRuntimeException e) { + logger.warn("error while trying copyFiles " + request.getIndexName(), e); + responseObserver.onError(e); + } catch (Exception e) { + logger.warn( + String.format( + "error on copyFiles for primaryGen: %s, for index: %s", + request.getPrimaryGen(), request.getIndexName()), + e); + responseObserver.onError( + Status.INTERNAL + .withDescription( + String.format( + "error on copyFiles for primaryGen: %s, for index: %s", + request.getPrimaryGen(), request.getIndexName())) + .augmentDescription(e.getMessage()) + .asRuntimeException()); + } + } + + private void handle( IndexState indexState, CopyFiles copyFilesRequest, StreamObserver responseObserver) @@ -114,10 +157,4 @@ public void handle( NrtMetrics.nrtMergeSize.labelValues(indexName).observe(job.getTotalBytesCopied()); } } - - @Override - public TransferStatus handle(IndexState indexState, CopyFiles protoRequest) - throws HandlerException { - throw new UnsupportedOperationException("This method is in not implemented for this class"); - } } diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/GetNodesInfoHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/GetNodesInfoHandler.java index 6605ffad8..7d0bd520e 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/GetNodesInfoHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/GetNodesInfoHandler.java @@ -18,17 +18,41 @@ import com.yelp.nrtsearch.server.grpc.GetNodesRequest; import com.yelp.nrtsearch.server.grpc.GetNodesResponse; import com.yelp.nrtsearch.server.grpc.NodeInfo; +import com.yelp.nrtsearch.server.luceneserver.handler.Handler; import com.yelp.nrtsearch.server.utils.HostPort; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; import java.util.Collection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class GetNodesInfoHandler implements Handler { +public class GetNodesInfoHandler extends Handler { private static final Logger logger = LoggerFactory.getLogger(GetNodesInfoHandler.class); + public GetNodesInfoHandler(GlobalState globalState) { + super(globalState); + } + @Override - public GetNodesResponse handle(IndexState indexState, GetNodesRequest getNodesRequest) - throws HandlerException { + public void handle( + GetNodesRequest getNodesRequest, StreamObserver responseObserver) { + try { + IndexState indexState = getGlobalState().getIndex(getNodesRequest.getIndexName()); + GetNodesResponse reply = handle(indexState); + logger.debug("GetNodesInfoHandler returned GetNodeResponse of size " + reply.getNodesCount()); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } catch (Exception e) { + logger.warn("error on GetNodesInfoHandler", e); + responseObserver.onError( + Status.INTERNAL + .withDescription("error on GetNodesInfoHandler") + .augmentDescription(e.getMessage()) + .asRuntimeException()); + } + } + + private GetNodesResponse handle(IndexState indexState) { GetNodesResponse.Builder builder = GetNodesResponse.newBuilder(); ShardState shardState = indexState.getShard(0); if (!shardState.isPrimary() || !shardState.isStarted()) { diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/Handler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/Handler.java deleted file mode 100644 index 97be3aad4..000000000 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/Handler.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2020 Yelp Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.yelp.nrtsearch.server.luceneserver; - -import com.google.protobuf.GeneratedMessageV3; -import com.yelp.nrtsearch.server.grpc.ReplicationServerClient; -import io.grpc.stub.StreamObserver; - -/* Interface for handlers that take in an indexState and the protoBuff request and returns the protoBuff response */ -public interface Handler { - - default boolean isValidMagicHeader(int magicHeader) { - return magicHeader == ReplicationServerClient.BINARY_MAGIC; - } - - default void handle(IndexState indexState, T protoRequest, StreamObserver responseObserver) - throws Exception { - throw new UnsupportedOperationException("This method is not supported"); - } - - S handle(IndexState indexState, T protoRequest) throws HandlerException; - - class HandlerException extends Exception { - public HandlerException(Throwable err) { - super(err); - } - - public HandlerException(String errorMessage) { - super(errorMessage); - } - - public HandlerException(String errorMessage, Throwable err) { - super(errorMessage, err); - } - } -} diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/NewNRTPointHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/NewNRTPointHandler.java index 66cbeb05e..17f9ae2ee 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/NewNRTPointHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/NewNRTPointHandler.java @@ -18,13 +18,65 @@ import com.yelp.nrtsearch.server.grpc.NewNRTPoint; import com.yelp.nrtsearch.server.grpc.TransferStatus; import com.yelp.nrtsearch.server.grpc.TransferStatusCode; +import com.yelp.nrtsearch.server.luceneserver.handler.Handler; +import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class NewNRTPointHandler implements Handler { +public class NewNRTPointHandler extends Handler { + private static final Logger logger = LoggerFactory.getLogger(NewNRTPointHandler.class); + private final boolean verifyIndexId; + + public NewNRTPointHandler(GlobalState globalState, boolean verifyIndexId) { + super(globalState); + this.verifyIndexId = verifyIndexId; + } @Override - public TransferStatus handle(IndexState indexState, NewNRTPoint newNRTPointRequest) - throws HandlerException { + public void handle(NewNRTPoint request, StreamObserver responseObserver) { + try { + IndexStateManager indexStateManager = + getGlobalState().getIndexStateManager(request.getIndexName()); + checkIndexId(request.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); + + IndexState indexState = indexStateManager.getCurrent(); + TransferStatus reply = handle(indexState, request); + logger.debug( + "NewNRTPointHandler returned status " + + reply.getCode() + + " message: " + + reply.getMessage()); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } catch (StatusRuntimeException e) { + logger.warn( + String.format( + "error on newNRTPoint for indexName: %s, for version: %s, primaryGen: %s", + request.getIndexName(), request.getVersion(), request.getPrimaryGen()), + e); + responseObserver.onError(e); + } catch (Exception e) { + logger.warn( + String.format( + "error on newNRTPoint for indexName: %s, for version: %s, primaryGen: %s", + request.getIndexName(), request.getVersion(), request.getPrimaryGen()), + e); + responseObserver.onError( + Status.INTERNAL + .withDescription( + String.format( + "error on newNRTPoint for indexName: %s, for version: %s, primaryGen: %s", + request.getIndexName(), request.getVersion(), request.getPrimaryGen())) + .augmentDescription(e.getMessage()) + .asRuntimeException()); + } + } + + private TransferStatus handle(IndexState indexState, NewNRTPoint newNRTPointRequest) { ShardState shardState = indexState.getShard(0); if (shardState.isReplica() == false) { throw new IllegalArgumentException( diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/RecvCopyStateHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/RecvCopyStateHandler.java index e553382a8..2cdba0e61 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/RecvCopyStateHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/RecvCopyStateHandler.java @@ -20,13 +20,62 @@ import com.yelp.nrtsearch.server.grpc.CopyStateRequest; import com.yelp.nrtsearch.server.grpc.FileMetadata; import com.yelp.nrtsearch.server.grpc.FilesMetadata; +import com.yelp.nrtsearch.server.luceneserver.handler.Handler; +import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.Map; import org.apache.lucene.replicator.nrt.FileMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RecvCopyStateHandler extends Handler { + private static final Logger logger = LoggerFactory.getLogger(RecvCopyStateHandler.class); + + private final boolean verifyIndexId; + + public RecvCopyStateHandler(GlobalState globalState, boolean verifyIndexId) { + super(globalState); + this.verifyIndexId = verifyIndexId; + } -public class RecvCopyStateHandler implements Handler { @Override - public CopyState handle(IndexState indexState, CopyStateRequest copyStateRequest) { + public void handle(CopyStateRequest request, StreamObserver responseObserver) { + try { + IndexStateManager indexStateManager = + getGlobalState().getIndexStateManager(request.getIndexName()); + checkIndexId(request.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); + + IndexState indexState = indexStateManager.getCurrent(); + CopyState reply = handle(indexState, request); + logger.debug( + "RecvCopyStateHandler returned, completedMergeFiles count: " + + reply.getCompletedMergeFilesCount()); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } catch (StatusRuntimeException e) { + logger.warn("error while trying recvCopyState " + request.getIndexName(), e); + responseObserver.onError(e); + } catch (Exception e) { + logger.warn( + String.format( + "error on recvCopyState for replicaId: %s, for index: %s", + request.getReplicaId(), request.getIndexName()), + e); + responseObserver.onError( + Status.INTERNAL + .withDescription( + String.format( + "error on recvCopyState for replicaId: %s, for index: %s", + request.getReplicaId(), request.getIndexName())) + .augmentDescription(e.getMessage()) + .asRuntimeException()); + } + } + + private CopyState handle(IndexState indexState, CopyStateRequest copyStateRequest) { ShardState shardState = indexState.getShard(0); if (shardState.isPrimary() == false) { throw new IllegalArgumentException( diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/ReplicaCurrentSearchingVersionHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/ReplicaCurrentSearchingVersionHandler.java index cd831db1d..16e1fb014 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/ReplicaCurrentSearchingVersionHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/ReplicaCurrentSearchingVersionHandler.java @@ -17,12 +17,47 @@ import com.yelp.nrtsearch.server.grpc.IndexName; import com.yelp.nrtsearch.server.grpc.SearcherVersion; +import com.yelp.nrtsearch.server.luceneserver.handler.Handler; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class ReplicaCurrentSearchingVersionHandler implements Handler { +public class ReplicaCurrentSearchingVersionHandler extends Handler { + private static final Logger logger = + LoggerFactory.getLogger(ReplicaCurrentSearchingVersionHandler.class); + + public ReplicaCurrentSearchingVersionHandler(GlobalState globalState) { + super(globalState); + } @Override - public SearcherVersion handle(IndexState indexState, IndexName indexNameRequest) + public void handle(IndexName indexNameRequest, StreamObserver responseObserver) { + try { + IndexState indexState = getGlobalState().getIndex(indexNameRequest.getIndexName()); + SearcherVersion reply = handle(indexState, indexNameRequest); + logger.info("ReplicaCurrentSearchingVersionHandler returned version " + reply.getVersion()); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } catch (Exception e) { + logger.warn( + String.format( + "error on getCurrentSearcherVersion for indexName: %s", + indexNameRequest.getIndexName()), + e); + responseObserver.onError( + Status.INTERNAL + .withDescription( + String.format( + "error on getCurrentSearcherVersion for indexName: %s", + indexNameRequest.getIndexName())) + .augmentDescription(e.getMessage()) + .asRuntimeException()); + } + } + + private SearcherVersion handle(IndexState indexState, IndexName indexNameRequest) throws HandlerException { ShardState shardState = indexState.getShard(0); if (!shardState.isReplica() || !shardState.isStarted()) { diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/WriteNRTPointHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/WriteNRTPointHandler.java index 9fa545843..ff8286c00 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/WriteNRTPointHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/WriteNRTPointHandler.java @@ -18,8 +18,11 @@ import com.yelp.nrtsearch.server.grpc.IndexName; import com.yelp.nrtsearch.server.grpc.ReplicationServerClient; import com.yelp.nrtsearch.server.grpc.SearcherVersion; +import com.yelp.nrtsearch.server.luceneserver.handler.Handler; +import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.Iterator; import java.util.Queue; @@ -28,17 +31,40 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class WriteNRTPointHandler implements Handler { +public class WriteNRTPointHandler extends Handler { private static final Logger logger = LoggerFactory.getLogger(WriteNRTPointHandler.class); - private final String indexId; - public WriteNRTPointHandler(String indexId) { - this.indexId = indexId; + public WriteNRTPointHandler(GlobalState globalState) { + super(globalState); } @Override - public SearcherVersion handle(IndexState indexState, IndexName protoRequest) - throws HandlerException { + public void handle(IndexName indexNameRequest, StreamObserver responseObserver) { + try { + IndexStateManager indexStateManager = + getGlobalState().getIndexStateManager(indexNameRequest.getIndexName()); + String indexId = indexStateManager.getIndexId(); + IndexState indexState = indexStateManager.getCurrent(); + SearcherVersion reply = handle(indexState, indexId); + logger.debug("WriteNRTPointHandler returned version " + reply.getVersion()); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } catch (Exception e) { + logger.warn( + String.format( + "error on writeNRTPoint for indexName: %s", indexNameRequest.getIndexName()), + e); + responseObserver.onError( + Status.INTERNAL + .withDescription( + String.format( + "error on writeNRTPoint for indexName: %s", indexNameRequest.getIndexName())) + .augmentDescription(e.getMessage()) + .asRuntimeException()); + } + } + + private SearcherVersion handle(IndexState indexState, String indexId) { final ShardState shardState = indexState.getShard(0); if (shardState.isPrimary() == false) { diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/Handler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/Handler.java index fa7472b66..d84c435c9 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/Handler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/Handler.java @@ -15,10 +15,12 @@ */ package com.yelp.nrtsearch.server.luceneserver.handler; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.GeneratedMessageV3; -import com.yelp.nrtsearch.server.grpc.AddDocumentRequest; import com.yelp.nrtsearch.server.grpc.LuceneServerStubBuilder; +import com.yelp.nrtsearch.server.grpc.ReplicationServerClient; import com.yelp.nrtsearch.server.luceneserver.GlobalState; +import io.grpc.Status; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; import org.slf4j.Logger; @@ -52,7 +54,7 @@ public void handle(T protoRequest, StreamObserver responseObserver) { throw new UnsupportedOperationException("This method is not supported"); } - public StreamObserver handle(StreamObserver responseObserver) { + public StreamObserver handle(StreamObserver responseObserver) { throw new UnsupportedOperationException("This method is not supported"); } @@ -77,6 +79,23 @@ protected void setResponseCompression( } } + protected boolean isValidMagicHeader(int magicHeader) { + return magicHeader == ReplicationServerClient.BINARY_MAGIC; + } + + @VisibleForTesting + public static void checkIndexId(String actual, String expected, boolean throwException) { + if (!actual.equals(expected)) { + String message = + String.format("Index id mismatch, expected: %s, actual: %s", expected, actual); + if (throwException) { + throw Status.FAILED_PRECONDITION.withDescription(message).asRuntimeException(); + } else { + logger.warn(message); + } + } + } + public static class HandlerException extends Exception { public HandlerException(Throwable err) { super(err); diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/RecvRawFileHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/RecvRawFileHandler.java new file mode 100644 index 000000000..43f197c0b --- /dev/null +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/RecvRawFileHandler.java @@ -0,0 +1,84 @@ +/* + * Copyright 2024 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.luceneserver.handler; + +import com.google.protobuf.ByteString; +import com.yelp.nrtsearch.server.grpc.FileInfo; +import com.yelp.nrtsearch.server.grpc.RawFileChunk; +import com.yelp.nrtsearch.server.luceneserver.GlobalState; +import com.yelp.nrtsearch.server.luceneserver.IndexState; +import com.yelp.nrtsearch.server.luceneserver.ShardState; +import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RecvRawFileHandler extends Handler { + private static final Logger logger = LoggerFactory.getLogger(RecvRawFileHandler.class); + private final boolean verifyIndexId; + + public RecvRawFileHandler(GlobalState globalState, boolean verifyIndexId) { + super(globalState); + this.verifyIndexId = verifyIndexId; + } + + @Override + public void handle(FileInfo fileInfoRequest, StreamObserver responseObserver) { + try { + IndexStateManager indexStateManager = + getGlobalState().getIndexStateManager(fileInfoRequest.getIndexName()); + checkIndexId(fileInfoRequest.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); + + IndexState indexState = indexStateManager.getCurrent(); + ShardState shardState = indexState.getShard(0); + try (IndexInput luceneFile = + shardState.indexDir.openInput(fileInfoRequest.getFileName(), IOContext.DEFAULT)) { + long len = luceneFile.length(); + long pos = fileInfoRequest.getFpStart(); + luceneFile.seek(pos); + byte[] buffer = new byte[1024 * 64]; + long totalRead; + totalRead = pos; + while (totalRead < len) { + int chunkSize = (int) Math.min(buffer.length, (len - totalRead)); + luceneFile.readBytes(buffer, 0, chunkSize); + RawFileChunk rawFileChunk = + RawFileChunk.newBuilder() + .setContent(ByteString.copyFrom(buffer, 0, chunkSize)) + .build(); + responseObserver.onNext(rawFileChunk); + totalRead += chunkSize; + } + // EOF + responseObserver.onCompleted(); + } + } catch (StatusRuntimeException e) { + logger.warn("error on recvRawFile " + fileInfoRequest.getFileName(), e); + responseObserver.onError(e); + } catch (Exception e) { + logger.warn("error on recvRawFile " + fileInfoRequest.getFileName(), e); + responseObserver.onError( + Status.INTERNAL + .withDescription("error on recvRawFile: " + fileInfoRequest.getFileName()) + .augmentDescription(e.getMessage()) + .asRuntimeException()); + } + } +} diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/RecvRawFileV2Handler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/RecvRawFileV2Handler.java new file mode 100644 index 000000000..6d02af1e3 --- /dev/null +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/RecvRawFileV2Handler.java @@ -0,0 +1,137 @@ +/* + * Copyright 2024 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.luceneserver.handler; + +import com.google.protobuf.ByteString; +import com.yelp.nrtsearch.server.grpc.FileInfo; +import com.yelp.nrtsearch.server.grpc.RawFileChunk; +import com.yelp.nrtsearch.server.luceneserver.GlobalState; +import com.yelp.nrtsearch.server.luceneserver.IndexState; +import com.yelp.nrtsearch.server.luceneserver.ShardState; +import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RecvRawFileV2Handler extends Handler { + private static final Logger logger = LoggerFactory.getLogger(RecvRawFileV2Handler.class); + private final boolean verifyIndexId; + + public RecvRawFileV2Handler(GlobalState globalState, boolean verifyIndexId) { + super(globalState); + this.verifyIndexId = verifyIndexId; + } + + @Override + public StreamObserver handle(StreamObserver responseObserver) { + return new StreamObserver<>() { + private IndexState indexState; + private IndexInput luceneFile; + private byte[] buffer; + private final int ackEvery = + getGlobalState().getConfiguration().getFileCopyConfig().getAckEvery(); + private final int maxInflight = + getGlobalState().getConfiguration().getFileCopyConfig().getMaxInFlight(); + private int lastAckedSeq = 0; + private int currentSeq = 0; + private long fileOffset; + private long fileLength; + + @Override + public void onNext(FileInfo fileInfoRequest) { + try { + if (indexState == null) { + // Start transfer + IndexStateManager indexStateManager = + getGlobalState().getIndexStateManager(fileInfoRequest.getIndexName()); + checkIndexId( + fileInfoRequest.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); + + indexState = indexStateManager.getCurrent(); + ShardState shardState = indexState.getShard(0); + if (shardState == null) { + throw new IllegalStateException( + "Error getting shard state for: " + fileInfoRequest.getIndexName()); + } + luceneFile = + shardState.indexDir.openInput(fileInfoRequest.getFileName(), IOContext.DEFAULT); + luceneFile.seek(fileInfoRequest.getFpStart()); + fileOffset = fileInfoRequest.getFpStart(); + fileLength = luceneFile.length(); + buffer = + new byte[getGlobalState().getConfiguration().getFileCopyConfig().getChunkSize()]; + } else { + // ack existing transfer + lastAckedSeq = fileInfoRequest.getAckSeqNum(); + if (lastAckedSeq <= 0) { + throw new IllegalArgumentException( + "Invalid ackSeqNum: " + fileInfoRequest.getAckSeqNum()); + } + } + while (fileOffset < fileLength && (currentSeq - lastAckedSeq) < maxInflight) { + int chunkSize = (int) Math.min(buffer.length, (fileLength - fileOffset)); + luceneFile.readBytes(buffer, 0, chunkSize); + currentSeq++; + RawFileChunk rawFileChunk = + RawFileChunk.newBuilder() + .setContent(ByteString.copyFrom(buffer, 0, chunkSize)) + .setSeqNum(currentSeq) + .setAck((currentSeq % ackEvery) == 0) + .build(); + responseObserver.onNext(rawFileChunk); + fileOffset += chunkSize; + if (fileOffset == fileLength) { + responseObserver.onCompleted(); + } + } + logger.debug( + String.format("recvRawFileV2: in flight chunks: %d", currentSeq - lastAckedSeq)); + } catch (Throwable t) { + maybeCloseFile(); + responseObserver.onError(t); + throw new RuntimeException(t); + } + } + + @Override + public void onError(Throwable t) { + logger.error("recvRawFileV2 onError", t); + maybeCloseFile(); + responseObserver.onError(t); + } + + @Override + public void onCompleted() { + maybeCloseFile(); + logger.debug("recvRawFileV2 onCompleted"); + } + + private void maybeCloseFile() { + if (luceneFile != null) { + try { + luceneFile.close(); + } catch (IOException e) { + logger.warn("Error closing index file", e); + } + luceneFile = null; + } + } + }; + } +} diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/SendRawFileHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/SendRawFileHandler.java new file mode 100644 index 000000000..6dd465012 --- /dev/null +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/SendRawFileHandler.java @@ -0,0 +1,117 @@ +/* + * Copyright 2024 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.luceneserver.handler; + +import com.yelp.nrtsearch.server.grpc.RawFileChunk; +import com.yelp.nrtsearch.server.grpc.TransferStatus; +import com.yelp.nrtsearch.server.grpc.TransferStatusCode; +import com.yelp.nrtsearch.server.luceneserver.GlobalState; +import io.grpc.stub.StreamObserver; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SendRawFileHandler extends Handler { + public SendRawFileHandler(GlobalState globalState) { + super(globalState); + } + + @Override + public StreamObserver handle(StreamObserver responseObserver) { + OutputStream outputStream = null; + try { + // TODO: where do we write these files to? + outputStream = new FileOutputStream(File.createTempFile("tempfile", ".tmp")); + } catch (IOException e) { + responseObserver.onError(e); + } + return new SendRawFileStreamObserver(outputStream, responseObserver); + } + + @Override + public void handle(RawFileChunk rawFileChunk, StreamObserver responseObserver) {} + + static class SendRawFileStreamObserver implements StreamObserver { + private static final Logger logger = + LoggerFactory.getLogger(SendRawFileStreamObserver.class.getName()); + private final OutputStream outputStream; + private final StreamObserver responseObserver; + private final long startTime; + + SendRawFileStreamObserver( + OutputStream outputStream, StreamObserver responseObserver) { + this.outputStream = outputStream; + this.responseObserver = responseObserver; + startTime = System.nanoTime(); + } + + @Override + public void onNext(RawFileChunk value) { + // called by client once per chunk of data + try { + logger.trace("sendRawFile onNext"); + value.getContent().writeTo(outputStream); + } catch (IOException e) { + try { + outputStream.close(); + } catch (IOException ex) { + logger.warn("error trying to close outputStream", ex); + } finally { + // we either had error in writing to outputStream or cant close it, + // either case we need to raise it back to client + responseObserver.onError(e); + } + } + } + + @Override + public void onError(Throwable t) { + logger.warn("sendRawFile cancelled", t); + try { + outputStream.close(); + } catch (IOException e) { + logger.warn("error while trying to close outputStream", e); + } finally { + // we want to raise error always here + responseObserver.onError(t); + } + } + + @Override + public void onCompleted() { + logger.info("sendRawFile completed"); + // called by client after the entire file is sent + try { + outputStream.close(); + // TOOD: should we send fileSize copied? + long endTime = System.nanoTime(); + long totalTimeInMilliSeoncds = (endTime - startTime) / (1000 * 1000); + responseObserver.onNext( + TransferStatus.newBuilder() + .setCode(TransferStatusCode.Done) + .setMessage(String.valueOf(totalTimeInMilliSeoncds)) + .build()); + responseObserver.onCompleted(); + } catch (IOException e) { + logger.warn("error while trying to close outputStream", e); + responseObserver.onError(e); + } + } + } +} From cfc1c388ea22026bab5dc28989e1fc662e2a9543 Mon Sep 17 00:00:00 2001 From: Sarthak Nandi Date: Fri, 4 Oct 2024 08:59:21 -0700 Subject: [PATCH 2/5] Fix tests --- .../com/yelp/nrtsearch/server/grpc/CopyFileTest.java | 11 ++++++++++- .../server/grpc/ReplicationServerClientTest.java | 4 ++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/yelp/nrtsearch/server/grpc/CopyFileTest.java b/src/test/java/com/yelp/nrtsearch/server/grpc/CopyFileTest.java index 9f99b38ef..9c1bd8cdc 100644 --- a/src/test/java/com/yelp/nrtsearch/server/grpc/CopyFileTest.java +++ b/src/test/java/com/yelp/nrtsearch/server/grpc/CopyFileTest.java @@ -16,8 +16,12 @@ package com.yelp.nrtsearch.server.grpc; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.google.protobuf.ByteString; +import com.yelp.nrtsearch.server.config.LuceneServerConfiguration; +import com.yelp.nrtsearch.server.luceneserver.GlobalState; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; @@ -52,11 +56,16 @@ public void sendRawFile() throws Exception { // Generate a unique in-process server name. String serverName = InProcessServerBuilder.generateName(); + GlobalState mockGlobalState = mock(GlobalState.class); + LuceneServerConfiguration mockConfiguration = mock(LuceneServerConfiguration.class); + when(mockGlobalState.getConfiguration()).thenReturn(mockConfiguration); + when(mockConfiguration.getUseKeepAliveForReplication()).thenReturn(true); + // Create a server, add service, start, and register for automatic graceful shutdown. grpcCleanup.register( InProcessServerBuilder.forName(serverName) .directExecutor() - .addService(new LuceneServer.ReplicationServerImpl(null, false)) + .addService(new LuceneServer.ReplicationServerImpl(mockGlobalState, false)) .build() .start()); diff --git a/src/test/java/com/yelp/nrtsearch/server/grpc/ReplicationServerClientTest.java b/src/test/java/com/yelp/nrtsearch/server/grpc/ReplicationServerClientTest.java index 240539696..19538e3a7 100644 --- a/src/test/java/com/yelp/nrtsearch/server/grpc/ReplicationServerClientTest.java +++ b/src/test/java/com/yelp/nrtsearch/server/grpc/ReplicationServerClientTest.java @@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.yelp.nrtsearch.clientlib.Node; +import com.yelp.nrtsearch.server.config.LuceneServerConfiguration; import com.yelp.nrtsearch.server.grpc.LuceneServer.ReplicationServerImpl; import com.yelp.nrtsearch.server.grpc.ReplicationServerClient.DiscoveryFileAndPort; import com.yelp.nrtsearch.server.luceneserver.GlobalState; @@ -72,6 +73,9 @@ private Server getBasicReplicationServer() throws IOException { // we only need to test connectivity for now GlobalState mockGlobalState = mock(GlobalState.class); when(mockGlobalState.getIndex(any(String.class))).thenThrow(new RuntimeException("Expected")); + LuceneServerConfiguration mockConfiguration = mock(LuceneServerConfiguration.class); + when(mockGlobalState.getConfiguration()).thenReturn(mockConfiguration); + when(mockConfiguration.getUseKeepAliveForReplication()).thenReturn(true); return ServerBuilder.forPort(0) .addService(new ReplicationServerImpl(mockGlobalState, false)) From c8005df77b221a5c24a069cd30aeb4af43d04690 Mon Sep 17 00:00:00 2001 From: Sarthak Nandi Date: Fri, 4 Oct 2024 09:03:50 -0700 Subject: [PATCH 3/5] Remove checkIndexId from LuceneServer --- .../yelp/nrtsearch/server/grpc/LuceneServer.java | 14 -------------- .../nrtsearch/server/grpc/VerifyIndexIdTest.java | 9 +++++---- 2 files changed, 5 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java b/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java index eaf2483e1..88f1a73d0 100644 --- a/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java +++ b/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java @@ -104,7 +104,6 @@ import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.ServerInterceptors; -import io.grpc.Status; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import io.grpc.protobuf.services.ProtoReflectionService; import io.grpc.stub.StreamObserver; @@ -686,19 +685,6 @@ static class ReplicationServerImpl extends ReplicationServerGrpc.ReplicationServ private final SendRawFileHandler sendRawFileHandler; private final WriteNRTPointHandler writeNRTPointHandler; - @VisibleForTesting - static void checkIndexId(String actual, String expected, boolean throwException) { - if (!actual.equals(expected)) { - String message = - String.format("Index id mismatch, expected: %s, actual: %s", expected, actual); - if (throwException) { - throw Status.FAILED_PRECONDITION.withDescription(message).asRuntimeException(); - } else { - logger.warn(message); - } - } - } - public ReplicationServerImpl(GlobalState globalState, boolean verifyIndexId) { this.globalState = globalState; this.verifyIndexId = verifyIndexId; diff --git a/src/test/java/com/yelp/nrtsearch/server/grpc/VerifyIndexIdTest.java b/src/test/java/com/yelp/nrtsearch/server/grpc/VerifyIndexIdTest.java index 3dc9f097a..e791ac56b 100644 --- a/src/test/java/com/yelp/nrtsearch/server/grpc/VerifyIndexIdTest.java +++ b/src/test/java/com/yelp/nrtsearch/server/grpc/VerifyIndexIdTest.java @@ -18,6 +18,7 @@ import static org.junit.Assert.*; import com.yelp.nrtsearch.server.config.IndexStartConfig; +import com.yelp.nrtsearch.server.luceneserver.handler.Handler; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; @@ -260,23 +261,23 @@ public void testReplication() throws IOException { @Test public void testCheckIndexId_same() { - LuceneServer.ReplicationServerImpl.checkIndexId("id1", "id1", false); + Handler.checkIndexId("id1", "id1", false); } @Test public void testCheckIndexId_different() { - LuceneServer.ReplicationServerImpl.checkIndexId("id1", "id2", false); + Handler.checkIndexId("id1", "id2", false); } @Test public void testCheckIndexId_verifySame() { - LuceneServer.ReplicationServerImpl.checkIndexId("id1", "id1", true); + Handler.checkIndexId("id1", "id1", true); } @Test public void testCheckIndexId_verifyDifferent() { try { - LuceneServer.ReplicationServerImpl.checkIndexId("id1", "id2", true); + Handler.checkIndexId("id1", "id2", true); fail(); } catch (StatusRuntimeException e) { assertEquals(Status.FAILED_PRECONDITION.getCode(), e.getStatus().getCode()); From e7bb6b6cfddc8d619f98a7208a2afd12c7131954 Mon Sep 17 00:00:00 2001 From: Sarthak Nandi Date: Fri, 4 Oct 2024 09:40:17 -0700 Subject: [PATCH 4/5] Correct imports after merge --- .../java/com/yelp/nrtsearch/server/grpc/LuceneServer.java | 3 --- .../nrtsearch/server/luceneserver/AddReplicaHandler.java | 5 +++-- .../nrtsearch/server/luceneserver/CopyFilesHandler.java | 5 +++-- .../server/luceneserver/GetNodesInfoHandler.java | 1 + .../nrtsearch/server/luceneserver/NewNRTPointHandler.java | 5 +++-- .../server/luceneserver/RecvCopyStateHandler.java | 7 ++++--- .../ReplicaCurrentSearchingVersionHandler.java | 8 ++++---- .../server/luceneserver/WriteNRTPointHandler.java | 3 ++- .../nrtsearch/server/luceneserver/handler/Handler.java | 2 +- .../server/luceneserver/handler/RecvRawFileHandler.java | 6 +++--- .../server/luceneserver/handler/RecvRawFileV2Handler.java | 6 +++--- .../server/luceneserver/handler/SendRawFileHandler.java | 2 +- 12 files changed, 28 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java b/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java index 5eb364507..8c2a282eb 100644 --- a/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java +++ b/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java @@ -74,9 +74,6 @@ import com.yelp.nrtsearch.server.luceneserver.handler.StopIndexHandler; import com.yelp.nrtsearch.server.luceneserver.handler.UpdateFieldsHandler; import com.yelp.nrtsearch.server.luceneserver.highlights.HighlighterService; -import com.yelp.nrtsearch.server.luceneserver.index.IndexState; -import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager; -import com.yelp.nrtsearch.server.luceneserver.index.ShardState; import com.yelp.nrtsearch.server.luceneserver.index.handlers.LiveSettingsV2Handler; import com.yelp.nrtsearch.server.luceneserver.index.handlers.SettingsV2Handler; import com.yelp.nrtsearch.server.luceneserver.logging.HitsLoggerCreator; diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/AddReplicaHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/AddReplicaHandler.java index cf0dce051..62aff8028 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/AddReplicaHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/AddReplicaHandler.java @@ -18,10 +18,11 @@ import com.yelp.nrtsearch.server.grpc.AddReplicaRequest; import com.yelp.nrtsearch.server.grpc.AddReplicaResponse; import com.yelp.nrtsearch.server.grpc.ReplicationServerClient; -import com.yelp.nrtsearch.server.luceneserver.index.IndexState; -import com.yelp.nrtsearch.server.luceneserver.index.ShardState; import com.yelp.nrtsearch.server.luceneserver.handler.Handler; +import com.yelp.nrtsearch.server.luceneserver.index.IndexState; import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager; +import com.yelp.nrtsearch.server.luceneserver.index.ShardState; +import com.yelp.nrtsearch.server.luceneserver.state.GlobalState; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/CopyFilesHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/CopyFilesHandler.java index 1390d8ccc..2aa72f945 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/CopyFilesHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/CopyFilesHandler.java @@ -18,11 +18,12 @@ import com.yelp.nrtsearch.server.grpc.CopyFiles; import com.yelp.nrtsearch.server.grpc.TransferStatus; import com.yelp.nrtsearch.server.grpc.TransferStatusCode; +import com.yelp.nrtsearch.server.luceneserver.handler.Handler; import com.yelp.nrtsearch.server.luceneserver.index.IndexState; +import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager; import com.yelp.nrtsearch.server.luceneserver.index.ShardState; import com.yelp.nrtsearch.server.luceneserver.nrt.NRTReplicaNode; -import com.yelp.nrtsearch.server.luceneserver.handler.Handler; -import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager; +import com.yelp.nrtsearch.server.luceneserver.state.GlobalState; import com.yelp.nrtsearch.server.monitoring.NrtMetrics; import io.grpc.Status; import io.grpc.StatusRuntimeException; diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/GetNodesInfoHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/GetNodesInfoHandler.java index f031157e7..bfb909f25 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/GetNodesInfoHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/GetNodesInfoHandler.java @@ -22,6 +22,7 @@ import com.yelp.nrtsearch.server.luceneserver.index.IndexState; import com.yelp.nrtsearch.server.luceneserver.index.ShardState; import com.yelp.nrtsearch.server.luceneserver.nrt.NRTPrimaryNode; +import com.yelp.nrtsearch.server.luceneserver.state.GlobalState; import com.yelp.nrtsearch.server.utils.HostPort; import io.grpc.Status; import io.grpc.stub.StreamObserver; diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/NewNRTPointHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/NewNRTPointHandler.java index 05052b575..c75bef62c 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/NewNRTPointHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/NewNRTPointHandler.java @@ -19,12 +19,13 @@ import com.yelp.nrtsearch.server.grpc.TransferStatus; import com.yelp.nrtsearch.server.grpc.TransferStatusCode; import com.yelp.nrtsearch.server.luceneserver.handler.Handler; +import com.yelp.nrtsearch.server.luceneserver.index.IndexState; import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager; +import com.yelp.nrtsearch.server.luceneserver.index.ShardState; +import com.yelp.nrtsearch.server.luceneserver.state.GlobalState; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; -import com.yelp.nrtsearch.server.luceneserver.index.IndexState; -import com.yelp.nrtsearch.server.luceneserver.index.ShardState; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/RecvCopyStateHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/RecvCopyStateHandler.java index 391a7c5db..84a53b117 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/RecvCopyStateHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/RecvCopyStateHandler.java @@ -21,13 +21,14 @@ import com.yelp.nrtsearch.server.grpc.FileMetadata; import com.yelp.nrtsearch.server.grpc.FilesMetadata; import com.yelp.nrtsearch.server.luceneserver.handler.Handler; +import com.yelp.nrtsearch.server.luceneserver.index.IndexState; import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager; +import com.yelp.nrtsearch.server.luceneserver.index.ShardState; +import com.yelp.nrtsearch.server.luceneserver.nrt.NRTPrimaryNode; +import com.yelp.nrtsearch.server.luceneserver.state.GlobalState; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; -import com.yelp.nrtsearch.server.luceneserver.index.IndexState; -import com.yelp.nrtsearch.server.luceneserver.index.ShardState; -import com.yelp.nrtsearch.server.luceneserver.nrt.NRTPrimaryNode; import java.io.IOException; import java.util.Map; import org.apache.lucene.replicator.nrt.FileMetaData; diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/ReplicaCurrentSearchingVersionHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/ReplicaCurrentSearchingVersionHandler.java index f60b7827b..199ef1898 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/ReplicaCurrentSearchingVersionHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/ReplicaCurrentSearchingVersionHandler.java @@ -18,10 +18,11 @@ import com.yelp.nrtsearch.server.grpc.IndexName; import com.yelp.nrtsearch.server.grpc.SearcherVersion; import com.yelp.nrtsearch.server.luceneserver.handler.Handler; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import com.yelp.nrtsearch.server.luceneserver.index.IndexState; import com.yelp.nrtsearch.server.luceneserver.index.ShardState; +import com.yelp.nrtsearch.server.luceneserver.state.GlobalState; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,8 +60,7 @@ public void handle(IndexName indexNameRequest, StreamObserver r } } - private SearcherVersion handle(IndexState indexState, IndexName indexNameRequest) - throws HandlerException { + private SearcherVersion handle(IndexState indexState, IndexName indexNameRequest) { ShardState shardState = indexState.getShard(0); if (!shardState.isReplica() || !shardState.isStarted()) { throw new IllegalArgumentException( diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/WriteNRTPointHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/WriteNRTPointHandler.java index 5e01fd332..6570af8ef 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/WriteNRTPointHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/WriteNRTPointHandler.java @@ -19,10 +19,11 @@ import com.yelp.nrtsearch.server.grpc.ReplicationServerClient; import com.yelp.nrtsearch.server.grpc.SearcherVersion; import com.yelp.nrtsearch.server.luceneserver.handler.Handler; -import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager; import com.yelp.nrtsearch.server.luceneserver.index.IndexState; +import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager; import com.yelp.nrtsearch.server.luceneserver.index.ShardState; import com.yelp.nrtsearch.server.luceneserver.nrt.NRTPrimaryNode; +import com.yelp.nrtsearch.server.luceneserver.state.GlobalState; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/Handler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/Handler.java index 14b883895..2b0d55ee9 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/Handler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/Handler.java @@ -19,8 +19,8 @@ import com.google.protobuf.GeneratedMessageV3; import com.yelp.nrtsearch.server.grpc.LuceneServerStubBuilder; import com.yelp.nrtsearch.server.grpc.ReplicationServerClient; -import io.grpc.Status; import com.yelp.nrtsearch.server.luceneserver.state.GlobalState; +import io.grpc.Status; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; import org.slf4j.Logger; diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/RecvRawFileHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/RecvRawFileHandler.java index 43f197c0b..b3abb0c3d 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/RecvRawFileHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/RecvRawFileHandler.java @@ -18,10 +18,10 @@ import com.google.protobuf.ByteString; import com.yelp.nrtsearch.server.grpc.FileInfo; import com.yelp.nrtsearch.server.grpc.RawFileChunk; -import com.yelp.nrtsearch.server.luceneserver.GlobalState; -import com.yelp.nrtsearch.server.luceneserver.IndexState; -import com.yelp.nrtsearch.server.luceneserver.ShardState; +import com.yelp.nrtsearch.server.luceneserver.index.IndexState; import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager; +import com.yelp.nrtsearch.server.luceneserver.index.ShardState; +import com.yelp.nrtsearch.server.luceneserver.state.GlobalState; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/RecvRawFileV2Handler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/RecvRawFileV2Handler.java index 6d02af1e3..bfd98542b 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/RecvRawFileV2Handler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/RecvRawFileV2Handler.java @@ -18,10 +18,10 @@ import com.google.protobuf.ByteString; import com.yelp.nrtsearch.server.grpc.FileInfo; import com.yelp.nrtsearch.server.grpc.RawFileChunk; -import com.yelp.nrtsearch.server.luceneserver.GlobalState; -import com.yelp.nrtsearch.server.luceneserver.IndexState; -import com.yelp.nrtsearch.server.luceneserver.ShardState; +import com.yelp.nrtsearch.server.luceneserver.index.IndexState; import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager; +import com.yelp.nrtsearch.server.luceneserver.index.ShardState; +import com.yelp.nrtsearch.server.luceneserver.state.GlobalState; import io.grpc.stub.StreamObserver; import java.io.IOException; import org.apache.lucene.store.IOContext; diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/SendRawFileHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/SendRawFileHandler.java index 6dd465012..15c2c4196 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/SendRawFileHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/handler/SendRawFileHandler.java @@ -18,7 +18,7 @@ import com.yelp.nrtsearch.server.grpc.RawFileChunk; import com.yelp.nrtsearch.server.grpc.TransferStatus; import com.yelp.nrtsearch.server.grpc.TransferStatusCode; -import com.yelp.nrtsearch.server.luceneserver.GlobalState; +import com.yelp.nrtsearch.server.luceneserver.state.GlobalState; import io.grpc.stub.StreamObserver; import java.io.File; import java.io.FileOutputStream; From 4d670583a0397e9e9feaa31895e149d8156eeedc Mon Sep 17 00:00:00 2001 From: Sarthak Nandi Date: Fri, 4 Oct 2024 10:19:16 -0700 Subject: [PATCH 5/5] Correct test import --- src/test/java/com/yelp/nrtsearch/server/grpc/CopyFileTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/yelp/nrtsearch/server/grpc/CopyFileTest.java b/src/test/java/com/yelp/nrtsearch/server/grpc/CopyFileTest.java index 9c1bd8cdc..76693093b 100644 --- a/src/test/java/com/yelp/nrtsearch/server/grpc/CopyFileTest.java +++ b/src/test/java/com/yelp/nrtsearch/server/grpc/CopyFileTest.java @@ -21,7 +21,7 @@ import com.google.protobuf.ByteString; import com.yelp.nrtsearch.server.config.LuceneServerConfiguration; -import com.yelp.nrtsearch.server.luceneserver.GlobalState; +import com.yelp.nrtsearch.server.luceneserver.state.GlobalState; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver;