diff --git a/extensions/standard-processors/tests/integration/InvokeHTTPTests.cpp b/extensions/standard-processors/tests/integration/InvokeHTTPTests.cpp index e543302fa0e..9151f180b84 100644 --- a/extensions/standard-processors/tests/integration/InvokeHTTPTests.cpp +++ b/extensions/standard-processors/tests/integration/InvokeHTTPTests.cpp @@ -33,230 +33,57 @@ #include "processors/LogAttribute.h" #include "unit/SingleProcessorTestController.h" #include "integration/ConnectionCountingServer.h" +#include "unit/TestUtils.h" namespace org::apache::nifi::minifi::test { class TestHTTPServer { public: - TestHTTPServer(); + TestHTTPServer(TestController& test_controller) { + LogTestController::getInstance().setDebug(); + LogTestController::getInstance().setDebug(); + + test_plan_ = test_controller.createPlan(); + + listen_http_ = std::dynamic_pointer_cast(test_plan_->addProcessor("ListenHTTP", PROCESSOR_NAME)); + log_attribute_ = std::dynamic_pointer_cast(test_plan_->addProcessor("LogAttribute", "LogAttribute", core::Relationship("success", "description"), true)); + REQUIRE(listen_http_); + REQUIRE(log_attribute_); + test_plan_->setProperty(listen_http_, org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "testytesttest"); + test_plan_->setProperty(listen_http_, org::apache::nifi::minifi::processors::ListenHTTP::Port, "8681"); + test_plan_->setProperty(listen_http_, org::apache::nifi::minifi::processors::ListenHTTP::HeadersAsAttributesRegex, ".*"); + test_plan_->runProcessor(0); + test_plan_->runProcessor(1); + thread_ = std::thread{[this] { + while (running_) { + if (listen_http_->isWorkAvailable()) { + test_plan_->runProcessor(0); + test_plan_->runProcessor(1); + } + } + }}; + } static constexpr const char* PROCESSOR_NAME = "my_http_server"; static constexpr const char* URL = "http://localhost:8681/testytesttest"; - void trigger() { - LogTestController::getInstance().setDebug(); - LogTestController::getInstance().setDebug(); - test_plan_->reset(); - test_controller_.runSession(test_plan_); + ~TestHTTPServer() { + running_ = false; + thread_.join(); } private: - TestController test_controller_; std::shared_ptr listen_http_; std::shared_ptr log_attribute_; - std::shared_ptr test_plan_ = test_controller_.createPlan(); + std::shared_ptr test_plan_; + std::thread thread_; + std::atomic_bool running_{true}; }; -TestHTTPServer::TestHTTPServer() { - LogTestController::getInstance().setDebug(); - LogTestController::getInstance().setDebug(); - - listen_http_ = std::dynamic_pointer_cast(test_plan_->addProcessor("ListenHTTP", PROCESSOR_NAME)); - log_attribute_ = std::dynamic_pointer_cast(test_plan_->addProcessor("LogAttribute", "LogAttribute", core::Relationship("success", "description"), true)); - REQUIRE(listen_http_); - REQUIRE(log_attribute_); - test_plan_->setProperty(listen_http_, org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "testytesttest"); - test_plan_->setProperty(listen_http_, org::apache::nifi::minifi::processors::ListenHTTP::Port, "8681"); - test_plan_->setProperty(listen_http_, org::apache::nifi::minifi::processors::ListenHTTP::HeadersAsAttributesRegex, ".*"); - test_controller_.runSession(test_plan_); -} - -TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") { - TestController testController; - TestHTTPServer http_server; - - LogTestController::getInstance().setDebug(); - - std::shared_ptr content_repo = std::make_shared(); - std::shared_ptr repo = std::make_shared(); - - std::shared_ptr invokehttp = std::make_shared("invokehttp"); - invokehttp->initialize(); - - minifi::utils::Identifier invokehttp_uuid = invokehttp->getUUID(); - REQUIRE(invokehttp_uuid); - - auto node = std::make_shared(invokehttp.get()); - auto context = std::make_shared(node, nullptr, repo, repo, content_repo); - - context->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST"); - context->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, TestHTTPServer::URL); - - auto session = std::make_shared(context); - - invokehttp->incrementActiveTasks(); - invokehttp->setScheduledState(core::ScheduledState::RUNNING); - auto factory2 = std::make_shared(context); - invokehttp->onSchedule(*context, *factory2); - invokehttp->onTrigger(*context, *session); - - auto reporter = session->getProvenanceReporter(); - auto records = reporter->getEvents(); - auto record = session->get(); - REQUIRE(record == nullptr); - REQUIRE(records.empty()); - - reporter = session->getProvenanceReporter(); - - records = reporter->getEvents(); - session->commit(); - - invokehttp->incrementActiveTasks(); - invokehttp->setScheduledState(core::ScheduledState::RUNNING); - invokehttp->onTrigger(*context, *session); - - session->commit(); - records = reporter->getEvents(); - // FIXME(fgerlits): this test is very weak, as `records` is empty - for (const auto& provEventRecord : records) { - REQUIRE(provEventRecord->getComponentType() == TestHTTPServer::PROCESSOR_NAME); - } - - REQUIRE(LogTestController::getInstance().contains("Exiting because method is POST")); -} - -TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") { - TestController testController; - - LogTestController::getInstance().setDebug(); - - auto repo = std::make_shared(); - - std::shared_ptr listenhttp = std::make_shared("listenhttp"); - listenhttp->initialize(); - - std::shared_ptr invokehttp = std::make_shared("invokehttp"); - invokehttp->initialize(); - - minifi::utils::Identifier processoruuid = listenhttp->getUUID(); - REQUIRE(processoruuid); - - minifi::utils::Identifier invokehttp_uuid = invokehttp->getUUID(); - REQUIRE(invokehttp_uuid); - - auto configuration = std::make_shared(); - std::shared_ptr content_repo = std::make_shared(); - content_repo->initialize(configuration); - - auto connection = std::make_shared(repo, content_repo, "getfileCreate2Connection"); - connection->addRelationship(core::Relationship("success", "description")); - - auto connection2 = std::make_shared(repo, content_repo, "listenhttp"); - - connection2->addRelationship(core::Relationship("No Retry", "description")); - - // link the connections so that we can test results at the end for this - connection->setSource(listenhttp.get()); - connection->setSourceUUID(invokehttp_uuid); - connection->setDestinationUUID(processoruuid); - connection2->setSourceUUID(processoruuid); - - listenhttp->addConnection(connection.get()); - invokehttp->addConnection(connection.get()); - invokehttp->addConnection(connection2.get()); - - auto node = std::make_shared(listenhttp.get()); - auto node2 = std::make_shared(invokehttp.get()); - auto context = std::make_shared(node, nullptr, repo, repo, content_repo); - auto context2 = std::make_shared(node2, nullptr, repo, repo, content_repo); - context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8680"); - context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest"); - - context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST"); - context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8680/testytesttest"); - auto session = std::make_shared(context); - auto session2 = std::make_shared(context2); - - REQUIRE(listenhttp->getName() == "listenhttp"); - - auto factory = std::make_shared(context); - - std::shared_ptr record; - - invokehttp->incrementActiveTasks(); - invokehttp->setScheduledState(core::ScheduledState::RUNNING); - auto factory2 = std::make_shared(context2); - invokehttp->onSchedule(*context2, *factory2); - invokehttp->onTrigger(*context2, *session2); - - listenhttp->incrementActiveTasks(); - listenhttp->setScheduledState(core::ScheduledState::RUNNING); - listenhttp->onSchedule(*context, *factory); - listenhttp->onTrigger(*context, *session); - - auto reporter = session->getProvenanceReporter(); - auto records = reporter->getEvents(); - record = session->get(); - REQUIRE(record == nullptr); - REQUIRE(records.empty()); - - listenhttp->incrementActiveTasks(); - listenhttp->setScheduledState(core::ScheduledState::RUNNING); - listenhttp->onTrigger(*context, *session); - - reporter = session->getProvenanceReporter(); - - records = reporter->getEvents(); - session->commit(); - - invokehttp->incrementActiveTasks(); - invokehttp->setScheduledState(core::ScheduledState::RUNNING); - invokehttp->onTrigger(*context2, *session2); - - session2->commit(); - records = reporter->getEvents(); - // FIXME(fgerlits): this test is very weak, as `records` is empty - for (const auto& provEventRecord : records) { - REQUIRE(provEventRecord->getComponentType() == listenhttp->getName()); - } - - REQUIRE(true == LogTestController::getInstance().contains("Exiting because method is POST")); -} - -TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") { - TestController testController; - TestHTTPServer http_server; - - LogTestController::getInstance().setDebug(); - - std::shared_ptr plan = testController.createPlan(); - std::shared_ptr invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp"); - - plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST"); - plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::URL, TestHTTPServer::URL); - testController.runSession(plan); - - auto records = plan->getProvenanceRecords(); - std::shared_ptr record = plan->getCurrentFlowFile(); - REQUIRE(record == nullptr); - REQUIRE(records.empty()); - - plan->reset(); - testController.runSession(plan); - - records = plan->getProvenanceRecords(); - // FIXME(fgerlits): this test is very weak, as `records` is empty - for (const auto& provEventRecord : records) { - REQUIRE(provEventRecord->getComponentType() == TestHTTPServer::PROCESSOR_NAME); - } - - REQUIRE(true == LogTestController::getInstance().contains("Exiting because method is POST")); -} - TEST_CASE("HTTPTestsPenalizeNoRetry", "[httptest1]") { using minifi::processors::InvokeHTTP; TestController testController; - TestHTTPServer http_server; + TestHTTPServer http_server(testController); LogTestController::getInstance().setInfo(); @@ -285,11 +112,12 @@ TEST_CASE("HTTPTestsPenalizeNoRetry", "[httptest1]") { TEST_CASE("InvokeHTTP fails with when flow contains invalid attribute names in HTTP headers", "[httptest1]") { using minifi::processors::InvokeHTTP; - TestHTTPServer http_server; - LogTestController::getInstance().setDebug(); auto invokehttp = std::make_shared("InvokeHTTP"); test::SingleProcessorTestController test_controller{invokehttp}; + TestHTTPServer http_server(test_controller); + + LogTestController::getInstance().setDebug(); invokehttp->setProperty(InvokeHTTP::Method, "GET"); invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL); @@ -305,11 +133,12 @@ TEST_CASE("InvokeHTTP fails with when flow contains invalid attribute names in H TEST_CASE("InvokeHTTP succeeds when the flow file contains an attribute that would be invalid as an HTTP header, and the policy is FAIL, but the attribute is not matched", "[httptest1][invokehttp][httpheader][attribute]") { using minifi::processors::InvokeHTTP; - TestHTTPServer http_server; - LogTestController::getInstance().setDebug(); auto invokehttp = std::make_shared("InvokeHTTP"); test::SingleProcessorTestController test_controller{invokehttp}; + TestHTTPServer http_server(test_controller); + + LogTestController::getInstance().setDebug(); invokehttp->setProperty(InvokeHTTP::Method, "GET"); invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL); @@ -320,17 +149,17 @@ TEST_CASE("InvokeHTTP succeeds when the flow file contains an attribute that wou REQUIRE(result.at(InvokeHTTP::RelFailure).empty()); const auto& success_contents = result.at(InvokeHTTP::Success); REQUIRE(success_contents.size() == 1); - http_server.trigger(); + REQUIRE(utils::verifyLogLinePresenceInPollTime(1s, "key:valid-header value:value2")); REQUIRE_FALSE(LogTestController::getInstance().contains("key:invalid")); - REQUIRE(LogTestController::getInstance().contains("key:valid-header value:value2")); } TEST_CASE("InvokeHTTP replaces invalid characters of attributes", "[httptest1]") { using minifi::processors::InvokeHTTP; - TestHTTPServer http_server; auto invokehttp = std::make_shared("InvokeHTTP"); test::SingleProcessorTestController test_controller{invokehttp}; + TestHTTPServer http_server(test_controller); + LogTestController::getInstance().setTrace(); invokehttp->setProperty(InvokeHTTP::Method, "GET"); @@ -341,17 +170,17 @@ TEST_CASE("InvokeHTTP replaces invalid characters of attributes", "[httptest1]") auto file_contents = result.at(InvokeHTTP::Success); REQUIRE(file_contents.size() == 1); REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data"); - http_server.trigger(); - REQUIRE(LogTestController::getInstance().contains("key:invalid-header value:value")); - REQUIRE(LogTestController::getInstance().contains("key:X-MiNiFi-Empty-Attribute-Name value:value2")); + REQUIRE(utils::verifyLogLinePresenceInPollTime(1s, "key:invalid-header value:value")); + REQUIRE(utils::verifyLogLinePresenceInPollTime(1s, "key:X-MiNiFi-Empty-Attribute-Name value:value2")); } TEST_CASE("InvokeHTTP drops invalid attributes from HTTP headers", "[httptest1]") { using minifi::processors::InvokeHTTP; - TestHTTPServer http_server; auto invokehttp = std::make_shared("InvokeHTTP"); test::SingleProcessorTestController test_controller{invokehttp}; + TestHTTPServer http_server(test_controller); + LogTestController::getInstance().setTrace(); invokehttp->setProperty(InvokeHTTP::Method, "GET"); @@ -363,17 +192,17 @@ TEST_CASE("InvokeHTTP drops invalid attributes from HTTP headers", "[httptest1]" auto file_contents = result.at(InvokeHTTP::Success); REQUIRE(file_contents.size() == 1); REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data"); - http_server.trigger(); - REQUIRE(LogTestController::getInstance().contains("key:legit-header value:value1")); + REQUIRE(utils::verifyLogLinePresenceInPollTime(1s, "key:legit-header value:value1")); REQUIRE_FALSE(LogTestController::getInstance().contains("key:invalid", 0s)); } TEST_CASE("InvokeHTTP empty Attributes to Send means no attributes are sent", "[httptest1]") { using minifi::processors::InvokeHTTP; - TestHTTPServer http_server; auto invokehttp = std::make_shared("InvokeHTTP"); test::SingleProcessorTestController test_controller{invokehttp}; + TestHTTPServer http_server(test_controller); + LogTestController::getInstance().setTrace(); invokehttp->setProperty(InvokeHTTP::Method, "GET"); @@ -385,17 +214,17 @@ TEST_CASE("InvokeHTTP empty Attributes to Send means no attributes are sent", "[ auto file_contents = result.at(InvokeHTTP::Success); REQUIRE(file_contents.size() == 1); REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data"); - http_server.trigger(); REQUIRE_FALSE(LogTestController::getInstance().contains("key:legit-header value:value1")); REQUIRE_FALSE(LogTestController::getInstance().contains("key:invalid", 0s)); } TEST_CASE("InvokeHTTP DateHeader", "[InvokeHTTP]") { using minifi::processors::InvokeHTTP; - TestHTTPServer http_server; auto invoke_http = std::make_shared("InvokeHTTP"); test::SingleProcessorTestController test_controller{invoke_http}; + TestHTTPServer http_server(test_controller); + LogTestController::getInstance().setTrace(); invoke_http->setProperty(InvokeHTTP::Method, "GET"); @@ -416,16 +245,16 @@ TEST_CASE("InvokeHTTP DateHeader", "[InvokeHTTP]") { auto file_contents = result.at(InvokeHTTP::Success); REQUIRE(file_contents.size() == 1); REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data"); - http_server.trigger(); - REQUIRE(LogTestController::getInstance().contains("key:Date", 0ms) == date_header); + REQUIRE(utils::verifyEventHappenedInPollTime(1s, [&] {return LogTestController::getInstance().contains("key:Date", 0ms) == date_header;})); } TEST_CASE("InvokeHTTP Attributes to Send uses full string matching, not substring", "[httptest1]") { using minifi::processors::InvokeHTTP; - TestHTTPServer http_server; auto invokehttp = std::make_shared("InvokeHTTP"); test::SingleProcessorTestController test_controller{invokehttp}; + TestHTTPServer http_server(test_controller); + LogTestController::getInstance().setTrace(); invokehttp->setProperty(InvokeHTTP::Method, "GET"); @@ -437,9 +266,8 @@ TEST_CASE("InvokeHTTP Attributes to Send uses full string matching, not substrin auto file_contents = result.at(InvokeHTTP::Success); REQUIRE(file_contents.size() == 1); REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data"); - http_server.trigger(); + REQUIRE(utils::verifyLogLinePresenceInPollTime(1s, "key:header value:value2")); REQUIRE_FALSE(LogTestController::getInstance().contains("key:header1 value:value1")); - REQUIRE(LogTestController::getInstance().contains("key:header value:value2")); REQUIRE_FALSE(LogTestController::getInstance().contains("key:invalid", 0s)); }