Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into HDDS-12012
Browse files Browse the repository at this point in the history
  • Loading branch information
adoroszlai committed Jan 9, 2025
2 parents fdb2fea + 80dc87a commit d2c0398
Show file tree
Hide file tree
Showing 181 changed files with 1,623 additions and 1,951 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,10 @@ public void executePutBlock(boolean close,
waitFuturesComplete();
final BlockData blockData = containerBlockData.build();
if (close) {
// HDDS-12007 changed datanodes to ignore the following PutBlock request.
// However, clients still have to send it for maintaining compatibility.
// Otherwise, new clients won't send a PutBlock.
// Then, old datanodes will fail since they expect a PutBlock.
final ContainerCommandRequestProto putBlockRequest
= ContainerProtocolCalls.getPutBlockRequest(
xceiverClient.getPipeline(), blockData, true, tokenString);
Expand Down
42 changes: 42 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1892,6 +1892,48 @@
interfaces by setting it to 0.0.0.0.</description>
</property>

<property>
<name>ozone.s3g.webadmin.http.enabled</name>
<value>true</value>
<tag>OZONE, S3GATEWAY</tag>
<description>This option can be used to disable the web server which serves additional content in Ozone S3 Gateway.</description>
</property>

<property>
<name>ozone.s3g.webadmin.https-address</name>
<value/>
<tag>OZONE, S3GATEWAY</tag>
<description>Ozone S3Gateway content server's HTTPS address and port.</description>
</property>

<property>
<name>ozone.s3g.webadmin.https-bind-host</name>
<value/>
<tag>OZONE, S3GATEWAY</tag>
<description>The actual address the HTTPS server will bind to. If this optional address
is set, it overrides only the hostname portion of ozone.s3g.webadmin.https-address.
This is useful for making the Ozone S3Gateway HTTPS server listen on all
interfaces by setting it to 0.0.0.0.</description>
</property>

<property>
<name>ozone.s3g.webadmin.http-address</name>
<value>0.0.0.0:19878</value>
<tag>OZONE, S3GATEWAY</tag>
<description>The address and port where Ozone S3Gateway serves
web content.</description>
</property>

<property>
<name>ozone.s3g.webadmin.http-bind-host</name>
<value>0.0.0.0</value>
<tag>OZONE, S3GATEWAY</tag>
<description>The actual address the HTTP server will bind to. If this optional address
is set, it overrides only the hostname portion of ozone.s3g.webadmin.http-address.
This is useful for making the Ozone S3Gateway HTTP server listen on all
interfaces by setting it to 0.0.0.0.</description>
</property>

<property>
<name>ozone.s3g.http.auth.kerberos.principal</name>
<value>HTTP/_HOST@REALM</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
*/
public enum ContainerLayoutVersion {

@Deprecated /* Use FILE_PER_BLOCK instead */
FILE_PER_CHUNK(1, "One file per chunk") {
@Override
public File getChunkFile(File chunkDir, BlockID blockID, String chunkName) {
Expand All @@ -47,7 +48,7 @@ public File getChunkFile(File chunkDir, BlockID blockID, String chunkName) {
}
};

private static final ContainerLayoutVersion
public static final ContainerLayoutVersion
DEFAULT_LAYOUT = ContainerLayoutVersion.FILE_PER_BLOCK;

private static final List<ContainerLayoutVersion> CONTAINER_LAYOUT_VERSIONS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,21 +524,6 @@ private ContainerCommandResponseProto dispatchCommand(
return response;
}

private CompletableFuture<ContainerCommandResponseProto> link(
ContainerCommandRequestProto requestProto, LogEntryProto entry) {
return CompletableFuture.supplyAsync(() -> {
final DispatcherContext context = DispatcherContext
.newBuilder(DispatcherContext.Op.STREAM_LINK)
.setTerm(entry.getTerm())
.setLogIndex(entry.getIndex())
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
.setContainer2BCSIDMap(container2BCSIDMap)
.build();

return dispatchCommand(requestProto, context);
}, executor);
}

private CompletableFuture<Message> writeStateMachineData(
ContainerCommandRequestProto requestProto, long entryIndex, long term,
long startTime) {
Expand Down Expand Up @@ -689,29 +674,8 @@ public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {

final KeyValueStreamDataChannel kvStreamDataChannel =
(KeyValueStreamDataChannel) dataChannel;

final ContainerCommandRequestProto request =
kvStreamDataChannel.getPutBlockRequest();

return link(request, entry).whenComplete((response, e) -> {
if (e != null) {
LOG.warn("Failed to link logEntry {} for request {}",
TermIndex.valueOf(entry), request, e);
}
if (response != null) {
final ContainerProtos.Result result = response.getResult();
if (LOG.isDebugEnabled()) {
LOG.debug("{} to link logEntry {} for request {}, response: {}",
result, TermIndex.valueOf(entry), request, response);
}
if (result == ContainerProtos.Result.SUCCESS) {
kvStreamDataChannel.setLinked();
return;
}
}
// failed to link, cleanup
kvStreamDataChannel.cleanUp();
});
kvStreamDataChannel.setLinked();
return CompletableFuture.completedFuture(null);
}

private ExecutorService getChunkExecutor(WriteChunkRequestProto req) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
Expand Down Expand Up @@ -124,6 +125,7 @@
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
import static org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion;
import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.DEFAULT_LAYOUT;
import static org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;

import org.apache.hadoop.util.Time;
Expand Down Expand Up @@ -191,6 +193,14 @@ public KeyValueHandler(ConfigurationSource config,
byteBufferToByteString =
ByteStringConversion
.createByteBufferConversion(isUnsafeByteBufferConversionEnabled);

if (ContainerLayoutVersion.getConfiguredVersion(conf) ==
ContainerLayoutVersion.FILE_PER_CHUNK) {
LOG.warn("FILE_PER_CHUNK layout is not supported. Falling back to default : {}.",
DEFAULT_LAYOUT.name());
OzoneConfiguration.of(conf).set(ScmConfigKeys.OZONE_SCM_CONTAINER_LAYOUT_KEY,
DEFAULT_LAYOUT.name());
}
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.util.ReferenceCountedObject;
import org.slf4j.Logger;
Expand All @@ -36,9 +33,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
* This class is used to get the DataChannel for streaming.
Expand All @@ -53,8 +48,6 @@ interface WriteMethod {

private final Buffers buffers = new Buffers(
BlockDataStreamOutput.PUT_BLOCK_REQUEST_LENGTH_MAX);
private final AtomicReference<ContainerCommandRequestProto> putBlockRequest
= new AtomicReference<>();
private final AtomicBoolean closed = new AtomicBoolean();

KeyValueStreamDataChannel(File file, ContainerData containerData,
Expand Down Expand Up @@ -90,7 +83,7 @@ static int writeBuffers(ReferenceCountedObject<ByteBuffer> src,
return src.get().remaining();
}

private static void writeFully(ByteBuffer b, WriteMethod writeMethod)
static void writeFully(ByteBuffer b, WriteMethod writeMethod)
throws IOException {
while (b.remaining() > 0) {
final int written = writeMethod.applyAsInt(b);
Expand All @@ -100,11 +93,6 @@ private static void writeFully(ByteBuffer b, WriteMethod writeMethod)
}
}

public ContainerCommandRequestProto getPutBlockRequest() {
return Objects.requireNonNull(putBlockRequest.get(),
() -> "putBlockRequest == null, " + this);
}

void assertOpen() throws IOException {
if (closed.get()) {
throw new IOException("Already closed: " + this);
Expand All @@ -115,7 +103,7 @@ void assertOpen() throws IOException {
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try {
putBlockRequest.set(closeBuffers(buffers, super::writeFileChannel));
writeBuffers();
} finally {
super.close();
}
Expand All @@ -130,22 +118,23 @@ protected void cleanupInternal() throws IOException {
}
}

static ContainerCommandRequestProto closeBuffers(
Buffers buffers, WriteMethod writeMethod) throws IOException {
/**
* Write the data in {@link #buffers} to the channel.
* Note that the PutBlock proto at the end is ignored; see HDDS-12007.
*/
private void writeBuffers() throws IOException {
final ReferenceCountedObject<ByteBuf> ref = buffers.pollAll();
final ByteBuf buf = ref.retain();
final ContainerCommandRequestProto putBlockRequest;
try {
putBlockRequest = readPutBlockRequest(buf);
setEndIndex(buf);
// write the remaining data
writeFully(buf.nioBuffer(), writeMethod);
writeFully(buf.nioBuffer(), super::writeFileChannel);
} finally {
ref.release();
}
return putBlockRequest;
}

private static int readProtoLength(ByteBuf b, int lengthIndex) {
static int readProtoLength(ByteBuf b, int lengthIndex) {
final int readerIndex = b.readerIndex();
LOG.debug("{}, lengthIndex = {}, readerIndex = {}",
b, lengthIndex, readerIndex);
Expand All @@ -158,8 +147,8 @@ private static int readProtoLength(ByteBuf b, int lengthIndex) {
return b.nioBuffer().getInt();
}

static ContainerCommandRequestProto readPutBlockRequest(ByteBuf b)
throws IOException {
/** Set end index to the proto index in order to ignore the proto. */
static void setEndIndex(ByteBuf b) {
// readerIndex protoIndex lengthIndex readerIndex+readableBytes
// V V V V
// format: |--- data ---|--- proto ---|--- proto length (4 bytes) ---|
Expand All @@ -168,37 +157,7 @@ static ContainerCommandRequestProto readPutBlockRequest(ByteBuf b)
final int protoLength = readProtoLength(b.duplicate(), lengthIndex);
final int protoIndex = lengthIndex - protoLength;

final ContainerCommandRequestProto proto;
try {
proto = readPutBlockRequest(b.slice(protoIndex, protoLength).nioBuffer());
} catch (Throwable t) {
RatisHelper.debug(b, "catch", LOG);
throw new IOException("Failed to readPutBlockRequest from " + b
+ ": readerIndex=" + readerIndex
+ ", protoIndex=" + protoIndex
+ ", protoLength=" + protoLength
+ ", lengthIndex=" + lengthIndex, t);
}

// set index for reading data
b.writerIndex(protoIndex);

return proto;
}

private static ContainerCommandRequestProto readPutBlockRequest(ByteBuffer b)
throws IOException {
RatisHelper.debug(b, "readPutBlockRequest", LOG);
final ByteString byteString = ByteString.copyFrom(b);

final ContainerCommandRequestProto request =
ContainerCommandRequestMessage.toProto(byteString, null);

if (!request.hasPutBlock()) {
throw new StorageContainerException(
"Malformed PutBlock request. trace ID: " + request.getTraceID(),
ContainerProtos.Result.MALFORMED_REQUEST);
}
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_LAYOUT_KEY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -292,6 +293,13 @@ public void testVolumeSetInKeyValueHandler() throws Exception {
keyValueHandler.getVolumeChoosingPolicyForTesting()
.getClass().getName());

// Ensures that KeyValueHandler falls back to FILE_PER_BLOCK.
conf.set(OZONE_SCM_CONTAINER_LAYOUT_KEY, "FILE_PER_CHUNK");
new KeyValueHandler(conf, context.getParent().getDatanodeDetails().getUuidString(), cset, volumeSet,
metrics, c -> { });
assertEquals(ContainerLayoutVersion.FILE_PER_BLOCK,
conf.getEnum(OZONE_SCM_CONTAINER_LAYOUT_KEY, ContainerLayoutVersion.FILE_PER_CHUNK));

//Set a class which is not of sub class of VolumeChoosingPolicy
conf.set(HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
"org.apache.hadoop.ozone.container.common.impl.HddsDispatcher");
Expand Down
Loading

0 comments on commit d2c0398

Please sign in to comment.