Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Break PrestoSerializer.cpp into components #11922

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions velox/serializers/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,19 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
velox_add_library(velox_presto_serializer CompactRowSerializer.cpp
PrestoSerializer.cpp UnsafeRowSerializer.cpp)
velox_add_library(
velox_presto_serializer
CompactRowSerializer.cpp
PrestoSerializer.cpp
UnsafeRowSerializer.cpp
PrestoBatchVectorSerializer.cpp
PrestoHeader.cpp
PrestoIterativeVectorSerializer.cpp
PrestoSerializerDeserializationUtils.cpp
PrestoSerializerEstimationUtils.cpp
PrestoSerializerSerializationUtils.cpp
PrestoVectorLexer.cpp
VectorStream.cpp)

velox_link_libraries(velox_presto_serializer velox_vector velox_row_fast)

Expand Down
181 changes: 181 additions & 0 deletions velox/serializers/PrestoBatchVectorSerializer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/serializers/PrestoBatchVectorSerializer.h"

#include "velox/serializers/PrestoSerializerEstimationUtils.h"
#include "velox/serializers/PrestoSerializerSerializationUtils.h"
#include "velox/serializers/VectorStream.h"

namespace facebook::velox::serializer::presto::detail {
void PrestoBatchVectorSerializer::serialize(
const RowVectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
Scratch& scratch,
OutputStream* stream) {
const auto numRows = rangesTotalSize(ranges);
const auto rowType = vector->type();
const auto numChildren = vector->childrenSize();

StreamArena arena(pool_);
std::vector<VectorStream> streams;
streams.reserve(numChildren);
for (int i = 0; i < numChildren; i++) {
streams.emplace_back(
rowType->childAt(i),
std::nullopt,
vector->childAt(i),
&arena,
numRows,
opts_);

if (numRows > 0) {
serializeColumn(vector->childAt(i), ranges, &streams[i], scratch);
}
}

flushStreams(
streams, numRows, arena, *codec_, opts_.minCompressionRatio, stream);
}

void PrestoBatchVectorSerializer::estimateSerializedSizeImpl(
const VectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
vector_size_t** sizes,
Scratch& scratch) {
switch (vector->encoding()) {
case VectorEncoding::Simple::FLAT:
VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
estimateFlatSerializedSize,
vector->typeKind(),
vector.get(),
ranges,
sizes);
break;
case VectorEncoding::Simple::CONSTANT:
VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
estimateConstantSerializedSize,
vector->typeKind(),
vector,
ranges,
sizes,
scratch);
break;
case VectorEncoding::Simple::DICTIONARY:
VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
estimateDictionarySerializedSize,
vector->typeKind(),
vector,
ranges,
sizes,
scratch);
break;
case VectorEncoding::Simple::ROW: {
if (!vector->mayHaveNulls()) {
// Add the size of the offsets in the Row encoding.
for (int32_t i = 0; i < ranges.size(); ++i) {
*sizes[i] += ranges[i].size * sizeof(int32_t);
}

auto rowVector = vector->as<RowVector>();
auto& children = rowVector->children();
for (auto& child : children) {
if (child) {
estimateSerializedSizeImpl(child, ranges, sizes, scratch);
}
}

break;
}

std::vector<IndexRange> childRanges;
std::vector<vector_size_t*> childSizes;
for (int32_t i = 0; i < ranges.size(); ++i) {
// Add the size of the nulls bit mask.
*sizes[i] += bits::nbytes(ranges[i].size);

auto begin = ranges[i].begin;
auto end = begin + ranges[i].size;
for (auto offset = begin; offset < end; ++offset) {
// Add the size of the offset.
*sizes[i] += sizeof(int32_t);
if (!vector->isNullAt(offset)) {
childRanges.push_back(IndexRange{offset, 1});
childSizes.push_back(sizes[i]);
}
}
}

auto rowVector = vector->as<RowVector>();
auto& children = rowVector->children();
for (auto& child : children) {
if (child) {
estimateSerializedSizeImpl(
child,
folly::Range(childRanges.data(), childRanges.size()),
childSizes.data(),
scratch);
}
}

break;
}
case VectorEncoding::Simple::MAP: {
auto mapVector = vector->as<MapVector>();
std::vector<IndexRange> childRanges;
std::vector<vector_size_t*> childSizes;
expandRepeatedRanges(
mapVector,
mapVector->rawOffsets(),
mapVector->rawSizes(),
ranges,
sizes,
&childRanges,
&childSizes);
estimateSerializedSizeImpl(
mapVector->mapKeys(), childRanges, childSizes.data(), scratch);
estimateSerializedSizeImpl(
mapVector->mapValues(), childRanges, childSizes.data(), scratch);
break;
}
case VectorEncoding::Simple::ARRAY: {
auto arrayVector = vector->as<ArrayVector>();
std::vector<IndexRange> childRanges;
std::vector<vector_size_t*> childSizes;
expandRepeatedRanges(
arrayVector,
arrayVector->rawOffsets(),
arrayVector->rawSizes(),
ranges,
sizes,
&childRanges,
&childSizes);
estimateSerializedSizeImpl(
arrayVector->elements(), childRanges, childSizes.data(), scratch);
break;
}
case VectorEncoding::Simple::LAZY:
estimateSerializedSizeImpl(
vector->as<LazyVector>()->loadedVectorShared(),
ranges,
sizes,
scratch);
break;
default:
VELOX_UNSUPPORTED("Unsupported vector encoding {}", vector->encoding());
}
}
} // namespace facebook::velox::serializer::presto::detail
56 changes: 56 additions & 0 deletions velox/serializers/PrestoBatchVectorSerializer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "velox/serializers/PrestoSerializer.h"
#include "velox/vector/VectorStream.h"

namespace facebook::velox::serializer::presto::detail {
class PrestoBatchVectorSerializer : public BatchVectorSerializer {
public:
PrestoBatchVectorSerializer(
memory::MemoryPool* pool,
const PrestoVectorSerde::PrestoOptions& opts)
: pool_(pool),
codec_(common::compressionKindToCodec(opts.compressionKind)),
opts_(opts) {}

void serialize(
const RowVectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
Scratch& scratch,
OutputStream* stream) override;

void estimateSerializedSize(
VectorPtr vector,
const folly::Range<const IndexRange*>& ranges,
vector_size_t** sizes,
Scratch& scratch) override {
estimateSerializedSizeImpl(vector, ranges, sizes, scratch);
}

private:
void estimateSerializedSizeImpl(
const VectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
vector_size_t** sizes,
Scratch& scratch);

memory::MemoryPool* pool_;
const std::unique_ptr<folly::io::Codec> codec_;
PrestoVectorSerde::PrestoOptions opts_;
};
} // namespace facebook::velox::serializer::presto::detail
75 changes: 75 additions & 0 deletions velox/serializers/PrestoHeader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/serializers/PrestoHeader.h"

#include "velox/serializers/PrestoSerializerSerializationUtils.h"

namespace facebook::velox::serializer::presto::detail {
/* static */ Expected<PrestoHeader> PrestoHeader::read(
ByteInputStream* source) {
if (source->remainingSize() < kHeaderSize) {
return folly::makeUnexpected(Status::Invalid(
fmt::format("{} bytes for header", source->remainingSize())));
}
PrestoHeader header;
header.numRows = source->read<int32_t>();
header.pageCodecMarker = source->read<int8_t>();
header.uncompressedSize = source->read<int32_t>();
header.compressedSize = source->read<int32_t>();
header.checksum = source->read<int64_t>();

if (header.numRows < 0) {
return folly::makeUnexpected(
Status::Invalid(fmt::format("negative numRows: {}", header.numRows)));
}
if (header.uncompressedSize < 0) {
return folly::makeUnexpected(Status::Invalid(
fmt::format("negative uncompressedSize: {}", header.uncompressedSize)));
}
if (header.compressedSize < 0) {
return folly::makeUnexpected(Status::Invalid(
fmt::format("negative compressedSize: {}", header.compressedSize)));
}

return header;
}

/* static */ std::optional<PrestoHeader> PrestoHeader::read(
std::string_view* source) {
if (source->size() < kHeaderSize) {
return std::nullopt;
}

PrestoHeader header;
header.numRows = readInt<int32_t>(source);
header.pageCodecMarker = readInt<int8_t>(source);
header.uncompressedSize = readInt<int32_t>(source);
header.compressedSize = readInt<int32_t>(source);
header.checksum = readInt<int64_t>(source);

if (header.numRows < 0) {
return std::nullopt;
}
if (header.uncompressedSize < 0) {
return std::nullopt;
}
if (header.compressedSize < 0) {
return std::nullopt;
}

return header;
}
} // namespace facebook::velox::serializer::presto::detail
42 changes: 42 additions & 0 deletions velox/serializers/PrestoHeader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "velox/common/base/Status.h"
#include "velox/common/memory/ByteStream.h"

namespace facebook::velox::serializer::presto::detail {

struct PrestoHeader {
int32_t numRows;
int8_t pageCodecMarker;
int32_t uncompressedSize;
int32_t compressedSize;
int64_t checksum;

static Expected<PrestoHeader> read(ByteInputStream* source);

static std::optional<PrestoHeader> read(std::string_view* source);

template <typename T>
static T readInt(std::string_view* source) {
assert(source->size() >= sizeof(T));
auto value = folly::loadUnaligned<T>(source->data());
source->remove_prefix(sizeof(T));
return value;
}
};
} // namespace facebook::velox::serializer::presto::detail
Loading
Loading