diff --git a/cpp/celeborn/conf/CMakeLists.txt b/cpp/celeborn/conf/CMakeLists.txt index 4d1a3d4b8ea..2d708dd872f 100644 --- a/cpp/celeborn/conf/CMakeLists.txt +++ b/cpp/celeborn/conf/CMakeLists.txt @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -add_library(conf BaseConf.cpp) +add_library(conf BaseConf.cpp CelebornConf.cpp) target_link_libraries( conf diff --git a/cpp/celeborn/conf/CelebornConf.cpp b/cpp/celeborn/conf/CelebornConf.cpp new file mode 100644 index 00000000000..01a9f971374 --- /dev/null +++ b/cpp/celeborn/conf/CelebornConf.cpp @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "celeborn/conf/CelebornConf.h" + +namespace celeborn { +namespace { + +// folly::to<> does not generate 'true' and 'false', so we do it ourselves. +std::string bool2String(bool value) { + return value ? "true" : "false"; +} + +#define STR_PROP(_key_, _val_) {std::string(_key_), std::string(_val_)} +#define NUM_PROP(_key_, _val_) \ + {std::string(_key_), folly::to(_val_)} +#define BOOL_PROP(_key_, _val_) {std::string(_key_), bool2String(_val_)} +#define NONE_PROP(_key_) {std::string(_key_), folly::none} + +enum class CapacityUnit { + BYTE, + KILOBYTE, + MEGABYTE, + GIGABYTE, + TERABYTE, + PETABYTE +}; + +double toBytesPerCapacityUnit(CapacityUnit unit) { + switch (unit) { + case CapacityUnit::BYTE: + return 1; + case CapacityUnit::KILOBYTE: + return exp2(10); + case CapacityUnit::MEGABYTE: + return exp2(20); + case CapacityUnit::GIGABYTE: + return exp2(30); + case CapacityUnit::TERABYTE: + return exp2(40); + case CapacityUnit::PETABYTE: + return exp2(50); + default: + CELEBORN_USER_FAIL("Invalid capacity unit '{}'", (int)unit); + } +} + +CapacityUnit valueOfCapacityUnit(const std::string& unitStr) { + if (unitStr == "B") { + return CapacityUnit::BYTE; + } + if (unitStr == "kB") { + return CapacityUnit::KILOBYTE; + } + if (unitStr == "MB") { + return CapacityUnit::MEGABYTE; + } + if (unitStr == "GB") { + return CapacityUnit::GIGABYTE; + } + if (unitStr == "TB") { + return CapacityUnit::TERABYTE; + } + if (unitStr == "PB") { + return CapacityUnit::PETABYTE; + } + CELEBORN_USER_FAIL("Invalid capacity unit '{}'", unitStr); +} + +// Convert capacity string with unit to the capacity number in the specified +// units +uint64_t toCapacity(const std::string& from, CapacityUnit to) { + static const RE2 kPattern(R"(^\s*(\d+(?:\.\d+)?)\s*([a-zA-Z]+)\s*$)"); + double value; + std::string unit; + if (!RE2::FullMatch(from, kPattern, &value, &unit)) { + CELEBORN_USER_FAIL("Invalid capacity string '{}'", from); + } + + return value * + (toBytesPerCapacityUnit(valueOfCapacityUnit(unit)) / + toBytesPerCapacityUnit(to)); +} + +Duration toDuration(const std::string& str) { + static const RE2 kPattern(R"(^\s*(\d+(?:\.\d+)?)\s*([a-zA-Z]+)\s*)"); + + double value; + std::string unit; + if (!RE2::FullMatch(str, kPattern, &value, &unit)) { + CELEBORN_USER_FAIL("Invalid duration {}", str); + } + if (unit == "ns") { + return std::chrono::duration(value); + } else if (unit == "us") { + return std::chrono::duration(value); + } else if (unit == "ms") { + return std::chrono::duration(value); + } else if (unit == "s") { + return Duration(value); + } else if (unit == "m") { + return std::chrono::duration>(value); + } else if (unit == "h") { + return std::chrono::duration>(value); + } else if (unit == "d") { + return std::chrono::duration>(value); + } + CELEBORN_USER_FAIL("Invalid duration {}", str); +} + +} // namespace + +const std::unordered_map> + CelebornConf::kDefaultProperties = { + STR_PROP(kRpcLookupTimeout, "30s"), + STR_PROP(kClientRpcGetReducerFileGroupRpcAskTimeout, "60s"), + STR_PROP(kNetworkConnectTimeout, "10s"), + STR_PROP(kClientFetchTimeout, "600s"), + NUM_PROP(kNetworkIoNumConnectionsPerPeer, "1"), + NUM_PROP(kNetworkIoClientThreads, 0), + NUM_PROP(kClientFetchMaxReqsInFlight, 3), + // NUM_PROP(kNumExample, 50'000), + // BOOL_PROP(kBoolExample, false), +}; + +CelebornConf::CelebornConf() { + registeredProps_ = kDefaultProperties; +} + +CelebornConf::CelebornConf(const std::string& filename) { + initialize(filename); + registeredProps_ = kDefaultProperties; +} + +CelebornConf::CelebornConf(const CelebornConf& other) { + if (auto* memConfig = + dynamic_cast(other.config_.get())) { + config_ = + std::make_unique(other.config_->valuesCopy()); + } else { + config_ = std::make_unique(other.config_->valuesCopy()); + } + registeredProps_ = other.registeredProps_; + filePath_ = other.filePath_; +} + +void CelebornConf::registerProperty( + const std::string_view& key, + const std::string& value) { + setValue(static_cast(key), value); +} + +Timeout CelebornConf::rpcLookupTimeout() const { + return toTimeout(toDuration(optionalProperty(kRpcLookupTimeout).value())); +} + +Timeout CelebornConf::clientRpcGetReducerFileGroupRpcAskTimeout() const { + return toTimeout(toDuration( + optionalProperty(kClientRpcGetReducerFileGroupRpcAskTimeout).value())); +} + +Timeout CelebornConf::networkConnectTimeout() const { + return toTimeout( + toDuration(optionalProperty(kNetworkConnectTimeout).value())); +} + +Timeout CelebornConf::clientFetchTimeout() const { + return toTimeout(toDuration(optionalProperty(kClientFetchTimeout).value())); +} + +int CelebornConf::networkIoNumConnectionsPerPeer() const { + return std::stoi(optionalProperty(kNetworkIoNumConnectionsPerPeer).value()); +} + +int CelebornConf::networkIoClientThreads() const { + return std::stoi(optionalProperty(kNetworkIoClientThreads).value()); +} + +int CelebornConf::clientFetchMaxReqsInFlight() const { + return std::stoi(optionalProperty(kClientFetchMaxReqsInFlight).value()); +} +} // namespace celeborn diff --git a/cpp/celeborn/conf/CelebornConf.h b/cpp/celeborn/conf/CelebornConf.h new file mode 100644 index 00000000000..68ce6c8aaad --- /dev/null +++ b/cpp/celeborn/conf/CelebornConf.h @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "celeborn/conf/BaseConf.h" +#include "celeborn/utils/CelebornUtils.h" + +namespace celeborn { +/*** + * steps to add a new config: + * === in CelebornConf.h: + * 1. define the configName with "static constexpr std::string_view"; + * 2. declare the getter method within class; + * === in CelebornConf.cpp: + * 3. register the configName in CelebornConf's constructor, with proper + * data type and proper default value; + * 4. implement the getter method. + */ + +class CelebornConf : public BaseConf { + public: + static const std::unordered_map> + kDefaultProperties; + + static constexpr std::string_view kRpcLookupTimeout{ + "celeborn.rpc.lookupTimeout"}; + + static constexpr std::string_view kClientRpcGetReducerFileGroupRpcAskTimeout{ + "celeborn.client.rpc.getReducerFileGroup.askTimeout"}; + + static constexpr std::string_view kNetworkConnectTimeout{ + "celeborn.network.connect.timeout"}; + + static constexpr std::string_view kClientFetchTimeout{ + "celeborn.client.fetch.timeout"}; + + static constexpr std::string_view kNetworkIoNumConnectionsPerPeer{ + "celeborn.data.io.numConnectionsPerPeer"}; + + static constexpr std::string_view kNetworkIoClientThreads{ + "celeborn.data.io.clientThreads"}; + + static constexpr std::string_view kClientFetchMaxReqsInFlight{ + "celeborn.client.fetch.maxReqsInFlight"}; + + CelebornConf(); + + CelebornConf(const std::string& filename); + + CelebornConf(const CelebornConf& other); + + CelebornConf(CelebornConf&& other) = delete; + + void registerProperty(const std::string_view& key, const std::string& value); + + Timeout rpcLookupTimeout() const; + + Timeout clientRpcGetReducerFileGroupRpcAskTimeout() const; + + Timeout networkConnectTimeout() const; + + Timeout clientFetchTimeout() const; + + int networkIoNumConnectionsPerPeer() const; + + int networkIoClientThreads() const; + + int clientFetchMaxReqsInFlight() const; +}; +} // namespace celeborn diff --git a/cpp/celeborn/conf/tests/CMakeLists.txt b/cpp/celeborn/conf/tests/CMakeLists.txt index b8b882c6818..1ab34315c14 100644 --- a/cpp/celeborn/conf/tests/CMakeLists.txt +++ b/cpp/celeborn/conf/tests/CMakeLists.txt @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_executable(celeborn_conf_test BaseConfTest.cpp) +add_executable(celeborn_conf_test BaseConfTest.cpp CelebornConfTest.cpp) add_test(NAME celeborn_conf_test COMMAND celeborn_conf_test) @@ -26,4 +26,4 @@ target_link_libraries( ${GFLAGS_LIBRARIES} GTest::gtest GTest::gmock - GTest::gtest_main) \ No newline at end of file + GTest::gtest_main) diff --git a/cpp/celeborn/conf/tests/CelebornConfTest.cpp b/cpp/celeborn/conf/tests/CelebornConfTest.cpp new file mode 100644 index 00000000000..05807719127 --- /dev/null +++ b/cpp/celeborn/conf/tests/CelebornConfTest.cpp @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include + +#include "celeborn/conf/CelebornConf.h" + +using namespace celeborn; +using SECOND = std::chrono::seconds; +using MILLISENCOND = std::chrono::milliseconds; + +namespace { +void writeToFile( + const std::string& filename, + const std::vector& lines) { + std::ofstream file; + file.open(filename); + for (auto& line : lines) { + file << line << "\n"; + } + file.close(); +} +} // namespace + +void testDefaultValues(CelebornConf* conf) { + EXPECT_EQ(conf->rpcLookupTimeout(), SECOND(30)); + EXPECT_EQ(conf->clientRpcGetReducerFileGroupRpcAskTimeout(), SECOND(60)); + EXPECT_EQ(conf->networkConnectTimeout(), SECOND(10)); + EXPECT_EQ(conf->clientFetchTimeout(), SECOND(600)); + EXPECT_EQ(conf->networkIoNumConnectionsPerPeer(), 1); + EXPECT_EQ(conf->networkIoClientThreads(), 0); + EXPECT_EQ(conf->clientFetchMaxReqsInFlight(), 3); +} + +TEST(CelebornConfTest, defaultValues) { + auto conf = std::make_unique(); + testDefaultValues(conf.get()); +} + +TEST(CelebornConfTest, setValues) { + auto conf = std::make_unique(); + testDefaultValues(conf.get()); + + conf->registerProperty(CelebornConf::kRpcLookupTimeout, "10s"); + EXPECT_EQ(conf->rpcLookupTimeout(), SECOND(10)); + conf->registerProperty( + CelebornConf::kClientRpcGetReducerFileGroupRpcAskTimeout, "10s"); + EXPECT_EQ(conf->clientRpcGetReducerFileGroupRpcAskTimeout(), SECOND(10)); + conf->registerProperty(CelebornConf::kNetworkConnectTimeout, "1000ms"); + EXPECT_EQ(conf->networkConnectTimeout(), SECOND(1)); + conf->registerProperty(CelebornConf::kClientFetchTimeout, "10ms"); + EXPECT_EQ(conf->clientFetchTimeout(), MILLISENCOND(10)); + conf->registerProperty(CelebornConf::kNetworkIoNumConnectionsPerPeer, "10"); + EXPECT_EQ(conf->networkIoNumConnectionsPerPeer(), 10); + conf->registerProperty(CelebornConf::kNetworkIoClientThreads, "10"); + EXPECT_EQ(conf->networkIoClientThreads(), 10); + conf->registerProperty(CelebornConf::kClientFetchMaxReqsInFlight, "10"); + EXPECT_EQ(conf->clientFetchMaxReqsInFlight(), 10); + + EXPECT_THROW( + conf->registerProperty("non-exist-key", "non-exist-value"), + CelebornUserError); +} + +TEST(CelebornConfTest, readFromFile) { + std::vector lines; + lines.push_back(std::string(CelebornConf::kRpcLookupTimeout) + " = 10s"); + lines.push_back( + std::string(CelebornConf::kNetworkIoNumConnectionsPerPeer) + " = 10"); + const std::string filename = "/tmp/test.conf"; + writeToFile(filename, lines); + + auto conf = std::make_unique(filename); + // The specified configs in file have higher priority. + EXPECT_EQ(conf->rpcLookupTimeout(), SECOND(10)); + EXPECT_EQ(conf->networkIoNumConnectionsPerPeer(), 10); + // The unspecified configs should have default values. + EXPECT_EQ(conf->clientRpcGetReducerFileGroupRpcAskTimeout(), SECOND(60)); + EXPECT_EQ(conf->networkConnectTimeout(), SECOND(10)); + + // The registerProperty could rewrite the configs. + conf->registerProperty(CelebornConf::kRpcLookupTimeout, "5s"); + EXPECT_EQ(conf->rpcLookupTimeout(), SECOND(5)); + conf->registerProperty(CelebornConf::kNetworkIoNumConnectionsPerPeer, "5"); + EXPECT_EQ(conf->networkIoNumConnectionsPerPeer(), 5); +} \ No newline at end of file diff --git a/cpp/celeborn/utils/CelebornUtils.h b/cpp/celeborn/utils/CelebornUtils.h index f1d776e000a..d42beb5f972 100644 --- a/cpp/celeborn/utils/CelebornUtils.h +++ b/cpp/celeborn/utils/CelebornUtils.h @@ -17,6 +17,8 @@ #pragma once +#include + #include "celeborn/utils/Exceptions.h" namespace celeborn { @@ -27,4 +29,13 @@ namespace celeborn { #define CELEBORN_SHUTDOWN_LOG_PREFIX "[CELEBORN_SHUTDOWN] " #define CELEBORN_SHUTDOWN_LOG(severity) \ LOG(severity) << CELEBORN_SHUTDOWN_LOG_PREFIX + + +using Duration = std::chrono::duration; +using Timeout = std::chrono::milliseconds; +inline Timeout toTimeout(Duration duration) { + return std::chrono::duration_cast(duration); + +} } // namespace celeborn +