Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,11 @@ bool PipelineTask::is_blockable() const {
_sink->is_blockable(_state);
}

void PipelineTask::_stop_accepting_submit() {
std::unique_lock<std::mutex> lock(_blockable_check_lock);
_accept_submit = false;
}

bool PipelineTask::_is_blocked() {
// `_dry_run = true` means we do not need data from source operator.
if (!_dry_run) {
Expand Down Expand Up @@ -754,6 +759,7 @@ Status PipelineTask::finalize() {
return Status::OK();
}
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(fragment->get_query_ctx()->query_mem_tracker());
_stop_accepting_submit();
RETURN_IF_ERROR(_state_transition(State::FINALIZED));
std::unique_lock<std::mutex> lc(_dependency_lock);
_sink_shared_state.reset();
Expand All @@ -767,6 +773,9 @@ Status PipelineTask::finalize() {
}

Status PipelineTask::close(Status exec_status, bool close_sink) {
if (close_sink) {
_stop_accepting_submit();
}
int64_t close_ns = 0;
Status s;
{
Expand Down
9 changes: 9 additions & 0 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <cstdint>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

Expand Down Expand Up @@ -185,6 +186,10 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
PipelineTask() : _index(0) {}

private:
friend class HybridTaskScheduler;

void _stop_accepting_submit();

// Whether this task is blocked before execution (FE 2-phase commit trigger, runtime filters)
bool _wait_to_start();
// Whether this task is blocked during execution (read dependency, write dependency)
Expand Down Expand Up @@ -268,6 +273,10 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {

Dependency* _memory_sufficient_dependency;
std::mutex _dependency_lock;
// Guards _accept_submit and keeps HybridTaskScheduler::submit() from reading _sink/_operators
// in is_blockable() while terminal close/finalize is closing the submit gate.
std::mutex _blockable_check_lock;
bool _accept_submit = true;

std::atomic<bool> _running {false};
std::atomic<bool> _eos {false};
Expand Down
10 changes: 9 additions & 1 deletion be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,15 @@ void TaskScheduler::stop() {
}

Status HybridTaskScheduler::submit(PipelineTaskSPtr task) {
if (task->is_blockable()) {
bool blockable = false;
{
std::unique_lock<std::mutex> blockable_check_lock(task->_blockable_check_lock);
if (!task->_accept_submit) {
return Status::OK();
}
blockable = task->is_blockable();
}
if (blockable) {
return _blocking_scheduler.submit(task);
} else {
return _simple_scheduler.submit(task);
Expand Down
105 changes: 105 additions & 0 deletions be/test/pipeline/pipeline_task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
#include <glog/logging.h>
#include <gtest/gtest.h>

#include <atomic>
#include <chrono>
#include <thread>

#include "common/config.h"
#include "common/status.h"
#include "dummy_task_queue.h"
Expand Down Expand Up @@ -95,6 +99,26 @@ class PipelineTaskTest : public testing::Test {
template class OperatorX<DummyOperatorLocalState>;
template class DataSinkOperatorX<DummySinkLocalState>;

class CountingBlockableSinkOperator final : public DataSinkOperatorX<DummySinkLocalState> {
public:
CountingBlockableSinkOperator(int op_id, int node_id, int dest_id,
std::atomic<int>* blockable_checks)
: DataSinkOperatorX<DummySinkLocalState>(op_id, node_id, dest_id),
_blockable_checks(blockable_checks) {}

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override {
return Status::OK();
}

bool is_blockable(RuntimeState* state) const override {
_blockable_checks->fetch_add(1, std::memory_order_relaxed);
return DataSinkOperatorX<DummySinkLocalState>::is_blockable(state);
}

private:
std::atomic<int>* _blockable_checks;
};

TEST_F(PipelineTaskTest, TEST_CONSTRUCTOR) {
auto num_instances = 1;
auto pip_id = 0;
Expand Down Expand Up @@ -520,6 +544,87 @@ TEST_F(PipelineTaskTest, TEST_STATE_TRANSITION) {
}
}

TEST_F(PipelineTaskTest, TEST_CLOSED_TASK_REJECTS_HYBRID_SUBMIT_BEFORE_FINALIZE) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
OperatorPtr source_op;
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());

int op_id = 1;
int node_id = 2;
int dest_id = 3;
std::atomic<int> blockable_checks = 0;
DataSinkOperatorPtr sink_op;
sink_op.reset(new CountingBlockableSinkOperator(op_id, node_id, dest_id, &blockable_checks));
EXPECT_TRUE(pip->set_sink(sink_op).ok());

auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;

std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_TRUE(task->close(Status::OK()).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::FINISHED);
EXPECT_NE(task->_sink, nullptr);
EXPECT_FALSE(task->_operators.empty());
EXPECT_FALSE(task->_accept_submit);

HybridTaskScheduler scheduler(1, 1, "test_hybrid_task_scheduler", nullptr);
EXPECT_TRUE(scheduler.submit(task).ok());
EXPECT_EQ(blockable_checks.load(std::memory_order_relaxed), 0);
scheduler.stop();
EXPECT_TRUE(task->finalize().ok());
}

TEST_F(PipelineTaskTest, TEST_FINALIZED_TASK_REJECTS_HYBRID_SUBMIT) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
OperatorPtr source_op;
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());

int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());

auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;

std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_TRUE(task->close(Status::OK()).ok());
EXPECT_TRUE(task->finalize().ok());
EXPECT_EQ(task->_sink, nullptr);

HybridTaskScheduler scheduler(1, 1, "test_hybrid_task_scheduler", nullptr);
EXPECT_TRUE(scheduler.submit(task).ok());
scheduler.stop();
}

TEST_F(PipelineTaskTest, TEST_SINK_FINISHED) {
auto num_instances = 1;
auto pip_id = 0;
Expand Down
Loading