Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32946 Capture and report lookahead timings for hash distributor #19272

Merged
merged 1 commit into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions common/thorhelper/thorcommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1801,8 +1801,6 @@ void ActivityTimeAccumulator::addStatistics(IStatisticGatherer & builder) const
if (blockedCycles)
builder.addStatistic(StTimeBlocked, cycle_to_nanosec(blockedCycles));
}
if (lookAheadCycles)
builder.addStatistic(StTimeLookAhead, (unsigned __int64)cycle_to_nanosec(lookAheadCycles));
}

void ActivityTimeAccumulator::addStatistics(CRuntimeStatisticCollection & merged) const
Expand All @@ -1816,8 +1814,6 @@ void ActivityTimeAccumulator::addStatistics(CRuntimeStatisticCollection & merged
if (blockedCycles)
merged.mergeStatistic(StTimeBlocked, cycle_to_nanosec(blockedCycles));
}
if (lookAheadCycles)
merged.mergeStatistic(StTimeLookAhead, (unsigned __int64)cycle_to_nanosec(lookAheadCycles));
}

void ActivityTimeAccumulator::merge(const ActivityTimeAccumulator & other)
Expand Down
9 changes: 8 additions & 1 deletion thorlcr/activities/diskread/thdiskreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,14 @@ class CDiskGroupAggregateSlave
merging = false;
appendOutputLinked(this);
}

// CSlaveActivity overloaded methods
virtual unsigned __int64 queryLookAheadCycles() const override
{
cycle_t lookAheadCycles = PARENT::queryLookAheadCycles();
if (distributor)
lookAheadCycles += distributor->queryLookAheadCycles();
return lookAheadCycles;
}
// IHThorGroupAggregateCallback
virtual void processRow(const void *next)
{
Expand Down
18 changes: 17 additions & 1 deletion thorlcr/activities/fetch/thfetchslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,12 @@ class CFetchStream : public IRowStream, implements IStopInput, implements IFetch
if (distributor)
distributor->abort();
}

virtual unsigned __int64 queryLookAheadCycles() const override
{
if (distributor)
return distributor->queryLookAheadCycles();
return 0;
}
// IStopInput
virtual void stopInput()
{
Expand Down Expand Up @@ -404,6 +409,15 @@ class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler
{
}

virtual unsigned __int64 queryLookAheadCycles() const override
{
CriticalBlock b(fetchStreamCS);
cycle_t lookAheadCycles = PARENT::queryLookAheadCycles();
if (fetchStream)
lookAheadCycles += fetchStream->queryLookAheadCycles();
return lookAheadCycles;
}

// IThorDataLink impl.
virtual void start() override
{
Expand Down Expand Up @@ -515,6 +529,8 @@ class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler
OwnedRoxieString fileName = fetchBaseHelper->getFileName();
{
CriticalBlock b(fetchStreamCS);
if (fetchStream)
slaveTimerStats.lookAheadCycles += fetchStream->queryLookAheadCycles();
fetchStream.setown(createFetchStream(*this, keyInIf, rowIf, abortSoon, fileName, parts, offsetCount, offsetMapSz, offsetMapBytes.toByteArray(), this, mptag, eexp));
}
fetchStreamOut = fetchStream->queryOutput();
Expand Down
1 change: 1 addition & 0 deletions thorlcr/activities/fetch/thfetchslave.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ interface IFetchStream : extends IInterface
virtual void abort() = 0;
virtual void getStats(CRuntimeStatisticCollection & stats) const = 0;
virtual void getFileStats(std::vector<OwnedPtr<CRuntimeStatisticCollection>> & fileStats, unsigned fileTableStart) const = 0;
virtual unsigned __int64 queryLookAheadCycles() const = 0;
};

IFetchStream *createFetchStream(CSlaveActivity &owner, IThorRowInterfaces *keyRowIf, IThorRowInterfaces *fetchRowIf, bool &abortSoon, const char *logicalFilename, CPartDescriptorArray &parts, unsigned offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *iFetchHandler, mptag_t tag, IExpander *eexp=NULL);
Expand Down
52 changes: 43 additions & 9 deletions thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
RelaxedAtomic<stat_type> numLocalRows {0};
RelaxedAtomic<stat_type> numRemoteRows {0};
RelaxedAtomic<size_t> sizeRemoteWrite {0};
RelaxedAtomic<cycle_t> lookAheadCycles {0};

void init()
{
Expand Down Expand Up @@ -859,10 +860,19 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
}
if (aborted)
break;
const void *row = input->ungroupedNextRow();
const void *row;
if (owner.activity->queryTimeActivities())
{
CCycleTimer rowTimer;
row = input->ungroupedNextRow();
lookAheadCycles.fastAdd(rowTimer.elapsedCycles());
}
else
{
row = input->ungroupedNextRow();
}
if (!row)
break;

CTarget *target = nullptr;
if (owner.isAll)
target = targets.item(0);
Expand Down Expand Up @@ -947,6 +957,10 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
stats.setStatistic(StNumRemoteRows, numRemoteRows.load());
stats.setStatistic(StSizeRemoteWrite, sizeRemoteWrite.load());
}
virtual unsigned __int64 queryLookAheadCycles() const
{
return lookAheadCycles.load();
}
// IThreadFactory impl.
virtual IPooledThread *createNew()
{
Expand Down Expand Up @@ -1257,6 +1271,17 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
ihash = NULL;
iCompare = NULL;
}
virtual void mergeStats(CRuntimeStatisticCollection &stats) const
{
sender.mergeStats(stats);
CriticalBlock block(critPiperd);
if (piperd)
mergeRemappedStats(stats, piperd, diskToTempStatsMap);
}
virtual unsigned __int64 queryLookAheadCycles() const
{
return sender.queryLookAheadCycles();
}
virtual void abort()
{
if (!aborted)
Expand Down Expand Up @@ -1451,13 +1476,6 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
virtual void stopRecv() = 0;
virtual bool sendBlock(unsigned i,CMessageBuffer &mb) = 0;

virtual void mergeStats(CRuntimeStatisticCollection &stats) const
{
sender.mergeStats(stats);
CriticalBlock block(critPiperd);
if (piperd)
mergeRemappedStats(stats, piperd, diskToTempStatsMap);
}
// IExceptionHandler impl.
virtual bool fireException(IException *e)
{
Expand Down Expand Up @@ -4103,6 +4121,15 @@ class HashJoinSlaveActivity : public CSlaveActivity, implements IStopInput
activeStats.setStatistic(StNumRightRows, joinhelper->getRhsProgress());
}
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = PARENT::queryLookAheadCycles();
if (lhsDistributor)
lookAheadCycles += lhsDistributor->queryLookAheadCycles();
if (rhsDistributor)
lookAheadCycles += rhsDistributor->queryLookAheadCycles();
return lookAheadCycles;
}
};
#ifdef _MSC_VER
#pragma warning(pop)
Expand Down Expand Up @@ -4584,6 +4611,13 @@ class CHashAggregateSlave : public CSlaveActivity, implements IHThorRowAggregato
info.canStall = true;
// maybe more?
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = PARENT::queryLookAheadCycles();
if (distributor)
lookAheadCycles += distributor->queryLookAheadCycles();
return lookAheadCycles;
}
// IHThorRowAggregator impl
virtual size32_t clearAggregate(ARowBuilder & rowBuilder) override { return helper->clearAggregate(rowBuilder); }
virtual size32_t processFirst(ARowBuilder & rowBuilder, const void * src) override { return helper->processFirst(rowBuilder, src); }
Expand Down
1 change: 1 addition & 0 deletions thorlcr/activities/hashdistrib/thhashdistribslave.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ interface IHashDistributor : extends IInterface
virtual void join()=0;
virtual void setBufferSizes(unsigned sendBufferSize, unsigned outputBufferSize, unsigned pullBufferSize) = 0;
virtual void mergeStats(CRuntimeStatisticCollection &stats) const = 0;
virtual unsigned __int64 queryLookAheadCycles() const = 0;
virtual void abort()=0;
};

Expand Down
7 changes: 7 additions & 0 deletions thorlcr/activities/indexread/thindexreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,13 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements
merging = false;
appendOutputLinked(this);
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = PARENT::queryLookAheadCycles();
if (distributor)
lookAheadCycles += distributor->queryLookAheadCycles();
return lookAheadCycles;
}
// IHThorGroupAggregateCallback
virtual void processRow(const void *next)
{
Expand Down
9 changes: 9 additions & 0 deletions thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2837,6 +2837,15 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
PARENT::start();
dbgassertex(isSmart() || (leftITDL->isGrouped() == grouped)); // std. lookup join expects these to match
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = PARENT::queryLookAheadCycles();
if (rhsDistributor)
lookAheadCycles += rhsDistributor->queryLookAheadCycles();
if (lhsDistributor)
lookAheadCycles += lhsDistributor->queryLookAheadCycles();
return lookAheadCycles;
}
virtual void reset() override
{
PARENT::reset();
Expand Down
20 changes: 16 additions & 4 deletions thorlcr/graph/thgraphslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ MemoryBuffer &CSlaveActivity::getInitializationData(unsigned slave, MemoryBuffer
return mb.append(queryInitializationData(slave));
}

unsigned __int64 CSlaveActivity::queryLocalCycles() const
unsigned __int64 CSlaveActivity::queryLocalCycles(unsigned __int64 totalCycles, unsigned __int64 blockedCycles, unsigned __int64 lookAheadCycles) const
{
unsigned __int64 inputCycles = 0;
if (1 == inputs.ordinality())
Expand Down Expand Up @@ -587,11 +587,10 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const
break;
}
}
unsigned __int64 processCycles = queryTotalCycles() + queryLookAheadCycles();
unsigned __int64 processCycles = totalCycles + lookAheadCycles;
if (processCycles < inputCycles) // not sure how/if possible, but guard against
return 0;
processCycles -= inputCycles;
const unsigned __int64 blockedCycles = queryBlockedCycles();
if (processCycles < blockedCycles)
{
ActPrintLog("CSlaveActivity::queryLocalCycles - process %" I64F "uns < blocked %" I64F "uns", cycle_to_nanosec(processCycles), cycle_to_nanosec(blockedCycles));
Expand All @@ -600,6 +599,11 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const
return processCycles-blockedCycles;
}

unsigned __int64 CSlaveActivity::queryLocalCycles() const
{
return queryLocalCycles(queryTotalCycles(), queryBlockedCycles(), queryLookAheadCycles());
}

void CSlaveActivity::serializeStats(MemoryBuffer &mb)
{
CriticalBlock b(crit); // JCSMORE not sure what this is protecting..
Expand All @@ -619,7 +623,15 @@ void CSlaveActivity::serializeStats(MemoryBuffer &mb)
queryCodeContext()->gatherStats(serializedStats);

// JCS->GH - should these be serialized as cycles, and a different mapping used on master?
serializedStats.setStatistic(StTimeLocalExecute, (unsigned __int64)cycle_to_nanosec(queryLocalCycles()));
//
// Note: Look ahead cycles are not being kept up to date in slaverStats as multiple objects and threads are updating
// look ahead cycles. At the moment, each thread and objects that generate look ahead cycles, track its own look ahead
// cycles and the up to date lookahead cycles is only available with a call to queryLookAheadCycles(). The code would
// need to be refactored to change this behaviour.
unsigned __int64 lookAheadCycles = queryLookAheadCycles();
unsigned __int64 localCycles = queryLocalCycles(queryTotalCycles(), queryBlockedCycles(), lookAheadCycles);
serializedStats.setStatistic(StTimeLookAhead, (unsigned __int64)cycle_to_nanosec(lookAheadCycles));
serializedStats.setStatistic(StTimeLocalExecute, (unsigned __int64)cycle_to_nanosec(localCycles));
slaveTimerStats.addStatistics(serializedStats);
serializedStats.serialize(mb);
ForEachItemIn(i, outputs)
Expand Down
1 change: 1 addition & 0 deletions thorlcr/graph/thgraphslave.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
bool canStall() const;
bool isFastThrough() const;
bool suppressLookAhead() const;
unsigned __int64 queryLocalCycles(unsigned __int64 totalCycles, unsigned __int64 blockedCycles, unsigned __int64 lookAheadCycles) const;

// IThorDataLink
virtual CSlaveActivity *queryFromActivity() override { return this; }
Expand Down
Loading