Skip to content

Commit

Permalink
Encryption script using python based libraries
Browse files Browse the repository at this point in the history
Removed the usage of nrfkms from the script.
Instead, a dedicated python script containing a SuitKMS class
should be used.

Ref: NCSDK-30800

Signed-off-by: Artur Hadasz <[email protected]>
  • Loading branch information
ahasztag authored and robertstypa committed Jan 9, 2025
1 parent 495daca commit ee9c4fb
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 69 deletions.
65 changes: 65 additions & 0 deletions ncs/basic_kms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#
# Copyright (c) 2024 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause
#
"""A basic KMS based on keys stored in files on the local drive."""

import os

from pathlib import Path
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from suit_generator.suit_kms_base import SuitKMSBase
import json


class SuitKMS(SuitKMSBase):
"""Implementation of the KMS."""

def parse_context(self, context):
"""Parse the provided context string."""
if context is None:
self.keys_directory = Path(__file__).parent
return None

context_loaded = json.loads(context)
self.keys_directory = Path(context_loaded["keys_directory"])

def init_kms(self, context) -> None:
"""
Initialize the KMS.
:param context: The context to be used
"""
self.parse_context(context)

def encrypt(self, plaintext, key_name, context, aad) -> tuple[bytes, bytes, bytes]:
"""
Encrypt the plainext with an AES key.
:param plaintext: The plaintext to be encrypted.
:param key_name: The name of the key to be used.
:param context: The context to be used
If it is passed, it is used to point to the directory where the keys are stored.
In this case, it must be a JSON string in te format '{ "keys_directory":"<path>" }'.
:param aad: The additional authenticated data to be used.
:return: The nonce, tag and ciphertext.
:rtype: tuple[bytes, bytes, bytes]
"""
key_file_name = key_name + ".bin"
key_file = self.keys_directory / key_file_name

with open(key_file, "rb") as f:
key_data = f.read()
aesgcm = AESGCM(key_data)
nonce = os.urandom(12)
ciphertext_response = aesgcm.encrypt(nonce, plaintext, aad)
ciphertext = ciphertext_response[:-16]
tag = ciphertext_response[-16:]

return nonce, tag, ciphertext


def suit_kms_factory():
"""Get a KMS object."""
return SuitKMS()
114 changes: 45 additions & 69 deletions ncs/encrypt_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@

import os
import cbor2
import importlib.util
import sys
from argparse import ArgumentParser
from argparse import RawTextHelpFormatter
from pathlib import Path
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from enum import Enum, unique
from pynrfkms.kms import KMS
import getpass
from suit_generator.suit_kms_base import SuitKMSBase


@unique
Expand Down Expand Up @@ -73,16 +74,6 @@ def __str__(self):
return self.value


class EncryptionKMSBackends(Enum):
"""KMS backends."""

VAULT = "vault"
LOCAL = "local"

def __str__(self):
return self.value


KEY_IDS = {
SuitDomains.APPLICATION.value: 0x40022000,
SuitDomains.RADIO.value: 0x40032000,
Expand All @@ -91,6 +82,15 @@ def __str__(self):
}


def _import_module_from_path(module_name, file_path):
# Helper function to import a python module from a file path.
spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
return module


class DigestGenerator:
"""Class to generate digests for plaintext files using specified hash algorithms."""

Expand Down Expand Up @@ -136,19 +136,17 @@ def __init__(self, kw_alg: SuitKWAlgorithms):
self.cose_kw_alg = SuitAlgorithms.COSE_ALG_DIRECT.value
pass

def init_kms_backend(self, cli_arguments):
"""Initialize the KMS backend based on command line arguments."""
if cli_arguments.kms_backend == EncryptionKMSBackends.VAULT:
self.kms = KMS(backend="vault", url=cli_arguments.kms_vault_url, token=cli_arguments.kms_token)
elif cli_arguments.kms_backend == EncryptionKMSBackends.LOCAL:
pswd = cli_arguments.kms_local_password
if pswd is None:
pswd = getpass.getpass("Enter password for local KMS backend: ")
self.kms = KMS(backend="local", dir=cli_arguments.kms_dir, password=pswd, encoding="der")
del pswd
def init_kms_backend(self, kms_script, context):
"""Initialize the KMS from the provided script backend based on the passed context."""
module_name = "SuitKMS_module"
kms_module = _import_module_from_path(module_name, kms_script)
self.kms = kms_module.suit_kms_factory()
if not isinstance(self.kms, SuitKMSBase):
raise ValueError(f"Class {type(self.kms)} does not implement the required SuitKMSBase interface")
self.kms.init_kms(context)

def generate_kms_artifacts(self, plaintext_file_path: Path, key_name: str, context: str):
"""Generate encrypted artifacts using KMS.
"""Generate encrypted artifacts using the key management system.
This method reads the plaintext file, encrypts it using the specified key wrap algorithm,
and returns the encrypted asset and encrypted content encryption key (CEK).
Expand All @@ -169,39 +167,31 @@ def generate_kms_artifacts(self, plaintext_file_path: Path, key_name: str, conte
with open(plaintext_file_path, "rb") as plaintext_file:
asset_plaintext = plaintext_file.read()

encrypted_asset = None
nonce = None
tag = None
ciphertext = None
encrypted_cek = None

if self.cose_kw_alg == SuitAlgorithms.COSE_ALG_A256KW.value:
encrypted_asset, encrypted_cek = self.kms.aes_key_wrap(
key_name=key_name,
context=context,
plaintext=asset_plaintext,
aek_type="aes",
aad=enc_structure_encoded,
)
raise ValueError("AES Key Wrap 256 is not supported yet")
elif self.cose_kw_alg == SuitAlgorithms.COSE_ALG_DIRECT.value:
encrypted_asset = self.kms.encrypt(
nonce, tag, ciphertext = self.kms.encrypt(
plaintext=asset_plaintext,
key_name=key_name,
context=context,
aad=enc_structure_encoded,
)

encrypted_asset = nonce + tag + ciphertext

return encrypted_asset, encrypted_cek

def parse_encrypted_assets(self, asset_bytes):
"""Parse the encrypted assets to extract initialization vector, tag, and encrypted content."""
if self.cose_kw_alg == SuitAlgorithms.COSE_ALG_A256KW.value:
init_vector = asset_bytes[:12] # the names init vector and nonce are used interchangeably in this case
# nrfkms wrap returns the encrypted data in format nonce|encrypted_data|tag
encrypted_content = asset_bytes[12:-16]
tag = asset_bytes[-16:]
elif self.cose_kw_alg == SuitAlgorithms.COSE_ALG_DIRECT.value:
# nrfkms encrypt returns the encrypted data in format nonce|tag|encrypted_data
init_vector = asset_bytes[:12]
tag = asset_bytes[12 : 12 + 16]
encrypted_content = asset_bytes[12 + 16 :]
# Encrypted data is returned in format nonce|tag|encrypted_data
init_vector = asset_bytes[:12]
tag = asset_bytes[12 : 12 + 16]
encrypted_content = asset_bytes[12 + 16 :]

return init_vector, tag, encrypted_content

Expand Down Expand Up @@ -268,12 +258,12 @@ def generate_encryption_info_and_encrypted_payload(

def create_encrypt_and_generate_subparser(top_parser):
"""Create a subparser for the 'encrypt-and-generate' command."""
parser = top_parser.add_parser(
"encrypt-and-generate", help="First encrypt the command using nrfkms, then generate the files."
)
parser = top_parser.add_parser("encrypt-and-generate", help="First encrypt the payload, then generate the files.")

parser.add_argument("--firmware", required=True, type=Path, help="Input, plaintext firmware.")
parser.add_argument("--key-name", required=True, type=str, help="Name of the key used to derive the key by nrfkms.")
parser.add_argument(
"--key-name", required=True, type=str, help="Name of the key used by the KMS to identify the key."
)
parser.add_argument(
"--domain",
required=True,
Expand All @@ -283,9 +273,8 @@ def create_encrypt_and_generate_subparser(top_parser):
)
parser.add_argument(
"--context",
required=True,
type=str,
help="Context string used to derive the key. See nrfkms documentation for more information.",
help="Any context information that should be passed to the KMS backend during initialization and encryption.",
)
parser.add_argument("--output-dir", required=True, type=Path, help="Directory to store the output files")
parser.add_argument(
Expand All @@ -303,20 +292,10 @@ def create_encrypt_and_generate_subparser(top_parser):
help="Key wrap algorithm used to wrap the CEK.",
)
parser.add_argument(
"--kms-backend",
required=True,
type=EncryptionKMSBackends,
choices=list(EncryptionKMSBackends),
help="KMS backend to use.",
)
parser.add_argument("--kms-vault-url", type=str, help='URL of the KMS vault - only if kms-backend set to "vault".')
parser.add_argument("--kms-token", type=str, help='KMS token - only if kms-backend set to "vault"')
parser.add_argument("--kms-dir", type=str, help='Local backend directory - only if kms-backend set to "local".')
parser.add_argument(
"--kms-local-password",
type=str,
help='KMS local backend password - only if kms-backend set to "local". If not provided, '
"the script will prompt for it.",
"--kms-script",
# required=True,
default=Path(__file__).parent / "basic_kms.py",
help="Python script containing a SuitKMS class with an encrypt function - used to communicate with a KMS.",
)


Expand All @@ -328,9 +307,7 @@ def create_generate_subparser(top_parser):
"--encrypted-firmware",
required=True,
type=Path,
help="Input, encrypted firmware in form iv|tag|encrypted_firmware for kw-alg=direct "
"(as output from nrfkms encrypt) or iv|encrypted_firmware|tag for kw-alg=aes-kw256 "
"(as output from nrfkms wrap).",
help="Input, encrypted firmware in form iv|tag|encrypted_firmware",
)
parser.add_argument("--encrypted-key", required=True, type=Path, help="Encrypted content/asset encryption key")
parser.add_argument(
Expand Down Expand Up @@ -366,10 +343,9 @@ def create_subparsers(parser):
description="""This script allows to output artifacts needed by a SUIT envelope for encrypted firmware.
It has two modes of operation:
- encrypt-and-generate: First encrypt the command using nrfkms, then generate the files.
- encrypt-and-generate: First encrypt the payload, then generate the files.
- generate: Only generate files based on encrypted firmware and the encrypted content/asset encryption key.
Note the encrypted firmware should match the format generated by nrfkms iv|tag|encrypted_firmware if
kw-alg=direct is used (format output from nrfkms encrypt) or iv|encrypted_firmware|tag if kw-alg=aes-kw256 is used (format output from nrfkms wrap).
Note the encrypted firmware should match the format iv|tag|encrypted_firmware
In both cases the output files are:
encrypted_content.bin - encrypted content of the firmware concatenated with the tag (encrypted firmware|16 byte tag).
Expand All @@ -393,7 +369,7 @@ def create_subparsers(parser):
encryptor = Encryptor(arguments.kw_alg)

if arguments.command == "encrypt-and-generate":
encryptor.init_kms_backend(arguments)
encryptor.init_kms_backend(arguments.kms_script, arguments.context)
digest_generator = DigestGenerator(arguments.hash_alg.value)
digest_generator.generate_digest_size_for_plain_text(arguments.firmware, arguments.output_dir)
encrypted_asset, encrypted_cek = encryptor.generate_kms_artifacts(
Expand Down
35 changes: 35 additions & 0 deletions suit_generator/suit_kms_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Copyright (c) 2024 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause
#
"""A base abstract class for any KMS implementations used by the SUIT encrypt/sign scripts."""

from abc import ABC, abstractmethod


class SuitKMSBase(ABC):
"""Base abstract class for the KMS implementations."""

@abstractmethod
def init_kms(self, context) -> None:
"""
Initialize the KMS.
:param context: The context to be used
"""
pass

@abstractmethod
def encrypt(self, plaintext, key_name, context, aad) -> tuple[bytes, bytes, bytes]:
"""
Encrypt the plainext with an AES key.
:param plaintext: The plaintext to be encrypted.
:param key_name: The name of the key to be used.
:param context: The context to be used
:param aad: The additional authenticated data to be used.
:return: The nonce, tag and ciphertext.
:rtype: tuple[bytes, bytes, bytes]
"""
pass

0 comments on commit ee9c4fb

Please sign in to comment.