diff --git a/be/src/exec/pipeline/pipeline_task.cpp b/be/src/exec/pipeline/pipeline_task.cpp index 19da74998720b8..e05e94e2d344c3 100644 --- a/be/src/exec/pipeline/pipeline_task.cpp +++ b/be/src/exec/pipeline/pipeline_task.cpp @@ -325,6 +325,11 @@ bool PipelineTask::is_blockable() const { _sink->is_blockable(_state); } +void PipelineTask::_stop_accepting_submit() { + std::unique_lock lock(_blockable_check_lock); + _accept_submit = false; +} + bool PipelineTask::_is_blocked() { // `_dry_run = true` means we do not need data from source operator. if (!_dry_run) { @@ -881,6 +886,7 @@ Status PipelineTask::finalize() { return Status::OK(); } SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(fragment->get_query_ctx()->query_mem_tracker()); + _stop_accepting_submit(); RETURN_IF_ERROR(_state_transition(State::FINALIZED)); std::unique_lock lc(_dependency_lock); _sink_shared_state.reset(); @@ -894,6 +900,9 @@ Status PipelineTask::finalize() { } Status PipelineTask::close(Status exec_status, bool close_sink) { + if (close_sink) { + _stop_accepting_submit(); + } int64_t close_ns = 0; Status s; { diff --git a/be/src/exec/pipeline/pipeline_task.h b/be/src/exec/pipeline/pipeline_task.h index c5404daaf02338..68d2195e559792 100644 --- a/be/src/exec/pipeline/pipeline_task.h +++ b/be/src/exec/pipeline/pipeline_task.h @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -183,6 +184,10 @@ class PipelineTask : public std::enable_shared_from_this { PipelineTask() : _index(0) {} private: + friend class HybridTaskScheduler; + + void _stop_accepting_submit(); + // Whether this task is blocked before execution (FE 2-phase commit trigger, runtime filters) bool _wait_to_start(); // Whether this task is blocked during execution (read dependency, write dependency) @@ -268,6 +273,10 @@ class PipelineTask : public std::enable_shared_from_this { Dependency* _memory_sufficient_dependency; std::mutex _dependency_lock; + // Guards _accept_submit and keeps HybridTaskScheduler::submit() from reading _sink/_operators + // in is_blockable() while terminal close/finalize is closing the submit gate. + std::mutex _blockable_check_lock; + bool _accept_submit = true; std::atomic _running {false}; std::atomic _eos {false}; diff --git a/be/src/exec/pipeline/task_scheduler.cpp b/be/src/exec/pipeline/task_scheduler.cpp index 20931c03aa05cd..fb6cf4e80f5076 100644 --- a/be/src/exec/pipeline/task_scheduler.cpp +++ b/be/src/exec/pipeline/task_scheduler.cpp @@ -189,7 +189,15 @@ void TaskScheduler::stop() { } Status HybridTaskScheduler::submit(PipelineTaskSPtr task) { - if (task->is_blockable()) { + bool blockable = false; + { + std::unique_lock blockable_check_lock(task->_blockable_check_lock); + if (!task->_accept_submit) { + return Status::OK(); + } + blockable = task->is_blockable(); + } + if (blockable) { return _blocking_scheduler.submit(task); } else { return _simple_scheduler.submit(task); diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp b/be/test/exec/pipeline/pipeline_task_test.cpp index 08d84f4f8e2e75..29555a8d2ae96c 100644 --- a/be/test/exec/pipeline/pipeline_task_test.cpp +++ b/be/test/exec/pipeline/pipeline_task_test.cpp @@ -18,6 +18,10 @@ #include #include +#include +#include +#include + #include "common/config.h" #include "common/status.h" #include "exec/operator/operator.h" @@ -26,6 +30,7 @@ #include "exec/pipeline/dummy_task_queue.h" #include "exec/pipeline/pipeline.h" #include "exec/pipeline/pipeline_fragment_context.h" +#include "exec/pipeline/task_scheduler.h" #include "exec/pipeline/thrift_builder.h" #include "exec/spill/spill_file.h" #include "runtime/exec_env.h" @@ -94,6 +99,24 @@ class PipelineTaskTest : public testing::Test { template class OperatorX; template class DataSinkOperatorX; +class CountingBlockableSinkOperator final : public DataSinkOperatorX { +public: + CountingBlockableSinkOperator(int op_id, int node_id, int dest_id, + std::atomic* blockable_checks) + : DataSinkOperatorX(op_id, node_id, dest_id), + _blockable_checks(blockable_checks) {} + + Status sink(RuntimeState* state, Block* in_block, bool eos) override { return Status::OK(); } + + bool is_blockable(RuntimeState* state) const override { + _blockable_checks->fetch_add(1, std::memory_order_relaxed); + return DataSinkOperatorX::is_blockable(state); + } + +private: + std::atomic* _blockable_checks; +}; + TEST_F(PipelineTaskTest, TEST_CONSTRUCTOR) { auto num_instances = 1; auto pip_id = 0; @@ -519,6 +542,87 @@ TEST_F(PipelineTaskTest, TEST_STATE_TRANSITION) { } } +TEST_F(PipelineTaskTest, TEST_CLOSED_TASK_REJECTS_HYBRID_SUBMIT_BEFORE_FINALIZE) { + auto num_instances = 1; + auto pip_id = 0; + auto task_id = 0; + auto pip = std::make_shared(pip_id, num_instances, num_instances); + OperatorPtr source_op; + source_op.reset(new DummyOperator()); + EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok()); + + int op_id = 1; + int node_id = 2; + int dest_id = 3; + std::atomic blockable_checks = 0; + DataSinkOperatorPtr sink_op; + sink_op.reset(new CountingBlockableSinkOperator(op_id, node_id, dest_id, &blockable_checks)); + EXPECT_TRUE(pip->set_sink(sink_op).ok()); + + auto profile = std::make_shared("Pipeline : " + std::to_string(pip_id)); + std::map, std::vector>>> + shared_state_map; + _runtime_state->resize_op_id_to_local_state(-1); + auto task = std::make_shared(pip, task_id, _runtime_state.get(), _context, + profile.get(), shared_state_map, task_id); + task->_exec_time_slice = 10'000'000'000ULL; + + std::vector scan_range; + int sender_id = 0; + TDataSink tsink; + EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); + EXPECT_TRUE(task->close(Status::OK()).ok()); + EXPECT_EQ(task->_exec_state, PipelineTask::State::FINISHED); + EXPECT_NE(task->_sink, nullptr); + EXPECT_FALSE(task->_operators.empty()); + EXPECT_FALSE(task->_accept_submit); + + HybridTaskScheduler scheduler(1, 1, "test_hybrid_task_scheduler", nullptr); + EXPECT_TRUE(scheduler.submit(task).ok()); + EXPECT_EQ(blockable_checks.load(std::memory_order_relaxed), 0); + scheduler.stop(); + EXPECT_TRUE(task->finalize().ok()); +} + +TEST_F(PipelineTaskTest, TEST_FINALIZED_TASK_REJECTS_HYBRID_SUBMIT) { + auto num_instances = 1; + auto pip_id = 0; + auto task_id = 0; + auto pip = std::make_shared(pip_id, num_instances, num_instances); + OperatorPtr source_op; + source_op.reset(new DummyOperator()); + EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok()); + + int op_id = 1; + int node_id = 2; + int dest_id = 3; + DataSinkOperatorPtr sink_op; + sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id)); + EXPECT_TRUE(pip->set_sink(sink_op).ok()); + + auto profile = std::make_shared("Pipeline : " + std::to_string(pip_id)); + std::map, std::vector>>> + shared_state_map; + _runtime_state->resize_op_id_to_local_state(-1); + auto task = std::make_shared(pip, task_id, _runtime_state.get(), _context, + profile.get(), shared_state_map, task_id); + task->_exec_time_slice = 10'000'000'000ULL; + + std::vector scan_range; + int sender_id = 0; + TDataSink tsink; + EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); + EXPECT_TRUE(task->close(Status::OK()).ok()); + EXPECT_TRUE(task->finalize().ok()); + EXPECT_EQ(task->_sink, nullptr); + + HybridTaskScheduler scheduler(1, 1, "test_hybrid_task_scheduler", nullptr); + EXPECT_TRUE(scheduler.submit(task).ok()); + scheduler.stop(); +} + TEST_F(PipelineTaskTest, TEST_SINK_FINISHED) { auto num_instances = 1; auto pip_id = 0;