diff --git a/cpp/celeborn/CMakeLists.txt b/cpp/celeborn/CMakeLists.txt index a1fc98670de..d22bca3f117 100644 --- a/cpp/celeborn/CMakeLists.txt +++ b/cpp/celeborn/CMakeLists.txt @@ -12,7 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -add_subdirectory(utils) add_subdirectory(proto) add_subdirectory(memory) +add_subdirectory(utils) add_subdirectory(conf) +add_subdirectory(protocol) diff --git a/cpp/celeborn/protocol/CMakeLists.txt b/cpp/celeborn/protocol/CMakeLists.txt new file mode 100644 index 00000000000..e92778e9302 --- /dev/null +++ b/cpp/celeborn/protocol/CMakeLists.txt @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +add_library(protocol PartitionLocation.cpp) + +target_include_directories(protocol PUBLIC ${CMAKE_BINARY_DIR}) + +target_link_libraries( + protocol + memory + proto + ${FOLLY_WITH_DEPENDENCIES} + ${GLOG} + ${GFLAGS_LIBRARIES} +) + +if(CELEBORN_BUILD_TESTS) + add_subdirectory(tests) +endif() diff --git a/cpp/celeborn/protocol/PartitionLocation.cpp b/cpp/celeborn/protocol/PartitionLocation.cpp new file mode 100644 index 00000000000..5fc6e52c37b --- /dev/null +++ b/cpp/celeborn/protocol/PartitionLocation.cpp @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "celeborn/protocol/PartitionLocation.h" + +#include "celeborn/protocol/StatusCode.h" +#include "celeborn/utils/CelebornUtils.h" +#include "celeborn/utils/Exceptions.h" + +namespace celeborn { +std::unique_ptr StorageInfo::fromPb(const PbStorageInfo& pb) { + auto result = std::make_unique(); + result->type = static_cast(pb.type()); + result->mountPoint = pb.mountpoint(); + result->finalResult = pb.finalresult(); + result->filePath = pb.filepath(); + result->availableStorageTypes = pb.availablestoragetypes(); + return std::move(result); +} + +std::unique_ptr PartitionLocation::fromPb( + const PbPartitionLocation& pb) { + auto result = fromPbWithoutPeer(pb); + if (pb.has_peer()) { + auto peer = fromPbWithoutPeer(pb.peer()); + if (result->mode == PRIMARY) { + CELEBORN_CHECK( + peer->mode == REPLICA, "PRIMARY's peer mode should be REPLICA"); + result->replicaPeer = std::move(peer); + } else { + CELEBORN_CHECK( + peer->mode == PRIMARY, "REPLICA's peer mode should be PRIMARY"); + peer->replicaPeer = std::move(result); + result = std::move(peer); + } + } + CELEBORN_CHECK(result->mode == PRIMARY, "non-peer's mode should be PRIMARY"); + return std::move(result); +} + +PartitionLocation::PartitionLocation(const PartitionLocation& other) + : id(other.id), + epoch(other.epoch), + host(other.host), + rpcPort(other.rpcPort), + pushPort(other.pushPort), + fetchPort(other.fetchPort), + replicatePort(other.replicatePort), + mode(other.mode), + replicaPeer( + other.replicaPeer + ? std::make_unique(*other.replicaPeer) + : nullptr), + storageInfo(std::make_unique(*other.storageInfo)) {} + +std::unique_ptr PartitionLocation::fromPbWithoutPeer( + const PbPartitionLocation& pb) { + auto result = std::make_unique(); + result->id = pb.id(); + result->epoch = pb.epoch(); + result->host = pb.host(); + result->rpcPort = pb.rpcport(); + result->pushPort = pb.pushport(); + result->fetchPort = pb.fetchport(); + result->replicatePort = pb.replicateport(); + result->mode = static_cast(pb.mode()); + result->replicaPeer = nullptr; + result->storageInfo = StorageInfo::fromPb(pb.storageinfo()); + return std::move(result); +} + +StatusCode toStatusCode(int32_t code) { + CELEBORN_CHECK(code >= 0); + CELEBORN_CHECK(code <= StatusCode::TAIL); + return static_cast(code); +} +} // namespace celeborn diff --git a/cpp/celeborn/protocol/PartitionLocation.h b/cpp/celeborn/protocol/PartitionLocation.h new file mode 100644 index 00000000000..dc0f9ecd752 --- /dev/null +++ b/cpp/celeborn/protocol/PartitionLocation.h @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "celeborn/proto/TransportMessagesCpp.pb.h" +#include "celeborn/utils/Exceptions.h" + +namespace celeborn { +struct StorageInfo { + static std::unique_ptr fromPb(const PbStorageInfo& pb); + + StorageInfo() = default; + + StorageInfo(const StorageInfo& other) = default; + + enum Type { + MEMORY = 0, + HDD = 1, + SSD = 2, + HDFS = 3, + OSS = 4, + S3 = 5, + }; + + static const int MEMORY_MASK = 0b1; + static const int LOCAL_DISK_MASK = 0b10; + static const int HDFS_MASK = 0b100; + static const int OSS_MASK = 0b1000; + static const int S3_MASK = 0b10000; + static const int ALL_TYPES_AVAILABLE_MASK = 0; + + Type type{MEMORY}; + std::string mountPoint; + // if a file is committed, field "finalResult" will be true + bool finalResult{false}; + std::string filePath; + int availableStorageTypes{0}; +}; + +struct PartitionLocation { + enum Mode { + PRIMARY = 0, + REPLICA = 1, + }; + + int id; + int epoch; + std::string host; + int rpcPort; + int pushPort; + int fetchPort; + int replicatePort; + Mode mode; + // Only primary PartitionLocation might have replicaPeer. + // Replica PartitionLocation would not have replicaPeer. + // The lifecycle of Replica is bounded with Primary. + std::unique_ptr replicaPeer{nullptr}; + std::unique_ptr storageInfo; + // TODO: RoaringBitmap is not supported yet. + // RoaringBitmap mapIdBitMap; + + static std::unique_ptr fromPb( + const PbPartitionLocation& pb); + + PartitionLocation() = default; + + PartitionLocation(const PartitionLocation& other); + + std::string filename() const { + return std::to_string(id) + "-" + std::to_string(epoch) + "-" + + std::to_string(mode); + } + + private: + static std::unique_ptr fromPbWithoutPeer( + const PbPartitionLocation& pb); +}; +} // namespace celeborn diff --git a/cpp/celeborn/protocol/StatusCode.h b/cpp/celeborn/protocol/StatusCode.h new file mode 100644 index 00000000000..e5c4ee199e1 --- /dev/null +++ b/cpp/celeborn/protocol/StatusCode.h @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace celeborn { +enum StatusCode { + // 1/0 Status + SUCCESS = 0, + PARTIAL_SUCCESS = 1, + REQUEST_FAILED = 2, + + // Specific Status + SHUFFLE_ALREADY_REGISTERED = 3, + SHUFFLE_NOT_REGISTERED = 4, + RESERVE_SLOTS_FAILED = 5, + SLOT_NOT_AVAILABLE = 6, + WORKER_NOT_FOUND = 7, + PARTITION_NOT_FOUND = 8, + REPLICA_PARTITION_NOT_FOUND = 9, + DELETE_FILES_FAILED = 10, + PARTITION_EXISTS = 11, + REVIVE_FAILED = 12, + REPLICATE_DATA_FAILED = 13, + NUM_MAPPER_ZERO = 14, + MAP_ENDED = 15, + STAGE_ENDED = 16, + + // push data fail causes + PUSH_DATA_FAIL_NON_CRITICAL_CAUSE = 17, + PUSH_DATA_WRITE_FAIL_REPLICA = 18, + PUSH_DATA_WRITE_FAIL_PRIMARY = 19, + PUSH_DATA_FAIL_PARTITION_NOT_FOUND = 20, + + HARD_SPLIT = 21, + SOFT_SPLIT = 22, + + STAGE_END_TIME_OUT = 23, + SHUFFLE_DATA_LOST = 24, + WORKER_SHUTDOWN = 25, + NO_AVAILABLE_WORKING_DIR = 26, + WORKER_EXCLUDED = 27, + WORKER_UNKNOWN = 28, + + COMMIT_FILE_EXCEPTION = 29, + + // Rate limit statuses + PUSH_DATA_SUCCESS_PRIMARY_CONGESTED = 30, + PUSH_DATA_SUCCESS_REPLICA_CONGESTED = 31, + + PUSH_DATA_HANDSHAKE_FAIL_REPLICA = 32, + PUSH_DATA_HANDSHAKE_FAIL_PRIMARY = 33, + REGION_START_FAIL_REPLICA = 34, + REGION_START_FAIL_PRIMARY = 35, + REGION_FINISH_FAIL_REPLICA = 36, + REGION_FINISH_FAIL_PRIMARY = 37, + + PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY = 38, + PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA = 39, + PUSH_DATA_CONNECTION_EXCEPTION_PRIMARY = 40, + PUSH_DATA_CONNECTION_EXCEPTION_REPLICA = 41, + PUSH_DATA_TIMEOUT_PRIMARY = 42, + PUSH_DATA_TIMEOUT_REPLICA = 43, + PUSH_DATA_PRIMARY_WORKER_EXCLUDED = 44, + PUSH_DATA_REPLICA_WORKER_EXCLUDED = 45, + + FETCH_DATA_TIMEOUT = 46, + REVIVE_INITIALIZED = 47, + DESTROY_SLOTS_MOCK_FAILURE = 48, + COMMIT_FILES_MOCK_FAILURE = 49, + PUSH_DATA_FAIL_NON_CRITICAL_CAUSE_REPLICA = 50, + OPEN_STREAM_FAILED = 51, + SEGMENT_START_FAIL_REPLICA = 52, + SEGMENT_START_FAIL_PRIMARY = 53, + NO_SPLIT = 54, + + TAIL +}; + +StatusCode toStatusCode(int32_t code); +} // namespace celeborn diff --git a/cpp/celeborn/protocol/tests/CMakeLists.txt b/cpp/celeborn/protocol/tests/CMakeLists.txt new file mode 100644 index 00000000000..8f7b5e2f2d7 --- /dev/null +++ b/cpp/celeborn/protocol/tests/CMakeLists.txt @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(celeborn_protocol_test PartitionLocationTest.cpp) + +add_test(NAME celeborn_protocol_test COMMAND celeborn_protocol_test) + +target_include_directories(protocol PUBLIC ${CMAKE_BINARY_DIR}) + +target_link_libraries( + celeborn_protocol_test + PRIVATE + memory + proto + protocol + utils + ${FOLLY_WITH_DEPENDENCIES} + ${GLOG} + ${GFLAGS_LIBRARIES} + GTest::gtest + GTest::gmock + GTest::gtest_main) \ No newline at end of file diff --git a/cpp/celeborn/protocol/tests/PartitionLocationTest.cpp b/cpp/celeborn/protocol/tests/PartitionLocationTest.cpp new file mode 100644 index 00000000000..ec318369bc3 --- /dev/null +++ b/cpp/celeborn/protocol/tests/PartitionLocationTest.cpp @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "celeborn/proto/TransportMessagesCpp.pb.h" +#include "celeborn/protocol/PartitionLocation.h" + +using namespace celeborn; + +std::unique_ptr generateStorageInfoPb() { + auto pbStorageInfo = std::make_unique(); + pbStorageInfo->set_type(1); + pbStorageInfo->set_mountpoint("test_mountpoint"); + pbStorageInfo->set_finalresult(true); + pbStorageInfo->set_filepath("test_filepath"); + pbStorageInfo->set_availablestoragetypes(1); + return std::move(pbStorageInfo); +} + +void verifyStorageInfo(const StorageInfo* storageInfo) { + EXPECT_EQ(storageInfo->type, 1); + EXPECT_EQ(storageInfo->mountPoint, "test_mountpoint"); + EXPECT_EQ(storageInfo->finalResult, true); + EXPECT_EQ(storageInfo->filePath, "test_filepath"); + EXPECT_EQ(storageInfo->availableStorageTypes, 1); +} + +std::unique_ptr generateBasicPartitionLocationPb() { + auto pbPartitionLocation = std::make_unique(); + pbPartitionLocation->set_id(1); + pbPartitionLocation->set_epoch(101); + pbPartitionLocation->set_host("test_host"); + pbPartitionLocation->set_rpcport(1001); + pbPartitionLocation->set_pushport(1002); + pbPartitionLocation->set_fetchport(1003); + pbPartitionLocation->set_replicateport(1004); + return std::move(pbPartitionLocation); +} + +void verifyBasicPartitionLocation(const PartitionLocation* partitionLocation) { + EXPECT_EQ(partitionLocation->id, 1); + EXPECT_EQ(partitionLocation->epoch, 101); + EXPECT_EQ(partitionLocation->host, "test_host"); + EXPECT_EQ(partitionLocation->rpcPort, 1001); + EXPECT_EQ(partitionLocation->pushPort, 1002); + EXPECT_EQ(partitionLocation->fetchPort, 1003); + EXPECT_EQ(partitionLocation->replicatePort, 1004); +} + +TEST(PartitionLocationTest, storageInfoFromPb) { + auto pbStorageInfo = generateStorageInfoPb(); + auto storageInfo = StorageInfo::fromPb(*pbStorageInfo); + verifyStorageInfo(storageInfo.get()); +} + +TEST(PartitionLocationTest, fromPbWithoutPeer) { + auto pbPartitionLocation = generateBasicPartitionLocationPb(); + pbPartitionLocation->set_mode(PbPartitionLocation_Mode_Primary); + auto pbStorageInfo = generateStorageInfoPb(); + pbPartitionLocation->set_allocated_storageinfo(pbStorageInfo.release()); + + auto partitionLocation = PartitionLocation::fromPb(*pbPartitionLocation); + + verifyBasicPartitionLocation(partitionLocation.get()); + EXPECT_EQ(partitionLocation->mode, PartitionLocation::Mode::PRIMARY); + verifyStorageInfo(partitionLocation->storageInfo.get()); +} + +TEST(PartitionLocationTest, fromPbWithPeer) { + auto pbPartitionLocationPrimary = generateBasicPartitionLocationPb(); + pbPartitionLocationPrimary->set_mode(PbPartitionLocation_Mode_Primary); + auto pbStorageInfoPrimary = generateStorageInfoPb(); + pbPartitionLocationPrimary->set_allocated_storageinfo( + pbStorageInfoPrimary.release()); + + auto pbPartitionLocationReplica = generateBasicPartitionLocationPb(); + pbPartitionLocationReplica->set_mode(PbPartitionLocation_Mode_Replica); + auto pbStorageInfoReplica = generateStorageInfoPb(); + pbPartitionLocationReplica->set_allocated_storageinfo( + pbStorageInfoReplica.release()); + + pbPartitionLocationPrimary->set_allocated_peer( + pbPartitionLocationReplica.release()); + + auto partitionLocationPrimary = + PartitionLocation::fromPb(*pbPartitionLocationPrimary); + + verifyBasicPartitionLocation(partitionLocationPrimary.get()); + EXPECT_EQ(partitionLocationPrimary->mode, PartitionLocation::Mode::PRIMARY); + verifyStorageInfo(partitionLocationPrimary->storageInfo.get()); + + auto partitionLocationReplica = partitionLocationPrimary->replicaPeer.get(); + verifyBasicPartitionLocation(partitionLocationReplica); + EXPECT_EQ(partitionLocationReplica->mode, PartitionLocation::Mode::REPLICA); + verifyStorageInfo(partitionLocationReplica->storageInfo.get()); +}