Skip to content

Commit

Permalink
ExchangeSenderOp check writer status only if there is data to flush t…
Browse files Browse the repository at this point in the history
…o MPPTunnel (#9736)

ref #6233

Signed-off-by: xufei <[email protected]>
  • Loading branch information
windtalker authored Jan 2, 2025
1 parent 0fd3441 commit eb4d9ea
Show file tree
Hide file tree
Showing 28 changed files with 156 additions and 383 deletions.
2 changes: 0 additions & 2 deletions dbms/src/Common/GRPCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ class GRPCSendQueue
}

bool isWritable() const { return send_queue.isWritable(); }
void notifyNextPipelineWriter() { send_queue.notifyNextPipelineWriter(); }

void registerPipeReadTask(TaskPtr && task) { send_queue.registerPipeReadTask(std::move(task)); }
void registerPipeWriteTask(TaskPtr && task) { send_queue.registerPipeWriteTask(std::move(task)); }
Expand Down Expand Up @@ -300,7 +299,6 @@ class GRPCRecvQueue
}

bool isWritable() const { return recv_queue.isWritable(); }
void notifyNextPipelineWriter() { return recv_queue.notifyNextPipelineWriter(); }

void registerPipeReadTask(TaskPtr && task) { recv_queue.registerPipeReadTask(std::move(task)); }
void registerPipeWriteTask(TaskPtr && task) { recv_queue.registerPipeWriteTask(std::move(task)); }
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Common/LooseBoundedMPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,6 @@ class LooseBoundedMPMCQueue
return !isFullWithoutLock();
}

void notifyNextPipelineWriter() { pipe_writer_cv.notifyOne(); }

MPMCQueueStatus getStatus() const
{
std::lock_guard lock(mu);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/DataStreams/ExchangeSenderBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ Block ExchangeSenderBlockInputStream::readImpl()
{
total_rows += block.rows();
writer->write(block);
assert(!writer->hasPendingFlush());
}
else
{
writer->flush();
assert(!writer->hasPendingFlush());
}
return block;
}
Expand Down
42 changes: 15 additions & 27 deletions dbms/src/Flash/Coprocessor/DAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,48 +23,36 @@ namespace DB
{
class DAGContext;

enum class WriteResult
{
Done,
NeedWaitForPolling,
NeedWaitForNotify,
};

class DAGResponseWriter
{
public:
DAGResponseWriter(Int64 records_per_chunk_, DAGContext & dag_context_);
/// prepared with sample block
virtual void prepare(const Block &){};
void write(const Block & block)
{
if (!doWrite(block))
{
notifyNextPipelineWriter();
}
}
virtual WriteResult write(const Block & block) = 0;

// For async writer, `waitForWritable` need to be called before calling `write`.
// ```
// auto res = waitForWritable();
// switch (res) case...
// write(block);
// ```
virtual WaitResult waitForWritable() const { throw Exception("Unsupport"); }
virtual WaitResult waitForWritable() const { return WaitResult::Ready; }

/// flush cached blocks for batch writer
void flush()
{
if (!doFlush())
{
notifyNextPipelineWriter();
}
}
virtual WriteResult flush() = 0;

/// if hasPendingFlush is true, need to flush before write
// hasPendingFlush can be true only in pipeline mode
bool hasPendingFlush() const { return has_pending_flush; }

virtual ~DAGResponseWriter() = default;

protected:
// return true if write is actually write the data
virtual bool doWrite(const Block & block) = 0;
// return true if flush is actually flush data
virtual bool doFlush() = 0;
virtual void notifyNextPipelineWriter() = 0;

Int64 records_per_chunk;
DAGContext & dag_context;
bool has_pending_flush = false;
};

} // namespace DB
6 changes: 2 additions & 4 deletions dbms/src/Flash/Coprocessor/StreamWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ struct CopStreamWriter
if (!writer->Write(resp))
throw Exception("Failed to write resp");
}
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }
static void notifyNextPipelineWriter() {}
static WaitResult waitForWritable() { return WaitResult::Ready; }
};

struct BatchCopStreamWriter
Expand All @@ -83,8 +82,7 @@ struct BatchCopStreamWriter
if (!writer->Write(resp))
throw Exception("Failed to write resp");
}
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }
static void notifyNextPipelineWriter() {}
static WaitResult waitForWritable() { return WaitResult::Ready; }
};

using CopStreamWriterPtr = std::shared_ptr<CopStreamWriter>;
Expand Down
31 changes: 17 additions & 14 deletions dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,23 @@ StreamingDAGResponseWriter<StreamWriterPtr>::StreamingDAGResponseWriter(
}

template <class StreamWriterPtr>
bool StreamingDAGResponseWriter<StreamWriterPtr>::doFlush()
WriteResult StreamingDAGResponseWriter<StreamWriterPtr>::flush()
{
has_pending_flush = false;
if (rows_in_blocks > 0)
{
encodeThenWriteBlocks();
return true;
auto wait_res = waitForWritable();
if (wait_res == WaitResult::Ready)
{
encodeThenWriteBlocks();
return WriteResult::Done;
}
// set has_pending_flush to true since current flush is not done
has_pending_flush = true;
return wait_res == WaitResult::WaitForPolling ? WriteResult::NeedWaitForPolling
: WriteResult::NeedWaitForNotify;
}
return false;
return WriteResult::Done;
}

template <class StreamWriterPtr>
Expand All @@ -78,14 +87,9 @@ WaitResult StreamingDAGResponseWriter<StreamWriterPtr>::waitForWritable() const
}

template <class StreamWriterPtr>
void StreamingDAGResponseWriter<StreamWriterPtr>::notifyNextPipelineWriter()
{
return writer->notifyNextPipelineWriter();
}

template <class StreamWriterPtr>
bool StreamingDAGResponseWriter<StreamWriterPtr>::doWrite(const Block & block)
WriteResult StreamingDAGResponseWriter<StreamWriterPtr>::write(const Block & block)
{
assert(has_pending_flush == false);
RUNTIME_CHECK_MSG(
block.columns() == dag_context.result_field_types.size(),
"Output column size mismatch with field type size");
Expand All @@ -98,10 +102,9 @@ bool StreamingDAGResponseWriter<StreamWriterPtr>::doWrite(const Block & block)

if (static_cast<Int64>(rows_in_blocks) > batch_send_min_limit)
{
encodeThenWriteBlocks();
return true;
return flush();
}
return false;
return WriteResult::Done;
}

template <class StreamWriterPtr>
Expand Down
6 changes: 2 additions & 4 deletions dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ class StreamingDAGResponseWriter : public DAGResponseWriter
DAGContext & dag_context_);
WaitResult waitForWritable() const override;

protected:
bool doWrite(const Block & block) override;
bool doFlush() override;
void notifyNextPipelineWriter() override;
WriteResult write(const Block & block) override;
WriteResult flush() override;

private:
void encodeThenWriteBlocks();
Expand Down
9 changes: 5 additions & 4 deletions dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void UnaryDAGResponseWriter::appendWarningsToDAGResponse()
dag_response->set_warning_count(dag_context.getWarningCount());
}

bool UnaryDAGResponseWriter::doFlush()
WriteResult UnaryDAGResponseWriter::flush()
{
if (current_records_num > 0)
{
Expand All @@ -86,11 +86,12 @@ bool UnaryDAGResponseWriter::doFlush()
throw TiFlashException(
"DAG response is too big, please check config about region size or region merge scheduler",
Errors::Coprocessor::Internal);
return true;
return WriteResult::Done;
}

bool UnaryDAGResponseWriter::doWrite(const Block & block)
WriteResult UnaryDAGResponseWriter::write(const Block & block)
{
assert(has_pending_flush == false);
if (block.columns() != dag_context.result_field_types.size())
throw TiFlashException("Output column size mismatch with field type size", Errors::Coprocessor::Internal);
if (records_per_chunk == -1)
Expand All @@ -117,6 +118,6 @@ bool UnaryDAGResponseWriter::doWrite(const Block & block)
row_index = upper;
}
}
return true;
return WriteResult::Done;
}
} // namespace DB
7 changes: 2 additions & 5 deletions dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,8 @@ class UnaryDAGResponseWriter : public DAGResponseWriter

void encodeChunkToDAGResponse();
void appendWarningsToDAGResponse();

protected:
bool doWrite(const Block & block) override;
bool doFlush() override;
void notifyNextPipelineWriter() override{};
WriteResult write(const Block & block) override;
WriteResult flush() override;

private:
tipb::SelectResponse * dag_response;
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ struct MockStreamWriter
{}

void write(tipb::SelectResponse & response) { checker(response); }
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }
static void notifyNextPipelineWriter() {}
static WaitResult waitForWritable() { return WaitResult::Ready; }

private:
MockStreamWriterChecker checker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ struct MockWriter
queue->push(tracked_packet);
}
static uint16_t getPartitionNum() { return 1; }
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }
static void notifyNextPipelineWriter() {}
static WaitResult waitForWritable() { return WaitResult::Ready; }

std::vector<tipb::FieldType> result_field_types;

Expand Down
31 changes: 17 additions & 14 deletions dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,23 @@ BroadcastOrPassThroughWriter<ExchangeWriterPtr>::BroadcastOrPassThroughWriter(
}

template <class ExchangeWriterPtr>
bool BroadcastOrPassThroughWriter<ExchangeWriterPtr>::doFlush()
WriteResult BroadcastOrPassThroughWriter<ExchangeWriterPtr>::flush()
{
has_pending_flush = false;
if (rows_in_blocks > 0)
{
writeBlocks();
return true;
auto wait_res = waitForWritable();
if (wait_res == WaitResult::Ready)
{
writeBlocks();
return WriteResult::Done;
}
// set has_pending_flush to true since current flush is not done
has_pending_flush = true;
return wait_res == WaitResult::WaitForPolling ? WriteResult::NeedWaitForPolling
: WriteResult::NeedWaitForNotify;
}
return false;
return WriteResult::Done;
}

template <class ExchangeWriterPtr>
Expand All @@ -83,14 +92,9 @@ WaitResult BroadcastOrPassThroughWriter<ExchangeWriterPtr>::waitForWritable() co
}

template <class ExchangeWriterPtr>
void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::notifyNextPipelineWriter()
{
writer->notifyNextPipelineWriter();
}

template <class ExchangeWriterPtr>
bool BroadcastOrPassThroughWriter<ExchangeWriterPtr>::doWrite(const Block & block)
WriteResult BroadcastOrPassThroughWriter<ExchangeWriterPtr>::write(const Block & block)
{
assert(has_pending_flush == false);
RUNTIME_CHECK(!block.info.selective);
RUNTIME_CHECK_MSG(
block.columns() == dag_context.result_field_types.size(),
Expand All @@ -104,10 +108,9 @@ bool BroadcastOrPassThroughWriter<ExchangeWriterPtr>::doWrite(const Block & bloc

if (static_cast<Int64>(rows_in_blocks) >= batch_send_min_limit)
{
writeBlocks();
return true;
return flush();
}
return false;
return WriteResult::Done;
}

template <class ExchangeWriterPtr>
Expand Down
7 changes: 2 additions & 5 deletions dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,8 @@ class BroadcastOrPassThroughWriter : public DAGResponseWriter
tipb::CompressionMode compression_mode_,
tipb::ExchangeType exchange_type_);
WaitResult waitForWritable() const override;

protected:
bool doWrite(const Block & block) override;
bool doFlush() override;
void notifyNextPipelineWriter() override;
WriteResult write(const Block & block) override;
WriteResult flush() override;

private:
void writeBlocks();
Expand Down
31 changes: 17 additions & 14 deletions dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,23 @@ void FineGrainedShuffleWriter<ExchangeWriterPtr>::prepare(const Block & sample_b
}

template <class ExchangeWriterPtr>
bool FineGrainedShuffleWriter<ExchangeWriterPtr>::doFlush()
WriteResult FineGrainedShuffleWriter<ExchangeWriterPtr>::flush()
{
has_pending_flush = false;
if (rows_in_blocks > 0)
{
batchWriteFineGrainedShuffle();
return true;
auto wait_res = waitForWritable();
if (wait_res == WaitResult::Ready)
{
batchWriteFineGrainedShuffle();
return WriteResult::Done;
}
// set has_pending_flush to true since current flush is not done
has_pending_flush = true;
return wait_res == WaitResult::WaitForPolling ? WriteResult::NeedWaitForPolling
: WriteResult::NeedWaitForNotify;
}
return false;
}

template <class ExchangeWriterPtr>
void FineGrainedShuffleWriter<ExchangeWriterPtr>::notifyNextPipelineWriter()
{
writer->notifyNextPipelineWriter();
return WriteResult::Done;
}

template <class ExchangeWriterPtr>
Expand All @@ -113,8 +116,9 @@ WaitResult FineGrainedShuffleWriter<ExchangeWriterPtr>::waitForWritable() const
}

template <class ExchangeWriterPtr>
bool FineGrainedShuffleWriter<ExchangeWriterPtr>::doWrite(const Block & block)
WriteResult FineGrainedShuffleWriter<ExchangeWriterPtr>::write(const Block & block)
{
assert(has_pending_flush == false);
RUNTIME_CHECK_MSG(prepared, "FineGrainedShuffleWriter should be prepared before writing.");
RUNTIME_CHECK_MSG(
block.columns() == dag_context.result_field_types.size(),
Expand All @@ -135,10 +139,9 @@ bool FineGrainedShuffleWriter<ExchangeWriterPtr>::doWrite(const Block & block)
if (blocks.size() == fine_grained_shuffle_stream_count
|| static_cast<UInt64>(rows_in_blocks) >= batch_send_row_limit)
{
batchWriteFineGrainedShuffle();
return true;
return flush();
}
return false;
return WriteResult::Done;
}

template <class ExchangeWriterPtr>
Expand Down
Loading

0 comments on commit eb4d9ea

Please sign in to comment.