Skip to content

Commit

Permalink
Merge pull request #19272 from shamser/issue32946
Browse files Browse the repository at this point in the history
HPCC-32946 Capture and report lookahead timings for hash distributor

Reviewed-by: Jake Smith <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Jan 20, 2025
2 parents c9fcc06 + c9ef55c commit b49e7e0
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 19 deletions.
4 changes: 0 additions & 4 deletions common/thorhelper/thorcommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1801,8 +1801,6 @@ void ActivityTimeAccumulator::addStatistics(IStatisticGatherer & builder) const
if (blockedCycles)
builder.addStatistic(StTimeBlocked, cycle_to_nanosec(blockedCycles));
}
if (lookAheadCycles)
builder.addStatistic(StTimeLookAhead, (unsigned __int64)cycle_to_nanosec(lookAheadCycles));
}

void ActivityTimeAccumulator::addStatistics(CRuntimeStatisticCollection & merged) const
Expand All @@ -1816,8 +1814,6 @@ void ActivityTimeAccumulator::addStatistics(CRuntimeStatisticCollection & merged
if (blockedCycles)
merged.mergeStatistic(StTimeBlocked, cycle_to_nanosec(blockedCycles));
}
if (lookAheadCycles)
merged.mergeStatistic(StTimeLookAhead, (unsigned __int64)cycle_to_nanosec(lookAheadCycles));
}

void ActivityTimeAccumulator::merge(const ActivityTimeAccumulator & other)
Expand Down
9 changes: 8 additions & 1 deletion thorlcr/activities/diskread/thdiskreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,14 @@ class CDiskGroupAggregateSlave
merging = false;
appendOutputLinked(this);
}

// CSlaveActivity overloaded methods
virtual unsigned __int64 queryLookAheadCycles() const override
{
cycle_t lookAheadCycles = PARENT::queryLookAheadCycles();
if (distributor)
lookAheadCycles += distributor->queryLookAheadCycles();
return lookAheadCycles;
}
// IHThorGroupAggregateCallback
virtual void processRow(const void *next)
{
Expand Down
18 changes: 17 additions & 1 deletion thorlcr/activities/fetch/thfetchslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,12 @@ class CFetchStream : public IRowStream, implements IStopInput, implements IFetch
if (distributor)
distributor->abort();
}

virtual unsigned __int64 queryLookAheadCycles() const override
{
if (distributor)
return distributor->queryLookAheadCycles();
return 0;
}
// IStopInput
virtual void stopInput()
{
Expand Down Expand Up @@ -404,6 +409,15 @@ class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler
{
}

virtual unsigned __int64 queryLookAheadCycles() const override
{
CriticalBlock b(fetchStreamCS);
cycle_t lookAheadCycles = PARENT::queryLookAheadCycles();
if (fetchStream)
lookAheadCycles += fetchStream->queryLookAheadCycles();
return lookAheadCycles;
}

// IThorDataLink impl.
virtual void start() override
{
Expand Down Expand Up @@ -515,6 +529,8 @@ class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler
OwnedRoxieString fileName = fetchBaseHelper->getFileName();
{
CriticalBlock b(fetchStreamCS);
if (fetchStream)
slaveTimerStats.lookAheadCycles += fetchStream->queryLookAheadCycles();
fetchStream.setown(createFetchStream(*this, keyInIf, rowIf, abortSoon, fileName, parts, offsetCount, offsetMapSz, offsetMapBytes.toByteArray(), this, mptag, eexp));
}
fetchStreamOut = fetchStream->queryOutput();
Expand Down
1 change: 1 addition & 0 deletions thorlcr/activities/fetch/thfetchslave.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ interface IFetchStream : extends IInterface
virtual void abort() = 0;
virtual void getStats(CRuntimeStatisticCollection & stats) const = 0;
virtual void getFileStats(std::vector<OwnedPtr<CRuntimeStatisticCollection>> & fileStats, unsigned fileTableStart) const = 0;
virtual unsigned __int64 queryLookAheadCycles() const = 0;
};

IFetchStream *createFetchStream(CSlaveActivity &owner, IThorRowInterfaces *keyRowIf, IThorRowInterfaces *fetchRowIf, bool &abortSoon, const char *logicalFilename, CPartDescriptorArray &parts, unsigned offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *iFetchHandler, mptag_t tag, IExpander *eexp=NULL);
Expand Down
52 changes: 43 additions & 9 deletions thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
RelaxedAtomic<stat_type> numLocalRows {0};
RelaxedAtomic<stat_type> numRemoteRows {0};
RelaxedAtomic<size_t> sizeRemoteWrite {0};
RelaxedAtomic<cycle_t> lookAheadCycles {0};

void init()
{
Expand Down Expand Up @@ -859,10 +860,19 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
}
if (aborted)
break;
const void *row = input->ungroupedNextRow();
const void *row;
if (owner.activity->queryTimeActivities())
{
CCycleTimer rowTimer;
row = input->ungroupedNextRow();
lookAheadCycles.fastAdd(rowTimer.elapsedCycles());
}
else
{
row = input->ungroupedNextRow();
}
if (!row)
break;

CTarget *target = nullptr;
if (owner.isAll)
target = targets.item(0);
Expand Down Expand Up @@ -947,6 +957,10 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
stats.setStatistic(StNumRemoteRows, numRemoteRows.load());
stats.setStatistic(StSizeRemoteWrite, sizeRemoteWrite.load());
}
virtual unsigned __int64 queryLookAheadCycles() const
{
return lookAheadCycles.load();
}
// IThreadFactory impl.
virtual IPooledThread *createNew()
{
Expand Down Expand Up @@ -1257,6 +1271,17 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
ihash = NULL;
iCompare = NULL;
}
virtual void mergeStats(CRuntimeStatisticCollection &stats) const
{
sender.mergeStats(stats);
CriticalBlock block(critPiperd);
if (piperd)
mergeRemappedStats(stats, piperd, diskToTempStatsMap);
}
virtual unsigned __int64 queryLookAheadCycles() const
{
return sender.queryLookAheadCycles();
}
virtual void abort()
{
if (!aborted)
Expand Down Expand Up @@ -1451,13 +1476,6 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
virtual void stopRecv() = 0;
virtual bool sendBlock(unsigned i,CMessageBuffer &mb) = 0;

virtual void mergeStats(CRuntimeStatisticCollection &stats) const
{
sender.mergeStats(stats);
CriticalBlock block(critPiperd);
if (piperd)
mergeRemappedStats(stats, piperd, diskToTempStatsMap);
}
// IExceptionHandler impl.
virtual bool fireException(IException *e)
{
Expand Down Expand Up @@ -4103,6 +4121,15 @@ class HashJoinSlaveActivity : public CSlaveActivity, implements IStopInput
activeStats.setStatistic(StNumRightRows, joinhelper->getRhsProgress());
}
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = PARENT::queryLookAheadCycles();
if (lhsDistributor)
lookAheadCycles += lhsDistributor->queryLookAheadCycles();
if (rhsDistributor)
lookAheadCycles += rhsDistributor->queryLookAheadCycles();
return lookAheadCycles;
}
};
#ifdef _MSC_VER
#pragma warning(pop)
Expand Down Expand Up @@ -4584,6 +4611,13 @@ class CHashAggregateSlave : public CSlaveActivity, implements IHThorRowAggregato
info.canStall = true;
// maybe more?
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = PARENT::queryLookAheadCycles();
if (distributor)
lookAheadCycles += distributor->queryLookAheadCycles();
return lookAheadCycles;
}
// IHThorRowAggregator impl
virtual size32_t clearAggregate(ARowBuilder & rowBuilder) override { return helper->clearAggregate(rowBuilder); }
virtual size32_t processFirst(ARowBuilder & rowBuilder, const void * src) override { return helper->processFirst(rowBuilder, src); }
Expand Down
1 change: 1 addition & 0 deletions thorlcr/activities/hashdistrib/thhashdistribslave.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ interface IHashDistributor : extends IInterface
virtual void join()=0;
virtual void setBufferSizes(unsigned sendBufferSize, unsigned outputBufferSize, unsigned pullBufferSize) = 0;
virtual void mergeStats(CRuntimeStatisticCollection &stats) const = 0;
virtual unsigned __int64 queryLookAheadCycles() const = 0;
virtual void abort()=0;
};

Expand Down
7 changes: 7 additions & 0 deletions thorlcr/activities/indexread/thindexreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,13 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements
merging = false;
appendOutputLinked(this);
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = PARENT::queryLookAheadCycles();
if (distributor)
lookAheadCycles += distributor->queryLookAheadCycles();
return lookAheadCycles;
}
// IHThorGroupAggregateCallback
virtual void processRow(const void *next)
{
Expand Down
9 changes: 9 additions & 0 deletions thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2837,6 +2837,15 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
PARENT::start();
dbgassertex(isSmart() || (leftITDL->isGrouped() == grouped)); // std. lookup join expects these to match
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = PARENT::queryLookAheadCycles();
if (rhsDistributor)
lookAheadCycles += rhsDistributor->queryLookAheadCycles();
if (lhsDistributor)
lookAheadCycles += lhsDistributor->queryLookAheadCycles();
return lookAheadCycles;
}
virtual void reset() override
{
PARENT::reset();
Expand Down
20 changes: 16 additions & 4 deletions thorlcr/graph/thgraphslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ MemoryBuffer &CSlaveActivity::getInitializationData(unsigned slave, MemoryBuffer
return mb.append(queryInitializationData(slave));
}

unsigned __int64 CSlaveActivity::queryLocalCycles() const
unsigned __int64 CSlaveActivity::queryLocalCycles(unsigned __int64 totalCycles, unsigned __int64 blockedCycles, unsigned __int64 lookAheadCycles) const
{
unsigned __int64 inputCycles = 0;
if (1 == inputs.ordinality())
Expand Down Expand Up @@ -587,11 +587,10 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const
break;
}
}
unsigned __int64 processCycles = queryTotalCycles() + queryLookAheadCycles();
unsigned __int64 processCycles = totalCycles + lookAheadCycles;
if (processCycles < inputCycles) // not sure how/if possible, but guard against
return 0;
processCycles -= inputCycles;
const unsigned __int64 blockedCycles = queryBlockedCycles();
if (processCycles < blockedCycles)
{
ActPrintLog("CSlaveActivity::queryLocalCycles - process %" I64F "uns < blocked %" I64F "uns", cycle_to_nanosec(processCycles), cycle_to_nanosec(blockedCycles));
Expand All @@ -600,6 +599,11 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const
return processCycles-blockedCycles;
}

unsigned __int64 CSlaveActivity::queryLocalCycles() const
{
return queryLocalCycles(queryTotalCycles(), queryBlockedCycles(), queryLookAheadCycles());
}

void CSlaveActivity::serializeStats(MemoryBuffer &mb)
{
CriticalBlock b(crit); // JCSMORE not sure what this is protecting..
Expand All @@ -619,7 +623,15 @@ void CSlaveActivity::serializeStats(MemoryBuffer &mb)
queryCodeContext()->gatherStats(serializedStats);

// JCS->GH - should these be serialized as cycles, and a different mapping used on master?
serializedStats.setStatistic(StTimeLocalExecute, (unsigned __int64)cycle_to_nanosec(queryLocalCycles()));
//
// Note: Look ahead cycles are not being kept up to date in slaverStats as multiple objects and threads are updating
// look ahead cycles. At the moment, each thread and objects that generate look ahead cycles, track its own look ahead
// cycles and the up to date lookahead cycles is only available with a call to queryLookAheadCycles(). The code would
// need to be refactored to change this behaviour.
unsigned __int64 lookAheadCycles = queryLookAheadCycles();
unsigned __int64 localCycles = queryLocalCycles(queryTotalCycles(), queryBlockedCycles(), lookAheadCycles);
serializedStats.setStatistic(StTimeLookAhead, (unsigned __int64)cycle_to_nanosec(lookAheadCycles));
serializedStats.setStatistic(StTimeLocalExecute, (unsigned __int64)cycle_to_nanosec(localCycles));
slaveTimerStats.addStatistics(serializedStats);
serializedStats.serialize(mb);
ForEachItemIn(i, outputs)
Expand Down
1 change: 1 addition & 0 deletions thorlcr/graph/thgraphslave.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
bool canStall() const;
bool isFastThrough() const;
bool suppressLookAhead() const;
unsigned __int64 queryLocalCycles(unsigned __int64 totalCycles, unsigned __int64 blockedCycles, unsigned __int64 lookAheadCycles) const;

// IThorDataLink
virtual CSlaveActivity *queryFromActivity() override { return this; }
Expand Down

0 comments on commit b49e7e0

Please sign in to comment.