Skip to content

Commit

Permalink
MINIFICPP-2470 Add SSL and mTLS authentication support to CouchbaseCl…
Browse files Browse the repository at this point in the history
…usterService
  • Loading branch information
lordgamez committed Oct 21, 2024
1 parent 57f8862 commit 6bb70f7
Show file tree
Hide file tree
Showing 14 changed files with 393 additions and 49 deletions.
10 changes: 9 additions & 1 deletion docker/test/integration/cluster/DockerTestDirectoryBindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import hashlib
import subprocess
import OpenSSL.crypto
from ssl_utils.SSL_cert_utils import make_self_signed_cert, make_cert_without_extended_usage, make_server_cert
from ssl_utils.SSL_cert_utils import make_self_signed_cert, make_cert_without_extended_usage, make_server_cert, make_client_cert


class DockerTestDirectoryBindings:
Expand Down Expand Up @@ -214,3 +214,11 @@ def create_cert_files(self):
os.path.join(base, "root_ca.crt"),
]
subprocess.run(cmd, check=True)

clientuser_cert, clientuser_key = make_client_cert("clientuser", ca_cert=self.root_ca_cert, ca_key=self.root_ca_key)
self.put_test_resource('clientuser.crt',
OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM,
cert=clientuser_cert))
self.put_test_resource('clientuser.key',
OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM,
pkey=clientuser_key))
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,39 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import OpenSSL.crypto
import tempfile
import docker
import requests
import logging
from requests.auth import HTTPBasicAuth
from .Container import Container
from utils import retry_check
from ssl_utils.SSL_cert_utils import make_server_cert


class CouchbaseServerContainer(Container):
def __init__(self, feature_context, name, vols, network, image_store, command=None):
super().__init__(feature_context, name, 'couchbase-server', vols, network, image_store, command)
def __init__(self, feature_context, name, vols, network, image_store, command=None, ssl=False):
self.ssl = ssl
engine = "couchbase-server" if not ssl else "couchbase-server-ssl"
super().__init__(feature_context, name, engine, vols, network, image_store, command)
couchbase_cert, couchbase_key = make_server_cert(f"couchbase-server-{feature_context.id}", feature_context.root_ca_cert, feature_context.root_ca_key)

self.root_ca_file = tempfile.NamedTemporaryFile(delete=False)
self.root_ca_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, cert=feature_context.root_ca_cert))
self.root_ca_file.close()
os.chmod(self.root_ca_file.name, 0o700)

self.couchbase_cert_file = tempfile.NamedTemporaryFile(delete=False)
self.couchbase_cert_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, cert=couchbase_cert))
self.couchbase_cert_file.close()
os.chmod(self.couchbase_cert_file.name, 0o700)

self.couchbase_key_file = tempfile.NamedTemporaryFile(delete=False)
self.couchbase_key_file.write(OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM, pkey=couchbase_key))
self.couchbase_key_file.close()
os.chmod(self.couchbase_key_file.name, 0o700)

def get_startup_finished_log_entry(self):
# after startup the logs are only available in the container, only this message is shown
Expand All @@ -33,23 +59,55 @@ def run_post_startup_commands(self):
["couchbase-cli", "cluster-init", "-c", "localhost", "--cluster-username", "Administrator", "--cluster-password", "password123", "--services", "data,index,query",
"--cluster-ramsize", "2048", "--cluster-index-ramsize", "256"],
["couchbase-cli", "bucket-create", "-c", "localhost", "--username", "Administrator", "--password", "password123", "--bucket", "test_bucket", "--bucket-type", "couchbase",
"--bucket-ramsize", "1024", "--max-ttl", "36000"]
"--bucket-ramsize", "1024", "--max-ttl", "36000"],
["couchbase-cli", "user-manage", "-c", "localhost", "-u", "Administrator", "-p", "password123", "--set", "--rbac-username", "clientuser", "--rbac-password", "password123",
"--roles", "data_reader[test_bucket],data_writer[test_bucket]", "--auth-domain", "local"],
["bash", "-c", 'tee /tmp/auth.json <<< \'{"state": "enable", "prefixes": [ {"path": "subject.cn", "prefix": "", "delimiter": "."}]}\''],
['couchbase-cli', 'ssl-manage', '-c', 'localhost', '-u', 'Administrator', '-p', 'password123', '--set-client-auth', '/tmp/auth.json']
]

for command in commands:
(code, _) = self.client.containers.get(self.name).exec_run(command)
if code != 0:
return False

response = requests.post("http://localhost:8091/node/controller/loadTrustedCAs", auth=HTTPBasicAuth("Administrator", "password123"))
if response.status_code != 200:
logging.error("Failed to load CA certificates, with status code: {response.status_code}")
return False

response = requests.post("http://localhost:8091/node/controller/reloadCertificate", auth=HTTPBasicAuth("Administrator", "password123"))
if response.status_code != 200:
logging.error("Failed to reload certificates, with status code: {response.status_code}")
return False

self.post_startup_commands_finished = True
return True

def deploy(self):
if not self.set_deployed():
return

mounts = [
docker.types.Mount(
type='bind',
source=self.couchbase_key_file.name,
target='/opt/couchbase/var/lib/couchbase/inbox/pkey.key'),
docker.types.Mount(
type='bind',
source=self.couchbase_cert_file.name,
target='/opt/couchbase/var/lib/couchbase/inbox/chain.pem'),
docker.types.Mount(
type='bind',
source=self.root_ca_file.name,
target='/opt/couchbase/var/lib/couchbase/inbox/CA/root_ca.crt')
]

self.docker_container = self.client.containers.run(
"couchbase:enterprise-7.2.5",
detach=True,
name=self.name,
network=self.network.name,
ports={'11210/tcp': 11210},
entrypoint=self.command)
ports={'8091/tcp': 8091, '11210/tcp': 11210},
entrypoint=self.command,
mounts=mounts)
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ def start_minifi_c2_server(self, context):
self.cluster.deploy_container('minifi-c2-server')
assert self.cluster.wait_for_container_startup_to_finish('minifi-c2-server') or self.cluster.log_app_output()

def start_couchbase_server(self, context):
self.cluster.acquire_container(context=context, name='couchbase-server', engine='couchbase-server')
def start_couchbase_server(self, context, ssl=False):
engine = 'couchbase-server-ssl' if ssl else 'couchbase-server'
self.cluster.acquire_container(context=context, name='couchbase-server', engine=engine)
self.cluster.deploy_container('couchbase-server')
assert self.cluster.wait_for_container_startup_to_finish('couchbase-server') or self.cluster.log_app_output()

Expand Down
57 changes: 57 additions & 0 deletions docker/test/integration/features/couchbase.feature
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,60 @@ Feature: Executing Couchbase operations from MiNiFi-C++
And all instances start up

Then the Minifi logs contain the following message: "Failed to get content for document 'test_doc_id' from collection 'test_bucket._default._default' with the following exception: 'raw_binary_transcoder expects document to have BINARY common flags" in less than 60 seconds

Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor using SSL connection
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And the "Keep Source File" property of the GetFile processor is set to "true"
And the scheduling period of the GetFile processor is set to "20 seconds"
And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input'
And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id"
And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService"
And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id"
And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService"
And a PutFile processor with the "Directory" property set to "/tmp/output"
And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
And a CouchbaseClusterService is setup up with SSL connection with the name "CouchbaseClusterService"

And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey
And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey
And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile
And the "success" relationship of the PutFile processor is connected to the LogAttribute

When a Couchbase server is started
And all instances start up

Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 6000 seconds
And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds
And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds

Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor using mTLS authentication
Given a MiNiFi CPP server with yaml config
And a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input'
And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id"
And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService"
And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id"
And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService"
And a PutFile processor with the "Directory" property set to "/tmp/output"
And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
And a CouchbaseClusterService is setup up using mTLS authentication with the name "CouchbaseClusterService"

And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey
And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey
And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile
And the "success" relationship of the PutFile processor is connected to the LogAttribute

When a Couchbase server is started
And all instances start up

Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 6000 seconds
And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds
And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds
28 changes: 28 additions & 0 deletions docker/test/integration/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -1373,6 +1373,34 @@ def step_impl(context, service_name):
container.add_controller(couchbase_cluster_controller_service)


@given("a CouchbaseClusterService is setup up with SSL connection with the name \"{service_name}\"")
def step_impl(context, service_name):
ssl_context_service = SSLContextService(name="SSLContextService",
ca_cert='/tmp/resources/root_ca.crt')
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
container.add_controller(ssl_context_service)
couchbase_cluster_controller_service = CouchbaseClusterService(
name=service_name,
connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")),
ssl_context_service=ssl_context_service)
container.add_controller(couchbase_cluster_controller_service)


@then("a document with id \"{doc_id}\" in bucket \"{bucket_name}\" is present with data '{data}' of type \"{data_type}\" in Couchbase")
def step_impl(context, doc_id: str, bucket_name: str, data: str, data_type: str):
context.test.check_is_data_present_on_couchbase(doc_id, bucket_name, data, data_type)


@given("a CouchbaseClusterService is setup up using mTLS authentication with the name \"{service_name}\"")
def step_impl(context, service_name):
ssl_context_service = SSLContextService(name="SSLContextService",
cert='/tmp/resources/clientuser.crt',
key='/tmp/resources/clientuser.key',
ca_cert='/tmp/resources/root_ca.crt')
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
container.add_controller(ssl_context_service)
couchbase_cluster_controller_service = CouchbaseClusterService(
name=service_name,
connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")),
ssl_context_service=ssl_context_service)
container.add_controller(couchbase_cluster_controller_service)
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@


class CouchbaseClusterService(ControllerService):
def __init__(self, name, connection_string):
def __init__(self, name, connection_string, ssl_context_service=None):
super(CouchbaseClusterService, self).__init__(name=name)

self.service_class = 'CouchbaseClusterService'
self.properties['Connection String'] = connection_string
self.properties['User Name'] = "Administrator"
self.properties['User Password'] = "password123"
if ssl_context_service:
self.linked_services.append(ssl_context_service)
if not ssl_context_service or ssl_context_service and 'Client Certificate' not in ssl_context_service.properties:
self.properties['User Name'] = "Administrator"
self.properties['User Password'] = "password123"
1 change: 1 addition & 0 deletions docker/test/integration/minifi/core/ControllerService.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ def __init__(self, name=None, properties=None):
properties = {}

self.properties = properties
self.linked_services = []
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,7 @@ def serialize_node(self, connectable, root, visited):
if svc in visited:
continue

visited.append(svc)
root['controllerServices'].append({
'name': svc.name,
'identifier': svc.id,
'type': svc.service_class,
'properties': svc.properties
})
self.serialize_controller(svc, root)

if isinstance(connectable, Funnel):
root['funnels'].append({
Expand Down Expand Up @@ -159,3 +153,9 @@ def serialize_controller(self, controller, root):
'type': controller.service_class,
'properties': controller.properties
})

if controller.linked_services:
if len(controller.linked_services) == 1:
root['controllerServices'][-1]['properties']['Linked Services'] = controller.linked_services[0].name
else:
root['controllerServices'][-1]['properties']['Linked Services'] = [{"value": service.name} for service in controller.linked_services]
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,7 @@ def serialize_node(self, connectable, res=None, visited=None):
continue

visited.append(svc)
res['Controller Services'].append({
'name': svc.name,
'id': svc.id,
'class': svc.service_class,
'Properties': svc.properties
})
self._add_controller_service_node(svc, res)

if isinstance(connectable, Funnel):
res['Funnels'].append({
Expand Down Expand Up @@ -160,6 +155,25 @@ def serialize_node(self, connectable, res=None, visited=None):

return (res, visited)

def _add_controller_service_node(self, controller, parent):
if hasattr(controller, 'name'):
connectable_name = controller.name
else:
connectable_name = str(controller.uuid)

parent['Controller Services'].append({
'name': connectable_name,
'id': controller.id,
'class': controller.service_class,
'Properties': controller.properties
})

if controller.linked_services:
if len(controller.linked_services) == 1:
parent['Controller Services'][-1]['Properties']['Linked Services'] = controller.linked_services[0].name
else:
parent['Controller Services'][-1]['Properties']['Linked Services'] = [{"value": service.name} for service in controller.linked_services]

def serialize_controller(self, controller, root=None):
if root is None:
res = {
Expand All @@ -175,16 +189,6 @@ def serialize_controller(self, controller, root=None):
else:
res = root

if hasattr(controller, 'name'):
connectable_name = controller.name
else:
connectable_name = str(controller.uuid)

res['Controller Services'].append({
'name': connectable_name,
'id': controller.id,
'class': controller.service_class,
'Properties': controller.properties
})
self._add_controller_service_node(controller, res)

return res
Loading

0 comments on commit 6bb70f7

Please sign in to comment.