Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32248 Add tracing to rowservice #19314

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions fs/dafilesrv/dafilesrv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,6 @@ version: 1.0
detail: 100
)!!";


int main(int argc, const char* argv[])
{
InitModuleObjects();
Expand All @@ -386,7 +385,21 @@ int main(int argc, const char* argv[])
StringBuffer componentName;

// NB: bare-metal dafilesrv does not have a component specific xml
Owned<IPropertyTree> config = loadConfiguration(defaultYaml, argv, "dafilesrv", "DAFILESRV", nullptr, nullptr);
Owned<IPropertyTree> extractedGlobalConfig = createPTree("dafilesrv");

#ifndef _CONTAINERIZED
Owned<IPropertyTree> env = getHPCCEnvironment();
IPropertyTree* globalTracing = env->getPropTree("Software/tracing");
if (globalTracing != nullptr)
extractedGlobalConfig->addPropTree("tracing", globalTracing);
jakesmith marked this conversation as resolved.
Show resolved Hide resolved
#endif

const char* componentTag = "dafilesrv";
Owned<IPropertyTree> defaultConfig = createPTreeFromYAMLString(defaultYaml, 0, ptr_ignoreWhiteSpace, nullptr);
Owned<IPropertyTree> componentDefault(defaultConfig->getPropTree(componentTag));

// NB: bare-metal dafilesrv does not have a component specific xml, extracting relevant global configuration instead
Owned<IPropertyTree> config = loadConfiguration(componentDefault, extractedGlobalConfig, argv, componentTag, "DAFILESRV", nullptr, nullptr);

Owned<IPropertyTree> keyPairInfo; // NB: not used in containerized mode
// Get SSL Settings
Expand Down Expand Up @@ -513,7 +526,6 @@ int main(int argc, const char* argv[])

IPropertyTree *dafileSrvInstance = nullptr;
#ifndef _CONTAINERIZED
Owned<IPropertyTree> env = getHPCCEnvironment();
Owned<IPropertyTree> _dafileSrvInstance;
if (env)
{
Expand Down
81 changes: 77 additions & 4 deletions fs/dafsserver/dafsserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,9 @@ class CRemoteRequest : public CSimpleInterfaceOf<IInterface>
MemoryBuffer expandMb;
Owned<IXmlWriterExt> responseWriter; // for xml or json response

OwnedSpanLifetime requestSpan;
std::string requestTraceParent;

bool handleFull(MemoryBuffer &inMb, size32_t inPos, MemoryBuffer &compressMb, ICompressor *compressor, size32_t replyLimit, size32_t &totalSz)
{
size32_t sz = inMb.length()-inPos;
Expand Down Expand Up @@ -1092,13 +1095,58 @@ class CRemoteRequest : public CSimpleInterfaceOf<IInterface>
responseWriter->outputUInt(cursorHandle, sizeof(cursorHandle), "handle");
}
}

~CRemoteRequest()
{
if (requestSpan != nullptr)
{
requestSpan->setSpanStatusSuccess(true);
}
}

OutputFormat queryFormat() const { return format; }
unsigned __int64 queryReplyLimit() const { return replyLimit; }
IRemoteActivity *queryActivity() const { return activity; }
ICompressor *queryCompressor() const { return compressor; }

void process(IPropertyTree *requestTree, MemoryBuffer &restMb, MemoryBuffer &responseMb, CClientStats &stats)
{
bool traceParentChanged = false;
const char* fullTraceContext = requestTree->queryProp("_trace/traceparent");
if (fullTraceContext != nullptr)
{
// We only want to compare the trace-id & span-id, so ignore the last sampling group after the '-'
const char* lastHyphen = strchr(fullTraceContext, '-');
if (lastHyphen != nullptr)
{
size_t lastHyphenIdx = lastHyphen - fullTraceContext;
traceParentChanged = strncmp(fullTraceContext, requestTraceParent.c_str(), lastHyphenIdx) != 0;
}
}

if (traceParentChanged)
{
// Check to see if we have an existing span that needs to be marked successful before close
if (requestSpan != nullptr)
ghalliday marked this conversation as resolved.
Show resolved Hide resolved
{
requestSpan->setSpanStatusSuccess(true);
}

Owned<IProperties> traceHeaders = createProperties();
traceHeaders->setProp("traceparent", fullTraceContext);

const char* requestSpanName = nullptr;
if (activity->queryIsReadActivity())
requestSpanName = "ReadRequest";
else
requestSpanName = "WriteRequest";

requestSpan.setown(queryTraceManager().createServerSpan(requestSpanName, traceHeaders));
requestTraceParent = fullTraceContext;
}

ActiveSpanScope activeSpan(requestSpan.query());

if (requestTree->hasProp("replyLimit"))
replyLimit = requestTree->getPropInt64("replyLimit", defaultDaFSReplyLimitKB) * 1024;

Expand Down Expand Up @@ -3027,12 +3075,12 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
else
{
if (gc)
THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
break; // wait for rest via subsequent notifySelected's
}
}
else if (gc)
THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
// to be here, implies handled full message, loop around to see if more on the wire.
// will break out if nothing/partial.
}
Expand Down Expand Up @@ -4818,7 +4866,7 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
* }
* }
* }
*
*
* fetch continuation:
* {
* "format" : "binary",
Expand Down Expand Up @@ -4960,8 +5008,23 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
}
case StreamCmd::CLOSE:
{
OwnedActiveSpanScope closeSpan;
const char* traceParent = requestTree->queryProp("_trace/traceparent");
if (traceParent != nullptr)
{
Owned<IProperties> traceHeaders = createProperties();
traceHeaders->setProp("traceparent", traceParent);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also have the sampling suffix removed?


closeSpan.setown(queryTraceManager().createServerSpan("CloseRequest", traceHeaders));
}

if (0 == cursorHandle)
throw createDafsException(DAFSERR_cmdstream_protocol_failure, "cursor handle not supplied to 'close' command");
{
IDAFS_Exception* exception = createDafsException(DAFSERR_cmdstream_protocol_failure, "cursor handle not supplied to 'close' command");
closeSpan->recordException(exception);
throw exception;
}

IFileIO *dummy;
checkFileIOHandle(cursorHandle, dummy, true);
break;
Expand Down Expand Up @@ -4990,6 +5053,16 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
{
case StreamCmd::VERSION:
{
OwnedActiveSpanScope versionSpan;
const char* traceParent = requestTree->queryProp("_trace/traceparent");
if (traceParent != nullptr)
{
Owned<IProperties> traceHeaders = createProperties();
traceHeaders->setProp("traceparent", traceParent);

versionSpan.setown(queryTraceManager().createServerSpan("VersionRequest", traceHeaders));
}

if (outFmt_Binary == outputFormat)
reply.append(DAFILESRV_VERSIONSTRING);
else
Expand Down
Loading