diff --git a/docker/test/integration/cluster/DockerTestDirectoryBindings.py b/docker/test/integration/cluster/DockerTestDirectoryBindings.py index 87cf2b4ec6..25add90ae7 100644 --- a/docker/test/integration/cluster/DockerTestDirectoryBindings.py +++ b/docker/test/integration/cluster/DockerTestDirectoryBindings.py @@ -18,7 +18,7 @@ import hashlib import subprocess import OpenSSL.crypto -from ssl_utils.SSL_cert_utils import make_self_signed_cert, make_cert_without_extended_usage, make_server_cert +from ssl_utils.SSL_cert_utils import make_self_signed_cert, make_cert_without_extended_usage, make_server_cert, make_client_cert class DockerTestDirectoryBindings: @@ -214,3 +214,11 @@ def create_cert_files(self): os.path.join(base, "root_ca.crt"), ] subprocess.run(cmd, check=True) + + clientuser_cert, clientuser_key = make_client_cert("clientuser", ca_cert=self.root_ca_cert, ca_key=self.root_ca_key) + self.put_test_resource('clientuser.crt', + OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, + cert=clientuser_cert)) + self.put_test_resource('clientuser.key', + OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM, + pkey=clientuser_key)) diff --git a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py index 9baf01f088..92e0be61b2 100644 --- a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py +++ b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py @@ -12,13 +12,39 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os +import OpenSSL.crypto +import tempfile +import docker +import requests +import logging +from requests.auth import HTTPBasicAuth from .Container import Container from utils import retry_check +from ssl_utils.SSL_cert_utils import make_server_cert class CouchbaseServerContainer(Container): - def __init__(self, feature_context, name, vols, network, image_store, command=None): - super().__init__(feature_context, name, 'couchbase-server', vols, network, image_store, command) + def __init__(self, feature_context, name, vols, network, image_store, command=None, ssl=False): + self.ssl = ssl + engine = "couchbase-server" if not ssl else "couchbase-server-ssl" + super().__init__(feature_context, name, engine, vols, network, image_store, command) + couchbase_cert, couchbase_key = make_server_cert(f"couchbase-server-{feature_context.id}", feature_context.root_ca_cert, feature_context.root_ca_key) + + self.root_ca_file = tempfile.NamedTemporaryFile(delete=False) + self.root_ca_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, cert=feature_context.root_ca_cert)) + self.root_ca_file.close() + os.chmod(self.root_ca_file.name, 0o700) + + self.couchbase_cert_file = tempfile.NamedTemporaryFile(delete=False) + self.couchbase_cert_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, cert=couchbase_cert)) + self.couchbase_cert_file.close() + os.chmod(self.couchbase_cert_file.name, 0o700) + + self.couchbase_key_file = tempfile.NamedTemporaryFile(delete=False) + self.couchbase_key_file.write(OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM, pkey=couchbase_key)) + self.couchbase_key_file.close() + os.chmod(self.couchbase_key_file.name, 0o700) def get_startup_finished_log_entry(self): # after startup the logs are only available in the container, only this message is shown @@ -33,12 +59,28 @@ def run_post_startup_commands(self): ["couchbase-cli", "cluster-init", "-c", "localhost", "--cluster-username", "Administrator", "--cluster-password", "password123", "--services", "data,index,query", "--cluster-ramsize", "2048", "--cluster-index-ramsize", "256"], ["couchbase-cli", "bucket-create", "-c", "localhost", "--username", "Administrator", "--password", "password123", "--bucket", "test_bucket", "--bucket-type", "couchbase", - "--bucket-ramsize", "1024", "--max-ttl", "36000"] + "--bucket-ramsize", "1024", "--max-ttl", "36000"], + ["couchbase-cli", "user-manage", "-c", "localhost", "-u", "Administrator", "-p", "password123", "--set", "--rbac-username", "clientuser", "--rbac-password", "password123", + "--roles", "data_reader[test_bucket],data_writer[test_bucket]", "--auth-domain", "local"], + ["bash", "-c", 'tee /tmp/auth.json <<< \'{"state": "enable", "prefixes": [ {"path": "subject.cn", "prefix": "", "delimiter": "."}]}\''], + ['couchbase-cli', 'ssl-manage', '-c', 'localhost', '-u', 'Administrator', '-p', 'password123', '--set-client-auth', '/tmp/auth.json'] ] + for command in commands: (code, _) = self.client.containers.get(self.name).exec_run(command) if code != 0: return False + + response = requests.post("http://localhost:8091/node/controller/loadTrustedCAs", auth=HTTPBasicAuth("Administrator", "password123")) + if response.status_code != 200: + logging.error("Failed to load CA certificates, with status code: {response.status_code}") + return False + + response = requests.post("http://localhost:8091/node/controller/reloadCertificate", auth=HTTPBasicAuth("Administrator", "password123")) + if response.status_code != 200: + logging.error("Failed to reload certificates, with status code: {response.status_code}") + return False + self.post_startup_commands_finished = True return True @@ -46,10 +88,26 @@ def deploy(self): if not self.set_deployed(): return + mounts = [ + docker.types.Mount( + type='bind', + source=self.couchbase_key_file.name, + target='/opt/couchbase/var/lib/couchbase/inbox/pkey.key'), + docker.types.Mount( + type='bind', + source=self.couchbase_cert_file.name, + target='/opt/couchbase/var/lib/couchbase/inbox/chain.pem'), + docker.types.Mount( + type='bind', + source=self.root_ca_file.name, + target='/opt/couchbase/var/lib/couchbase/inbox/CA/root_ca.crt') + ] + self.docker_container = self.client.containers.run( "couchbase:enterprise-7.2.5", detach=True, name=self.name, network=self.network.name, - ports={'11210/tcp': 11210}, - entrypoint=self.command) + ports={'8091/tcp': 8091, '11210/tcp': 11210}, + entrypoint=self.command, + mounts=mounts) diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py index 9b3ae5e4fb..32fa522682 100644 --- a/docker/test/integration/features/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py @@ -86,8 +86,9 @@ def start_minifi_c2_server(self, context): self.cluster.deploy_container('minifi-c2-server') assert self.cluster.wait_for_container_startup_to_finish('minifi-c2-server') or self.cluster.log_app_output() - def start_couchbase_server(self, context): - self.cluster.acquire_container(context=context, name='couchbase-server', engine='couchbase-server') + def start_couchbase_server(self, context, ssl=False): + engine = 'couchbase-server-ssl' if ssl else 'couchbase-server' + self.cluster.acquire_container(context=context, name='couchbase-server', engine=engine) self.cluster.deploy_container('couchbase-server') assert self.cluster.wait_for_container_startup_to_finish('couchbase-server') or self.cluster.log_app_output() diff --git a/docker/test/integration/features/couchbase.feature b/docker/test/integration/features/couchbase.feature index 646eaacd97..a9f2b25f01 100644 --- a/docker/test/integration/features/couchbase.feature +++ b/docker/test/integration/features/couchbase.feature @@ -173,3 +173,60 @@ Feature: Executing Couchbase operations from MiNiFi-C++ And all instances start up Then the Minifi logs contain the following message: "Failed to get content for document 'test_doc_id' from collection 'test_bucket._default._default' with the following exception: 'raw_binary_transcoder expects document to have BINARY common flags" in less than 60 seconds + + Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor using SSL connection + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And the "Keep Source File" property of the GetFile processor is set to "true" + And the scheduling period of the GetFile processor is set to "20 seconds" + And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" + And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" + And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" + And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And a LogAttribute processor with the "FlowFiles To Log" property set to "0" + And a CouchbaseClusterService is setup up with SSL connection with the name "CouchbaseClusterService" + + And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey + And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey + And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile + And the "success" relationship of the PutFile processor is connected to the LogAttribute + + When a Couchbase server is started + And all instances start up + + Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 6000 seconds + And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds + And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds + + Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor using mTLS authentication + Given a MiNiFi CPP server with yaml config + And a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" + And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" + And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" + And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And a LogAttribute processor with the "FlowFiles To Log" property set to "0" + And a CouchbaseClusterService is setup up using mTLS authentication with the name "CouchbaseClusterService" + + And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey + And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey + And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile + And the "success" relationship of the PutFile processor is connected to the LogAttribute + + When a Couchbase server is started + And all instances start up + + Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 6000 seconds + And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds + And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index a6909926ee..c79e36ecc1 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -1373,6 +1373,34 @@ def step_impl(context, service_name): container.add_controller(couchbase_cluster_controller_service) +@given("a CouchbaseClusterService is setup up with SSL connection with the name \"{service_name}\"") +def step_impl(context, service_name): + ssl_context_service = SSLContextService(name="SSLContextService", + ca_cert='/tmp/resources/root_ca.crt') + container = context.test.acquire_container(context=context, name="minifi-cpp-flow") + container.add_controller(ssl_context_service) + couchbase_cluster_controller_service = CouchbaseClusterService( + name=service_name, + connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")), + ssl_context_service=ssl_context_service) + container.add_controller(couchbase_cluster_controller_service) + + @then("a document with id \"{doc_id}\" in bucket \"{bucket_name}\" is present with data '{data}' of type \"{data_type}\" in Couchbase") def step_impl(context, doc_id: str, bucket_name: str, data: str, data_type: str): context.test.check_is_data_present_on_couchbase(doc_id, bucket_name, data, data_type) + + +@given("a CouchbaseClusterService is setup up using mTLS authentication with the name \"{service_name}\"") +def step_impl(context, service_name): + ssl_context_service = SSLContextService(name="SSLContextService", + cert='/tmp/resources/clientuser.crt', + key='/tmp/resources/clientuser.key', + ca_cert='/tmp/resources/root_ca.crt') + container = context.test.acquire_container(context=context, name="minifi-cpp-flow") + container.add_controller(ssl_context_service) + couchbase_cluster_controller_service = CouchbaseClusterService( + name=service_name, + connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")), + ssl_context_service=ssl_context_service) + container.add_controller(couchbase_cluster_controller_service) diff --git a/docker/test/integration/minifi/controllers/CouchbaseClusterService.py b/docker/test/integration/minifi/controllers/CouchbaseClusterService.py index e06c1e6ffb..94494fe175 100644 --- a/docker/test/integration/minifi/controllers/CouchbaseClusterService.py +++ b/docker/test/integration/minifi/controllers/CouchbaseClusterService.py @@ -18,10 +18,13 @@ class CouchbaseClusterService(ControllerService): - def __init__(self, name, connection_string): + def __init__(self, name, connection_string, ssl_context_service=None): super(CouchbaseClusterService, self).__init__(name=name) self.service_class = 'CouchbaseClusterService' self.properties['Connection String'] = connection_string - self.properties['User Name'] = "Administrator" - self.properties['User Password'] = "password123" + if ssl_context_service: + self.linked_services.append(ssl_context_service) + if not ssl_context_service or ssl_context_service and 'Client Certificate' not in ssl_context_service.properties: + self.properties['User Name'] = "Administrator" + self.properties['User Password'] = "password123" diff --git a/docker/test/integration/minifi/core/ControllerService.py b/docker/test/integration/minifi/core/ControllerService.py index f295131393..02d32e4a77 100644 --- a/docker/test/integration/minifi/core/ControllerService.py +++ b/docker/test/integration/minifi/core/ControllerService.py @@ -34,3 +34,4 @@ def __init__(self, name=None, properties=None): properties = {} self.properties = properties + self.linked_services = [] diff --git a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py index c1043d5067..bc6e1ab066 100644 --- a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py +++ b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py @@ -117,13 +117,7 @@ def serialize_node(self, connectable, root, visited): if svc in visited: continue - visited.append(svc) - root['controllerServices'].append({ - 'name': svc.name, - 'identifier': svc.id, - 'type': svc.service_class, - 'properties': svc.properties - }) + self.serialize_controller(svc, root) if isinstance(connectable, Funnel): root['funnels'].append({ @@ -159,3 +153,9 @@ def serialize_controller(self, controller, root): 'type': controller.service_class, 'properties': controller.properties }) + + if controller.linked_services: + if len(controller.linked_services) == 1: + root['controllerServices'][-1]['properties']['Linked Services'] = controller.linked_services[0].name + else: + root['controllerServices'][-1]['properties']['Linked Services'] = [{"value": service.name} for service in controller.linked_services] diff --git a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py index dc912b1224..14cbabc980 100644 --- a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py +++ b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py @@ -119,12 +119,7 @@ def serialize_node(self, connectable, res=None, visited=None): continue visited.append(svc) - res['Controller Services'].append({ - 'name': svc.name, - 'id': svc.id, - 'class': svc.service_class, - 'Properties': svc.properties - }) + self._add_controller_service_node(svc, res) if isinstance(connectable, Funnel): res['Funnels'].append({ @@ -160,6 +155,25 @@ def serialize_node(self, connectable, res=None, visited=None): return (res, visited) + def _add_controller_service_node(self, controller, parent): + if hasattr(controller, 'name'): + connectable_name = controller.name + else: + connectable_name = str(controller.uuid) + + parent['Controller Services'].append({ + 'name': connectable_name, + 'id': controller.id, + 'class': controller.service_class, + 'Properties': controller.properties + }) + + if controller.linked_services: + if len(controller.linked_services) == 1: + parent['Controller Services'][-1]['Properties']['Linked Services'] = controller.linked_services[0].name + else: + parent['Controller Services'][-1]['Properties']['Linked Services'] = [{"value": service.name} for service in controller.linked_services] + def serialize_controller(self, controller, root=None): if root is None: res = { @@ -175,16 +189,6 @@ def serialize_controller(self, controller, root=None): else: res = root - if hasattr(controller, 'name'): - connectable_name = controller.name - else: - connectable_name = str(controller.uuid) - - res['Controller Services'].append({ - 'name': connectable_name, - 'id': controller.id, - 'class': controller.service_class, - 'Properties': controller.properties - }) + self._add_controller_service_node(controller, res) return res diff --git a/examples/couchbase_mtls_authentication.json b/examples/couchbase_mtls_authentication.json new file mode 100644 index 0000000000..7ac14df254 --- /dev/null +++ b/examples/couchbase_mtls_authentication.json @@ -0,0 +1,74 @@ +{ + "parameterContexts": [], + "rootGroup": { + "name": "MiNiFi Flow", + "processors": [ + { + "name": "Get Couchbase document file from local directory", + "identifier": "21b1e56e-e8d5-4543-9f6b-be148f91fb02", + "type": "org.apache.nifi.processors.standard.GetFile", + "schedulingStrategy": "TIMER_DRIVEN", + "schedulingPeriod": "2 sec", + "penaltyDuration": "30 sec", + "properties": { + "Input Directory": "/home/user/couchbase/input" + }, + "autoTerminatedRelationships": [], + "concurrentlySchedulableTaskCount": 1 + }, + { + "name": "Insert Couchbase document", + "identifier": "df762d53-0f94-4611-be01-e689b8992573", + "type": "org.apache.nifi.processors.standard.PutCouchbaseKey", + "schedulingStrategy": "EVENT_DRIVEN", + "penaltyDuration": "30 sec", + "properties": { + "Bucket Name": "test_bucket", + "Document Id": "test_doc_id", + "Couchbase Cluster Controller Service": "CouchbaseClusterService for mTLS authentication" + }, + "autoTerminatedRelationships": [ + "success", + "failure", + "retry" + ] + } + ], + "controllerServices": [ + { + "name": "SSLContextService for Couchbase", + "identifier": "33e03d54-9917-494e-8ba0-8caeb3fdf4de", + "type": "SSLContextService", + "properties": { + "Client Certificate": "/home/user/couchbase/certs/clientuser.crt", + "Private Key": "/home/user/couchbase/certs/clientuser.key", + "CA Certificate": "/home/user/couchbase/certsroot_ca.crt" + } + }, + { + "name": "CouchbaseClusterService for mTLS authentication", + "identifier": "747bae3c-e68e-40af-8933-02179bd6cf85", + "type": "CouchbaseClusterService", + "properties": { + "Connection String": "couchbases://couchbase-server-hLkYUYq55djwrW5A26XNJD", + "Linked Services": "SSLContextService for Couchbase" + } + } + ], + "connections": [ + { + "identifier": "94fdd7b1-7857-44c3-8cf2-d373a5578420", + "name": "GetFile/success/PutCouchbaseKey", + "source": { + "id": "21b1e56e-e8d5-4543-9f6b-be148f91fb02" + }, + "destination": { + "id": "df762d53-0f94-4611-be01-e689b8992573" + }, + "selectedRelationships": [ + "success" + ] + } + ], + } +} diff --git a/examples/couchbase_mtls_authentication.yml b/examples/couchbase_mtls_authentication.yml new file mode 100644 index 0000000000..7ef1033e70 --- /dev/null +++ b/examples/couchbase_mtls_authentication.yml @@ -0,0 +1,45 @@ +Flow Controller: + name: MiNiFi Flow +Processors: +- id: 21b1e56e-e8d5-4543-9f6b-be148f91fb02 + name: Get Couchbase document file from local directory + class: org.apache.nifi.processors.standard.GetFile + scheduling strategy: TIMER_DRIVEN + scheduling period: 2 sec + penalization period: 30 sec + Properties: + Input Directory: /tmp/input + auto-terminated relationships list: [] +- id: df762d53-0f94-4611-be01-e689b8992573 + name: Insert Couchbase document + class: org.apache.nifi.processors.standard.PutCouchbaseKey + scheduling strategy: EVENT_DRIVEN + penalization period: 30 sec + Properties: + Bucket Name: test_bucket + Couchbase Cluster Controller Service: CouchbaseClusterService for mTLS authentication + Document Id: test_doc_id + auto-terminated relationships list: + - success + - failure + - retry +Controller Services: +- id: 33e03d54-9917-494e-8ba0-8caeb3fdf4de + name: SSLContextService for Couchbase + class: SSLContextService + Properties: + CA Certificate: /tmp/resources/root_ca.crt + Client Certificate: /tmp/resources/clientuser.crt + Private Key: /tmp/resources/clientuser.key +- id: 747bae3c-e68e-40af-8933-02179bd6cf85 + name: CouchbaseClusterService for mTLS authentication + class: CouchbaseClusterService + Properties: + Connection String: couchbases://couchbase-server-VPQDsPD2pj35q5WzHNt9ER + Linked Services: SSLContextService for Couchbase +Connections: +- id: 94fdd7b1-7857-44c3-8cf2-d373a5578420 + destination id: df762d53-0f94-4611-be01-e689b8992573 + name: GetFile/success/PutCouchbaseKey + source id: 21b1e56e-e8d5-4543-9f6b-be148f91fb02 + source relationship name: success diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp index bcab7b90f8..02f66797ac 100644 --- a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp +++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp @@ -26,6 +26,50 @@ namespace org::apache::nifi::minifi::couchbase { +CouchbaseClient::CouchbaseClient(std::string connection_string, std::string username, std::string password, minifi::controllers::SSLContextService* ssl_context_service, + const std::shared_ptr& logger) + : connection_string_(std::move(connection_string)), logger_(logger), cluster_options_(buildClusterOptions(std::move(username), std::move(password), ssl_context_service)) { +} + +::couchbase::cluster_options CouchbaseClient::buildClusterOptions(std::string username, std::string password, minifi::controllers::SSLContextService* ssl_context_service) { + if (username.empty() && (!ssl_context_service || (ssl_context_service && ssl_context_service->getCertificateFile().empty()))) { + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Neither username and password nor SSLContextService is provided for Couchbase authentication"); + } + + if (!username.empty() && ssl_context_service && !ssl_context_service->getCertificateFile().empty()) { + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Username and password authentication or mTLS authentication using certificate defined in SSLConextService " + "linked service should be provided exclusively for Couchbase"); + } + + if (!username.empty()) { + logger_->log_debug("Using username and password authentication for Couchbase server"); + if (password.empty()) { + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Password missing for Couchbase server authentication"); + } + ::couchbase::cluster_options cluster_options(std::move(username), std::move(password)); + if (ssl_context_service && !ssl_context_service->getCACertificate().empty()) { + logger_->log_debug("Setting Couchbase client CA certificate path to '{}'", ssl_context_service->getCACertificate().string()); + cluster_options.security().trust_certificate(ssl_context_service->getCACertificate().string()); + } + return cluster_options; + } + + logger_->log_debug("Using mTLS authentication for Couchbase server"); + logger_->log_debug("Setting Couchbase client SSL key file path to '{}'", ssl_context_service->getPrivateKeyFile().string()); + logger_->log_debug("Setting Couchbase client certificate file path to '{}'", ssl_context_service->getCertificateFile().string()); + if (ssl_context_service->getPrivateKeyFile().empty() || ssl_context_service->getCertificateFile().empty()) { + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Couchbase client private key path or client certificate path is empty"); + } + + ::couchbase::cluster_options cluster_options(::couchbase::certificate_authenticator(ssl_context_service->getCertificateFile().string(), ssl_context_service->getPrivateKeyFile().string())); + if (!ssl_context_service->getCACertificate().empty()) { + logger_->log_debug("Setting Couchbase client CA certificate path to '{}'", ssl_context_service->getCACertificate().string()); + cluster_options.security().trust_certificate(ssl_context_service->getCACertificate().string()); + } + cluster_options.security().tls_verify(::couchbase::tls_verify_mode::peer); + return cluster_options; +} + CouchbaseErrorType CouchbaseClient::getErrorType(const std::error_code& error_code) { for (const auto& temporary_error : temporary_connection_errors) { if (static_cast(temporary_error) == error_code.value()) { @@ -136,8 +180,7 @@ nonstd::expected CouchbaseClient::establishConnection( return {}; } - auto options = ::couchbase::cluster_options(username_, password_); - auto [connect_err, cluster] = ::couchbase::cluster::connect(connection_string_, options).get(); + auto [connect_err, cluster] = ::couchbase::cluster::connect(connection_string_, cluster_options_).get(); if (connect_err.ec()) { logger_->log_error("Failed to connect to Couchbase cluster with error code: '{}' and message: '{}'", connect_err.ec(), connect_err.message()); return nonstd::make_unexpected(getErrorType(connect_err.ec())); @@ -159,11 +202,24 @@ void CouchbaseClusterService::onEnable() { getProperty(UserName, username); std::string password; getProperty(UserPassword, password); - if (connection_string.empty() || username.empty() || password.empty()) { - throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing connection string, username or password"); + if (connection_string.empty()) { + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing connection string"); + } + + if ((username.empty() || password.empty()) && linked_services_.size() == 0) { + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing username and password or SSLConextService as a linked service"); + } + + minifi::controllers::SSLContextService* ssl_context_service_ptr = nullptr; + if (linked_services_.size() > 0) { + auto ssl_context_service = std::dynamic_pointer_cast(linked_services_[0]); + if (!ssl_context_service) { + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Linked service is not an SSLContextService"); + } + ssl_context_service_ptr = ssl_context_service.get(); } + client_ = std::make_unique(connection_string, username, password, ssl_context_service_ptr, logger_); - client_ = std::make_unique(connection_string, username, password, logger_); auto result = client_->establishConnection(); if (!result) { if (result.error() == CouchbaseErrorType::FATAL) { diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.h b/extensions/couchbase/controllerservices/CouchbaseClusterService.h index 1a065d518a..7f6f481df1 100644 --- a/extensions/couchbase/controllerservices/CouchbaseClusterService.h +++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.h @@ -30,6 +30,7 @@ #include "couchbase/cluster.hxx" #include "core/ProcessContext.h" #include "core/logging/LoggerConfiguration.h" +#include "controllers/SSLContextService.h" namespace org::apache::nifi::minifi::couchbase { @@ -68,9 +69,8 @@ enum class CouchbaseErrorType { class CouchbaseClient { public: - CouchbaseClient(std::string connection_string, std::string username, std::string password, const std::shared_ptr& logger) - : connection_string_(std::move(connection_string)), username_(std::move(username)), password_(std::move(password)), logger_(logger) { - } + CouchbaseClient(std::string connection_string, std::string username, std::string password, controllers::SSLContextService* ssl_context_service, + const std::shared_ptr& logger); nonstd::expected upsert(const CouchbaseCollection& collection, CouchbaseValueType document_type, const std::string& document_id, const std::vector& buffer, const ::couchbase::upsert_options& options); @@ -91,13 +91,13 @@ class CouchbaseClient { }; static CouchbaseErrorType getErrorType(const std::error_code& error_code); + ::couchbase::cluster_options buildClusterOptions(std::string username, std::string password, minifi::controllers::SSLContextService* ssl_context_service); nonstd::expected<::couchbase::collection, CouchbaseErrorType> getCollection(const CouchbaseCollection& collection); std::string connection_string_; - std::string username_; - std::string password_; - std::optional<::couchbase::cluster> cluster_; std::shared_ptr logger_; + ::couchbase::cluster_options cluster_options_; + std::optional<::couchbase::cluster> cluster_; }; namespace controllers { diff --git a/libminifi/src/core/controller/StandardControllerServiceNode.cpp b/libminifi/src/core/controller/StandardControllerServiceNode.cpp index 653b792fb6..d704429f7d 100644 --- a/libminifi/src/core/controller/StandardControllerServiceNode.cpp +++ b/libminifi/src/core/controller/StandardControllerServiceNode.cpp @@ -23,11 +23,13 @@ namespace org::apache::nifi::minifi::core::controller { bool StandardControllerServiceNode::enable() { - Property property("Linked Services", "Referenced Controller Services"); - controller_service_->setState(ENABLED); logger_->log_trace("Enabling CSN {}", getName()); + if (active) { + logger_->log_debug("CSN {} is already enabled", getName()); + return true; + } + Property property("Linked Services", "Referenced Controller Services"); if (getProperty(property.getName(), property)) { - active = true; for (const auto& linked_service : property.getValues()) { ControllerServiceNode* csNode = provider->getControllerServiceNode(linked_service); if (nullptr != csNode) { @@ -40,13 +42,20 @@ bool StandardControllerServiceNode::enable() { if (nullptr != impl) { std::lock_guard lock(mutex_); std::vector> services; + std::vector service_nodes; services.reserve(linked_controller_services_.size()); for (const auto& service : linked_controller_services_) { services.push_back(service->getControllerServiceImplementation()); + if (!service->enable()) { + logger_->log_debug("Linked Service '{}' could not be enabled", service->getName()); + return false; + } } impl->setLinkedControllerServices(services); impl->onEnable(); } + active = true; + controller_service_->setState(ENABLED); return true; }