From 1d8b4210ceaaa7ad2e2794aaf86b2e11da36d00c Mon Sep 17 00:00:00 2001 From: HanumathRao Date: Thu, 18 Apr 2019 15:44:30 -0700 Subject: [PATCH] DRILL-7193: Integration changes of the Distributed RM queue configuration with Simple Parallelizer. Changes to set the memory allocation per operator in query profile. Addressing an memory minimization logic was not considering non-buffered operators. Handling error cases when memory requirements for buffered or non-buffered cannot be reduced. --- .../client/src/protobuf/BitControl.pb.cc | 231 +++++++++++------- .../client/src/protobuf/BitControl.pb.h | 156 +++++++++--- .../client/src/protobuf/UserBitShared.pb.cc | 177 ++++++++------ .../client/src/protobuf/UserBitShared.pb.h | 34 +++ .../apache/drill/exec/ops/OpProfileDef.java | 4 +- .../drill/exec/ops/OperatorContextImpl.java | 2 +- .../apache/drill/exec/ops/OperatorStats.java | 10 +- .../exec/physical/base/AbstractGroupScan.java | 12 - .../exec/physical/impl/BaseRootExec.java | 2 +- .../DistributedQueueParallelizer.java | 116 ++++++--- .../drill/exec/planner/fragment/Fragment.java | 10 +- .../exec/planner/fragment/Materializer.java | 10 +- .../planner/fragment/MemoryCalculator.java | 24 +- .../planner/fragment/ZKQueueParallelizer.java | 1 - .../drill/exec/planner/sql/DirectPlan.java | 15 +- .../sql/handlers/ShowFilesHandler.java | 2 +- .../RMConsistentBlobStoreManager.java | 2 +- .../drill/exec/work/foreman/Foreman.java | 2 +- .../rm/DistributedResourceManager.java | 1 - .../drill/exec/memory/TestAllocators.java | 6 +- .../managed/TestExternalSortInternals.java | 2 +- .../exec/planner/rm/TestMemoryCalculator.java | 55 +++-- .../drill/exec/record/TestRecordIterator.java | 4 +- .../exec/store/dfs/TestDrillFileSystem.java | 2 +- .../apache/drill/test/OperatorFixture.java | 2 +- .../drill/exec/proto/SchemaUserBitShared.java | 7 + .../drill/exec/proto/UserBitShared.java | 197 +++++++++++---- .../exec/proto/beans/OperatorProfile.java | 22 ++ .../src/main/protobuf/UserBitShared.proto | 1 + 29 files changed, 756 insertions(+), 353 deletions(-) diff --git a/contrib/native/client/src/protobuf/BitControl.pb.cc b/contrib/native/client/src/protobuf/BitControl.pb.cc index 3bf9db5b560..e31c7265a31 100644 --- a/contrib/native/client/src/protobuf/BitControl.pb.cc +++ b/contrib/native/client/src/protobuf/BitControl.pb.cc @@ -326,21 +326,23 @@ const ::google::protobuf::uint32 TableStruct::offsets[] GOOGLE_PROTOBUF_ATTRIBUT GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::bit::control::PlanFragment, options_json_), GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::bit::control::PlanFragment, context_), GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::bit::control::PlanFragment, collector_), - 2, - 7, + GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::bit::control::PlanFragment, endpointuuid_), + 3, 8, 9, 10, - 0, 11, - 3, - 4, + 0, 12, - 13, + 4, 5, - 1, + 13, + 14, 6, + 1, + 7, ~0u, + 2, GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::bit::control::Collector, _has_bits_), GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::bit::control::Collector, _internal_metadata_), ~0u, // no _extensions_ @@ -394,11 +396,11 @@ static const ::google::protobuf::internal::MigrationSchema schemas[] GOOGLE_PROT { 20, 27, sizeof(::exec::bit::control::FragmentStatus)}, { 29, 35, sizeof(::exec::bit::control::InitializeFragments)}, { 36, 43, sizeof(::exec::bit::control::CustomMessage)}, - { 45, 65, sizeof(::exec::bit::control::PlanFragment)}, - { 80, 89, sizeof(::exec::bit::control::Collector)}, - { 93, 102, sizeof(::exec::bit::control::QueryContextInformation)}, - { 106, 114, sizeof(::exec::bit::control::WorkQueueStatus)}, - { 117, 124, sizeof(::exec::bit::control::FinishedReceiver)}, + { 45, 66, sizeof(::exec::bit::control::PlanFragment)}, + { 82, 91, sizeof(::exec::bit::control::Collector)}, + { 95, 104, sizeof(::exec::bit::control::QueryContextInformation)}, + { 108, 116, sizeof(::exec::bit::control::WorkQueueStatus)}, + { 119, 126, sizeof(::exec::bit::control::FinishedReceiver)}, }; static ::google::protobuf::Message const * const file_default_instances[] = { @@ -449,7 +451,7 @@ void AddDescriptorsImpl() { "c.bit.FragmentHandle\"G\n\023InitializeFragme" "nts\0220\n\010fragment\030\001 \003(\0132\036.exec.bit.control" ".PlanFragment\".\n\rCustomMessage\022\014\n\004type\030\001" - " \001(\005\022\017\n\007message\030\002 \001(\014\"\374\003\n\014PlanFragment\022(" + " \001(\005\022\017\n\007message\030\002 \001(\014\"\222\004\n\014PlanFragment\022(" "\n\006handle\030\001 \001(\0132\030.exec.bit.FragmentHandle" "\022\024\n\014network_cost\030\004 \001(\002\022\020\n\010cpu_cost\030\005 \001(\002" "\022\021\n\tdisk_cost\030\006 \001(\002\022\023\n\013memory_cost\030\007 \001(\002" @@ -462,32 +464,33 @@ void AddDescriptorsImpl() { "ls\022\024\n\014options_json\030\017 \001(\t\022:\n\007context\030\020 \001(" "\0132).exec.bit.control.QueryContextInforma" "tion\022.\n\tcollector\030\021 \003(\0132\033.exec.bit.contr" - "ol.Collector\"\210\001\n\tCollector\022\"\n\032opposite_m" - "ajor_fragment_id\030\001 \001(\005\022#\n\027incoming_minor" - "_fragment\030\002 \003(\005B\002\020\001\022\035\n\025supports_out_of_o" - "rder\030\003 \001(\010\022\023\n\013is_spooling\030\004 \001(\010\"w\n\027Query" - "ContextInformation\022\030\n\020query_start_time\030\001" - " \001(\003\022\021\n\ttime_zone\030\002 \001(\005\022\033\n\023default_schem" - "a_name\030\003 \001(\t\022\022\n\nsession_id\030\004 \001(\t\"f\n\017Work" - "QueueStatus\022(\n\010endpoint\030\001 \001(\0132\026.exec.Dri" - "llbitEndpoint\022\024\n\014queue_length\030\002 \001(\005\022\023\n\013r" - "eport_time\030\003 \001(\003\"h\n\020FinishedReceiver\022*\n\010" - "receiver\030\001 \001(\0132\030.exec.bit.FragmentHandle" - "\022(\n\006sender\030\002 \001(\0132\030.exec.bit.FragmentHand" - "le*\206\003\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013" - "\n\007GOODBYE\020\002\022\034\n\030REQ_INITIALIZE_FRAGMENTS\020" - "\003\022\027\n\023REQ_CANCEL_FRAGMENT\020\006\022\031\n\025REQ_RECEIV" - "ER_FINISHED\020\007\022\027\n\023REQ_FRAGMENT_STATUS\020\010\022\022" - "\n\016REQ_BIT_STATUS\020\t\022\024\n\020REQ_QUERY_STATUS\020\n" - "\022\024\n\020REQ_QUERY_CANCEL\020\017\022\030\n\024REQ_UNPAUSE_FR" - "AGMENT\020\020\022\016\n\nREQ_CUSTOM\020\021\022\030\n\024RESP_FRAGMEN" - "T_HANDLE\020\013\022\030\n\024RESP_FRAGMENT_STATUS\020\014\022\023\n\017" - "RESP_BIT_STATUS\020\r\022\025\n\021RESP_QUERY_STATUS\020\016" - "\022\017\n\013RESP_CUSTOM\020\022\022\020\n\014SASL_MESSAGE\020\023B+\n\033o" - "rg.apache.drill.exec.protoB\nBitControlH\001" + "ol.Collector\022\024\n\014endpointUUID\030\022 \001(\t\"\210\001\n\tC" + "ollector\022\"\n\032opposite_major_fragment_id\030\001" + " \001(\005\022#\n\027incoming_minor_fragment\030\002 \003(\005B\002\020" + "\001\022\035\n\025supports_out_of_order\030\003 \001(\010\022\023\n\013is_s" + "pooling\030\004 \001(\010\"w\n\027QueryContextInformation" + "\022\030\n\020query_start_time\030\001 \001(\003\022\021\n\ttime_zone\030" + "\002 \001(\005\022\033\n\023default_schema_name\030\003 \001(\t\022\022\n\nse" + "ssion_id\030\004 \001(\t\"f\n\017WorkQueueStatus\022(\n\010end" + "point\030\001 \001(\0132\026.exec.DrillbitEndpoint\022\024\n\014q" + "ueue_length\030\002 \001(\005\022\023\n\013report_time\030\003 \001(\003\"h" + "\n\020FinishedReceiver\022*\n\010receiver\030\001 \001(\0132\030.e" + "xec.bit.FragmentHandle\022(\n\006sender\030\002 \001(\0132\030" + ".exec.bit.FragmentHandle*\206\003\n\007RpcType\022\r\n\t" + "HANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\034\n\030REQ" + "_INITIALIZE_FRAGMENTS\020\003\022\027\n\023REQ_CANCEL_FR" + "AGMENT\020\006\022\031\n\025REQ_RECEIVER_FINISHED\020\007\022\027\n\023R" + "EQ_FRAGMENT_STATUS\020\010\022\022\n\016REQ_BIT_STATUS\020\t" + "\022\024\n\020REQ_QUERY_STATUS\020\n\022\024\n\020REQ_QUERY_CANC" + "EL\020\017\022\030\n\024REQ_UNPAUSE_FRAGMENT\020\020\022\016\n\nREQ_CU" + "STOM\020\021\022\030\n\024RESP_FRAGMENT_HANDLE\020\013\022\030\n\024RESP" + "_FRAGMENT_STATUS\020\014\022\023\n\017RESP_BIT_STATUS\020\r\022" + "\025\n\021RESP_QUERY_STATUS\020\016\022\017\n\013RESP_CUSTOM\020\022\022" + "\020\n\014SASL_MESSAGE\020\023B+\n\033org.apache.drill.ex" + "ec.protoB\nBitControlH\001" }; ::google::protobuf::DescriptorPool::InternalAddGeneratedFile( - descriptor, 2000); + descriptor, 2022); ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile( "BitControl.proto", &protobuf_RegisterTypes); ::protobuf_ExecutionProtos_2eproto::AddDescriptors(); @@ -2028,6 +2031,7 @@ const int PlanFragment::kCredentialsFieldNumber; const int PlanFragment::kOptionsJsonFieldNumber; const int PlanFragment::kContextFieldNumber; const int PlanFragment::kCollectorFieldNumber; +const int PlanFragment::kEndpointUUIDFieldNumber; #endif // !defined(_MSC_VER) || _MSC_VER >= 1900 PlanFragment::PlanFragment() @@ -2051,6 +2055,10 @@ PlanFragment::PlanFragment(const PlanFragment& from) if (from.has_options_json()) { options_json_.AssignWithDefault(&::google::protobuf::internal::GetEmptyStringAlreadyInited(), from.options_json_); } + endpointuuid_.UnsafeSetDefault(&::google::protobuf::internal::GetEmptyStringAlreadyInited()); + if (from.has_endpointuuid()) { + endpointuuid_.AssignWithDefault(&::google::protobuf::internal::GetEmptyStringAlreadyInited(), from.endpointuuid_); + } if (from.has_handle()) { handle_ = new ::exec::bit::FragmentHandle(*from.handle_); } else { @@ -2085,6 +2093,7 @@ PlanFragment::PlanFragment(const PlanFragment& from) void PlanFragment::SharedCtor() { fragment_json_.UnsafeSetDefault(&::google::protobuf::internal::GetEmptyStringAlreadyInited()); options_json_.UnsafeSetDefault(&::google::protobuf::internal::GetEmptyStringAlreadyInited()); + endpointuuid_.UnsafeSetDefault(&::google::protobuf::internal::GetEmptyStringAlreadyInited()); ::memset(&handle_, 0, static_cast( reinterpret_cast(&leaf_fragment_) - reinterpret_cast(&handle_)) + sizeof(leaf_fragment_)); @@ -2100,6 +2109,7 @@ PlanFragment::~PlanFragment() { void PlanFragment::SharedDtor() { fragment_json_.DestroyNoArena(&::google::protobuf::internal::GetEmptyStringAlreadyInited()); options_json_.DestroyNoArena(&::google::protobuf::internal::GetEmptyStringAlreadyInited()); + endpointuuid_.DestroyNoArena(&::google::protobuf::internal::GetEmptyStringAlreadyInited()); if (this != internal_default_instance()) delete handle_; if (this != internal_default_instance()) delete assignment_; if (this != internal_default_instance()) delete foreman_; @@ -2129,7 +2139,7 @@ void PlanFragment::Clear() { collector_.Clear(); cached_has_bits = _has_bits_[0]; - if (cached_has_bits & 127u) { + if (cached_has_bits & 255u) { if (cached_has_bits & 0x00000001u) { fragment_json_.ClearNonDefaultToEmptyNoArena(); } @@ -2137,31 +2147,33 @@ void PlanFragment::Clear() { options_json_.ClearNonDefaultToEmptyNoArena(); } if (cached_has_bits & 0x00000004u) { + endpointuuid_.ClearNonDefaultToEmptyNoArena(); + } + if (cached_has_bits & 0x00000008u) { GOOGLE_DCHECK(handle_ != NULL); handle_->Clear(); } - if (cached_has_bits & 0x00000008u) { + if (cached_has_bits & 0x00000010u) { GOOGLE_DCHECK(assignment_ != NULL); assignment_->Clear(); } - if (cached_has_bits & 0x00000010u) { + if (cached_has_bits & 0x00000020u) { GOOGLE_DCHECK(foreman_ != NULL); foreman_->Clear(); } - if (cached_has_bits & 0x00000020u) { + if (cached_has_bits & 0x00000040u) { GOOGLE_DCHECK(credentials_ != NULL); credentials_->Clear(); } - if (cached_has_bits & 0x00000040u) { + if (cached_has_bits & 0x00000080u) { GOOGLE_DCHECK(context_ != NULL); context_->Clear(); } } - network_cost_ = 0; - if (cached_has_bits & 16128u) { - ::memset(&cpu_cost_, 0, static_cast( + if (cached_has_bits & 32512u) { + ::memset(&network_cost_, 0, static_cast( reinterpret_cast(&leaf_fragment_) - - reinterpret_cast(&cpu_cost_)) + sizeof(leaf_fragment_)); + reinterpret_cast(&network_cost_)) + sizeof(leaf_fragment_)); mem_initial_ = GOOGLE_LONGLONG(20000000); mem_max_ = GOOGLE_LONGLONG(2000000000); } @@ -2381,6 +2393,22 @@ bool PlanFragment::MergePartialFromCodedStream( break; } + // optional string endpointUUID = 18; + case 18: { + if (static_cast< ::google::protobuf::uint8>(tag) == + static_cast< ::google::protobuf::uint8>(146u /* 146 & 0xFF */)) { + DO_(::google::protobuf::internal::WireFormatLite::ReadString( + input, this->mutable_endpointuuid())); + ::google::protobuf::internal::WireFormat::VerifyUTF8StringNamedField( + this->endpointuuid().data(), static_cast(this->endpointuuid().length()), + ::google::protobuf::internal::WireFormat::PARSE, + "exec.bit.control.PlanFragment.endpointUUID"); + } else { + goto handle_unusual; + } + break; + } + default: { handle_unusual: if (tag == 0) { @@ -2409,28 +2437,28 @@ void PlanFragment::SerializeWithCachedSizes( cached_has_bits = _has_bits_[0]; // optional .exec.bit.FragmentHandle handle = 1; - if (cached_has_bits & 0x00000004u) { + if (cached_has_bits & 0x00000008u) { ::google::protobuf::internal::WireFormatLite::WriteMessageMaybeToArray( 1, this->_internal_handle(), output); } // optional float network_cost = 4; - if (cached_has_bits & 0x00000080u) { + if (cached_has_bits & 0x00000100u) { ::google::protobuf::internal::WireFormatLite::WriteFloat(4, this->network_cost(), output); } // optional float cpu_cost = 5; - if (cached_has_bits & 0x00000100u) { + if (cached_has_bits & 0x00000200u) { ::google::protobuf::internal::WireFormatLite::WriteFloat(5, this->cpu_cost(), output); } // optional float disk_cost = 6; - if (cached_has_bits & 0x00000200u) { + if (cached_has_bits & 0x00000400u) { ::google::protobuf::internal::WireFormatLite::WriteFloat(6, this->disk_cost(), output); } // optional float memory_cost = 7; - if (cached_has_bits & 0x00000400u) { + if (cached_has_bits & 0x00000800u) { ::google::protobuf::internal::WireFormatLite::WriteFloat(7, this->memory_cost(), output); } @@ -2445,34 +2473,34 @@ void PlanFragment::SerializeWithCachedSizes( } // optional bool leaf_fragment = 9; - if (cached_has_bits & 0x00000800u) { + if (cached_has_bits & 0x00001000u) { ::google::protobuf::internal::WireFormatLite::WriteBool(9, this->leaf_fragment(), output); } // optional .exec.DrillbitEndpoint assignment = 10; - if (cached_has_bits & 0x00000008u) { + if (cached_has_bits & 0x00000010u) { ::google::protobuf::internal::WireFormatLite::WriteMessageMaybeToArray( 10, this->_internal_assignment(), output); } // optional .exec.DrillbitEndpoint foreman = 11; - if (cached_has_bits & 0x00000010u) { + if (cached_has_bits & 0x00000020u) { ::google::protobuf::internal::WireFormatLite::WriteMessageMaybeToArray( 11, this->_internal_foreman(), output); } // optional int64 mem_initial = 12 [default = 20000000]; - if (cached_has_bits & 0x00001000u) { + if (cached_has_bits & 0x00002000u) { ::google::protobuf::internal::WireFormatLite::WriteInt64(12, this->mem_initial(), output); } // optional int64 mem_max = 13 [default = 2000000000]; - if (cached_has_bits & 0x00002000u) { + if (cached_has_bits & 0x00004000u) { ::google::protobuf::internal::WireFormatLite::WriteInt64(13, this->mem_max(), output); } // optional .exec.shared.UserCredentials credentials = 14; - if (cached_has_bits & 0x00000020u) { + if (cached_has_bits & 0x00000040u) { ::google::protobuf::internal::WireFormatLite::WriteMessageMaybeToArray( 14, this->_internal_credentials(), output); } @@ -2488,7 +2516,7 @@ void PlanFragment::SerializeWithCachedSizes( } // optional .exec.bit.control.QueryContextInformation context = 16; - if (cached_has_bits & 0x00000040u) { + if (cached_has_bits & 0x00000080u) { ::google::protobuf::internal::WireFormatLite::WriteMessageMaybeToArray( 16, this->_internal_context(), output); } @@ -2502,6 +2530,16 @@ void PlanFragment::SerializeWithCachedSizes( output); } + // optional string endpointUUID = 18; + if (cached_has_bits & 0x00000004u) { + ::google::protobuf::internal::WireFormat::VerifyUTF8StringNamedField( + this->endpointuuid().data(), static_cast(this->endpointuuid().length()), + ::google::protobuf::internal::WireFormat::SERIALIZE, + "exec.bit.control.PlanFragment.endpointUUID"); + ::google::protobuf::internal::WireFormatLite::WriteStringMaybeAliased( + 18, this->endpointuuid(), output); + } + if (_internal_metadata_.have_unknown_fields()) { ::google::protobuf::internal::WireFormat::SerializeUnknownFields( _internal_metadata_.unknown_fields(), output); @@ -2518,29 +2556,29 @@ ::google::protobuf::uint8* PlanFragment::InternalSerializeWithCachedSizesToArray cached_has_bits = _has_bits_[0]; // optional .exec.bit.FragmentHandle handle = 1; - if (cached_has_bits & 0x00000004u) { + if (cached_has_bits & 0x00000008u) { target = ::google::protobuf::internal::WireFormatLite:: InternalWriteMessageToArray( 1, this->_internal_handle(), deterministic, target); } // optional float network_cost = 4; - if (cached_has_bits & 0x00000080u) { + if (cached_has_bits & 0x00000100u) { target = ::google::protobuf::internal::WireFormatLite::WriteFloatToArray(4, this->network_cost(), target); } // optional float cpu_cost = 5; - if (cached_has_bits & 0x00000100u) { + if (cached_has_bits & 0x00000200u) { target = ::google::protobuf::internal::WireFormatLite::WriteFloatToArray(5, this->cpu_cost(), target); } // optional float disk_cost = 6; - if (cached_has_bits & 0x00000200u) { + if (cached_has_bits & 0x00000400u) { target = ::google::protobuf::internal::WireFormatLite::WriteFloatToArray(6, this->disk_cost(), target); } // optional float memory_cost = 7; - if (cached_has_bits & 0x00000400u) { + if (cached_has_bits & 0x00000800u) { target = ::google::protobuf::internal::WireFormatLite::WriteFloatToArray(7, this->memory_cost(), target); } @@ -2556,36 +2594,36 @@ ::google::protobuf::uint8* PlanFragment::InternalSerializeWithCachedSizesToArray } // optional bool leaf_fragment = 9; - if (cached_has_bits & 0x00000800u) { + if (cached_has_bits & 0x00001000u) { target = ::google::protobuf::internal::WireFormatLite::WriteBoolToArray(9, this->leaf_fragment(), target); } // optional .exec.DrillbitEndpoint assignment = 10; - if (cached_has_bits & 0x00000008u) { + if (cached_has_bits & 0x00000010u) { target = ::google::protobuf::internal::WireFormatLite:: InternalWriteMessageToArray( 10, this->_internal_assignment(), deterministic, target); } // optional .exec.DrillbitEndpoint foreman = 11; - if (cached_has_bits & 0x00000010u) { + if (cached_has_bits & 0x00000020u) { target = ::google::protobuf::internal::WireFormatLite:: InternalWriteMessageToArray( 11, this->_internal_foreman(), deterministic, target); } // optional int64 mem_initial = 12 [default = 20000000]; - if (cached_has_bits & 0x00001000u) { + if (cached_has_bits & 0x00002000u) { target = ::google::protobuf::internal::WireFormatLite::WriteInt64ToArray(12, this->mem_initial(), target); } // optional int64 mem_max = 13 [default = 2000000000]; - if (cached_has_bits & 0x00002000u) { + if (cached_has_bits & 0x00004000u) { target = ::google::protobuf::internal::WireFormatLite::WriteInt64ToArray(13, this->mem_max(), target); } // optional .exec.shared.UserCredentials credentials = 14; - if (cached_has_bits & 0x00000020u) { + if (cached_has_bits & 0x00000040u) { target = ::google::protobuf::internal::WireFormatLite:: InternalWriteMessageToArray( 14, this->_internal_credentials(), deterministic, target); @@ -2603,7 +2641,7 @@ ::google::protobuf::uint8* PlanFragment::InternalSerializeWithCachedSizesToArray } // optional .exec.bit.control.QueryContextInformation context = 16; - if (cached_has_bits & 0x00000040u) { + if (cached_has_bits & 0x00000080u) { target = ::google::protobuf::internal::WireFormatLite:: InternalWriteMessageToArray( 16, this->_internal_context(), deterministic, target); @@ -2617,6 +2655,17 @@ ::google::protobuf::uint8* PlanFragment::InternalSerializeWithCachedSizesToArray 17, this->collector(static_cast(i)), deterministic, target); } + // optional string endpointUUID = 18; + if (cached_has_bits & 0x00000004u) { + ::google::protobuf::internal::WireFormat::VerifyUTF8StringNamedField( + this->endpointuuid().data(), static_cast(this->endpointuuid().length()), + ::google::protobuf::internal::WireFormat::SERIALIZE, + "exec.bit.control.PlanFragment.endpointUUID"); + target = + ::google::protobuf::internal::WireFormatLite::WriteStringToArray( + 18, this->endpointuuid(), target); + } + if (_internal_metadata_.have_unknown_fields()) { target = ::google::protobuf::internal::WireFormat::SerializeUnknownFieldsToArray( _internal_metadata_.unknown_fields(), target); @@ -2660,6 +2709,13 @@ size_t PlanFragment::ByteSizeLong() const { this->options_json()); } + // optional string endpointUUID = 18; + if (has_endpointuuid()) { + total_size += 2 + + ::google::protobuf::internal::WireFormatLite::StringSize( + this->endpointuuid()); + } + // optional .exec.bit.FragmentHandle handle = 1; if (has_handle()) { total_size += 1 + @@ -2695,13 +2751,13 @@ size_t PlanFragment::ByteSizeLong() const { *context_); } + } + if (_has_bits_[8 / 32] & 32512u) { // optional float network_cost = 4; if (has_network_cost()) { total_size += 1 + 4; } - } - if (_has_bits_[8 / 32] & 16128u) { // optional float cpu_cost = 5; if (has_cpu_cost()) { total_size += 1 + 4; @@ -2776,42 +2832,45 @@ void PlanFragment::MergeFrom(const PlanFragment& from) { options_json_.AssignWithDefault(&::google::protobuf::internal::GetEmptyStringAlreadyInited(), from.options_json_); } if (cached_has_bits & 0x00000004u) { - mutable_handle()->::exec::bit::FragmentHandle::MergeFrom(from.handle()); + set_has_endpointuuid(); + endpointuuid_.AssignWithDefault(&::google::protobuf::internal::GetEmptyStringAlreadyInited(), from.endpointuuid_); } if (cached_has_bits & 0x00000008u) { - mutable_assignment()->::exec::DrillbitEndpoint::MergeFrom(from.assignment()); + mutable_handle()->::exec::bit::FragmentHandle::MergeFrom(from.handle()); } if (cached_has_bits & 0x00000010u) { - mutable_foreman()->::exec::DrillbitEndpoint::MergeFrom(from.foreman()); + mutable_assignment()->::exec::DrillbitEndpoint::MergeFrom(from.assignment()); } if (cached_has_bits & 0x00000020u) { - mutable_credentials()->::exec::shared::UserCredentials::MergeFrom(from.credentials()); + mutable_foreman()->::exec::DrillbitEndpoint::MergeFrom(from.foreman()); } if (cached_has_bits & 0x00000040u) { - mutable_context()->::exec::bit::control::QueryContextInformation::MergeFrom(from.context()); + mutable_credentials()->::exec::shared::UserCredentials::MergeFrom(from.credentials()); } if (cached_has_bits & 0x00000080u) { - network_cost_ = from.network_cost_; + mutable_context()->::exec::bit::control::QueryContextInformation::MergeFrom(from.context()); } - _has_bits_[0] |= cached_has_bits; } - if (cached_has_bits & 16128u) { + if (cached_has_bits & 32512u) { if (cached_has_bits & 0x00000100u) { - cpu_cost_ = from.cpu_cost_; + network_cost_ = from.network_cost_; } if (cached_has_bits & 0x00000200u) { - disk_cost_ = from.disk_cost_; + cpu_cost_ = from.cpu_cost_; } if (cached_has_bits & 0x00000400u) { - memory_cost_ = from.memory_cost_; + disk_cost_ = from.disk_cost_; } if (cached_has_bits & 0x00000800u) { - leaf_fragment_ = from.leaf_fragment_; + memory_cost_ = from.memory_cost_; } if (cached_has_bits & 0x00001000u) { - mem_initial_ = from.mem_initial_; + leaf_fragment_ = from.leaf_fragment_; } if (cached_has_bits & 0x00002000u) { + mem_initial_ = from.mem_initial_; + } + if (cached_has_bits & 0x00004000u) { mem_max_ = from.mem_max_; } _has_bits_[0] |= cached_has_bits; @@ -2847,6 +2906,8 @@ void PlanFragment::InternalSwap(PlanFragment* other) { GetArenaNoVirtual()); options_json_.Swap(&other->options_json_, &::google::protobuf::internal::GetEmptyStringAlreadyInited(), GetArenaNoVirtual()); + endpointuuid_.Swap(&other->endpointuuid_, &::google::protobuf::internal::GetEmptyStringAlreadyInited(), + GetArenaNoVirtual()); swap(handle_, other->handle_); swap(assignment_, other->assignment_); swap(foreman_, other->foreman_); diff --git a/contrib/native/client/src/protobuf/BitControl.pb.h b/contrib/native/client/src/protobuf/BitControl.pb.h index abfda10d619..58e4889946f 100644 --- a/contrib/native/client/src/protobuf/BitControl.pb.h +++ b/contrib/native/client/src/protobuf/BitControl.pb.h @@ -938,6 +938,21 @@ class PlanFragment : public ::google::protobuf::Message /* @@protoc_insertion_po ::std::string* release_options_json(); void set_allocated_options_json(::std::string* options_json); + // optional string endpointUUID = 18; + bool has_endpointuuid() const; + void clear_endpointuuid(); + static const int kEndpointUUIDFieldNumber = 18; + const ::std::string& endpointuuid() const; + void set_endpointuuid(const ::std::string& value); + #if LANG_CXX11 + void set_endpointuuid(::std::string&& value); + #endif + void set_endpointuuid(const char* value); + void set_endpointuuid(const char* value, size_t size); + ::std::string* mutable_endpointuuid(); + ::std::string* release_endpointuuid(); + void set_allocated_endpointuuid(::std::string* endpointuuid); + // optional .exec.bit.FragmentHandle handle = 1; bool has_handle() const; void clear_handle(); @@ -1077,6 +1092,8 @@ class PlanFragment : public ::google::protobuf::Message /* @@protoc_insertion_po void clear_has_options_json(); void set_has_context(); void clear_has_context(); + void set_has_endpointuuid(); + void clear_has_endpointuuid(); ::google::protobuf::internal::InternalMetadataWithArena _internal_metadata_; ::google::protobuf::internal::HasBits<1> _has_bits_; @@ -1084,6 +1101,7 @@ class PlanFragment : public ::google::protobuf::Message /* @@protoc_insertion_po ::google::protobuf::RepeatedPtrField< ::exec::bit::control::Collector > collector_; ::google::protobuf::internal::ArenaStringPtr fragment_json_; ::google::protobuf::internal::ArenaStringPtr options_json_; + ::google::protobuf::internal::ArenaStringPtr endpointuuid_; ::exec::bit::FragmentHandle* handle_; ::exec::DrillbitEndpoint* assignment_; ::exec::DrillbitEndpoint* foreman_; @@ -2142,13 +2160,13 @@ inline void CustomMessage::set_allocated_message(::std::string* message) { // optional .exec.bit.FragmentHandle handle = 1; inline bool PlanFragment::has_handle() const { - return (_has_bits_[0] & 0x00000004u) != 0; + return (_has_bits_[0] & 0x00000008u) != 0; } inline void PlanFragment::set_has_handle() { - _has_bits_[0] |= 0x00000004u; + _has_bits_[0] |= 0x00000008u; } inline void PlanFragment::clear_has_handle() { - _has_bits_[0] &= ~0x00000004u; + _has_bits_[0] &= ~0x00000008u; } inline const ::exec::bit::FragmentHandle& PlanFragment::_internal_handle() const { return *handle_; @@ -2196,13 +2214,13 @@ inline void PlanFragment::set_allocated_handle(::exec::bit::FragmentHandle* hand // optional float network_cost = 4; inline bool PlanFragment::has_network_cost() const { - return (_has_bits_[0] & 0x00000080u) != 0; + return (_has_bits_[0] & 0x00000100u) != 0; } inline void PlanFragment::set_has_network_cost() { - _has_bits_[0] |= 0x00000080u; + _has_bits_[0] |= 0x00000100u; } inline void PlanFragment::clear_has_network_cost() { - _has_bits_[0] &= ~0x00000080u; + _has_bits_[0] &= ~0x00000100u; } inline void PlanFragment::clear_network_cost() { network_cost_ = 0; @@ -2220,13 +2238,13 @@ inline void PlanFragment::set_network_cost(float value) { // optional float cpu_cost = 5; inline bool PlanFragment::has_cpu_cost() const { - return (_has_bits_[0] & 0x00000100u) != 0; + return (_has_bits_[0] & 0x00000200u) != 0; } inline void PlanFragment::set_has_cpu_cost() { - _has_bits_[0] |= 0x00000100u; + _has_bits_[0] |= 0x00000200u; } inline void PlanFragment::clear_has_cpu_cost() { - _has_bits_[0] &= ~0x00000100u; + _has_bits_[0] &= ~0x00000200u; } inline void PlanFragment::clear_cpu_cost() { cpu_cost_ = 0; @@ -2244,13 +2262,13 @@ inline void PlanFragment::set_cpu_cost(float value) { // optional float disk_cost = 6; inline bool PlanFragment::has_disk_cost() const { - return (_has_bits_[0] & 0x00000200u) != 0; + return (_has_bits_[0] & 0x00000400u) != 0; } inline void PlanFragment::set_has_disk_cost() { - _has_bits_[0] |= 0x00000200u; + _has_bits_[0] |= 0x00000400u; } inline void PlanFragment::clear_has_disk_cost() { - _has_bits_[0] &= ~0x00000200u; + _has_bits_[0] &= ~0x00000400u; } inline void PlanFragment::clear_disk_cost() { disk_cost_ = 0; @@ -2268,13 +2286,13 @@ inline void PlanFragment::set_disk_cost(float value) { // optional float memory_cost = 7; inline bool PlanFragment::has_memory_cost() const { - return (_has_bits_[0] & 0x00000400u) != 0; + return (_has_bits_[0] & 0x00000800u) != 0; } inline void PlanFragment::set_has_memory_cost() { - _has_bits_[0] |= 0x00000400u; + _has_bits_[0] |= 0x00000800u; } inline void PlanFragment::clear_has_memory_cost() { - _has_bits_[0] &= ~0x00000400u; + _has_bits_[0] &= ~0x00000800u; } inline void PlanFragment::clear_memory_cost() { memory_cost_ = 0; @@ -2358,13 +2376,13 @@ inline void PlanFragment::set_allocated_fragment_json(::std::string* fragment_js // optional bool leaf_fragment = 9; inline bool PlanFragment::has_leaf_fragment() const { - return (_has_bits_[0] & 0x00000800u) != 0; + return (_has_bits_[0] & 0x00001000u) != 0; } inline void PlanFragment::set_has_leaf_fragment() { - _has_bits_[0] |= 0x00000800u; + _has_bits_[0] |= 0x00001000u; } inline void PlanFragment::clear_has_leaf_fragment() { - _has_bits_[0] &= ~0x00000800u; + _has_bits_[0] &= ~0x00001000u; } inline void PlanFragment::clear_leaf_fragment() { leaf_fragment_ = false; @@ -2382,13 +2400,13 @@ inline void PlanFragment::set_leaf_fragment(bool value) { // optional .exec.DrillbitEndpoint assignment = 10; inline bool PlanFragment::has_assignment() const { - return (_has_bits_[0] & 0x00000008u) != 0; + return (_has_bits_[0] & 0x00000010u) != 0; } inline void PlanFragment::set_has_assignment() { - _has_bits_[0] |= 0x00000008u; + _has_bits_[0] |= 0x00000010u; } inline void PlanFragment::clear_has_assignment() { - _has_bits_[0] &= ~0x00000008u; + _has_bits_[0] &= ~0x00000010u; } inline const ::exec::DrillbitEndpoint& PlanFragment::_internal_assignment() const { return *assignment_; @@ -2436,13 +2454,13 @@ inline void PlanFragment::set_allocated_assignment(::exec::DrillbitEndpoint* ass // optional .exec.DrillbitEndpoint foreman = 11; inline bool PlanFragment::has_foreman() const { - return (_has_bits_[0] & 0x00000010u) != 0; + return (_has_bits_[0] & 0x00000020u) != 0; } inline void PlanFragment::set_has_foreman() { - _has_bits_[0] |= 0x00000010u; + _has_bits_[0] |= 0x00000020u; } inline void PlanFragment::clear_has_foreman() { - _has_bits_[0] &= ~0x00000010u; + _has_bits_[0] &= ~0x00000020u; } inline const ::exec::DrillbitEndpoint& PlanFragment::_internal_foreman() const { return *foreman_; @@ -2490,13 +2508,13 @@ inline void PlanFragment::set_allocated_foreman(::exec::DrillbitEndpoint* forema // optional int64 mem_initial = 12 [default = 20000000]; inline bool PlanFragment::has_mem_initial() const { - return (_has_bits_[0] & 0x00001000u) != 0; + return (_has_bits_[0] & 0x00002000u) != 0; } inline void PlanFragment::set_has_mem_initial() { - _has_bits_[0] |= 0x00001000u; + _has_bits_[0] |= 0x00002000u; } inline void PlanFragment::clear_has_mem_initial() { - _has_bits_[0] &= ~0x00001000u; + _has_bits_[0] &= ~0x00002000u; } inline void PlanFragment::clear_mem_initial() { mem_initial_ = GOOGLE_LONGLONG(20000000); @@ -2514,13 +2532,13 @@ inline void PlanFragment::set_mem_initial(::google::protobuf::int64 value) { // optional int64 mem_max = 13 [default = 2000000000]; inline bool PlanFragment::has_mem_max() const { - return (_has_bits_[0] & 0x00002000u) != 0; + return (_has_bits_[0] & 0x00004000u) != 0; } inline void PlanFragment::set_has_mem_max() { - _has_bits_[0] |= 0x00002000u; + _has_bits_[0] |= 0x00004000u; } inline void PlanFragment::clear_has_mem_max() { - _has_bits_[0] &= ~0x00002000u; + _has_bits_[0] &= ~0x00004000u; } inline void PlanFragment::clear_mem_max() { mem_max_ = GOOGLE_LONGLONG(2000000000); @@ -2538,13 +2556,13 @@ inline void PlanFragment::set_mem_max(::google::protobuf::int64 value) { // optional .exec.shared.UserCredentials credentials = 14; inline bool PlanFragment::has_credentials() const { - return (_has_bits_[0] & 0x00000020u) != 0; + return (_has_bits_[0] & 0x00000040u) != 0; } inline void PlanFragment::set_has_credentials() { - _has_bits_[0] |= 0x00000020u; + _has_bits_[0] |= 0x00000040u; } inline void PlanFragment::clear_has_credentials() { - _has_bits_[0] &= ~0x00000020u; + _has_bits_[0] &= ~0x00000040u; } inline const ::exec::shared::UserCredentials& PlanFragment::_internal_credentials() const { return *credentials_; @@ -2658,13 +2676,13 @@ inline void PlanFragment::set_allocated_options_json(::std::string* options_json // optional .exec.bit.control.QueryContextInformation context = 16; inline bool PlanFragment::has_context() const { - return (_has_bits_[0] & 0x00000040u) != 0; + return (_has_bits_[0] & 0x00000080u) != 0; } inline void PlanFragment::set_has_context() { - _has_bits_[0] |= 0x00000040u; + _has_bits_[0] |= 0x00000080u; } inline void PlanFragment::clear_has_context() { - _has_bits_[0] &= ~0x00000040u; + _has_bits_[0] &= ~0x00000080u; } inline void PlanFragment::clear_context() { if (context_ != NULL) context_->Clear(); @@ -2744,6 +2762,72 @@ PlanFragment::collector() const { return collector_; } +// optional string endpointUUID = 18; +inline bool PlanFragment::has_endpointuuid() const { + return (_has_bits_[0] & 0x00000004u) != 0; +} +inline void PlanFragment::set_has_endpointuuid() { + _has_bits_[0] |= 0x00000004u; +} +inline void PlanFragment::clear_has_endpointuuid() { + _has_bits_[0] &= ~0x00000004u; +} +inline void PlanFragment::clear_endpointuuid() { + endpointuuid_.ClearToEmptyNoArena(&::google::protobuf::internal::GetEmptyStringAlreadyInited()); + clear_has_endpointuuid(); +} +inline const ::std::string& PlanFragment::endpointuuid() const { + // @@protoc_insertion_point(field_get:exec.bit.control.PlanFragment.endpointUUID) + return endpointuuid_.GetNoArena(); +} +inline void PlanFragment::set_endpointuuid(const ::std::string& value) { + set_has_endpointuuid(); + endpointuuid_.SetNoArena(&::google::protobuf::internal::GetEmptyStringAlreadyInited(), value); + // @@protoc_insertion_point(field_set:exec.bit.control.PlanFragment.endpointUUID) +} +#if LANG_CXX11 +inline void PlanFragment::set_endpointuuid(::std::string&& value) { + set_has_endpointuuid(); + endpointuuid_.SetNoArena( + &::google::protobuf::internal::GetEmptyStringAlreadyInited(), ::std::move(value)); + // @@protoc_insertion_point(field_set_rvalue:exec.bit.control.PlanFragment.endpointUUID) +} +#endif +inline void PlanFragment::set_endpointuuid(const char* value) { + GOOGLE_DCHECK(value != NULL); + set_has_endpointuuid(); + endpointuuid_.SetNoArena(&::google::protobuf::internal::GetEmptyStringAlreadyInited(), ::std::string(value)); + // @@protoc_insertion_point(field_set_char:exec.bit.control.PlanFragment.endpointUUID) +} +inline void PlanFragment::set_endpointuuid(const char* value, size_t size) { + set_has_endpointuuid(); + endpointuuid_.SetNoArena(&::google::protobuf::internal::GetEmptyStringAlreadyInited(), + ::std::string(reinterpret_cast(value), size)); + // @@protoc_insertion_point(field_set_pointer:exec.bit.control.PlanFragment.endpointUUID) +} +inline ::std::string* PlanFragment::mutable_endpointuuid() { + set_has_endpointuuid(); + // @@protoc_insertion_point(field_mutable:exec.bit.control.PlanFragment.endpointUUID) + return endpointuuid_.MutableNoArena(&::google::protobuf::internal::GetEmptyStringAlreadyInited()); +} +inline ::std::string* PlanFragment::release_endpointuuid() { + // @@protoc_insertion_point(field_release:exec.bit.control.PlanFragment.endpointUUID) + if (!has_endpointuuid()) { + return NULL; + } + clear_has_endpointuuid(); + return endpointuuid_.ReleaseNonDefaultNoArena(&::google::protobuf::internal::GetEmptyStringAlreadyInited()); +} +inline void PlanFragment::set_allocated_endpointuuid(::std::string* endpointuuid) { + if (endpointuuid != NULL) { + set_has_endpointuuid(); + } else { + clear_has_endpointuuid(); + } + endpointuuid_.SetAllocatedNoArena(&::google::protobuf::internal::GetEmptyStringAlreadyInited(), endpointuuid); + // @@protoc_insertion_point(field_set_allocated:exec.bit.control.PlanFragment.endpointUUID) +} + // ------------------------------------------------------------------- // Collector diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc b/contrib/native/client/src/protobuf/UserBitShared.pb.cc index 0996fca902a..80e728a8834 100644 --- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc +++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc @@ -800,6 +800,7 @@ const ::google::protobuf::uint32 TableStruct::offsets[] GOOGLE_PROTOBUF_ATTRIBUT GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::shared::OperatorProfile, peak_local_memory_allocated_), GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::shared::OperatorProfile, metric_), GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::shared::OperatorProfile, wait_nanos_), + GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::shared::OperatorProfile, optimal_mem_allocation_), ~0u, 0, 1, @@ -808,6 +809,7 @@ const ::google::protobuf::uint32 TableStruct::offsets[] GOOGLE_PROTOBUF_ATTRIBUT 4, ~0u, 5, + 6, GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::shared::StreamProfile, _has_bits_), GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::shared::StreamProfile, _internal_metadata_), ~0u, // no _extensions_ @@ -875,12 +877,12 @@ static const ::google::protobuf::internal::MigrationSchema schemas[] GOOGLE_PROT { 169, 197, sizeof(::exec::shared::QueryProfile)}, { 220, 227, sizeof(::exec::shared::MajorFragmentProfile)}, { 229, 245, sizeof(::exec::shared::MinorFragmentProfile)}, - { 256, 269, sizeof(::exec::shared::OperatorProfile)}, - { 277, 285, sizeof(::exec::shared::StreamProfile)}, - { 288, 296, sizeof(::exec::shared::MetricValue)}, - { 299, 305, sizeof(::exec::shared::Registry)}, - { 306, 313, sizeof(::exec::shared::Jar)}, - { 315, 323, sizeof(::exec::shared::SaslMessage)}, + { 256, 270, sizeof(::exec::shared::OperatorProfile)}, + { 279, 287, sizeof(::exec::shared::StreamProfile)}, + { 290, 298, sizeof(::exec::shared::MetricValue)}, + { 301, 307, sizeof(::exec::shared::Registry)}, + { 308, 315, sizeof(::exec::shared::Jar)}, + { 317, 325, sizeof(::exec::shared::SaslMessage)}, }; static ::google::protobuf::Message const * const file_default_instances[] = { @@ -1013,67 +1015,68 @@ void AddDescriptorsImpl() { "y_used\030\007 \001(\003\022\027\n\017max_memory_used\030\010 \001(\003\022(\n" "\010endpoint\030\t \001(\0132\026.exec.DrillbitEndpoint\022" "\023\n\013last_update\030\n \001(\003\022\025\n\rlast_progress\030\013 " - "\001(\003\"\377\001\n\017OperatorProfile\0221\n\rinput_profile" + "\001(\003\"\237\002\n\017OperatorProfile\0221\n\rinput_profile" "\030\001 \003(\0132\032.exec.shared.StreamProfile\022\023\n\013op" "erator_id\030\003 \001(\005\022\025\n\roperator_type\030\004 \001(\005\022\023" "\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 \001" "(\003\022#\n\033peak_local_memory_allocated\030\007 \001(\003\022" "(\n\006metric\030\010 \003(\0132\030.exec.shared.MetricValu" - "e\022\022\n\nwait_nanos\030\t \001(\003\"B\n\rStreamProfile\022\017" - "\n\007records\030\001 \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007sche" - "mas\030\003 \001(\003\"J\n\013MetricValue\022\021\n\tmetric_id\030\001 " - "\001(\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014double_value\030" - "\003 \001(\001\")\n\010Registry\022\035\n\003jar\030\001 \003(\0132\020.exec.sh" - "ared.Jar\"/\n\003Jar\022\014\n\004name\030\001 \001(\t\022\032\n\022functio" - "n_signature\030\002 \003(\t\"W\n\013SaslMessage\022\021\n\tmech" - "anism\030\001 \001(\t\022\014\n\004data\030\002 \001(\014\022\'\n\006status\030\003 \001(" - "\0162\027.exec.shared.SaslStatus*5\n\nRpcChannel" - "\022\017\n\013BIT_CONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020" - "\002*V\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010" - "PHYSICAL\020\003\022\r\n\tEXECUTION\020\004\022\026\n\022PREPARED_ST" - "ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000" - "\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014" - "\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022" - "\032\n\026CANCELLATION_REQUESTED\020\006*\374\t\n\020CoreOper" - "atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST" - "_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020" - "\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH" - "_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGIN" - "G_RECEIVER\020\010\022\034\n\030ORDERED_PARTITION_SENDER" - "\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013\022" - "\032\n\026RANGE_PARTITION_SENDER\020\014\022\n\n\006SCREEN\020\r\022" - "\034\n\030SELECTION_VECTOR_REMOVER\020\016\022\027\n\023STREAMI" - "NG_AGGREGATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERN" - "AL_SORT\020\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_S" - "ORT\020\024\022\032\n\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIV" - "E_SUB_SCAN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rM" - "OCK_SUB_SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DI" - "RECT_SUB_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT" - "_SUB_SCAN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_S" - "CHEMA_SUB_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n" - "\021PRODUCER_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!" - "\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAV" - "RO_SUB_SCAN\020$\022\021\n\rPCAP_SUB_SCAN\020%\022\022\n\016KAFK" - "A_SUB_SCAN\020&\022\021\n\rKUDU_SUB_SCAN\020\'\022\013\n\007FLATT" - "EN\020(\022\020\n\014LATERAL_JOIN\020)\022\n\n\006UNNEST\020*\022,\n(HI" - "VE_DRILL_NATIVE_PARQUET_ROW_GROUP_SCAN\020+" - "\022\r\n\tJDBC_SCAN\020,\022\022\n\016REGEX_SUB_SCAN\020-\022\023\n\017M" - "APRDB_SUB_SCAN\020.\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n\013" - "KUDU_WRITER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n" - "\013JSON_WRITER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022" - "\n\016IMAGE_SUB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\020" - "5\022\023\n\017PARTITION_LIMIT\0206\022\023\n\017PCAPNG_SUB_SCA" - "N\0207\022\022\n\016RUNTIME_FILTER\0208\022\017\n\013ROWKEY_JOIN\0209" - "\022\023\n\017SYSLOG_SUB_SCAN\020:\022\030\n\024STATISTICS_AGGR" - "EGATE\020;\022\020\n\014UNPIVOT_MAPS\020<\022\024\n\020STATISTICS_" - "MERGE\020=\022\021\n\rLTSV_SUB_SCAN\020>*g\n\nSaslStatus" - "\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SA" - "SL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SA" - "SL_FAILED\020\004B.\n\033org.apache.drill.exec.pro" - "toB\rUserBitSharedH\001" + "e\022\022\n\nwait_nanos\030\t \001(\003\022\036\n\026optimal_mem_all" + "ocation\030\n \001(\003\"B\n\rStreamProfile\022\017\n\007record" + "s\030\001 \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007schemas\030\003 \001(" + "\003\"J\n\013MetricValue\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nl" + "ong_value\030\002 \001(\003\022\024\n\014double_value\030\003 \001(\001\")\n" + "\010Registry\022\035\n\003jar\030\001 \003(\0132\020.exec.shared.Jar" + "\"/\n\003Jar\022\014\n\004name\030\001 \001(\t\022\032\n\022function_signat" + "ure\030\002 \003(\t\"W\n\013SaslMessage\022\021\n\tmechanism\030\001 " + "\001(\t\022\014\n\004data\030\002 \001(\014\022\'\n\006status\030\003 \001(\0162\027.exec" + ".shared.SaslStatus*5\n\nRpcChannel\022\017\n\013BIT_" + "CONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020\002*V\n\tQue" + "ryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL" + "\020\003\022\r\n\tEXECUTION\020\004\022\026\n\022PREPARED_STATEMENT\020" + "\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAI" + "TING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISH" + "ED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCE" + "LLATION_REQUESTED\020\006*\374\t\n\020CoreOperatorType" + "\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020" + "\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHAS" + "H_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITI" + "ON_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGING_RECEIV" + "ER\020\010\022\034\n\030ORDERED_PARTITION_SENDER\020\t\022\013\n\007PR" + "OJECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013\022\032\n\026RANGE" + "_PARTITION_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELEC" + "TION_VECTOR_REMOVER\020\016\022\027\n\023STREAMING_AGGRE" + "GATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020" + "\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n" + "\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SC" + "AN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_" + "SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRECT_SUB" + "_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SCA" + "N\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_SU" + "B_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUCE" + "R_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WIND" + "OW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_S" + "CAN\020$\022\021\n\rPCAP_SUB_SCAN\020%\022\022\n\016KAFKA_SUB_SC" + "AN\020&\022\021\n\rKUDU_SUB_SCAN\020\'\022\013\n\007FLATTEN\020(\022\020\n\014" + "LATERAL_JOIN\020)\022\n\n\006UNNEST\020*\022,\n(HIVE_DRILL" + "_NATIVE_PARQUET_ROW_GROUP_SCAN\020+\022\r\n\tJDBC" + "_SCAN\020,\022\022\n\016REGEX_SUB_SCAN\020-\022\023\n\017MAPRDB_SU" + "B_SCAN\020.\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n\013KUDU_WRI" + "TER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WR" + "ITER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_" + "SUB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PAR" + "TITION_LIMIT\0206\022\023\n\017PCAPNG_SUB_SCAN\0207\022\022\n\016R" + "UNTIME_FILTER\0208\022\017\n\013ROWKEY_JOIN\0209\022\023\n\017SYSL" + "OG_SUB_SCAN\020:\022\030\n\024STATISTICS_AGGREGATE\020;\022" + "\020\n\014UNPIVOT_MAPS\020<\022\024\n\020STATISTICS_MERGE\020=\022" + "\021\n\rLTSV_SUB_SCAN\020>*g\n\nSaslStatus\022\020\n\014SASL" + "_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PR" + "OGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILE" + "D\020\004B.\n\033org.apache.drill.exec.protoB\rUser" + "BitSharedH\001" }; ::google::protobuf::DescriptorPool::InternalAddGeneratedFile( - descriptor, 5659); + descriptor, 5691); ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile( "UserBitShared.proto", &protobuf_RegisterTypes); ::protobuf_Types_2eproto::AddDescriptors(); @@ -8684,6 +8687,7 @@ const int OperatorProfile::kProcessNanosFieldNumber; const int OperatorProfile::kPeakLocalMemoryAllocatedFieldNumber; const int OperatorProfile::kMetricFieldNumber; const int OperatorProfile::kWaitNanosFieldNumber; +const int OperatorProfile::kOptimalMemAllocationFieldNumber; #endif // !defined(_MSC_VER) || _MSC_VER >= 1900 OperatorProfile::OperatorProfile() @@ -8701,15 +8705,15 @@ OperatorProfile::OperatorProfile(const OperatorProfile& from) metric_(from.metric_) { _internal_metadata_.MergeFrom(from._internal_metadata_); ::memcpy(&operator_id_, &from.operator_id_, - static_cast(reinterpret_cast(&wait_nanos_) - - reinterpret_cast(&operator_id_)) + sizeof(wait_nanos_)); + static_cast(reinterpret_cast(&optimal_mem_allocation_) - + reinterpret_cast(&operator_id_)) + sizeof(optimal_mem_allocation_)); // @@protoc_insertion_point(copy_constructor:exec.shared.OperatorProfile) } void OperatorProfile::SharedCtor() { ::memset(&operator_id_, 0, static_cast( - reinterpret_cast(&wait_nanos_) - - reinterpret_cast(&operator_id_)) + sizeof(wait_nanos_)); + reinterpret_cast(&optimal_mem_allocation_) - + reinterpret_cast(&operator_id_)) + sizeof(optimal_mem_allocation_)); } OperatorProfile::~OperatorProfile() { @@ -8743,10 +8747,10 @@ void OperatorProfile::Clear() { input_profile_.Clear(); metric_.Clear(); cached_has_bits = _has_bits_[0]; - if (cached_has_bits & 63u) { + if (cached_has_bits & 127u) { ::memset(&operator_id_, 0, static_cast( - reinterpret_cast(&wait_nanos_) - - reinterpret_cast(&operator_id_)) + sizeof(wait_nanos_)); + reinterpret_cast(&optimal_mem_allocation_) - + reinterpret_cast(&operator_id_)) + sizeof(optimal_mem_allocation_)); } _has_bits_.Clear(); _internal_metadata_.Clear(); @@ -8870,6 +8874,20 @@ bool OperatorProfile::MergePartialFromCodedStream( break; } + // optional int64 optimal_mem_allocation = 10; + case 10: { + if (static_cast< ::google::protobuf::uint8>(tag) == + static_cast< ::google::protobuf::uint8>(80u /* 80 & 0xFF */)) { + set_has_optimal_mem_allocation(); + DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive< + ::google::protobuf::int64, ::google::protobuf::internal::WireFormatLite::TYPE_INT64>( + input, &optimal_mem_allocation_))); + } else { + goto handle_unusual; + } + break; + } + default: { handle_unusual: if (tag == 0) { @@ -8945,6 +8963,11 @@ void OperatorProfile::SerializeWithCachedSizes( ::google::protobuf::internal::WireFormatLite::WriteInt64(9, this->wait_nanos(), output); } + // optional int64 optimal_mem_allocation = 10; + if (cached_has_bits & 0x00000040u) { + ::google::protobuf::internal::WireFormatLite::WriteInt64(10, this->optimal_mem_allocation(), output); + } + if (_internal_metadata_.have_unknown_fields()) { ::google::protobuf::internal::WireFormat::SerializeUnknownFields( _internal_metadata_.unknown_fields(), output); @@ -9006,6 +9029,11 @@ ::google::protobuf::uint8* OperatorProfile::InternalSerializeWithCachedSizesToAr target = ::google::protobuf::internal::WireFormatLite::WriteInt64ToArray(9, this->wait_nanos(), target); } + // optional int64 optimal_mem_allocation = 10; + if (cached_has_bits & 0x00000040u) { + target = ::google::protobuf::internal::WireFormatLite::WriteInt64ToArray(10, this->optimal_mem_allocation(), target); + } + if (_internal_metadata_.have_unknown_fields()) { target = ::google::protobuf::internal::WireFormat::SerializeUnknownFieldsToArray( _internal_metadata_.unknown_fields(), target); @@ -9045,7 +9073,7 @@ size_t OperatorProfile::ByteSizeLong() const { } } - if (_has_bits_[0 / 32] & 63u) { + if (_has_bits_[0 / 32] & 127u) { // optional int32 operator_id = 3; if (has_operator_id()) { total_size += 1 + @@ -9088,6 +9116,13 @@ size_t OperatorProfile::ByteSizeLong() const { this->wait_nanos()); } + // optional int64 optimal_mem_allocation = 10; + if (has_optimal_mem_allocation()) { + total_size += 1 + + ::google::protobuf::internal::WireFormatLite::Int64Size( + this->optimal_mem_allocation()); + } + } int cached_size = ::google::protobuf::internal::ToCachedSize(total_size); SetCachedSize(cached_size); @@ -9119,7 +9154,7 @@ void OperatorProfile::MergeFrom(const OperatorProfile& from) { input_profile_.MergeFrom(from.input_profile_); metric_.MergeFrom(from.metric_); cached_has_bits = from._has_bits_[0]; - if (cached_has_bits & 63u) { + if (cached_has_bits & 127u) { if (cached_has_bits & 0x00000001u) { operator_id_ = from.operator_id_; } @@ -9138,6 +9173,9 @@ void OperatorProfile::MergeFrom(const OperatorProfile& from) { if (cached_has_bits & 0x00000020u) { wait_nanos_ = from.wait_nanos_; } + if (cached_has_bits & 0x00000040u) { + optimal_mem_allocation_ = from.optimal_mem_allocation_; + } _has_bits_[0] |= cached_has_bits; } } @@ -9174,6 +9212,7 @@ void OperatorProfile::InternalSwap(OperatorProfile* other) { swap(process_nanos_, other->process_nanos_); swap(peak_local_memory_allocated_, other->peak_local_memory_allocated_); swap(wait_nanos_, other->wait_nanos_); + swap(optimal_mem_allocation_, other->optimal_mem_allocation_); swap(_has_bits_[0], other->_has_bits_[0]); _internal_metadata_.Swap(&other->_internal_metadata_); } diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h b/contrib/native/client/src/protobuf/UserBitShared.pb.h index e186f138061..1d1e2ffddb1 100644 --- a/contrib/native/client/src/protobuf/UserBitShared.pb.h +++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h @@ -3504,6 +3504,13 @@ class OperatorProfile : public ::google::protobuf::Message /* @@protoc_insertion ::google::protobuf::int64 wait_nanos() const; void set_wait_nanos(::google::protobuf::int64 value); + // optional int64 optimal_mem_allocation = 10; + bool has_optimal_mem_allocation() const; + void clear_optimal_mem_allocation(); + static const int kOptimalMemAllocationFieldNumber = 10; + ::google::protobuf::int64 optimal_mem_allocation() const; + void set_optimal_mem_allocation(::google::protobuf::int64 value); + // @@protoc_insertion_point(class_scope:exec.shared.OperatorProfile) private: void set_has_operator_id(); @@ -3518,6 +3525,8 @@ class OperatorProfile : public ::google::protobuf::Message /* @@protoc_insertion void clear_has_peak_local_memory_allocated(); void set_has_wait_nanos(); void clear_has_wait_nanos(); + void set_has_optimal_mem_allocation(); + void clear_has_optimal_mem_allocation(); ::google::protobuf::internal::InternalMetadataWithArena _internal_metadata_; ::google::protobuf::internal::HasBits<1> _has_bits_; @@ -3530,6 +3539,7 @@ class OperatorProfile : public ::google::protobuf::Message /* @@protoc_insertion ::google::protobuf::int64 process_nanos_; ::google::protobuf::int64 peak_local_memory_allocated_; ::google::protobuf::int64 wait_nanos_; + ::google::protobuf::int64 optimal_mem_allocation_; friend struct ::protobuf_UserBitShared_2eproto::TableStruct; }; // ------------------------------------------------------------------- @@ -8080,6 +8090,30 @@ inline void OperatorProfile::set_wait_nanos(::google::protobuf::int64 value) { // @@protoc_insertion_point(field_set:exec.shared.OperatorProfile.wait_nanos) } +// optional int64 optimal_mem_allocation = 10; +inline bool OperatorProfile::has_optimal_mem_allocation() const { + return (_has_bits_[0] & 0x00000040u) != 0; +} +inline void OperatorProfile::set_has_optimal_mem_allocation() { + _has_bits_[0] |= 0x00000040u; +} +inline void OperatorProfile::clear_has_optimal_mem_allocation() { + _has_bits_[0] &= ~0x00000040u; +} +inline void OperatorProfile::clear_optimal_mem_allocation() { + optimal_mem_allocation_ = GOOGLE_LONGLONG(0); + clear_has_optimal_mem_allocation(); +} +inline ::google::protobuf::int64 OperatorProfile::optimal_mem_allocation() const { + // @@protoc_insertion_point(field_get:exec.shared.OperatorProfile.optimal_mem_allocation) + return optimal_mem_allocation_; +} +inline void OperatorProfile::set_optimal_mem_allocation(::google::protobuf::int64 value) { + set_has_optimal_mem_allocation(); + optimal_mem_allocation_ = value; + // @@protoc_insertion_point(field_set:exec.shared.OperatorProfile.optimal_mem_allocation) +} + // ------------------------------------------------------------------- // StreamProfile diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java index 8768eb34ce1..d9746dcd6e3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java @@ -22,11 +22,13 @@ public class OpProfileDef { public int operatorId; public int operatorType; public int incomingCount; + public long optimalMemoryAllocation; - public OpProfileDef(int operatorId, int operatorType, int incomingCount) { + public OpProfileDef(int operatorId, int operatorType, int incomingCount, long optimalMemoryAllocation) { this.operatorId = operatorId; this.operatorType = operatorType; this.incomingCount = incomingCount; + this.optimalMemoryAllocation = optimalMemoryAllocation; } public int getOperatorId(){ return operatorId; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java index d47e8d9c00f..c512959dc79 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java @@ -57,7 +57,7 @@ public OperatorContextImpl(PhysicalOperator popConfig, FragmentContextImpl conte } else { OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), - OperatorUtilities.getChildCount(popConfig)); + OperatorUtilities.getChildCount(popConfig), popConfig.getMaxAllocation()); this.stats = context.getStats().newOperatorStats(def, allocator); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java index 67a8b80f280..f682104aa1a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java @@ -45,7 +45,7 @@ public class OperatorStats { public long[] recordsReceivedByInput; public long[] batchesReceivedByInput; private long[] schemaCountByInput; - + private long optimalMemoryAllocation; private boolean inProcessing = false; private boolean inSetup = false; @@ -62,7 +62,7 @@ public class OperatorStats { private int inputCount; public OperatorStats(OpProfileDef def, BufferAllocator allocator){ - this(def.getOperatorId(), def.getOperatorType(), def.getIncomingCount(), allocator); + this(def.getOperatorId(), def.getOperatorType(), def.getIncomingCount(), allocator, def.optimalMemoryAllocation); } /** @@ -74,7 +74,7 @@ public OperatorStats(OpProfileDef def, BufferAllocator allocator){ */ public OperatorStats(OperatorStats original, boolean isClean) { - this(original.operatorId, original.operatorType, original.inputCount, original.allocator); + this(original.operatorId, original.operatorType, original.inputCount, original.allocator, original.optimalMemoryAllocation); if ( !isClean ) { inProcessing = original.inProcessing; @@ -88,7 +88,7 @@ public OperatorStats(OperatorStats original, boolean isClean) { } @VisibleForTesting - public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator) { + public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator, long initialAllocation) { super(); this.allocator = allocator; this.operatorId = operatorId; @@ -97,6 +97,7 @@ public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAll this.recordsReceivedByInput = new long[inputCount]; this.batchesReceivedByInput = new long[inputCount]; this.schemaCountByInput = new long[inputCount]; + this.optimalMemoryAllocation = initialAllocation; } private String assertionError(String msg){ @@ -207,6 +208,7 @@ public OperatorProfile getProfile() { .setOperatorId(operatorId) // .setSetupNanos(setupNanos) // .setProcessNanos(processingNanos) + .setOptimalMemAllocation(optimalMemoryAllocation) .setWaitNanos(waitNanos); if (allocator != null) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java index 126ad0751d4..07693c4c9c8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java @@ -101,18 +101,6 @@ public boolean enforceWidth() { return getMinParallelizationWidth() > 1; } - @Override - @JsonIgnore - public long getInitialAllocation() { - return 0; - } - - @Override - @JsonIgnore - public long getMaxAllocation() { - return 0; - } - @Override @JsonIgnore public boolean canPushdownProjects(List columns) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java index 95a1235017a..9c1ba591435 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -54,7 +54,7 @@ public BaseRootExec(final RootFragmentContext fragmentContext, final OperatorCon } //Creating new stat for appending to list stats = new OperatorStats(new OpProfileDef(config.getOperatorId(), - config.getOperatorType(), OperatorUtilities.getChildCount(config)), + config.getOperatorType(), OperatorUtilities.getChildCount(config), config.getMaxAllocation()), this.oContext.getAllocator()); fragmentContext.getStats().addOperatorStats(this.stats); this.fragmentContext = fragmentContext; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributedQueueParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributedQueueParallelizer.java index fecea5e7f6e..541130b1670 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributedQueueParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributedQueueParallelizer.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.planner.fragment; +import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.commons.lang3.tuple.Pair; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.util.function.CheckedConsumer; @@ -29,7 +30,7 @@ import org.apache.drill.exec.resourcemgr.config.QueryQueueConfig; import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException; import org.apache.drill.exec.work.foreman.rm.QueryResourceManager; - +import com.fasterxml.jackson.databind.ObjectMapper; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -46,6 +47,7 @@ * fragment is based on the cluster state and provided queue configuration. */ public class DistributedQueueParallelizer extends SimpleParallelizer { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueueParallelizer.class); private final boolean planHasMemory; private final QueryContext queryContext; private final QueryResourceManager rm; @@ -65,9 +67,13 @@ public BiFunction getMemory() { if (!planHasMemory) { final DrillNode drillEndpointNode = DrillNode.create(endpoint); if (operator.isBufferedOperator(queryContext)) { - return operators.get(drillEndpointNode).get(operator); + Long operatorsMemory = operators.get(drillEndpointNode).get(operator); + logger.debug(" Memory requirement for the operator {} in endpoint {} is {}", operator, endpoint, operatorsMemory); + return operatorsMemory; } else { - return operator.getMaxAllocation(); + Long nonBufferedMemory = (long)operator.getCost().getMemoryCost(); + logger.debug(" Memory requirement for the operator {} in endpoint {} is {}", operator, endpoint, nonBufferedMemory); + return nonBufferedMemory; } } else { @@ -92,10 +98,11 @@ public BiFunction getMemory() { */ public void adjustMemory(PlanningSet planningSet, Set roots, Map onlineEndpointUUIDs) throws ExecutionSetupException { - if (planHasMemory) { + logger.debug(" Plan already has memory settings. Adjustment of the memory is skipped"); return; } + logger.info(" Memory adjustment phase triggered"); final Map onlineDrillNodeUUIDs = onlineEndpointUUIDs.entrySet().stream() .collect(Collectors.toMap(x -> DrillNode.create(x.getKey()), x -> x.getValue())); @@ -112,7 +119,7 @@ public void adjustMemory(PlanningSet planningSet, Set roots, for (Wrapper wrapper : roots) { traverse(wrapper, CheckedConsumer.throwingConsumerWrapper((Wrapper fragment) -> { - MemoryCalculator calculator = new MemoryCalculator(planningSet, queryContext); + MemoryCalculator calculator = new MemoryCalculator(planningSet, queryContext, rm.minimumOperatorMemory()); fragment.getNode().getRoot().accept(calculator, fragment); NodeResources.merge(totalNodeResources, fragment.getResourceMap()); operators.entrySet() @@ -122,6 +129,10 @@ public void adjustMemory(PlanningSet planningSet, Set roots, })); } + if (logger.isDebugEnabled()) { + logger.debug(" Total node resource requirements for the plan is {}", getJSONFromResourcesMap(totalNodeResources)); + } + final QueryQueueConfig queueConfig; try { queueConfig = this.rm.selectQueue(max(totalNodeResources.values())); @@ -130,8 +141,10 @@ public void adjustMemory(PlanningSet planningSet, Set roots, } Map>> memoryAdjustedOperators = ensureOperatorMemoryWithinLimits(operators, totalNodeResources, - queueConfig.getMaxQueryMemoryInMBPerNode()); + List>> memoryAdjustedOperators = + ensureOperatorMemoryWithinLimits(operators, totalNodeResources, + convertMBToBytes(Math.min(queueConfig.getMaxQueryMemoryInMBPerNode(), + queueConfig.getQueueTotalMemoryInMB(onlineEndpointUUIDs.size())))); memoryAdjustedOperators.entrySet().stream().forEach((x) -> { Map memoryPerOperator = x.getValue().stream() .collect(Collectors.toMap(operatorLongPair -> operatorLongPair.getLeft(), @@ -140,9 +153,17 @@ public void adjustMemory(PlanningSet planningSet, Set roots, this.operators.put(x.getKey(), memoryPerOperator); }); + if (logger.isDebugEnabled()) { + logger.debug(" Total node resource requirements after adjustment {}", getJSONFromResourcesMap(totalNodeResources)); + } + this.rm.setCost(convertToUUID(totalNodeResources, onlineDrillNodeUUIDs)); } + private long convertMBToBytes(long value) { + return value * 1024 * 1024; + } + private Map convertToUUID(Map nodeResourcesMap, Map onlineDrillNodeUUIDs) { Map nodeResourcesPerUUID = new HashMap<>(); @@ -172,50 +193,81 @@ private NodeResources max(Collection resources) { */ private Map>> ensureOperatorMemoryWithinLimits(Map>> memoryPerOperator, - Map nodeResourceMap, long nodeLimit) { + Map nodeResourceMap, long nodeLimit) throws ExecutionSetupException { // Get the physical operators which are above the node memory limit. - Map>> onlyMemoryAboveLimitOperators = new HashMap<>(); - memoryPerOperator.entrySet().stream().forEach((entry) -> { - onlyMemoryAboveLimitOperators.putIfAbsent(entry.getKey(), new ArrayList<>()); - if (nodeResourceMap.get(entry.getKey()).getMemoryInBytes() > nodeLimit) { - onlyMemoryAboveLimitOperators.get(entry.getKey()).addAll(entry.getValue()); - } - }); - + Map>> onlyMemoryAboveLimitOperators = memoryPerOperator.entrySet() + .stream() + .filter(entry -> nodeResourceMap.get(entry.getKey()).getMemoryInBytes() > nodeLimit) + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())); // Compute the total memory required by the physical operators on the drillbits which are above node limit. // Then use the total memory to adjust the memory requirement based on the permissible node limit. Map>> memoryAdjustedDrillbits = new HashMap<>(); onlyMemoryAboveLimitOperators.entrySet().stream().forEach( - entry -> { - Long totalMemory = entry.getValue().stream().mapToLong(Pair::getValue).sum(); - List> adjustedMemory = entry.getValue().stream().map(operatorMemory -> { + CheckedConsumer.throwingConsumerWrapper(entry -> { + Long totalBufferedOperatorsMemoryReq = entry.getValue().stream().mapToLong(Pair::getValue).sum(); + Long nonBufferedOperatorsMemoryReq = nodeResourceMap.get(entry.getKey()).getMemoryInBytes() - totalBufferedOperatorsMemoryReq; + Long bufferedOperatorsMemoryLimit = nodeLimit - nonBufferedOperatorsMemoryReq; + if (bufferedOperatorsMemoryLimit < 0 || nonBufferedOperatorsMemoryReq < 0) { + logger.error(" Operator memory requirements for buffered operators {} or non buffered operators {} is negative", bufferedOperatorsMemoryLimit, + nonBufferedOperatorsMemoryReq); + throw new ExecutionSetupException("Operator memory requirements for buffered operators " + bufferedOperatorsMemoryLimit + " or non buffered operators " + + nonBufferedOperatorsMemoryReq + " is less than zero"); + } + List> adjustedMemory = entry.getValue().stream().map(operatorAndMemory -> { // formula to adjust the memory is (optimalMemory / totalMemory(this is for all the operators)) * permissible_node_limit. - return Pair.of(operatorMemory.getKey(), (long) Math.ceil(operatorMemory.getValue()/totalMemory * nodeLimit)); + return Pair.of(operatorAndMemory.getKey(), + Math.max(this.rm.minimumOperatorMemory(), + (long) Math.ceil(operatorAndMemory.getValue()/totalBufferedOperatorsMemoryReq * bufferedOperatorsMemoryLimit))); }).collect(Collectors.toList()); memoryAdjustedDrillbits.put(entry.getKey(), adjustedMemory); NodeResources nodeResources = nodeResourceMap.get(entry.getKey()); - nodeResources.setMemoryInBytes(adjustedMemory.stream().mapToLong(Pair::getValue).sum()); - } + nodeResources.setMemoryInBytes(nonBufferedOperatorsMemoryReq + adjustedMemory.stream().mapToLong(Pair::getValue).sum()); + }) ); + checkIfWithinLimit(nodeResourceMap, nodeLimit); + // Get all the operations on drillbits which were adjusted for memory and merge them with operators which are not // adjusted for memory. - Map>> allDrillbits = new HashMap<>(); - memoryPerOperator.entrySet().stream().filter((entry) -> !memoryAdjustedDrillbits.containsKey(entry.getKey())).forEach( - operatorMemory -> { - allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue()); - } - ); + Map>> allDrillbits = memoryPerOperator.entrySet() + .stream() + .filter((entry) -> !memoryAdjustedDrillbits.containsKey(entry.getKey())) + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())); memoryAdjustedDrillbits.entrySet().stream().forEach( - operatorMemory -> { - allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue()); - } - ); + operatorMemory -> allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue())); // At this point allDrillbits contains the operators on all drillbits. The memory also is adjusted based on the nodeLimit and // the ratio of their requirements. return allDrillbits; } + + private void checkIfWithinLimit(Map nodeResourcesMap, long nodeLimit) throws ExecutionSetupException { + for (Map.Entry entry : nodeResourcesMap.entrySet()) { + if (entry.getValue().getMemoryInBytes() > nodeLimit) { + logger.error(" Memory requirement for the query cannot be adjusted." + + " Memory requirement {} (in bytes) for a node {} is greater than limit {}", entry.getValue() + .getMemoryInBytes(), entry.getKey(), nodeLimit); + throw new ExecutionSetupException("Minimum memory requirement " + + entry.getValue().getMemoryInBytes() + " for a node " + entry.getKey() + " is greater than limit: " + nodeLimit); + } + } + } + + private String getJSONFromResourcesMap(Map resourcesMap) { + String json = ""; + try { + json = new ObjectMapper().writeValueAsString(resourcesMap.entrySet() + .stream() + .collect(Collectors.toMap(entry -> entry.getKey() + .toString(), Map.Entry::getValue))); + } catch (JsonProcessingException exception) { + logger.error(" Cannot convert the Node resources map to json "); + } + + return json; + } } \ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java index 0212e088211..3d28067d563 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java @@ -17,19 +17,17 @@ */ package org.apache.drill.exec.planner.fragment; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; import org.apache.drill.exec.physical.base.Exchange; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.util.memory.ZKQueueMemoryAllocationUtilities; import org.apache.drill.exec.work.foreman.ForemanSetupException; - import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + public class Fragment implements Iterable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fragment.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java index fdfa95c393a..d806b52e20a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java @@ -55,6 +55,7 @@ public PhysicalOperator visitExchange(Exchange exchange, IndexedFragmentNode iNo PhysicalOperator materializedSender = exchange.getSender(iNode.getMinorFragmentId(), child); materializedSender.setOperatorId(0); materializedSender.setCost(exchange.getCost()); + materializedSender.setMaxAllocation(exchange.getMaxAllocation()); // logger.debug("Visit sending exchange, materialized {} with child {}.", materializedSender, child); return materializedSender; @@ -62,6 +63,7 @@ public PhysicalOperator visitExchange(Exchange exchange, IndexedFragmentNode iNo // receiving exchange. PhysicalOperator materializedReceiver = exchange.getReceiver(iNode.getMinorFragmentId()); materializedReceiver.setOperatorId(Short.MAX_VALUE & exchange.getOperatorId()); + materializedReceiver.setMaxAllocation(exchange.getMaxAllocation()); // logger.debug("Visit receiving exchange, materialized receiver: {}.", materializedReceiver); materializedReceiver.setCost(exchange.getCost()); return materializedReceiver; @@ -70,8 +72,10 @@ public PhysicalOperator visitExchange(Exchange exchange, IndexedFragmentNode iNo @Override public PhysicalOperator visitGroupScan(GroupScan groupScan, IndexedFragmentNode iNode) throws ExecutionSetupException { + iNode.addAllocation(groupScan); SubScan child = groupScan.getSpecificScan(iNode.getMinorFragmentId()); child.setOperatorId(Short.MAX_VALUE & groupScan.getOperatorId()); + child.setMaxAllocation(groupScan.getMaxAllocation()); // remember the subscan for future use iNode.addSubScan(child); return child; @@ -89,11 +93,11 @@ public PhysicalOperator visitSubScan(SubScan subScan, IndexedFragmentNode value) @Override public PhysicalOperator visitStore(Store store, IndexedFragmentNode iNode) throws ExecutionSetupException { PhysicalOperator child = store.getChild().accept(this, iNode); - iNode.addAllocation(store); try { PhysicalOperator o = store.getSpecificStore(child, iNode.getMinorFragmentId()); + o.setMaxAllocation(store.getMaxAllocation()); o.setOperatorId(Short.MAX_VALUE & store.getOperatorId()); // logger.debug("New materialized store node {} with child {}", o, child); return o; @@ -112,6 +116,7 @@ public PhysicalOperator visitOp(PhysicalOperator op, IndexedFragmentNode iNode) } PhysicalOperator newOp = op.getNewWithChildren(children); newOp.setCost(op.getCost()); + newOp.setMaxAllocation(op.getMaxAllocation()); newOp.setOperatorId(Short.MAX_VALUE & op.getOperatorId()); return newOp; } @@ -128,6 +133,7 @@ public PhysicalOperator visitLateralJoin(LateralJoinPOP op, IndexedFragmentNode PhysicalOperator newOp = op.getNewWithChildren(children); newOp.setCost(op.getCost()); + newOp.setMaxAllocation(op.getMaxAllocation()); newOp.setOperatorId(Short.MAX_VALUE & op.getOperatorId()); ((LateralJoinPOP) newOp).setUnnestForLateralJoin(unnestForThisLateral); @@ -138,6 +144,7 @@ public PhysicalOperator visitLateralJoin(LateralJoinPOP op, IndexedFragmentNode public PhysicalOperator visitUnnest(UnnestPOP unnest, IndexedFragmentNode value) throws ExecutionSetupException { PhysicalOperator newOp = visitOp(unnest, value); value.addUnnest((UnnestPOP) newOp); + newOp.setMaxAllocation(unnest.getMaxAllocation()); return newOp; } @@ -157,6 +164,7 @@ public PhysicalOperator visitRowKeyJoin(RowKeyJoinPOP op, IndexedFragmentNode iN PhysicalOperator newOp = op.getNewWithChildren(children); newOp.setCost(op.getCost()); newOp.setOperatorId(Short.MAX_VALUE & op.getOperatorId()); + newOp.setMaxAllocation(op.getMaxAllocation()); ((RowKeyJoinPOP)newOp).setSubScanForRowKeyJoin(subScanInLeftInput); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java index d3d759ca437..4593c55f6ad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java @@ -49,11 +49,13 @@ public class MemoryCalculator extends AbstractOpWrapperVisitor>> bufferedOperators; private final QueryContext queryContext; + private final long MINIMUM_MEMORY_FOR_BUFFER_OPERS; - public MemoryCalculator(PlanningSet planningSet, QueryContext context) { + public MemoryCalculator(PlanningSet planningSet, QueryContext context, long minMemory) { this.planningSet = planningSet; this.bufferedOperators = new HashMap<>(); this.queryContext = context; + this.MINIMUM_MEMORY_FOR_BUFFER_OPERS = minMemory; } // Helper method to compute the minor fragment count per drillbit. This method returns @@ -86,7 +88,7 @@ public Void visitSendingExchange(Exchange exchange, Wrapper fragment) throws Run getMinorFragCountPerDrillbit(fragment), // get the memory requirements for the sender operator. (x) -> exchange.getSenderMemory(receivingFragment.getWidth(), x.getValue())); - return visitOp(exchange, fragment); + return visit(exchange, fragment); } @Override @@ -117,19 +119,26 @@ public Void visitReceivingExchange(Exchange exchange, Wrapper fragment) throws R return null; } + private Void visit(PhysicalOperator op, Wrapper fragment) { + for (PhysicalOperator child : op) { + child.accept(this, fragment); + } + return null; + } + public List> getBufferedOperators(DrillNode endpoint) { return this.bufferedOperators.getOrDefault(endpoint, new ArrayList<>()); } @Override public Void visitOp(PhysicalOperator op, Wrapper fragment) { - long memoryCost = (int)Math.ceil(op.getCost().getMemoryCost()); + long memoryCost = (long)Math.ceil(op.getCost().getMemoryCost()); if (op.isBufferedOperator(queryContext)) { // If the operator is a buffered operator then get the memory estimates of the optimizer. // The memory estimates of the optimizer are for the whole operator spread across all the // minor fragments. Divide this memory estimation by fragment width to get the memory // requirement per minor fragment. - long memoryCostPerMinorFrag = (int)Math.ceil(memoryCost/fragment.getAssignedEndpoints().size()); + long memoryCostPerMinorFrag = Math.max((long)Math.ceil(memoryCost/fragment.getAssignedEndpoints().size()), MINIMUM_MEMORY_FOR_BUFFER_OPERS); Map drillbitEndpointMinorFragMap = getMinorFragCountPerDrillbit(fragment); Map x.getKey(), (x) -> Pair.of(op, - memoryCostPerMinorFrag * x.getValue()))); + memoryCostPerMinorFrag))); bufferedOperatorsPerDrillbit.entrySet().forEach((x) -> { bufferedOperators.putIfAbsent(x.getKey(), new ArrayList<>()); bufferedOperators.get(x.getKey()).add(x.getValue()); @@ -153,10 +162,7 @@ public Void visitOp(PhysicalOperator op, Wrapper fragment) { getMinorFragCountPerDrillbit(fragment), (x) -> memoryCost * x.getValue()); } - for (PhysicalOperator child : op) { - child.accept(this, fragment); - } - return null; + return visit(op, fragment); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ZKQueueParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ZKQueueParallelizer.java index 6e529224b9d..98fb2b32343 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ZKQueueParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ZKQueueParallelizer.java @@ -64,7 +64,6 @@ public void adjustMemory(PlanningSet planningSet, Set roots, } endpointMap = collector.getNodeMap(); - ZKQueueMemoryAllocationUtilities.planMemory(queryContext, this.resourceManager, endpointMap); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DirectPlan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DirectPlan.java index 3e1d6c79832..71fb70ebb18 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DirectPlan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DirectPlan.java @@ -21,15 +21,18 @@ import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode; import org.apache.drill.common.logical.PlanProperties.PlanPropertiesBuilder; import org.apache.drill.common.logical.PlanProperties.PlanType; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.Screen; +import org.apache.drill.exec.planner.cost.PrelCostEstimates; import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler; import org.apache.drill.exec.planner.sql.handlers.SimpleCommandResult; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.store.direct.DirectGroupScan; import org.apache.drill.exec.store.pojo.PojoRecordReader; +import java.util.Collection; import java.util.Collections; import java.util.List; @@ -43,20 +46,24 @@ public static PhysicalPlan createDirectPlan(QueryContext context, boolean result @SuppressWarnings("unchecked") public static PhysicalPlan createDirectPlan(QueryContext context, T obj){ - return createDirectPlan(context.getCurrentEndpoint(), Collections.singletonList(obj), (Class) obj.getClass()); + return createDirectPlan(context, Collections.singletonList(obj), (Class) obj.getClass()); } - public static PhysicalPlan createDirectPlan(DrillbitEndpoint endpoint, List records, Class clazz){ + public static PhysicalPlan createDirectPlan(QueryContext context, List records, Class clazz){ PojoRecordReader reader = new PojoRecordReader<>(clazz, records); DirectGroupScan scan = new DirectGroupScan(reader); - Screen screen = new Screen(scan, endpoint); + Screen screen = new Screen(scan, context.getCurrentEndpoint()); PlanPropertiesBuilder propsBuilder = PlanProperties.builder(); propsBuilder.type(PlanType.APACHE_DRILL_PHYSICAL); propsBuilder.version(1); propsBuilder.resultMode(ResultMode.EXEC); propsBuilder.generator(DirectPlan.class.getSimpleName(), ""); + Collection pops = DefaultSqlHandler.getPops(screen); + for (PhysicalOperator pop : pops) { + pop.setCost(new PrelCostEstimates(context.getOptions().getLong(ExecConstants.OUTPUT_BATCH_SIZE), 0)); + } return new PhysicalPlan(propsBuilder.build(), DefaultSqlHandler.getPops(screen)); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java index 1318e6f47de..337fbfd76b8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java @@ -92,7 +92,7 @@ public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException { .map(fileStatus -> new ShowFilesCommandResult(new Records.File(wsSchema.getFullSchemaName(), wsSchema, fileStatus))) .collect(Collectors.toList()); - return DirectPlan.createDirectPlan(context.getCurrentEndpoint(), records, ShowFilesCommandResult.class); + return DirectPlan.createDirectPlan(context, records, ShowFilesCommandResult.class); } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/rmblobmgr/RMConsistentBlobStoreManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/rmblobmgr/RMConsistentBlobStoreManager.java index befa4bce838..b140abd2a26 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/rmblobmgr/RMConsistentBlobStoreManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/rmblobmgr/RMConsistentBlobStoreManager.java @@ -404,7 +404,7 @@ private String updateBlobs(Map resourcesMap, QueryQueueCo throw new RMBlobUpdateException(String.format("Failed to update the cluster state blob and queue blob in a " + "transaction. [Details: %s]", exceptionStringBuilder.toString())); } - logger.debug("Successfully updated the blobs in a transaction. [Details: %s]", exceptionStringBuilder.toString()); + logger.debug("Successfully updated the blobs in a transaction. [Details: {}]", exceptionStringBuilder.toString()); // Reset the exceptionStringBuilder for next event exceptionStringBuilder.delete(0, exceptionStringBuilder.length()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index aad01c0eb16..4899715e674 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -425,6 +425,7 @@ private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupExcep private void runPhysicalPlan(final PhysicalPlan plan, Pointer textPlan) throws ExecutionSetupException { validatePlan(plan); + queryRM.setCost(plan.totalCost()); final QueryWorkUnit work = getQueryWorkUnit(plan, queryRM); if (enableRuntimeFilter) { runtimeFilterRouter = new RuntimeFilterRouter(work, drillbitContext); @@ -433,7 +434,6 @@ private void runPhysicalPlan(final PhysicalPlan plan, Pointer textPlan) if (textPlan != null) { queryManager.setPlanText(textPlan.value); } - queryManager.setTotalCost(plan.totalCost()); work.applyPlan(drillbitContext.getPlanReader()); logWorkUnit(work); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedResourceManager.java index c0cdefe21aa..96b06912dbb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedResourceManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedResourceManager.java @@ -273,7 +273,6 @@ public boolean hasQueue() { @Override public void setCost(double cost) { - throw new UnsupportedOperationException("DistributedQueryRM doesn't support cost in double format"); } public void setCost(Map costOnAssignedEndpoints) { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java index 4fb5a8f4417..d49cfd4f0ff 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java @@ -220,7 +220,7 @@ public void testAllocators() throws Exception { // Use some bogus operator type to create a new operator context. def = new OpProfileDef(physicalOperator1.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE, - OperatorUtilities.getChildCount(physicalOperator1)); + OperatorUtilities.getChildCount(physicalOperator1), physicalOperator1.getMaxAllocation()); stats = fragmentContext1.getStats().newOperatorStats(def, fragmentContext1.getAllocator()); // Add a couple of Operator Contexts @@ -234,7 +234,7 @@ public void testAllocators() throws Exception { OperatorContext oContext21 = fragmentContext1.newOperatorContext(physicalOperator3); def = new OpProfileDef(physicalOperator4.getOperatorId(), UserBitShared.CoreOperatorType.TEXT_WRITER_VALUE, - OperatorUtilities.getChildCount(physicalOperator4)); + OperatorUtilities.getChildCount(physicalOperator4), physicalOperator4.getMaxAllocation()); stats = fragmentContext2.getStats().newOperatorStats(def, fragmentContext2.getAllocator()); OperatorContext oContext22 = fragmentContext2.newOperatorContext(physicalOperator4, stats); DrillBuf b22 = oContext22.getAllocator().buffer(2000000); @@ -248,7 +248,7 @@ public void testAllocators() throws Exception { // New fragment starts an operator that allocates an amount within the limit def = new OpProfileDef(physicalOperator5.getOperatorId(), UserBitShared.CoreOperatorType.UNION_VALUE, - OperatorUtilities.getChildCount(physicalOperator5)); + OperatorUtilities.getChildCount(physicalOperator5), physicalOperator5.getMaxAllocation()); stats = fragmentContext3.getStats().newOperatorStats(def, fragmentContext3.getAllocator()); OperatorContext oContext31 = fragmentContext3.newOperatorContext(physicalOperator5, stats); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java index 9c2d5d8feca..b212e76b8dd 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java @@ -655,7 +655,7 @@ public void testMergeLimit() { @Test public void testMetrics() { - OperatorStats stats = new OperatorStats(100, 101, 0, fixture.allocator()); + OperatorStats stats = new OperatorStats(100, 101, 0, fixture.allocator(), 0); SortMetrics metrics = new SortMetrics(stats); // Input stats diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java index cd6b0a9a280..577014eaafe 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java @@ -27,7 +27,7 @@ import org.apache.drill.exec.planner.fragment.PlanningSet; import org.apache.drill.exec.planner.fragment.SimpleParallelizer; import org.apache.drill.exec.planner.fragment.Wrapper; -import org.apache.drill.exec.planner.fragment.common.DrillNode; +import org.apache.drill.common.DrillNode; import org.apache.drill.exec.pop.PopUnitTestBase; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.UserBitShared; @@ -44,9 +44,10 @@ import org.apache.drill.test.ClusterFixture; import org.apache.drill.test.ClusterFixtureBuilder; import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.ArrayList; import java.util.HashMap; @@ -59,6 +60,8 @@ import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -90,7 +93,7 @@ private static final DrillbitEndpoint newDrillbitEndpoint(String address, int po UserBitShared.QueryId.getDefaultInstance()); private static Map onlineEndpoints; - private Map resources; + private Map totalResources; @AfterClass public static void close() throws Exception { @@ -102,7 +105,16 @@ private QueryResourceManager mockResourceManager() throws QueueSelectionExceptio final QueryQueueConfig queueConfig = mock(QueryQueueConfig.class); when(queueConfig.getMaxQueryMemoryInMBPerNode()).thenReturn(10L); + when(queueConfig.getQueueTotalMemoryInMB(anyInt())).thenReturn(100L); when(mockRM.selectQueue(any(NodeResources.class))).thenReturn(queueConfig); + when(mockRM.minimumOperatorMemory()).thenReturn(40L); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + totalResources = (Map) invocation.getArguments()[0]; + return null; + } + }).when(mockRM).setCost(any(Map.class)); return mockRM; } @@ -116,7 +128,7 @@ private final Wrapper mockWrapper(Wrapper rootFragment, List mockdependencies = new ArrayList<>(); for (Wrapper dependency : rootFragment.getFragmentDependencies()) { - mockdependencies.add(mockWrapper(dependency, resourceMap, endpoints, originalToMockWrapper)); + mockdependencies.add(mockWrapper(dependency, getNodeResources(), endpoints, originalToMockWrapper)); } when(mockWrapper.getNode()).thenReturn(rootFragment.getNode()); @@ -129,11 +141,9 @@ private final Wrapper mockWrapper(Wrapper rootFragment, } private final PlanningSet mockPlanningSet(PlanningSet planningSet, - Map resourceMap, List endpoints) { Map wrapperToMockWrapper = new HashMap<>(); - Wrapper rootFragment = mockWrapper( planningSet.getRootWrapper(), resourceMap, - endpoints, wrapperToMockWrapper); + Wrapper rootFragment = mockWrapper(planningSet.getRootWrapper(), getNodeResources(), endpoints, wrapperToMockWrapper); PlanningSet mockPlanningSet = mock(PlanningSet.class); when(mockPlanningSet.getRootWrapper()).thenReturn(rootFragment); when(mockPlanningSet.get(any(Fragment.class))).thenAnswer(invocation -> { @@ -196,10 +206,13 @@ private Fragment getRootFragmentFromPlan(DrillbitContext context, } private PlanningSet preparePlanningSet(List activeEndpoints, long slice_target, - Map resources, String sql, - SimpleParallelizer parallelizer) throws Exception { + String sql, SimpleParallelizer parallelizer) throws Exception { Fragment rootFragment = getRootFragmentFromPlan(drillbitContext, getPlanForQuery(sql, 10, slice_target)); - return mockPlanningSet(parallelizer.prepareFragmentTree(rootFragment), resources, activeEndpoints); + return mockPlanningSet(parallelizer.prepareFragmentTree(rootFragment), activeEndpoints); + } + + private Map getNodeResources() { + return onlineEndpoints.keySet().stream().collect(Collectors.toMap(x -> DrillNode.create(x), x -> NodeResources.create())); } @BeforeClass @@ -207,21 +220,14 @@ public static void setupForAllTests() { onlineEndpoints = getEndpoints(2, new HashSet<>()); } - @Before - public void setupForEachTest() { - // Have to create separately for each test since it is updated my MemoryCalculator during merge - resources = onlineEndpoints.keySet().stream().collect(Collectors.toMap(x -> DrillNode.create(x), - x -> NodeResources.create())); - } - @Test public void TestSingleMajorFragmentWithProjectAndScan() throws Exception { String sql = "SELECT * from cp.`tpch/nation.parquet`"; SimpleParallelizer parallelizer = new DistributedQueueParallelizer(false, queryContext, mockResourceManager()); - PlanningSet planningSet = preparePlanningSet(new ArrayList<>(onlineEndpoints.keySet()), DEFAULT_SLICE_TARGET, resources, sql, parallelizer); + PlanningSet planningSet = preparePlanningSet(new ArrayList<>(onlineEndpoints.keySet()), DEFAULT_SLICE_TARGET, sql, parallelizer); parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), onlineEndpoints); - assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemoryInBytes() == 30)); + assertTrue("memory requirement is different", Iterables.all(totalResources.entrySet(), (e) -> e.getValue().getMemoryInBytes() == 30)); } @@ -230,21 +236,20 @@ public void TestSingleMajorFragmentWithGroupByProjectAndScan() throws Exception String sql = "SELECT dept_id, count(*) from cp.`tpch/lineitem.parquet` group by dept_id"; SimpleParallelizer parallelizer = new DistributedQueueParallelizer(false, queryContext, mockResourceManager()); - PlanningSet planningSet = preparePlanningSet(new ArrayList<>(onlineEndpoints.keySet()), DEFAULT_SLICE_TARGET, resources, sql, parallelizer); + PlanningSet planningSet = preparePlanningSet(new ArrayList<>(onlineEndpoints.keySet()), DEFAULT_SLICE_TARGET, sql, parallelizer); parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), onlineEndpoints); - assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemoryInBytes() == 529570)); + assertTrue("memory requirement is different", Iterables.all(totalResources.entrySet(), (e) -> e.getValue().getMemoryInBytes() == 529570)); } @Test - public void TestTwoMajorFragmentWithSortyProjectAndScan() throws Exception { + public void TestTwoMajorFragmentWithSortProjectAndScan() throws Exception { String sql = "SELECT * from cp.`tpch/lineitem.parquet` order by dept_id"; SimpleParallelizer parallelizer = new DistributedQueueParallelizer(false, queryContext, mockResourceManager()); - PlanningSet planningSet = preparePlanningSet(new ArrayList<>(onlineEndpoints.keySet()), 2, resources, sql, - parallelizer); + PlanningSet planningSet = preparePlanningSet(new ArrayList<>(onlineEndpoints.keySet()), 2, sql, parallelizer); parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), onlineEndpoints); - assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemoryInBytes() == 481490)); + assertTrue("memory requirement is different", Iterables.all(totalResources.entrySet(), (e) -> e.getValue().getMemoryInBytes() == 481460)); } @Test diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java index 01c06a4a664..37eb3556d14 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java @@ -77,7 +77,7 @@ public void testSimpleIterator() throws Throwable { RecordBatch singleBatch = exec.getIncoming(); PhysicalOperator dummyPop = operatorList.iterator().next(); OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE, - OperatorUtilities.getChildCount(dummyPop)); + OperatorUtilities.getChildCount(dummyPop), dummyPop.getMaxAllocation()); OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, exec.getContext().getAllocator()); RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0, false, null); int totalRecords = 0; @@ -133,7 +133,7 @@ public void testMarkResetIterator() throws Throwable { RecordBatch singleBatch = exec.getIncoming(); PhysicalOperator dummyPop = operatorList.iterator().next(); OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE, - OperatorUtilities.getChildCount(dummyPop)); + OperatorUtilities.getChildCount(dummyPop), dummyPop.getMaxAllocation()); OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, exec.getContext().getAllocator()); RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0, null); List vectors; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java index 5504382dc09..b5ca7351d4e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java @@ -67,7 +67,7 @@ public void testIOStats() throws Exception { InputStream is = null; Configuration conf = new Configuration(); conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS); - OpProfileDef profileDef = new OpProfileDef(0 /*operatorId*/, 0 /*operatorType*/, 0 /*inputCount*/); + OpProfileDef profileDef = new OpProfileDef(0 /*operatorId*/, 0 /*operatorType*/, 0 /*inputCount*/, 0 /*optimalMemoryAllocation*/); OperatorStats stats = new OperatorStats(profileDef, null /*allocator*/); // start wait time method in OperatorStats expects the OperatorStats state to be in "processing" diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java index 3d9190894ac..bc72bd8b781 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java @@ -438,7 +438,7 @@ public MockOperatorContext(FragmentContext fragContext, BufferAllocator allocator, PhysicalOperator config) { super(fragContext, allocator, config); - this.operatorStats = new OperatorStats(new OpProfileDef(0, 0, 100), allocator); + this.operatorStats = new OperatorStats(new OpProfileDef(0, 0, 100, 0), allocator); } @Override diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java index 932872c5163..4babfb18148 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java @@ -2377,6 +2377,8 @@ public void writeTo(com.dyuproject.protostuff.Output output, org.apache.drill.ex if(message.hasWaitNanos()) output.writeInt64(9, message.getWaitNanos(), false); + if(message.hasOptimalMemAllocation()) + output.writeInt64(10, message.getOptimalMemAllocation(), false); } public boolean isInitialized(org.apache.drill.exec.proto.UserBitShared.OperatorProfile message) { @@ -2442,6 +2444,9 @@ public void mergeFrom(com.dyuproject.protostuff.Input input, org.apache.drill.ex case 9: builder.setWaitNanos(input.readInt64()); break; + case 10: + builder.setOptimalMemAllocation(input.readInt64()); + break; default: input.handleUnknownField(number, this); } @@ -2490,6 +2495,7 @@ public static java.lang.String getFieldName(int number) case 7: return "peakLocalMemoryAllocated"; case 8: return "metric"; case 9: return "waitNanos"; + case 10: return "optimalMemAllocation"; default: return null; } } @@ -2509,6 +2515,7 @@ public static int getFieldNumber(java.lang.String name) fieldMap.put("peakLocalMemoryAllocated", 7); fieldMap.put("metric", 8); fieldMap.put("waitNanos", 9); + fieldMap.put("optimalMemAllocation", 10); } } diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java index 5f30015ef4f..452d90e3bda 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java @@ -22363,6 +22363,15 @@ org.apache.drill.exec.proto.UserBitShared.MetricValueOrBuilder getMetricOrBuilde * optional int64 wait_nanos = 9; */ long getWaitNanos(); + + /** + * optional int64 optimal_mem_allocation = 10; + */ + boolean hasOptimalMemAllocation(); + /** + * optional int64 optimal_mem_allocation = 10; + */ + long getOptimalMemAllocation(); } /** * Protobuf type {@code exec.shared.OperatorProfile} @@ -22385,6 +22394,7 @@ private OperatorProfile() { peakLocalMemoryAllocated_ = 0L; metric_ = java.util.Collections.emptyList(); waitNanos_ = 0L; + optimalMemAllocation_ = 0L; } @java.lang.Override @@ -22459,6 +22469,11 @@ private OperatorProfile( waitNanos_ = input.readInt64(); break; } + case 80: { + bitField0_ |= 0x00000040; + optimalMemAllocation_ = input.readInt64(); + break; + } default: { if (!parseUnknownField( input, unknownFields, extensionRegistry, tag)) { @@ -22658,6 +22673,21 @@ public long getWaitNanos() { return waitNanos_; } + public static final int OPTIMAL_MEM_ALLOCATION_FIELD_NUMBER = 10; + private long optimalMemAllocation_; + /** + * optional int64 optimal_mem_allocation = 10; + */ + public boolean hasOptimalMemAllocation() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional int64 optimal_mem_allocation = 10; + */ + public long getOptimalMemAllocation() { + return optimalMemAllocation_; + } + private byte memoizedIsInitialized = -1; @java.lang.Override public final boolean isInitialized() { @@ -22696,6 +22726,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) if (((bitField0_ & 0x00000020) == 0x00000020)) { output.writeInt64(9, waitNanos_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + output.writeInt64(10, optimalMemAllocation_); + } unknownFields.writeTo(output); } @@ -22737,6 +22770,10 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeInt64Size(9, waitNanos_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(10, optimalMemAllocation_); + } size += unknownFields.getSerializedSize(); memoizedSize = size; return size; @@ -22787,6 +22824,11 @@ public boolean equals(final java.lang.Object obj) { result = result && (getWaitNanos() == other.getWaitNanos()); } + result = result && (hasOptimalMemAllocation() == other.hasOptimalMemAllocation()); + if (hasOptimalMemAllocation()) { + result = result && (getOptimalMemAllocation() + == other.getOptimalMemAllocation()); + } result = result && unknownFields.equals(other.unknownFields); return result; } @@ -22834,6 +22876,11 @@ public int hashCode() { hash = (53 * hash) + com.google.protobuf.Internal.hashLong( getWaitNanos()); } + if (hasOptimalMemAllocation()) { + hash = (37 * hash) + OPTIMAL_MEM_ALLOCATION_FIELD_NUMBER; + hash = (53 * hash) + com.google.protobuf.Internal.hashLong( + getOptimalMemAllocation()); + } hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; return hash; @@ -22993,6 +23040,8 @@ public Builder clear() { } waitNanos_ = 0L; bitField0_ = (bitField0_ & ~0x00000080); + optimalMemAllocation_ = 0L; + bitField0_ = (bitField0_ & ~0x00000100); return this; } @@ -23063,6 +23112,10 @@ public org.apache.drill.exec.proto.UserBitShared.OperatorProfile buildPartial() to_bitField0_ |= 0x00000020; } result.waitNanos_ = waitNanos_; + if (((from_bitField0_ & 0x00000100) == 0x00000100)) { + to_bitField0_ |= 0x00000040; + } + result.optimalMemAllocation_ = optimalMemAllocation_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -23182,6 +23235,9 @@ public Builder mergeFrom(org.apache.drill.exec.proto.UserBitShared.OperatorProfi if (other.hasWaitNanos()) { setWaitNanos(other.getWaitNanos()); } + if (other.hasOptimalMemAllocation()) { + setOptimalMemAllocation(other.getOptimalMemAllocation()); + } this.mergeUnknownFields(other.unknownFields); onChanged(); return this; @@ -23883,6 +23939,38 @@ public Builder clearWaitNanos() { onChanged(); return this; } + + private long optimalMemAllocation_ ; + /** + * optional int64 optimal_mem_allocation = 10; + */ + public boolean hasOptimalMemAllocation() { + return ((bitField0_ & 0x00000100) == 0x00000100); + } + /** + * optional int64 optimal_mem_allocation = 10; + */ + public long getOptimalMemAllocation() { + return optimalMemAllocation_; + } + /** + * optional int64 optimal_mem_allocation = 10; + */ + public Builder setOptimalMemAllocation(long value) { + bitField0_ |= 0x00000100; + optimalMemAllocation_ = value; + onChanged(); + return this; + } + /** + * optional int64 optimal_mem_allocation = 10; + */ + public Builder clearOptimalMemAllocation() { + bitField0_ = (bitField0_ & ~0x00000100); + optimalMemAllocation_ = 0L; + onChanged(); + return this; + } @java.lang.Override public final Builder setUnknownFields( final com.google.protobuf.UnknownFieldSet unknownFields) { @@ -27858,64 +27946,65 @@ public org.apache.drill.exec.proto.UserBitShared.SaslMessage getDefaultInstanceF "y_used\030\007 \001(\003\022\027\n\017max_memory_used\030\010 \001(\003\022(\n" + "\010endpoint\030\t \001(\0132\026.exec.DrillbitEndpoint\022" + "\023\n\013last_update\030\n \001(\003\022\025\n\rlast_progress\030\013 " + - "\001(\003\"\377\001\n\017OperatorProfile\0221\n\rinput_profile" + + "\001(\003\"\237\002\n\017OperatorProfile\0221\n\rinput_profile" + "\030\001 \003(\0132\032.exec.shared.StreamProfile\022\023\n\013op" + "erator_id\030\003 \001(\005\022\025\n\roperator_type\030\004 \001(\005\022\023" + "\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 \001" + "(\003\022#\n\033peak_local_memory_allocated\030\007 \001(\003\022" + "(\n\006metric\030\010 \003(\0132\030.exec.shared.MetricValu" + - "e\022\022\n\nwait_nanos\030\t \001(\003\"B\n\rStreamProfile\022\017" + - "\n\007records\030\001 \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007sche" + - "mas\030\003 \001(\003\"J\n\013MetricValue\022\021\n\tmetric_id\030\001 " + - "\001(\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014double_value\030" + - "\003 \001(\001\")\n\010Registry\022\035\n\003jar\030\001 \003(\0132\020.exec.sh" + - "ared.Jar\"/\n\003Jar\022\014\n\004name\030\001 \001(\t\022\032\n\022functio" + - "n_signature\030\002 \003(\t\"W\n\013SaslMessage\022\021\n\tmech" + - "anism\030\001 \001(\t\022\014\n\004data\030\002 \001(\014\022\'\n\006status\030\003 \001(" + - "\0162\027.exec.shared.SaslStatus*5\n\nRpcChannel" + - "\022\017\n\013BIT_CONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020" + - "\002*V\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010" + - "PHYSICAL\020\003\022\r\n\tEXECUTION\020\004\022\026\n\022PREPARED_ST" + - "ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000" + - "\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014" + - "\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022" + - "\032\n\026CANCELLATION_REQUESTED\020\006*\374\t\n\020CoreOper" + - "atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST" + - "_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020" + - "\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH" + - "_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGIN" + - "G_RECEIVER\020\010\022\034\n\030ORDERED_PARTITION_SENDER" + - "\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013\022" + - "\032\n\026RANGE_PARTITION_SENDER\020\014\022\n\n\006SCREEN\020\r\022" + - "\034\n\030SELECTION_VECTOR_REMOVER\020\016\022\027\n\023STREAMI" + - "NG_AGGREGATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERN" + - "AL_SORT\020\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_S" + - "ORT\020\024\022\032\n\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIV" + - "E_SUB_SCAN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rM" + - "OCK_SUB_SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DI" + - "RECT_SUB_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT" + - "_SUB_SCAN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_S" + - "CHEMA_SUB_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n" + - "\021PRODUCER_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!" + - "\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAV" + - "RO_SUB_SCAN\020$\022\021\n\rPCAP_SUB_SCAN\020%\022\022\n\016KAFK" + - "A_SUB_SCAN\020&\022\021\n\rKUDU_SUB_SCAN\020\'\022\013\n\007FLATT" + - "EN\020(\022\020\n\014LATERAL_JOIN\020)\022\n\n\006UNNEST\020*\022,\n(HI" + - "VE_DRILL_NATIVE_PARQUET_ROW_GROUP_SCAN\020+" + - "\022\r\n\tJDBC_SCAN\020,\022\022\n\016REGEX_SUB_SCAN\020-\022\023\n\017M" + - "APRDB_SUB_SCAN\020.\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n\013" + - "KUDU_WRITER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n" + - "\013JSON_WRITER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022" + - "\n\016IMAGE_SUB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\020" + - "5\022\023\n\017PARTITION_LIMIT\0206\022\023\n\017PCAPNG_SUB_SCA" + - "N\0207\022\022\n\016RUNTIME_FILTER\0208\022\017\n\013ROWKEY_JOIN\0209" + - "\022\023\n\017SYSLOG_SUB_SCAN\020:\022\030\n\024STATISTICS_AGGR" + - "EGATE\020;\022\020\n\014UNPIVOT_MAPS\020<\022\024\n\020STATISTICS_" + - "MERGE\020=\022\021\n\rLTSV_SUB_SCAN\020>*g\n\nSaslStatus" + - "\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SA" + - "SL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SA" + - "SL_FAILED\020\004B.\n\033org.apache.drill.exec.pro" + - "toB\rUserBitSharedH\001" + "e\022\022\n\nwait_nanos\030\t \001(\003\022\036\n\026optimal_mem_all" + + "ocation\030\n \001(\003\"B\n\rStreamProfile\022\017\n\007record" + + "s\030\001 \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007schemas\030\003 \001(" + + "\003\"J\n\013MetricValue\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nl" + + "ong_value\030\002 \001(\003\022\024\n\014double_value\030\003 \001(\001\")\n" + + "\010Registry\022\035\n\003jar\030\001 \003(\0132\020.exec.shared.Jar" + + "\"/\n\003Jar\022\014\n\004name\030\001 \001(\t\022\032\n\022function_signat" + + "ure\030\002 \003(\t\"W\n\013SaslMessage\022\021\n\tmechanism\030\001 " + + "\001(\t\022\014\n\004data\030\002 \001(\014\022\'\n\006status\030\003 \001(\0162\027.exec" + + ".shared.SaslStatus*5\n\nRpcChannel\022\017\n\013BIT_" + + "CONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020\002*V\n\tQue" + + "ryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL" + + "\020\003\022\r\n\tEXECUTION\020\004\022\026\n\022PREPARED_STATEMENT\020" + + "\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAI" + + "TING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISH" + + "ED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCE" + + "LLATION_REQUESTED\020\006*\374\t\n\020CoreOperatorType" + + "\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020" + + "\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHAS" + + "H_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITI" + + "ON_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGING_RECEIV" + + "ER\020\010\022\034\n\030ORDERED_PARTITION_SENDER\020\t\022\013\n\007PR" + + "OJECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013\022\032\n\026RANGE" + + "_PARTITION_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELEC" + + "TION_VECTOR_REMOVER\020\016\022\027\n\023STREAMING_AGGRE" + + "GATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020" + + "\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n" + + "\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SC" + + "AN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_" + + "SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRECT_SUB" + + "_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SCA" + + "N\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_SU" + + "B_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUCE" + + "R_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WIND" + + "OW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_S" + + "CAN\020$\022\021\n\rPCAP_SUB_SCAN\020%\022\022\n\016KAFKA_SUB_SC" + + "AN\020&\022\021\n\rKUDU_SUB_SCAN\020\'\022\013\n\007FLATTEN\020(\022\020\n\014" + + "LATERAL_JOIN\020)\022\n\n\006UNNEST\020*\022,\n(HIVE_DRILL" + + "_NATIVE_PARQUET_ROW_GROUP_SCAN\020+\022\r\n\tJDBC" + + "_SCAN\020,\022\022\n\016REGEX_SUB_SCAN\020-\022\023\n\017MAPRDB_SU" + + "B_SCAN\020.\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n\013KUDU_WRI" + + "TER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WR" + + "ITER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_" + + "SUB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PAR" + + "TITION_LIMIT\0206\022\023\n\017PCAPNG_SUB_SCAN\0207\022\022\n\016R" + + "UNTIME_FILTER\0208\022\017\n\013ROWKEY_JOIN\0209\022\023\n\017SYSL" + + "OG_SUB_SCAN\020:\022\030\n\024STATISTICS_AGGREGATE\020;\022" + + "\020\n\014UNPIVOT_MAPS\020<\022\024\n\020STATISTICS_MERGE\020=\022" + + "\021\n\rLTSV_SUB_SCAN\020>*g\n\nSaslStatus\022\020\n\014SASL" + + "_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PR" + + "OGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILE" + + "D\020\004B.\n\033org.apache.drill.exec.protoB\rUser" + + "BitSharedH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { @@ -28033,7 +28122,7 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors( internal_static_exec_shared_OperatorProfile_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_exec_shared_OperatorProfile_descriptor, - new java.lang.String[] { "InputProfile", "OperatorId", "OperatorType", "SetupNanos", "ProcessNanos", "PeakLocalMemoryAllocated", "Metric", "WaitNanos", }); + new java.lang.String[] { "InputProfile", "OperatorId", "OperatorType", "SetupNanos", "ProcessNanos", "PeakLocalMemoryAllocated", "Metric", "WaitNanos", "OptimalMemAllocation", }); internal_static_exec_shared_StreamProfile_descriptor = getDescriptor().getMessageTypes().get(17); internal_static_exec_shared_StreamProfile_fieldAccessorTable = new diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java index d6275fa26c3..224214a69dd 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java @@ -57,6 +57,7 @@ public static OperatorProfile getDefaultInstance() private long peakLocalMemoryAllocated; private List metric; private long waitNanos; + private long optimalMemAllocation; public OperatorProfile() { @@ -169,6 +170,19 @@ public OperatorProfile setWaitNanos(long waitNanos) return this; } + // optimalMemAllocation + + public long getOptimalMemAllocation() + { + return optimalMemAllocation; + } + + public OperatorProfile setOptimalMemAllocation(long optimalMemAllocation) + { + this.optimalMemAllocation = optimalMemAllocation; + return this; + } + // java serialization public void readExternal(ObjectInput in) throws IOException @@ -253,6 +267,9 @@ public void mergeFrom(Input input, OperatorProfile message) throws IOException case 9: message.waitNanos = input.readInt64(); break; + case 10: + message.optimalMemAllocation = input.readInt64(); + break; default: input.handleUnknownField(number, this); } @@ -299,6 +316,9 @@ public void writeTo(Output output, OperatorProfile message) throws IOException if(message.waitNanos != 0) output.writeInt64(9, message.waitNanos, false); + + if(message.optimalMemAllocation != 0) + output.writeInt64(10, message.optimalMemAllocation, false); } public String getFieldName(int number) @@ -313,6 +333,7 @@ public String getFieldName(int number) case 7: return "peakLocalMemoryAllocated"; case 8: return "metric"; case 9: return "waitNanos"; + case 10: return "optimalMemAllocation"; default: return null; } } @@ -334,6 +355,7 @@ public int getFieldNumber(String name) __fieldMap.put("peakLocalMemoryAllocated", 7); __fieldMap.put("metric", 8); __fieldMap.put("waitNanos", 9); + __fieldMap.put("optimalMemAllocation", 10); } } diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto index 4d057d59e3a..46d28eb4146 100644 --- a/protocol/src/main/protobuf/UserBitShared.proto +++ b/protocol/src/main/protobuf/UserBitShared.proto @@ -268,6 +268,7 @@ message OperatorProfile { optional int64 peak_local_memory_allocated = 7; repeated MetricValue metric = 8; optional int64 wait_nanos = 9; + optional int64 optimal_mem_allocation = 10; } message StreamProfile {