diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index 03268d603c0..4b9529c29d3 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -313,6 +313,8 @@ extern bool blockedLocalAgent; extern bool acknowledgeAllRequests; extern unsigned packetAcknowledgeTimeout; extern cycle_t dynPriorityAdjustCycles; +extern bool traceThreadStartDelay; +extern int adjustBGThreadNiceValue; extern bool alwaysTrustFormatCrcs; extern bool allFilesDynamic; extern bool lockSuperFiles; diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index 1f8476cec72..f975c5409d8 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -76,6 +76,8 @@ bool blockedLocalAgent = true; bool acknowledgeAllRequests = true; unsigned packetAcknowledgeTimeout = 100; cycle_t dynPriorityAdjustCycles = 0; // default off (0) +bool traceThreadStartDelay = true; +int adjustBGThreadNiceValue = 5; unsigned headRegionSize; unsigned ccdMulticastPort; bool enableHeartBeat = true; @@ -1011,6 +1013,12 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) unsigned dynAdjustMsec = topology->getPropInt("@dynPriorityAdjustTime", 0); if (dynAdjustMsec) dynPriorityAdjustCycles = dynAdjustMsec * (queryOneSecCycles() / 1000ULL); + traceThreadStartDelay = topology->getPropBool("@traceThreadStartDelay", traceThreadStartDelay); + adjustBGThreadNiceValue = topology->getPropInt("@adjustBGThreadNiceValue", adjustBGThreadNiceValue); + if (adjustBGThreadNiceValue < 0) + adjustBGThreadNiceValue = 0; + if (adjustBGThreadNiceValue > 19) + adjustBGThreadNiceValue = 19; ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT); statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600); roxiemem::setMemTraceSizeLimit((memsize_t) topology->getPropInt64("@memTraceSizeLimit", 0)); diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index 5d0d52c0d6c..5897b28f939 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -1175,6 +1175,13 @@ class RoxieQueue : public CInterface, implements IThreadFactory if (qname && *qname) tname.appendf(" (%s)", qname); workers.setown(createThreadPool(tname.str(), this, false, nullptr, numWorkers)); + if (traceThreadStartDelay) + workers->setStartDelayTracing(60); + if (qname && *qname) + { + if (streq(qname, "BG")) + workers->setNiceValue(adjustBGThreadNiceValue); + } started = 0; idle = 0; if (IBYTIbufferSize) @@ -1914,7 +1921,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface public: IMPLEMENT_IINTERFACE; - RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers, "SLA"), hiQueue(headRegionSize, _numWorkers, "HIGH"), loQueue(headRegionSize, _numWorkers, "LOW"), bgQueue(headRegionSize, _numWorkers/2 + 1, "BG"), numWorkers(_numWorkers) + RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers, "SLA"), hiQueue(headRegionSize, _numWorkers, "HIGH"), loQueue(headRegionSize, _numWorkers, "LOW"), bgQueue(headRegionSize, _numWorkers, "BG"), numWorkers(_numWorkers) { } @@ -1936,7 +1943,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface loQueue.start(); hiQueue.start(); slaQueue.start(); - bgQueue.start(); // consider nice(+3) BG threads + bgQueue.start(); // NB BG thread priority can be adjusted } virtual void stop() diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index 503e57c2d99..06e92240020 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -4596,6 +4596,7 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie ctx->queryOptions().dynPriority = QUERY_BG_PRIORITY_VALUE; unsigned dynAdjustMsec = (dynPriorityAdjustCycles * 1000ULL) / queryOneSecCycles(); UWARNLOG("WARNING: %d msec dynamic adjustment threshold reached, shifting query to BG queue", dynAdjustMsec); + p->queryHeader().activityId &= ~ROXIE_PRIORITY_MASK; p->queryHeader().activityId |= ROXIE_BG_PRIORITY; // TODO: what to do about still running activities' continuation/ack priorities ? } diff --git a/system/jlib/jthread.cpp b/system/jlib/jthread.cpp index 8545d2bd641..769f04e9595 100644 --- a/system/jlib/jthread.cpp +++ b/system/jlib/jthread.cpp @@ -995,6 +995,7 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter unsigned stacksize; unsigned timeoutOnRelease; unsigned traceStartDelayPeriod = 0; + int niceValue = 0; unsigned startsInPeriod = 0; cycle_t startDelayInPeriod = 0; CCycleTimer overAllTimer; @@ -1114,6 +1115,8 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter CPooledThreadWrapper &ret = *new CPooledThreadWrapper(*this,newid,factory->createNew()); if (stacksize) ret.setStackSize(stacksize); + if (niceValue) + ret.setNice(niceValue); ret.start(false); threadwrappers.append(ret); return ret; @@ -1281,6 +1284,10 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter { traceStartDelayPeriod = secs; } + void setNiceValue(int value) + { + niceValue = value; + } bool waitAvailable(unsigned timeout) { if (!defaultmax) diff --git a/system/jlib/jthread.hpp b/system/jlib/jthread.hpp index 5d312aa9d2f..b48dff635a1 100644 --- a/system/jlib/jthread.hpp +++ b/system/jlib/jthread.hpp @@ -289,6 +289,7 @@ interface IThreadPool : extends IInterface virtual unsigned runningCount()=0; // number of currently running threads virtual PooledThreadHandle startNoBlock(void *param)=0; // starts a new thread if it can do so without blocking, else throws exception virtual void setStartDelayTracing(unsigned secs) = 0; // set start delay tracing period + virtual void setNiceValue(int value) = 0; // set priority for thread virtual bool waitAvailable(unsigned timeout) = 0; // wait until a pool member is available };