Skip to content

Commit

Permalink
HPCC-32958 Roxie dynamic priority
Browse files Browse the repository at this point in the history
Signed-off-by: M Kelly <[email protected]>
  • Loading branch information
mckellyln committed Nov 23, 2024
1 parent 3d4c3b3 commit d3f5445
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 22 deletions.
5 changes: 3 additions & 2 deletions roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ extern StringArray allQuerySetNames;
extern bool blockedLocalAgent;
extern bool acknowledgeAllRequests;
extern unsigned packetAcknowledgeTimeout;
extern unsigned dynPriorityAdjustTime;
extern bool alwaysTrustFormatCrcs;
extern bool allFilesDynamic;
extern bool lockSuperFiles;
Expand Down Expand Up @@ -333,8 +334,8 @@ extern unsigned memoryStatsInterval;
extern unsigned pingInterval;
extern unsigned socketCheckInterval;
extern memsize_t defaultMemoryLimit;
extern unsigned defaultTimeLimit[4];
extern unsigned defaultWarnTimeLimit[4];
extern unsigned defaultTimeLimit[3];
extern unsigned defaultWarnTimeLimit[3];
extern unsigned defaultThorConnectTimeout;
extern bool pretendAllOpt;
extern ClientCertificate clientCert;
Expand Down
5 changes: 5 additions & 0 deletions roxie/ccd/ccdcontext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,11 @@ class CRoxieContextBase : implements IRoxieAgentContext, implements ICodeContext
return options;
}

virtual unsigned queryElapsedMs() const
{
return elapsedTimer.elapsedMs();
}

const char *queryAuthToken()
{
return authToken.str();
Expand Down
1 change: 1 addition & 0 deletions roxie/ccd/ccdcontext.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ interface IRoxieAgentContext : extends IRoxieContextLogger
virtual void noteChildGraph(unsigned id, IActivityGraph *childGraph) = 0;
virtual roxiemem::IRowManager &queryRowManager() = 0;
virtual const QueryOptions &queryOptions() const = 0;
virtual unsigned queryElapsedMs() const = 0;
virtual void addAgentsReplyLen(unsigned len, unsigned duplicates, unsigned resends) = 0;
virtual const char *queryAuthToken() = 0;
virtual const IResolvedFile *resolveLFN(const char *filename, bool isOpt, bool isPrivilegedUser) = 0;
Expand Down
8 changes: 4 additions & 4 deletions roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ unsigned numRequestArrayThreads = 5;
bool blockedLocalAgent = true;
bool acknowledgeAllRequests = true;
unsigned packetAcknowledgeTimeout = 100;
unsigned dynPriorityAdjustTime = 0; // default off (0)
unsigned headRegionSize;
unsigned ccdMulticastPort;
bool enableHeartBeat = true;
Expand Down Expand Up @@ -164,8 +165,8 @@ int backgroundCopyPrio = 0;

unsigned memoryStatsInterval = 0;
memsize_t defaultMemoryLimit;
unsigned defaultTimeLimit[4] = {0, 0, 0, 0};
unsigned defaultWarnTimeLimit[4] = {0, 5000, 5000, 10000};
unsigned defaultTimeLimit[3] = {0, 0, 0};
unsigned defaultWarnTimeLimit[3] = {0, 5000, 5000};
unsigned defaultThorConnectTimeout;

unsigned defaultParallelJoinPreload = 0;
Expand Down Expand Up @@ -1007,6 +1008,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
acknowledgeAllRequests = topology->getPropBool("@acknowledgeAllRequests", acknowledgeAllRequests);
headRegionSize = topology->getPropInt("@headRegionSize", 0);
packetAcknowledgeTimeout = topology->getPropInt("@packetAcknowledgeTimeout", packetAcknowledgeTimeout);
dynPriorityAdjustTime = topology->getPropInt("@dynPriorityAdjustTime", 0);
ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT);
statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600);
roxiemem::setMemTraceSizeLimit((memsize_t) topology->getPropInt64("@memTraceSizeLimit", 0));
Expand Down Expand Up @@ -1169,11 +1171,9 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
defaultTimeLimit[0] = (unsigned) topology->getPropInt64("@defaultLowPriorityTimeLimit", 0);
defaultTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeLimit", 0);
defaultTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeLimit", 0);
defaultTimeLimit[3] = (unsigned) topology->getPropInt64("@defaultBGPriorityTimeLimit", 0);
defaultWarnTimeLimit[0] = (unsigned) topology->getPropInt64("@defaultLowPriorityTimeWarning", 0);
defaultWarnTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeWarning", 0);
defaultWarnTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeWarning", 0);
defaultWarnTimeLimit[3] = (unsigned) topology->getPropInt64("@defaultBGPriorityTimeWarning", 0);
defaultThorConnectTimeout = (unsigned) topology->getPropInt64("@defaultThorConnectTimeout", 60);
continuationCompressThreshold = (unsigned) topology->getPropInt64("@continuationCompressThreshold", 1024);

Expand Down
33 changes: 31 additions & 2 deletions roxie/ccd/ccdquery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ class CSharedOnceContext : public CInterfaceOf<ISharedOnceContext>
QueryOptions::QueryOptions()
{
priority = 0;
dynPriority = 0;
timeLimit = defaultTimeLimit[0];
warnTimeLimit = defaultWarnTimeLimit[0];

Expand Down Expand Up @@ -358,6 +359,7 @@ QueryOptions::QueryOptions()
QueryOptions::QueryOptions(const QueryOptions &other)
{
priority = other.priority;
dynPriority = other.dynPriority;
timeLimit = other.timeLimit;
warnTimeLimit = other.warnTimeLimit;

Expand Down Expand Up @@ -400,10 +402,12 @@ void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stat
updateFromWorkUnit(priority, wu, "priority");
if (stateInfo)
updateFromContext(priority, stateInfo, "@priority");
dynPriority = priority;
if ((int)priority < 0)
{
timeLimit = defaultTimeLimit[3];
warnTimeLimit = defaultWarnTimeLimit[3];
// use LOW queue time limits ...
timeLimit = defaultTimeLimit[0];
warnTimeLimit = defaultWarnTimeLimit[0];
}
else
{
Expand Down Expand Up @@ -494,6 +498,31 @@ void QueryOptions::setFromContext(const IPropertyTree *ctx)
if (ctx)
{
// Note: priority cannot be set at context level
// b/c this is after activities have been created, but we could
// dynamically adj priority in the header activityId before sending
int tmpPriority;
updateFromContext(tmpPriority, ctx, "@priority", "_Priority");
if (tmpPriority > 1)
tmpPriority = 1;
if (tmpPriority < -1)
tmpPriority = -1;

if (tmpPriority < (int)priority)
{
dynPriority = tmpPriority;
if (dynPriority < 0)
{
// use LOW queue time limits ...
timeLimit = defaultTimeLimit[0];
warnTimeLimit = defaultWarnTimeLimit[0];
}
else
{
timeLimit = defaultTimeLimit[dynPriority];
warnTimeLimit = defaultWarnTimeLimit[dynPriority];
}
}

updateFromContext(timeLimit, ctx, "@timeLimit", "_TimeLimit");
updateFromContext(warnTimeLimit, ctx, "@warnTimeLimit", "_WarnTimeLimit");
updateFromContextM(memoryLimit, ctx, "@memoryLimit", "_MemoryLimit");
Expand Down
2 changes: 1 addition & 1 deletion roxie/ccd/ccdquery.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ class QueryOptions
void setFromContext(const IPropertyTree *ctx);
void setFromAgentLoggingFlags(unsigned loggingFlags);


unsigned priority;
mutable int dynPriority;
unsigned timeLimit;
unsigned warnTimeLimit;
unsigned traceLimit;
Expand Down
2 changes: 1 addition & 1 deletion roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1936,7 +1936,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface
loQueue.start();
hiQueue.start();
slaQueue.start();
bgQueue.start();
bgQueue.start(); // consider nice(+3) BG threads
}

virtual void stop()
Expand Down
48 changes: 46 additions & 2 deletions roxie/ccd/ccdserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ class IndirectAgentContext : implements IRoxieAgentContext, public CInterface
{
return ctx->queryOptions();
}
virtual unsigned queryElapsedMs() const override
{
return ctx->queryElapsedMs();
}
virtual void addAgentsReplyLen(unsigned len, unsigned duplicates, unsigned resends) override
{
ctx->addAgentsReplyLen(len, duplicates, resends);
Expand Down Expand Up @@ -4557,9 +4561,38 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
// But this could still cause too many reply packets on the fastlane
// (higher priority output Q), which may cause the activities on the
// low priority output Q to not get service on time.

int origPriority = (int)ctx->queryOptions().priority;
int dynPriority = ctx->queryOptions().dynPriority;
if (dynPriority < origPriority)
{
unsigned newPri = ROXIE_SLA_PRIORITY + ROXIE_HIGH_PRIORITY;
switch (dynPriority)
{
case 1:
newPri = ROXIE_HIGH_PRIORITY;
break;
case 0:
newPri = ROXIE_LOW_PRIORITY;
break;
}
p->queryHeader().activityId &= ~ROXIE_PRIORITY_MASK;
p->queryHeader().activityId |= newPri;
}

// TODO: perhaps check elapsed every Nth msg ?
if ( (dynPriorityAdjustTime > 0) && (origPriority == 0) && (dynPriority == 0) &&
(ctx->queryElapsedMs() > dynPriorityAdjustTime) )
{
ctx->queryOptions().dynPriority = -1;
UWARNLOG("WARNING: %d msec dynamic adjustment threshold reached, shifting query to BG queue", dynPriorityAdjustTime);
p->queryHeader().activityId |= (ROXIE_SLA_PRIORITY + ROXIE_HIGH_PRIORITY);
// TODO: what to do about still running activities' continuation/ack priorities ?
}

unsigned pmask = p->queryHeader().activityId & ROXIE_PRIORITY_MASK;
if ((colocalArg == 0) && // not a child query activity??
(pmask && (pmask != ROXIE_PRIORITY_MASK)) &&
(pmask && (pmask != (ROXIE_SLA_PRIORITY + ROXIE_HIGH_PRIORITY))) &&
(p->queryHeader().overflowSequence == 0) &&
(p->queryHeader().continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)==0)
p->queryHeader().retries |= ROXIE_FASTLANE;
Expand Down Expand Up @@ -5014,7 +5047,18 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
mu.clear();
SimpleActivityTimer t(unpackerWaitCycles, timeActivities);
unsigned ctxTraceLevel = activity.queryLogCtx().queryTraceLevel();
unsigned timeout = remoteId.isSLAPriority() ? slaTimeout : (remoteId.isHighPriority() ? highTimeout : lowTimeout);

unsigned timeout = lowTimeout;
switch (activity.queryContext()->queryOptions().dynPriority)
{
case 2:
timeout = slaTimeout;
break;
case 1:
timeout = highTimeout;
break;
}

unsigned checkInterval = activity.queryContext()->checkInterval();
if (checkInterval > timeout)
checkInterval = timeout;
Expand Down
10 changes: 0 additions & 10 deletions roxie/ccd/ccdstate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2439,16 +2439,6 @@ class CRoxiePackageSetManager : implements IRoxieQueryPackageManagerSet, impleme
defaultWarnTimeLimit[2] = control->getPropInt("@limit", 0);
topology->setPropInt("@defaultSLAPriorityTimeWarning", defaultWarnTimeLimit[2]);
}
else if (stricmp(queryName, "control:defaultBGPriorityTimeLimit")==0)
{
defaultTimeLimit[3] = control->getPropInt("@limit", 0);
topology->setPropInt("@defaultBGPriorityTimeLimit", defaultTimeLimit[3]);
}
else if (stricmp(queryName, "control:defaultBGPriorityTimeWarning")==0)
{
defaultWarnTimeLimit[3] = control->getPropInt("@limit", 0);
topology->setPropInt("@defaultBGPriorityTimeWarning", defaultWarnTimeLimit[3]);
}
else if (stricmp(queryName, "control:deleteUnneededPhysicalFiles")==0)
{
UNIMPLEMENTED;
Expand Down

0 comments on commit d3f5445

Please sign in to comment.