Skip to content

Commit

Permalink
Allow for flatmap passthrough in the FieldWriter (facebookincubator#86)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#86

As title, allows for ingestion of RowVector in the FlatMapFieldWriter. Takes each item in the row and writes its child vector based on the ranges provided.

Currently not handled:
- Encoded row vectors with flatmap schema
- What happens when there are subsequent writes that have a different set of keys from the original write

Reviewed By: helfman

Differential Revision: D61456412

fbshipit-source-id: 404aefc1ffc88cf7b5f287ac32cd0a7a6d4e4f4b
  • Loading branch information
Kunal Kataria authored and facebook-github-bot committed Nov 6, 2024
1 parent df2d275 commit 70c0144
Show file tree
Hide file tree
Showing 2 changed files with 340 additions and 9 deletions.
122 changes: 120 additions & 2 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,35 @@ class SlidingWindowMapFieldWriter : public FieldWriter {
}
};

class FlatMapPassthroughValueFieldWriter {
public:
FlatMapPassthroughValueFieldWriter(
FieldWriterContext& context,
const StreamDescriptorBuilder& inMapDescriptor,
std::unique_ptr<FieldWriter> valueField)
: valueField_{std::move(valueField)},
inMapStream_{context.createContentStreamData<bool>(inMapDescriptor)} {}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) {
auto& data = inMapStream_.mutableData();
data.resize(data.size() + ranges.size(), true);
valueField_->write(vector, ranges);
}

void reset() {
inMapStream_.reset();
valueField_->reset();
}

void close() {
valueField_->close();
}

private:
std::unique_ptr<FieldWriter> valueField_;
ContentStreamData<bool>& inMapStream_;
};

class FlatMapValueFieldWriter {
public:
FlatMapValueFieldWriter(
Expand Down Expand Up @@ -822,6 +851,81 @@ class FlatMapFieldWriter : public FieldWriter {

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
// Check if the vector received is already flattened
const auto isFlatMap = vector->type()->kind() == velox::TypeKind::ROW;
isFlatMap ? ingestFlattenedMap(vector, ranges) : ingestMap(vector, ranges);
}

FlatMapPassthroughValueFieldWriter& createPassthroughValueFieldWriter(
const std::string& key) {
auto fieldWriter = FieldWriter::create(context_, valueType_);
auto& inMapDescriptor =
typeBuilder_->asFlatMap().addChild(key, fieldWriter->typeBuilder());
if (context_.flatmapFieldAddedEventHandler) {
context_.flatmapFieldAddedEventHandler(
*typeBuilder_, key, *fieldWriter->typeBuilder());
}
auto it = currentPassthroughFields_
.insert(
{key,
std::make_unique<FlatMapPassthroughValueFieldWriter>(
context_, inMapDescriptor, std::move(fieldWriter))})
.first;
return *it->second;
}

FlatMapPassthroughValueFieldWriter& findPassthroughValueFieldWriter(
const std::string& key) {
auto existingPair = currentPassthroughFields_.find(key);
NIMBLE_ASSERT(
existingPair != currentPassthroughFields_.end(),
"Field writer must already exist in map");
return *existingPair->second;
}

void ingestFlattenedMap(
const velox::VectorPtr& vector,
const OrderedRanges& ranges) {
NIMBLE_ASSERT(
currentValueFields_.empty() && allValueFields_.empty(),
"Mixing map and flatmap vectors in the FlatMapFieldWriter is not supported");
const auto& flatMap = vector->as<velox::RowVector>();
NIMBLE_ASSERT(
flatMap,
"Unexpected vector type. Vector must be a decoded ROW vector.");
const auto size = ranges.size();
nullsStream_.ensureNullsCapacity(flatMap->mayHaveNulls(), size);
const auto& keys = flatMap->type()->asRow().names();
const auto& values = flatMap->children();

OrderedRanges childRanges;
iterateNonNullIndices<true>(
ranges, nullsStream_.mutableNonNulls(), Flat{vector}, [&](auto offset) {
childRanges.add(offset, 1);
});

// early bail out if no ranges at the top level row vector
if (childRanges.size() == 0) {
return;
}

// Only create keys on first call to write (with valid ranges).
// Subsequent calls must have the same set of keys,
// otherwise writer will throw.
bool populateMap = currentPassthroughFields_.empty();

for (int i = 0; i < keys.size(); ++i) {
const auto& key = keys[i];
auto& writer = populateMap ? createPassthroughValueFieldWriter(key)
: findPassthroughValueFieldWriter(key);
writer.write(values[i], childRanges);
}
}

void ingestMap(const velox::VectorPtr& vector, const OrderedRanges& ranges) {
NIMBLE_ASSERT(
currentPassthroughFields_.empty(),
"Mixing map and flatmap vectors in the FlatMapFieldWriter is not supported");
auto size = ranges.size();
const velox::vector_size_t* offsets;
const velox::vector_size_t* lengths;
Expand Down Expand Up @@ -912,20 +1016,28 @@ class FlatMapFieldWriter : public FieldWriter {
field.second->reset();
}

for (auto& field : currentPassthroughFields_) {
field.second->reset();
}

nullsStream_.reset();
nonNullCount_ = 0;
currentValueFields_.clear();
}

void close() override {
// Add dummy node so we can preserve schema of an empty flat map.
if (allValueFields_.empty()) {
// Add dummy node so we can preserve schema of an empty flat map
// when no fields are written
if (allValueFields_.empty() && currentPassthroughFields_.empty()) {
auto valueField = FieldWriter::create(context_, valueType_);
typeBuilder_->asFlatMap().addChild("", valueField->typeBuilder());
} else {
for (auto& pair : allValueFields_) {
pair.second->close();
}
for (auto& pair : currentPassthroughFields_) {
pair.second->close();
}
}
}

Expand Down Expand Up @@ -969,6 +1081,12 @@ class FlatMapFieldWriter : public FieldWriter {
NullsStreamData& nullsStream_;
// This map store the FlatMapValue fields used in current flush unit.
folly::F14FastMap<KeyType, FlatMapValueFieldWriter*> currentValueFields_;

// This map stores the FlatMapPassthrough fields.
folly::F14FastMap<
std::string,
std::unique_ptr<FlatMapPassthroughValueFieldWriter>>
currentPassthroughFields_;
const std::shared_ptr<const velox::dwio::common::TypeWithId>& valueType_;
uint64_t nonNullCount_ = 0;
// This map store all FlatMapValue fields encountered by the VeloxWriter
Expand Down
Loading

0 comments on commit 70c0144

Please sign in to comment.