From 739d6434e7c45cc7e0061ba7b0b464e27f4ae23b Mon Sep 17 00:00:00 2001 From: Vladislav Gogov Date: Mon, 13 Jan 2025 14:33:35 +0300 Subject: [PATCH 1/2] Default compression setting via CS config (#13203) --- .../column_shard_config_validator.cpp | 55 +++++++++++ .../column_shard_config_validator_ut.cpp | 97 +++++++++++++++++++ .../column_shard_config_validator_ut/ya.make | 7 ++ ydb/core/config/validation/validators.cpp | 14 +++ ydb/core/config/validation/validators.h | 8 ++ ydb/core/config/validation/ya.make | 3 + ydb/core/formats/arrow/serializer/utils.cpp | 18 ++-- ydb/core/formats/arrow/serializer/utils.h | 7 +- ydb/core/protos/config.proto | 2 + .../olap/column_families/update.cpp | 8 +- 10 files changed, 203 insertions(+), 16 deletions(-) create mode 100644 ydb/core/config/validation/column_shard_config_validator.cpp create mode 100644 ydb/core/config/validation/column_shard_config_validator_ut/column_shard_config_validator_ut.cpp create mode 100644 ydb/core/config/validation/column_shard_config_validator_ut/ya.make diff --git a/ydb/core/config/validation/column_shard_config_validator.cpp b/ydb/core/config/validation/column_shard_config_validator.cpp new file mode 100644 index 000000000000..32c2543ef383 --- /dev/null +++ b/ydb/core/config/validation/column_shard_config_validator.cpp @@ -0,0 +1,55 @@ +#include "validators.h" + +#include +#include +#include + +#include +#include + +#include + +namespace NKikimr::NConfig { +namespace { + +EValidationResult ValidateDefaultCompression(const NKikimrConfig::TColumnShardConfig& columnShardConfig, std::vector& msg) { + if (!columnShardConfig.HasDefaultCompression() && !columnShardConfig.HasDefaultCompressionLevel()) { + return EValidationResult::Ok; + } + if (!columnShardConfig.HasDefaultCompression() && columnShardConfig.HasDefaultCompressionLevel()) { + msg.push_back("ColumnShardConfig: compression level is set without compression type"); + return EValidationResult::Error; + } + std::optional codec = NArrow::CompressionFromProto(columnShardConfig.GetDefaultCompression()); + if (!codec.has_value()) { + msg.push_back("ColumnShardConfig: Unknown compression"); + return EValidationResult::Error; + } + if (columnShardConfig.HasDefaultCompressionLevel()) { + if (!NArrow::SupportsCompressionLevel(codec.value())) { + TString messageErr = TStringBuilder() << "ColumnShardConfig: compression `" << NArrow::CompressionToString(codec.value()) + << "` does not support compression level"; + msg.push_back(messageErr); + return EValidationResult::Error; + } else if (!NArrow::SupportsCompressionLevel(codec.value(), columnShardConfig.GetDefaultCompressionLevel())) { + TString messageErr = TStringBuilder() + << "ColumnShardConfig: compression `" << NArrow::CompressionToString(codec.value()) + << "` does not support compression level = " << std::to_string(columnShardConfig.GetDefaultCompressionLevel()); + msg.push_back(messageErr); + return EValidationResult::Error; + } + } + return EValidationResult::Ok; +} + +} // namespace + +EValidationResult ValidateColumnShardConfig(const NKikimrConfig::TColumnShardConfig& columnShardConfig, std::vector& msg) { + EValidationResult validateDefaultCompressionResult = ValidateDefaultCompression(columnShardConfig, msg); + if (validateDefaultCompressionResult == EValidationResult::Error) { + return EValidationResult::Error; + } + return EValidationResult::Ok; +} + +} // namespace NKikimr::NConfig diff --git a/ydb/core/config/validation/column_shard_config_validator_ut/column_shard_config_validator_ut.cpp b/ydb/core/config/validation/column_shard_config_validator_ut/column_shard_config_validator_ut.cpp new file mode 100644 index 000000000000..c39d0a1d7767 --- /dev/null +++ b/ydb/core/config/validation/column_shard_config_validator_ut/column_shard_config_validator_ut.cpp @@ -0,0 +1,97 @@ +#include +#include +#include + +#include + +#include + +using namespace NKikimr::NConfig; + +Y_UNIT_TEST_SUITE(ColumnShardConfigValidation) { + Y_UNIT_TEST(AcceptDefaultCompression) { + NKikimrConfig::TColumnShardConfig CSConfig; + std::vector error; + EValidationResult result = ValidateColumnShardConfig(CSConfig, error); + UNIT_ASSERT_EQUAL(result, EValidationResult::Ok); + UNIT_ASSERT_C(error.empty(), "Should not be errors"); + } + + Y_UNIT_TEST(NotAcceptDefaultCompression) { + NKikimrConfig::TColumnShardConfig CSConfig; + std::vector error; + CSConfig.SetDefaultCompressionLevel(2); + EValidationResult result = ValidateColumnShardConfig(CSConfig, error); + UNIT_ASSERT_EQUAL(result, EValidationResult::Error); + UNIT_ASSERT_VALUES_EQUAL(error.size(), 1); + UNIT_ASSERT_STRINGS_EQUAL(error.front(), "ColumnShardConfig: compression level is set without compression type"); + } + + Y_UNIT_TEST(CorrectPlainCompression) { + NKikimrConfig::TColumnShardConfig CSConfig; + std::vector error; + CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + EValidationResult result = ValidateColumnShardConfig(CSConfig, error); + UNIT_ASSERT_EQUAL(result, EValidationResult::Ok); + UNIT_ASSERT_C(error.empty(), "Should not be errors"); + } + + Y_UNIT_TEST(NotCorrectPlainCompression) { + NKikimrConfig::TColumnShardConfig CSConfig; + std::vector error; + CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + CSConfig.SetDefaultCompressionLevel(1); + EValidationResult result = ValidateColumnShardConfig(CSConfig, error); + UNIT_ASSERT_EQUAL(result, EValidationResult::Error); + UNIT_ASSERT_VALUES_EQUAL(error.size(), 1); + UNIT_ASSERT_STRINGS_EQUAL(error.front(), "ColumnShardConfig: compression `uncompressed` does not support compression level"); + } + + Y_UNIT_TEST(CorrectLZ4Compression) { + NKikimrConfig::TColumnShardConfig CSConfig; + std::vector error; + CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + EValidationResult result = ValidateColumnShardConfig(CSConfig, error); + UNIT_ASSERT_EQUAL(result, EValidationResult::Ok); + UNIT_ASSERT_C(error.empty(), "Should not be errors"); + } + + Y_UNIT_TEST(NotCorrectLZ4Compression) { + NKikimrConfig::TColumnShardConfig CSConfig; + std::vector error; + CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + CSConfig.SetDefaultCompressionLevel(1); + EValidationResult result = ValidateColumnShardConfig(CSConfig, error); + UNIT_ASSERT_EQUAL(result, EValidationResult::Error); + UNIT_ASSERT_VALUES_EQUAL(error.size(), 1); + UNIT_ASSERT_STRINGS_EQUAL(error.front(), "ColumnShardConfig: compression `lz4` does not support compression level"); + } + + Y_UNIT_TEST(CorrectZSTDCompression) { + NKikimrConfig::TColumnShardConfig CSConfig; + std::vector error; + CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD); + EValidationResult result = ValidateColumnShardConfig(CSConfig, error); + UNIT_ASSERT_EQUAL(result, EValidationResult::Ok); + UNIT_ASSERT_C(error.empty(), "Should not be errors"); + CSConfig.SetDefaultCompressionLevel(0); + result = ValidateColumnShardConfig(CSConfig, error); + UNIT_ASSERT_EQUAL(result, EValidationResult::Ok); + UNIT_ASSERT_C(error.empty(), "Should not be errors"); + CSConfig.SetDefaultCompressionLevel(-100); + result = ValidateColumnShardConfig(CSConfig, error); + UNIT_ASSERT_EQUAL(result, EValidationResult::Ok); + UNIT_ASSERT_C(error.empty(), "Should not be errors"); + } + + Y_UNIT_TEST(NotCorrectZSTDCompression) { + NKikimrConfig::TColumnShardConfig CSConfig; + std::vector error; + CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD); + CSConfig.SetDefaultCompressionLevel(100); + EValidationResult result = ValidateColumnShardConfig(CSConfig, error); + UNIT_ASSERT_EQUAL(result, EValidationResult::Error); + UNIT_ASSERT_VALUES_EQUAL(error.size(), 1); + UNIT_ASSERT_STRINGS_EQUAL(error.front(), "ColumnShardConfig: compression `zstd` does not support compression level = 100"); + } +} diff --git a/ydb/core/config/validation/column_shard_config_validator_ut/ya.make b/ydb/core/config/validation/column_shard_config_validator_ut/ya.make new file mode 100644 index 000000000000..c7b809a6f8de --- /dev/null +++ b/ydb/core/config/validation/column_shard_config_validator_ut/ya.make @@ -0,0 +1,7 @@ +UNITTEST_FOR(ydb/core/config/validation) + +SRC( + column_shard_config_validator_ut.cpp +) + +END() diff --git a/ydb/core/config/validation/validators.cpp b/ydb/core/config/validation/validators.cpp index 36391d0e9ea4..45be69957b20 100644 --- a/ydb/core/config/validation/validators.cpp +++ b/ydb/core/config/validation/validators.cpp @@ -161,4 +161,18 @@ EValidationResult ValidateStaticGroup(const NKikimrConfig::TAppConfig& current, return EValidationResult::Ok; } +EValidationResult ValidateConfig(const NKikimrConfig::TAppConfig& config, std::vector& msg) { + if (config.HasColumnShardConfig()) { + NKikimr::NConfig::EValidationResult result = NKikimr::NConfig::ValidateColumnShardConfig(config.GetColumnShardConfig(), msg); + if (result == NKikimr::NConfig::EValidationResult::Error) { + return EValidationResult::Error; + } + } + if (msg.size() > 0) { + return EValidationResult::Warn; + } + + return EValidationResult::Ok; +} + } // namespace NKikimr::NConfig diff --git a/ydb/core/config/validation/validators.h b/ydb/core/config/validation/validators.h index 3e96c2c4afca..89f8babf9ff5 100644 --- a/ydb/core/config/validation/validators.h +++ b/ydb/core/config/validation/validators.h @@ -32,4 +32,12 @@ EValidationResult ValidateStaticGroup( const NKikimrConfig::TAppConfig& proposed, std::vector& msg); +EValidationResult ValidateColumnShardConfig( + const NKikimrConfig::TColumnShardConfig& columnShardConfig, + std::vector& msg); + +EValidationResult ValidateConfig( + const NKikimrConfig::TAppConfig& config, + std::vector& msg); + } // namespace NKikimr::NConfig diff --git a/ydb/core/config/validation/ya.make b/ydb/core/config/validation/ya.make index db6f9c1a70f5..a2756da1e29d 100644 --- a/ydb/core/config/validation/ya.make +++ b/ydb/core/config/validation/ya.make @@ -3,15 +3,18 @@ LIBRARY() SRCS( validators.h validators.cpp + column_shard_config_validator.cpp ) PEERDIR( ydb/core/protos + ydb/core/formats/arrow/serializer ) END() RECURSE_FOR_TESTS( ut + column_shard_config_validator_ut ) diff --git a/ydb/core/formats/arrow/serializer/utils.cpp b/ydb/core/formats/arrow/serializer/utils.cpp index b33f7bc58a5c..432086605caf 100644 --- a/ydb/core/formats/arrow/serializer/utils.cpp +++ b/ydb/core/formats/arrow/serializer/utils.cpp @@ -6,12 +6,18 @@ #include namespace NKikimr::NArrow { -bool SupportsCompressionLevel(const arrow::Compression::type compression) { - return arrow::util::Codec::SupportsCompressionLevel(compression); -} - -bool SupportsCompressionLevel(const NKikimrSchemeOp::EColumnCodec compression) { - return SupportsCompressionLevel(CompressionFromProto(compression).value()); +bool SupportsCompressionLevel(const arrow::Compression::type compression, const std::optional& compressionLevel) { + if (!arrow::util::Codec::SupportsCompressionLevel(compression)) { + return false; + } + if (compressionLevel.has_value()) { + int minLevel = MinimumCompressionLevel(compression).value(); + int maxLevel = MaximumCompressionLevel(compression).value(); + if (compressionLevel < minLevel || compressionLevel > maxLevel) { + return false; + } + } + return true; } std::optional MinimumCompressionLevel(const arrow::Compression::type compression) { diff --git a/ydb/core/formats/arrow/serializer/utils.h b/ydb/core/formats/arrow/serializer/utils.h index 954bc6dee9d7..9d0e37378b8a 100644 --- a/ydb/core/formats/arrow/serializer/utils.h +++ b/ydb/core/formats/arrow/serializer/utils.h @@ -1,15 +1,12 @@ #pragma once -#include - #include -#include +#include #include namespace NKikimr::NArrow { -bool SupportsCompressionLevel(const arrow::Compression::type compression); -bool SupportsCompressionLevel(const NKikimrSchemeOp::EColumnCodec compression); +bool SupportsCompressionLevel(const arrow::Compression::type compression, const std::optional& compressionLevel = {}); std::optional MinimumCompressionLevel(const arrow::Compression::type compression); std::optional MaximumCompressionLevel(const arrow::Compression::type compression); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 58910c9d3aa1..521a7dc61673 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1632,6 +1632,8 @@ message TColumnShardConfig { optional uint64 WritingBufferVolumeMb = 33 [default = 32]; optional uint64 WritingInFlightRequestsCountLimit = 34 [default = 1000]; optional uint64 WritingInFlightRequestBytesLimit = 35 [default = 128000000]; + optional NKikimrSchemeOp.EColumnCodec DefaultCompression = 36; + optional int32 DefaultCompressionLevel = 37; } message TSchemeShardConfig { diff --git a/ydb/core/tx/schemeshard/olap/column_families/update.cpp b/ydb/core/tx/schemeshard/olap/column_families/update.cpp index 9460f8f1be0d..d365688ca46e 100644 --- a/ydb/core/tx/schemeshard/olap/column_families/update.cpp +++ b/ydb/core/tx/schemeshard/olap/column_families/update.cpp @@ -24,14 +24,12 @@ NKikimr::TConclusion ConvertFamilyDes << "` is not support compression level"); } if (familyDescription.HasColumnCodecLevel()) { - int level = familyDescription.GetColumnCodecLevel(); - int minLevel = NArrow::MinimumCompressionLevel(codec.value()).value(); - int maxLevel = NArrow::MaximumCompressionLevel(codec.value()).value(); - if (level < minLevel || level > maxLevel) { + if (!NArrow::SupportsCompressionLevel(codec.value(), familyDescription.GetColumnCodecLevel())) { return NKikimr::TConclusionStatus::Fail(TStringBuilder() << "family `" << familyDescription.GetName() << "`: incorrect level for codec `" << NArrow::CompressionToString(familyDescription.GetColumnCodec()) << "`. expected: [" - << minLevel << ":" << maxLevel << "]"); + << NArrow::MinimumCompressionLevel(codec.value()).value() << ":" + << NArrow::MaximumCompressionLevel(codec.value()).value() << "]"); } } From e6095a4a30cbe686739c4b21cbd58b7be78f6f4b Mon Sep 17 00:00:00 2001 From: Vladislav Gogov Date: Fri, 24 Jan 2025 10:31:48 +0300 Subject: [PATCH 2/2] Change default compression via CSConfig (#12542) --- ydb/core/base/appdata_fwd.h | 7 +- ydb/core/formats/arrow/serializer/native.h | 26 +++++- ydb/core/formats/arrow/serializer/utils.cpp | 2 +- ydb/core/kqp/host/kqp_gateway_proxy.cpp | 13 ++- ydb/core/kqp/ut/common/columnshard.cpp | 32 +++++-- ydb/core/kqp/ut/common/columnshard.h | 2 +- ydb/core/kqp/ut/olap/compression_ut.cpp | 81 +++++++++++++++++ ydb/core/kqp/ut/olap/sys_view_ut.cpp | 17 ++-- ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 89 ++++++++++++++----- .../olap/column_families/schema.cpp | 35 ++++---- .../olap/column_families/update.cpp | 68 ++++++++------ .../tx/schemeshard/olap/columns/schema.cpp | 2 +- .../tx/schemeshard/olap/columns/update.cpp | 2 +- ydb/core/tx/schemeshard/olap/columns/update.h | 2 +- .../olap/operations/create_table.cpp | 1 - 15 files changed, 286 insertions(+), 93 deletions(-) diff --git a/ydb/core/base/appdata_fwd.h b/ydb/core/base/appdata_fwd.h index f22a246d2cd1..950ac1ed9dd2 100644 --- a/ydb/core/base/appdata_fwd.h +++ b/ydb/core/base/appdata_fwd.h @@ -283,15 +283,16 @@ inline TAppData* AppData(NActors::TActorSystem* actorSystem) { } inline bool HasAppData() { - return !!NActors::TlsActivationContext; + return !!NActors::TlsActivationContext + && NActors::TlsActivationContext->ExecutorThread.ActorSystem + && NActors::TlsActivationContext->ExecutorThread.ActorSystem->AppData(); } inline TAppData& AppDataVerified() { Y_ABORT_UNLESS(HasAppData()); auto& actorSystem = NActors::TlsActivationContext->ExecutorThread.ActorSystem; - Y_ABORT_UNLESS(actorSystem); TAppData* const x = actorSystem->AppData(); - Y_ABORT_UNLESS(x && x->Magic == TAppData::MagicTag); + Y_ABORT_UNLESS(x->Magic == TAppData::MagicTag); return *x; } diff --git a/ydb/core/formats/arrow/serializer/native.h b/ydb/core/formats/arrow/serializer/native.h index 092ce9d705e4..a5bc4acf48f8 100644 --- a/ydb/core/formats/arrow/serializer/native.h +++ b/ydb/core/formats/arrow/serializer/native.h @@ -1,7 +1,10 @@ #pragma once #include "abstract.h" +#include "parsing.h" +#include +#include #include #include @@ -22,6 +25,11 @@ class TNativeSerializer: public ISerializer { TConclusion> BuildCodec(const arrow::Compression::type& cType, const std::optional level) const; static const inline TFactory::TRegistrator Registrator = TFactory::TRegistrator(GetClassNameStatic()); + + static std::shared_ptr GetDefaultCodec() { + return *arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME); + } + protected: virtual bool IsCompatibleForExchangeWithSameClass(const ISerializer& /*item*/) const override { return true; @@ -53,7 +61,21 @@ class TNativeSerializer: public ISerializer { static arrow::ipc::IpcOptions BuildDefaultOptions() { arrow::ipc::IpcWriteOptions options; options.use_threads = false; - options.codec = *arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME); + if (HasAppData()) { + if (AppData()->ColumnShardConfig.HasDefaultCompression()) { + arrow::Compression::type codec = CompressionFromProto(AppData()->ColumnShardConfig.GetDefaultCompression()).value(); + if (AppData()->ColumnShardConfig.HasDefaultCompressionLevel()) { + options.codec = NArrow::TStatusValidator::GetValid( + arrow::util::Codec::Create(codec, AppData()->ColumnShardConfig.GetDefaultCompressionLevel())); + } else { + options.codec = NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec)); + } + } else { + options.codec = GetDefaultCodec(); + } + } else { + options.codec = GetDefaultCodec(); + } return options; } @@ -83,7 +105,7 @@ class TNativeSerializer: public ISerializer { } static arrow::ipc::IpcOptions GetDefaultOptions() { - static arrow::ipc::IpcWriteOptions options = BuildDefaultOptions(); + arrow::ipc::IpcWriteOptions options = BuildDefaultOptions(); return options; } diff --git a/ydb/core/formats/arrow/serializer/utils.cpp b/ydb/core/formats/arrow/serializer/utils.cpp index 432086605caf..cda91a203b10 100644 --- a/ydb/core/formats/arrow/serializer/utils.cpp +++ b/ydb/core/formats/arrow/serializer/utils.cpp @@ -32,4 +32,4 @@ std::optional MaximumCompressionLevel(const arrow::Compression::type compre } return NArrow::TStatusValidator::GetValid(arrow::util::Codec::MaximumCompressionLevel(compression)); } -} +} // namespace NKikimr::NArrow diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 9be8a344b498..6ebcdb00ee4d 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -410,12 +410,19 @@ bool FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T& } familyDescription->SetColumnCodec(codec); } else { - code = Ydb::StatusIds::BAD_REQUEST; - error = TStringBuilder() << "Compression is not set for column family'" << family.Name << "'"; - return false; + if (family.Name != "default") { + code = Ydb::StatusIds::BAD_REQUEST; + error = TStringBuilder() << "Compression is not set for non `default` column family '" << family.Name << "'"; + return false; + } } if (family.CompressionLevel.Defined()) { + if (!family.Compression.Defined()) { + code = Ydb::StatusIds::BAD_REQUEST; + error = TStringBuilder() << "Compression is not set for column family '" << family.Name << "', but compression level is set"; + return false; + } familyDescription->SetColumnCodecLevel(family.CompressionLevel.GetRef()); } } diff --git a/ydb/core/kqp/ut/common/columnshard.cpp b/ydb/core/kqp/ut/common/columnshard.cpp index cb4e49c92138..78fc58b86c8c 100644 --- a/ydb/core/kqp/ut/common/columnshard.cpp +++ b/ydb/core/kqp/ut/common/columnshard.cpp @@ -154,7 +154,9 @@ namespace NKqp { TString TTestHelper::TCompression::BuildQuery() const { TStringBuilder str; - str << "COMPRESSION=\"" << NArrow::CompressionToString(CompressionType) << "\""; + if (CompressionType.has_value()) { + str << "COMPRESSION=\"" << NArrow::CompressionToString(CompressionType.value()) << "\""; + } if (CompressionLevel.has_value()) { str << ", COMPRESSION_LEVEL=" << CompressionLevel.value(); } @@ -167,9 +169,16 @@ namespace NKqp { << "` and in right value `" << rhs.GetSerializerClassName() << "`"; return false; } - if (CompressionType != rhs.GetCompressionType()) { - errorMessage = TStringBuilder() << "different compression type: in left value `" << NArrow::CompressionToString(CompressionType) - << "` and in right value `" << NArrow::CompressionToString(rhs.GetCompressionType()) << "`"; + if (CompressionType.has_value() && rhs.HasCompressionType() && CompressionType.value() != rhs.GetCompressionTypeUnsafe()) { + errorMessage = TStringBuilder() << "different compression type: in left value `" + << NArrow::CompressionToString(CompressionType.value()) << "` and in right value `" + << NArrow::CompressionToString(rhs.GetCompressionTypeUnsafe()) << "`"; + return false; + } else if (CompressionType.has_value() && !rhs.HasCompressionType()) { + errorMessage = TStringBuilder() << "compression type is set in left value, but not set in right value"; + return false; + } else if (!CompressionType.has_value() && rhs.HasCompressionType()) { + errorMessage = TStringBuilder() << "compression type is not set in left value, but set in right value"; return false; } if (CompressionLevel.has_value() && rhs.GetCompressionLevel().has_value() && @@ -193,12 +202,15 @@ namespace NKqp { } bool TTestHelper::TColumnFamily::DeserializeFromProto(const NKikimrSchemeOp::TFamilyDescription& family) { - if (!family.HasId() || !family.HasName() || !family.HasColumnCodec()) { + if (!family.HasId() || !family.HasName()) { return false; } Id = family.GetId(); FamilyName = family.GetName(); - Compression = TTestHelper::TCompression().SetCompressionType(family.GetColumnCodec()); + Compression = TTestHelper::TCompression(); + if (family.HasColumnCodec()) { + Compression.SetCompressionType(family.GetColumnCodec()); + } if (family.HasColumnCodecLevel()) { Compression.SetCompressionLevel(family.GetColumnCodecLevel()); } @@ -285,9 +297,11 @@ namespace NKqp { TString TTestHelper::TColumnTableBase::BuildAlterCompressionQuery(const TString& columnName, const TCompression& compression) const { auto str = TStringBuilder() << "ALTER OBJECT `" << Name << "` (TYPE " << GetObjectType() << ") SET"; str << " (ACTION=ALTER_COLUMN, NAME=" << columnName << ", `SERIALIZER.CLASS_NAME`=`" << compression.GetSerializerClassName() << "`,"; - auto codec = NArrow::CompressionFromProto(compression.GetCompressionType()); - Y_VERIFY(codec.has_value()); - str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(codec.value()) << "`"; + if (compression.HasCompressionType()) { + auto codec = NArrow::CompressionFromProto(compression.GetCompressionTypeUnsafe()); + Y_VERIFY(codec.has_value()); + str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(codec.value()) << "`"; + } if (compression.GetCompressionLevel().has_value()) { str << "`COMPRESSION.LEVEL`=" << compression.GetCompressionLevel().value(); } diff --git a/ydb/core/kqp/ut/common/columnshard.h b/ydb/core/kqp/ut/common/columnshard.h index 94726f9a8761..3d1413850a9c 100644 --- a/ydb/core/kqp/ut/common/columnshard.h +++ b/ydb/core/kqp/ut/common/columnshard.h @@ -20,7 +20,7 @@ class TTestHelper { public: class TCompression { YDB_ACCESSOR(TString, SerializerClassName, "ARROW_SERIALIZER"); - YDB_ACCESSOR_DEF(NKikimrSchemeOp::EColumnCodec, CompressionType); + YDB_OPT(NKikimrSchemeOp::EColumnCodec, CompressionType); YDB_ACCESSOR_DEF(std::optional, CompressionLevel); public: diff --git a/ydb/core/kqp/ut/olap/compression_ut.cpp b/ydb/core/kqp/ut/olap/compression_ut.cpp index b325324a1f3d..454a1ff00170 100644 --- a/ydb/core/kqp/ut/olap/compression_ut.cpp +++ b/ydb/core/kqp/ut/olap/compression_ut.cpp @@ -1,7 +1,47 @@ #include +#include +#include +#include + +#include namespace NKikimr::NKqp { +std::pair GetVolumes( + const TKikimrRunner& runner, const TString& tablePath, const std::vector columnNames) { + TString selectQuery = "SELECT * FROM `" + tablePath + "/.sys/primary_index_stats` WHERE Activity == 1"; + if (columnNames.size()) { + selectQuery += " AND EntityName IN ('" + JoinSeq("','", columnNames) + "')"; + } + auto tableClient = runner.GetTableClient(); + std::optional rawBytesPred; + std::optional bytesPred; + while (true) { + auto rows = ExecuteScanQuery(tableClient, selectQuery, false); + ui64 rawBytes = 0; + ui64 bytes = 0; + for (auto&& r : rows) { + for (auto&& c : r) { + if (c.first == "RawBytes") { + rawBytes += GetUint64(c.second); + } + if (c.first == "BlobRangeSize") { + bytes += GetUint64(c.second); + } + } + } + if (rawBytesPred && *rawBytesPred == rawBytes && bytesPred && *bytesPred == bytes) { + break; + } else { + rawBytesPred = rawBytes; + bytesPred = bytes; + Cerr << "Wait changes: " << bytes << "/" << rawBytes << Endl; + Sleep(TDuration::Seconds(5)); + } + } + return { rawBytesPred.value(), bytesPred.value() }; +} + Y_UNIT_TEST_SUITE(KqpOlapCompression) { Y_UNIT_TEST(DisabledAlterCompression) { TKikimrSettings settings = TKikimrSettings().SetWithSampleTables(false).SetEnableOlapCompression(false); @@ -63,5 +103,46 @@ Y_UNIT_TEST_SUITE(KqpOlapCompression) { testHelper.CreateTable(testTable); testHelper.SetCompression(testTable, "pk_int", compression, NYdb::EStatus::SCHEME_ERROR); } + + std::pair GetVolumesColumnWithCompression(const std::optional& CSConfig = {}) { + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); + NKikimrConfig::TAppConfig appConfig; + if (CSConfig.has_value()) { + *appConfig.MutableColumnShardConfig() = CSConfig.value(); + } + auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig); + TTestHelper testHelper(settings); + Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize(); + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + + TVector schema = { + TTestHelper::TColumnSchema().SetName("pk_int").SetType(NScheme::NTypeIds::Uint64).SetNullable(false) + }; + + TString tableName = "/Root/ColumnTableTest"; + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "pk_int" }).SetSharding({ "pk_int" }).SetSchema(schema); + testHelper.CreateTable(testTable); + + TVector dataBuilders; + dataBuilders.push_back( + NArrow::NConstruction::TSimpleArrayConstructor>::BuildNotNullable( + "pk_int", false)); + auto batch = NArrow::NConstruction::TRecordBatchConstructor(dataBuilders).BuildBatch(100000); + testHelper.BulkUpsert(testTable, batch); + csController->WaitCompactions(TDuration::Seconds(10)); + return GetVolumes(testHelper.GetKikimr(), tableName, { "pk_int" }); + } + + Y_UNIT_TEST(DefaultCompressionViaCSConfig) { + auto [rawBytesPK1, bytesPK1] = GetVolumesColumnWithCompression(); // Default compression LZ4 + NKikimrConfig::TColumnShardConfig csConfig = NKikimrConfig::TColumnShardConfig(); + csConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD); + csConfig.SetDefaultCompressionLevel(1); + auto [rawBytesPK2, bytesPK2] = GetVolumesColumnWithCompression(csConfig); + AFL_VERIFY(rawBytesPK2 == rawBytesPK1)("pk1", rawBytesPK1)("pk2", rawBytesPK2); + AFL_VERIFY(bytesPK2 < bytesPK1 / 3)("pk1", bytesPK1)("pk2", bytesPK2); + } } } diff --git a/ydb/core/kqp/ut/olap/sys_view_ut.cpp b/ydb/core/kqp/ut/olap/sys_view_ut.cpp index 9d12b54efab6..0d9108b5d241 100644 --- a/ydb/core/kqp/ut/olap/sys_view_ut.cpp +++ b/ydb/core/kqp/ut/olap/sys_view_ut.cpp @@ -191,8 +191,10 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { ui64 rawBytesPK1; ui64 bytesPK1; auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); - auto settings = TKikimrSettings() - .SetWithSampleTables(false); + NKikimrConfig::TAppConfig appConfig; + auto* CSConfig = appConfig.MutableColumnShardConfig(); + CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig); TKikimrRunner kikimr(settings); Tests::NCommon::TLoggerInit(kikimr).Initialize(); TTypedLocalHelper helper("", kikimr, "olapTable", "olapStore"); @@ -259,8 +261,10 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { ui64 rawBytes1; ui64 bytes1; auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); - auto settings = TKikimrSettings() - .SetWithSampleTables(false); + NKikimrConfig::TAppConfig appConfig; + auto* CSConfig = appConfig.MutableColumnShardConfig(); + CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig); TKikimrRunner kikimr(settings); Tests::NCommon::TLoggerInit(kikimr).Initialize(); TTypedLocalHelper helper("Utf8", kikimr); @@ -298,7 +302,10 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { ui64 bytes1; auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); csController->SetSmallSizeDetector(Max()); - auto settings = TKikimrSettings().SetWithSampleTables(false); + NKikimrConfig::TAppConfig appConfig; + auto* CSConfig = appConfig.MutableColumnShardConfig(); + CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig); TKikimrRunner kikimr(settings); Tests::NCommon::TLoggerInit(kikimr).Initialize(); TTypedLocalHelper helper("Utf8", kikimr); diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index cfcd0651a588..f9b7c8900c63 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -8384,10 +8384,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); - TTestHelper::TCompression plainCompression = - TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); - TTestHelper::TColumnFamily defaultFamily = - TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression); + TTestHelper::TColumnFamily defaultFamily = TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default"); UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), 1); TTestHelper::TColumnFamily defaultFromScheme; @@ -8396,18 +8393,6 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TString errorMessage; UNIT_ASSERT_C(defaultFromScheme.IsEqual(defaultFamily, errorMessage), errorMessage); } - - auto columns = schema.GetColumns(); - for (ui32 i = 0; i < schema.ColumnsSize(); i++) { - auto column = columns[i]; - UNIT_ASSERT(column.HasSerializer()); - UNIT_ASSERT_EQUAL_C( - column.GetColumnFamilyId(), 0, TStringBuilder() << "family for column " << column.GetName() << " is not default"); - TTestHelper::TCompression compression; - UNIT_ASSERT(compression.DeserializeFromProto(schema.GetColumns(i).GetSerializer())); - TString errorMessage; - UNIT_ASSERT_C(compression.IsEqual(defaultFamily.GetCompression(), errorMessage), errorMessage); - } } // Field `Data` is not used in ColumnFamily for ColumnTable @@ -9179,11 +9164,10 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); TString tableName = "/Root/TableWithFamily"; - TTestHelper::TCompression plainCompression = - TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + TTestHelper::TCompression lz4Compression = TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); TVector families = { - TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(plainCompression), + TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(lz4Compression), }; { @@ -9206,7 +9190,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { testHelper.CreateTable(testTable); } - families.push_back(TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression)); + families.push_back(TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default")); auto& runner = testHelper.GetKikimr(); auto runtime = runner.GetTestServer().GetRuntime(); TActorId sender = runtime->AllocateEdgeActor(); @@ -9551,6 +9535,70 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { testHelper.CreateTable(testTable, EStatus::GENERIC_ERROR); } + Y_UNIT_TEST(CreateTableWithDefaultFamilyWithoutSettings) { + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + TString tableName = "/Root/ColumnTableTest"; + auto session = testHelper.GetSession(); + auto createQuery = TStringBuilder() << R"(CREATE TABLE `)" << tableName << R"(` ( + Key Uint64 NOT NULL, + Value1 String, + Value2 Uint32, + PRIMARY KEY (Key), + FAMILY default ()) + WITH (STORE = COLUMN);)"; + auto result = session.ExecuteSchemeQuery(createQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto& runner = testHelper.GetKikimr(); + auto runtime = runner.GetTestServer().GetRuntime(); + TActorId sender = runtime->AllocateEdgeActor(); + + auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); + auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); + TTestHelper::TColumnFamily defaultFamily = TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default"); + + UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), 1); + TTestHelper::TColumnFamily defaultFromScheme; + UNIT_ASSERT(defaultFromScheme.DeserializeFromProto(schema.GetColumnFamilies(0))); + { + TString errorMessage; + UNIT_ASSERT_C(defaultFromScheme.IsEqual(defaultFamily, errorMessage), errorMessage); + } + } + + Y_UNIT_TEST(CreateTableWithFamilyWithOnlyCompressionLevel) { + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + TString tableName = "/Root/ColumnTableTest"; + auto session = testHelper.GetSession(); + auto createQuery = TStringBuilder() << R"(CREATE TABLE `)" << tableName << R"(` ( + Key Uint64 NOT NULL, + Value1 String, + Value2 Uint32, + PRIMARY KEY (Key), + FAMILY family1 ( + COMPRESSION_LEVEL = 2 + )) + WITH (STORE = COLUMN);)"; + auto result = session.ExecuteSchemeQuery(createQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + + Y_UNIT_TEST(CreateTableNonDefaultFamilyWithoutCompression) { + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + TString tableName = "/Root/ColumnTableTest"; + auto session = testHelper.GetSession(); + auto createQuery = TStringBuilder() << R"(CREATE TABLE `)" << tableName << R"(` ( + Key Uint64 NOT NULL, + Value1 String, + Value2 Uint32, + PRIMARY KEY (Key), + FAMILY family1 ( + )) + WITH (STORE = COLUMN);)"; + auto result = session.ExecuteSchemeQuery(createQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + Y_UNIT_TEST(DropColumnAndResetTtl) { TKikimrSettings runnerSettings; runnerSettings.WithSampleTables = false; @@ -9630,7 +9678,6 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { testHelper.RebootTablets("/Root/ColumnTableTest"); } - } Y_UNIT_TEST_SUITE(KqpOlapTypes) { diff --git a/ydb/core/tx/schemeshard/olap/column_families/schema.cpp b/ydb/core/tx/schemeshard/olap/column_families/schema.cpp index 7667e984d532..957788be869a 100644 --- a/ydb/core/tx/schemeshard/olap/column_families/schema.cpp +++ b/ydb/core/tx/schemeshard/olap/column_families/schema.cpp @@ -119,23 +119,24 @@ bool TOlapColumnFamiliesDescription::Validate(const NKikimrSchemeOp::TColumnTabl } lastColumnFamilyId = familyProto.GetId(); - if (!familyProto.HasColumnCodec()) { - errors.AddError("missing column codec for column family '" + columnFamilyName + "'"); - return false; - } - - auto serializerProto = ConvertFamilyDescriptionToProtoSerializer(familyProto); - if (serializerProto.IsFail()) { - errors.AddError(serializerProto.GetErrorMessage()); - return false; - } - NArrow::NSerialization::TSerializerContainer serializer; - if (!serializer.DeserializeFromProto(serializerProto.GetResult())) { - errors.AddError(TStringBuilder() << "can't deserialize column family `" << columnFamilyName << "` from proto "); - return false; - } - if (!family->GetSerializerContainer().IsEqualTo(serializer)) { - errors.AddError(TStringBuilder() << "compression from column family '" << columnFamilyName << "` is not matching schema preset"); + if (familyProto.HasColumnCodec() && family->GetSerializerContainer().HasObject()) { + auto serializerProto = ConvertFamilyDescriptionToProtoSerializer(familyProto); + if (serializerProto.IsFail()) { + errors.AddError(serializerProto.GetErrorMessage()); + return false; + } + NArrow::NSerialization::TSerializerContainer serializer; + if (!serializer.DeserializeFromProto(serializerProto.GetResult())) { + errors.AddError(TStringBuilder() << "can't deserialize column family `" << columnFamilyName << "` from proto "); + return false; + } + if (!family->GetSerializerContainer().IsEqualTo(serializer)) { + errors.AddError(TStringBuilder() << "compression from column family '" << columnFamilyName << "` is not matching schema preset"); + return false; + } + } else if ((!familyProto.HasColumnCodec() && family->GetSerializerContainer().HasObject()) || + (familyProto.HasColumnCodec() && !family->GetSerializerContainer().HasObject())) { + errors.AddError(TStringBuilder() << "compression is not matching schema preset in column family `" << columnFamilyName << "`"); return false; } } diff --git a/ydb/core/tx/schemeshard/olap/column_families/update.cpp b/ydb/core/tx/schemeshard/olap/column_families/update.cpp index d365688ca46e..2234c3e0f36c 100644 --- a/ydb/core/tx/schemeshard/olap/column_families/update.cpp +++ b/ydb/core/tx/schemeshard/olap/column_families/update.cpp @@ -44,6 +44,9 @@ NKikimr::TConclusion ConvertFamilyDes NKikimr::TConclusion ConvertSerializerContainerToFamilyDescription( const NArrow::NSerialization::TSerializerContainer& serializer) { + if (!serializer.HasObject()) { + return NKikimr::TConclusionStatus::Fail("convert TSerializerContainer to TFamilyDescription: container doesn't have object"); + } NKikimrSchemeOp::TFamilyDescription result; if (serializer->GetClassName().empty()) { return NKikimr::TConclusionStatus::Fail("convert TSerializerContainer to TFamilyDescription: field `ClassName` is empty"); @@ -85,55 +88,66 @@ bool TOlapColumnFamlilyAdd::ParseFromRequest(const NKikimrSchemeOp::TFamilyDescr } Name = columnFamily.GetName(); - auto serializer = ConvertFamilyDescriptionToProtoSerializer(columnFamily); - if (serializer.IsFail()) { - errors.AddError(serializer.GetErrorMessage()); - return false; - } - auto resultBuild = NArrow::NSerialization::TSerializerContainer::BuildFromProto(serializer.GetResult()); - if (resultBuild.IsFail()) { - errors.AddError(resultBuild.GetErrorMessage()); - return false; + if (columnFamily.HasColumnCodec()) { + auto serializer = ConvertFamilyDescriptionToProtoSerializer(columnFamily); + if (serializer.IsFail()) { + errors.AddError(serializer.GetErrorMessage()); + return false; + } + auto resultBuild = NArrow::NSerialization::TSerializerContainer::BuildFromProto(serializer.GetResult()); + if (resultBuild.IsFail()) { + errors.AddError(resultBuild.GetErrorMessage()); + return false; + } + SerializerContainer = resultBuild.GetResult(); } - SerializerContainer = resultBuild.GetResult(); return true; } void TOlapColumnFamlilyAdd::ParseFromLocalDB(const NKikimrSchemeOp::TFamilyDescription& columnFamily) { Name = columnFamily.GetName(); - auto serializer = ConvertFamilyDescriptionToProtoSerializer(columnFamily); - Y_VERIFY_S(serializer.IsSuccess(), serializer.GetErrorMessage()); - Y_VERIFY(SerializerContainer.DeserializeFromProto(serializer.GetResult())); + if (columnFamily.HasColumnCodec()) { + auto serializer = ConvertFamilyDescriptionToProtoSerializer(columnFamily); + Y_VERIFY_S(serializer.IsSuccess(), serializer.GetErrorMessage()); + SerializerContainer = NArrow::NSerialization::TSerializerContainer(); + Y_VERIFY(SerializerContainer.DeserializeFromProto(serializer.GetResult())); + } } void TOlapColumnFamlilyAdd::Serialize(NKikimrSchemeOp::TFamilyDescription& columnFamily) const { - auto result = ConvertSerializerContainerToFamilyDescription(SerializerContainer); - Y_VERIFY_S(result.IsSuccess(), result.GetErrorMessage()); columnFamily.SetName(Name); - columnFamily.SetColumnCodec(result->GetColumnCodec()); - if (result->HasColumnCodecLevel()) { - columnFamily.SetColumnCodecLevel(result->GetColumnCodecLevel()); + if (SerializerContainer.HasObject()) { + auto result = ConvertSerializerContainerToFamilyDescription(SerializerContainer); + Y_VERIFY_S(result.IsSuccess(), result.GetErrorMessage()); + columnFamily.SetColumnCodec(result->GetColumnCodec()); + if (result->HasColumnCodecLevel()) { + columnFamily.SetColumnCodecLevel(result->GetColumnCodecLevel()); + } } } bool TOlapColumnFamlilyAdd::ApplyDiff(const TOlapColumnFamlilyDiff& diffColumnFamily, IErrorCollector& errors) { Y_ABORT_UNLESS(GetName() == diffColumnFamily.GetName()); - auto newColumnFamily = ConvertSerializerContainerToFamilyDescription(SerializerContainer); - if (newColumnFamily.IsFail()) { - errors.AddError(newColumnFamily.GetErrorMessage()); - return false; + NKikimrSchemeOp::TFamilyDescription newColumnFamily; + if (SerializerContainer.HasObject()) { + auto resultConvert = ConvertSerializerContainerToFamilyDescription(SerializerContainer); + if (resultConvert.IsFail()) { + errors.AddError(resultConvert.GetErrorMessage()); + return false; + } + newColumnFamily = resultConvert.GetResult(); } - newColumnFamily->SetName(GetName()); + newColumnFamily.SetName(GetName()); auto codec = diffColumnFamily.GetCodec(); if (codec.has_value()) { - newColumnFamily->SetColumnCodec(codec.value()); - newColumnFamily->ClearColumnCodecLevel(); + newColumnFamily.SetColumnCodec(codec.value()); + newColumnFamily.ClearColumnCodecLevel(); } auto codecLevel = diffColumnFamily.GetCodecLevel(); if (codecLevel.has_value()) { - newColumnFamily->SetColumnCodecLevel(codecLevel.value()); + newColumnFamily.SetColumnCodecLevel(codecLevel.value()); } - auto serializer = ConvertFamilyDescriptionToProtoSerializer(newColumnFamily.GetResult()); + auto serializer = ConvertFamilyDescriptionToProtoSerializer(newColumnFamily); if (serializer.IsFail()) { errors.AddError(serializer.GetErrorMessage()); return false; diff --git a/ydb/core/tx/schemeshard/olap/columns/schema.cpp b/ydb/core/tx/schemeshard/olap/columns/schema.cpp index 1f03bc8860ea..a14f7ddf058d 100644 --- a/ydb/core/tx/schemeshard/olap/columns/schema.cpp +++ b/ydb/core/tx/schemeshard/olap/columns/schema.cpp @@ -56,7 +56,7 @@ bool TOlapColumnsDescription::ApplyUpdate( if (newColumn.GetKeyOrder()) { Y_ABORT_UNLESS(orderedKeyColumnIds.emplace(*newColumn.GetKeyOrder(), newColumn.GetId()).second); } - if (!newColumn.GetSerializer().has_value() && !columnFamilies.GetColumnFamilies().empty() && + if (!newColumn.GetSerializer().HasObject() && !columnFamilies.GetColumnFamilies().empty() && !newColumn.ApplySerializerFromColumnFamily(columnFamilies, errors)) { return false; } diff --git a/ydb/core/tx/schemeshard/olap/columns/update.cpp b/ydb/core/tx/schemeshard/olap/columns/update.cpp index 1a428f42e664..9d915ef78c4e 100644 --- a/ydb/core/tx/schemeshard/olap/columns/update.cpp +++ b/ydb/core/tx/schemeshard/olap/columns/update.cpp @@ -184,7 +184,7 @@ void TOlapColumnBase::Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnS columnSchema.SetColumnFamilyId(ColumnFamilyId.value()); } if (Serializer) { - Serializer->SerializeToProto(*columnSchema.MutableSerializer()); + Serializer.SerializeToProto(*columnSchema.MutableSerializer()); } if (AccessorConstructor) { *columnSchema.MutableDataAccessorConstructor() = AccessorConstructor.SerializeToProto(); diff --git a/ydb/core/tx/schemeshard/olap/columns/update.h b/ydb/core/tx/schemeshard/olap/columns/update.h index 4f87cf014d37..5670ac5f23e9 100644 --- a/ydb/core/tx/schemeshard/olap/columns/update.h +++ b/ydb/core/tx/schemeshard/olap/columns/update.h @@ -35,7 +35,7 @@ class TOlapColumnBase { YDB_READONLY_DEF(NScheme::TTypeInfo, Type); YDB_READONLY_DEF(TString, StorageId); YDB_FLAG_ACCESSOR(NotNull, false); - YDB_ACCESSOR_DEF(std::optional, Serializer); + YDB_ACCESSOR_DEF(NArrow::NSerialization::TSerializerContainer, Serializer); YDB_READONLY_DEF(std::optional, DictionaryEncoding); YDB_READONLY_DEF(NOlap::TColumnDefaultScalarValue, DefaultValue); YDB_READONLY_DEF(NArrow::NAccessor::TConstructorContainer, AccessorConstructor); diff --git a/ydb/core/tx/schemeshard/olap/operations/create_table.cpp b/ydb/core/tx/schemeshard/olap/operations/create_table.cpp index bc5ef25fc14f..c0e3eb38f47a 100644 --- a/ydb/core/tx/schemeshard/olap/operations/create_table.cpp +++ b/ydb/core/tx/schemeshard/olap/operations/create_table.cpp @@ -570,7 +570,6 @@ class TCreateColumnTable: public TSubOperation { auto defaultFamily = mutableSchema->AddColumnFamilies(); defaultFamily->SetName("default"); defaultFamily->SetId(0); - defaultFamily->SetColumnCodec(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); for (ui32 i = 0; i < schema.ColumnsSize(); i++) { if (!schema.GetColumns(i).HasColumnFamilyName() || !schema.GetColumns(i).HasColumnFamilyId()) {