Skip to content

Commit

Permalink
da_revocation: function out generation of revocation set from the crl (
Browse files Browse the repository at this point in the history
…#36933)

* da_revocation: function out generation of revocation set from the crl

* da_revocation: fix extracting certificate issuer from the crl entry

Also, add the ability to generate the revocation set from the crl file

* Restyled by autopep8

* replace the hyphen with underscore to make script importable

* sample revocation set from the sample direct CRLs

* added unit tests for verifying the revocation set generation against
present test data

* add revocation set generation tests to ci

* Restyled by whitespace

* Restyled by prettier-json

* Restyled by autopep8

* Restyled by isort

* run tests in build env

* add click_option_group to requirements

---------

Co-authored-by: Restyled.io <[email protected]>
  • Loading branch information
shubhamdp and restyled-commits authored Jan 17, 2025
1 parent 2d57f8c commit d7e30d0
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 49 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ jobs:
python -m ensurepip --upgrade
python -m pip install -r scripts/setup/requirements.setuppayload.txt
python3 src/setup_payload/tests/run_python_setup_payload_test.py out/chip-tool
- name: Run revocation set generation tests
run: scripts/run_in_build_env.sh 'python3 -m unittest -v credentials/generate_revocation_set.py'

build_linux_python_lighting_device:
name: Build on Linux (python lighting-app)
Expand Down
251 changes: 202 additions & 49 deletions credentials/generate-revocation-set.py → credentials/generate_revocation_set.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import base64
import json
import logging
import os
import subprocess
import sys
import unittest
from enum import Enum
from typing import Optional

Expand Down Expand Up @@ -199,6 +201,98 @@ def fetch_crl_from_url(url: str, timeout: int) -> x509.CertificateRevocationList
logging.error('Failed to fetch a valid CRL')


def generate_revocation_set_from_crl(crl_file: x509.CertificateRevocationList,
crl_signer_certificate: x509.Certificate,
certificate_authority_name_b64: str,
certificate_akid_hex: str,
crl_signer_delegator_cert: x509.Certificate) -> dict:
"""Generate a revocation set from a CRL file.
Args:
crl_file: The CRL object containing revoked certificates
crl_signer_certificate: The certificate object used to sign the CRL
certificate_authority_name_b64: Base64 encoded issuer name
certificate_akid_hex: Hex encoded Authority Key Identifier
crl_signer_delegator_cert: crl signer delegator certificate object
Returns:
dict: A dictionary containing the revocation set data with fields:
- type: "revocation_set"
- issuer_subject_key_id: Authority Key Identifier (hex)
- issuer_name: Issuer name (base64)
- revoked_serial_numbers: List of revoked serial numbers
- crl_signer_cert: CRL signer certificate (base64 DER)
- crl_signer_delegator: Optional delegator certificate (base64 DER)
"""
serialnumber_list = []

for revoked_cert in crl_file:
try:
cert_issuer_entry_ext = revoked_cert.extensions.get_extension_for_oid(x509.CRLEntryExtensionOID.CERTIFICATE_ISSUER)
revoked_cert_issuer = cert_issuer_entry_ext.value.get_values_for_type(x509.DirectoryName)[0].public_bytes()
revoked_cert_issuer_b64 = base64.b64encode(revoked_cert_issuer).decode('utf-8')

if revoked_cert_issuer_b64 is not None:
# check if this really are the same thing
if revoked_cert_issuer_b64 != certificate_authority_name_b64:
logging.warning("CRL Issuer is not CRL File Issuer, continue...")
continue
except Exception:
pass

serialnumber_list.append(bytes(str('{:02X}'.format(revoked_cert.serial_number)), 'utf-8').decode('utf-8'))

entry = {
"type": "revocation_set",
"issuer_subject_key_id": certificate_akid_hex,
"issuer_name": certificate_authority_name_b64,
"revoked_serial_numbers": serialnumber_list,
"crl_signer_cert": base64.b64encode(crl_signer_certificate.public_bytes(serialization.Encoding.DER)).decode('utf-8'),
}

if crl_signer_delegator_cert:
entry["crl_signer_delegator"] = base64.b64encode(
crl_signer_delegator_cert.public_bytes(serialization.Encoding.DER)).decode('utf-8')

return entry


# This is implemented as per point (9) in 6.2.4.1. Conceptual algorithm for revocation set construction
def get_certificate_authority_details(crl_signer_certificate: x509.Certificate,
crl_signer_delegator_cert: x509.Certificate,
paa_certificate_object: x509.Certificate,
is_paa: bool) -> tuple[str, str]:
"""Get certificate authority name and AKID based on certificate hierarchy.
Args:
crl_signer_certificate: The CRL signer certificate
crl_signer_delegator_cert: Optional delegator certificate
paa_certificate_object: Optional PAA certificate
is_paa: Whether this is a PAA certificate
Returns:
tuple[str, str]: (certificate_authority_name_b64, certificate_akid_hex)
"""
if is_paa and not is_self_signed_certificate(crl_signer_certificate):
cert_for_details = paa_certificate_object
logging.debug("Using PAA certificate for details")
elif crl_signer_delegator_cert:
cert_for_details = crl_signer_delegator_cert
logging.debug("Using CRL Signer Delegator certificate for details")
else:
cert_for_details = crl_signer_certificate
logging.debug("Using CRL Signer certificate for details")

certificate_authority_name_b64 = get_subject_b64(cert_for_details)
certificate_akid = get_skid(cert_for_details)
certificate_akid_hex = ''.join('{:02X}'.format(x) for x in certificate_akid)

logging.debug(f"Certificate Authority Name: {certificate_authority_name_b64}")
logging.debug(f"Certificate AKID: {certificate_akid_hex}")

return certificate_authority_name_b64, certificate_akid_hex


class DCLDClient:
'''
A client for interacting with DCLD using either the REST API or command line interface (CLI).
Expand Down Expand Up @@ -325,7 +419,7 @@ def get_issuer_cert(self, cert: x509.Certificate) -> Optional[x509.Certificate]:

return issuer_certificate_object

def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]:
def get_revocations_points_by_skid(self, issuer_subject_key_id: str) -> list[dict]:
'''
Get revocation points by subject key ID
Expand All @@ -349,7 +443,12 @@ def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]:
return response["pkiRevocationDistributionPointsByIssuerSubjectKeyID"]["points"]


@click.command()
@click.group()
def cli():
pass


@cli.command('from-dcl')
@click.help_option('-h', '--help')
@optgroup.group('Input data sources', cls=RequiredMutuallyExclusiveOptionGroup)
@optgroup.option('--use-main-net-dcld', type=str, default='', metavar='PATH', help="Location of `dcld` binary, to use `dcld` for mirroring MainNet.")
Expand All @@ -362,9 +461,8 @@ def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]:
@optgroup.option('--log-level', default='INFO', show_default=True, type=click.Choice(__LOG_LEVELS__.keys(),
case_sensitive=False), callback=lambda c, p, v: __LOG_LEVELS__[v],
help='Determines the verbosity of script output')
def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool, use_test_net_http: bool, output: str, log_level: str):
"""Tool to construct revocation set from DCL"""

def from_dcl(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, output, log_level):
"""Generate revocation set from DCL"""
logging.basicConfig(
level=log_level,
format='%(asctime)s %(name)s %(levelname)-7s %(message)s',
Expand Down Expand Up @@ -467,65 +565,120 @@ def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool
# TODO: 8. Validate CRL as per Section 6.3 of RFC 5280

# 9. decide on certificate authority name and AKID
if revocation_point["isPAA"] and not is_self_signed_certificate(crl_signer_certificate):
certificate_authority_name_b64 = get_subject_b64(paa_certificate_object)
certificate_akid = get_skid(paa_certificate_object)
elif crl_signer_delegator_cert:
certificate_authority_name_b64 = get_subject_b64(crl_signer_delegator_cert)
certificate_akid = get_skid(crl_signer_delegator_cert)
else:
certificate_authority_name_b64 = get_subject_b64(crl_signer_certificate)
certificate_akid = get_skid(crl_signer_certificate)
certificate_authority_name_b64, certificate_akid_hex = get_certificate_authority_details(
crl_signer_certificate, crl_signer_delegator_cert, paa_certificate_object, revocation_point["isPAA"])

# validate issuer skid matchces with the one in revocation points
certificate_akid_hex = ''.join('{:02X}'.format(x) for x in certificate_akid)

logging.debug(f"Certificate Authority Name: {certificate_authority_name_b64}")
logging.debug(f"Certificate AKID: {certificate_akid_hex}")
logging.debug(f"revocation_point['issuerSubjectKeyID']: {revocation_point['issuerSubjectKeyID']}")

if revocation_point["issuerSubjectKeyID"] != certificate_akid_hex:
logging.warning("CRL Issuer Subject Key ID is not CRL Signer Subject Key ID, continue...")
continue

serialnumber_list = []
# 10. Iterate through the Revoked Certificates List
for revoked_cert in crl_file:
try:
revoked_cert_issuer = revoked_cert.extensions.get_extension_for_oid(
x509.CRLEntryExtensionOID.CERTIFICATE_ISSUER).value.get_values_for_type(x509.DirectoryName).value

if revoked_cert_issuer is not None:
# check if this really are the same thing
if revoked_cert_issuer != certificate_authority_name_b64:
logging.warning("CRL Issuer is not CRL File Issuer, continue...")
continue
except Exception:
logging.warning("certificateIssuer entry extension not found in CRL")
pass

serialnumber_list.append(bytes(str('{:02X}'.format(revoked_cert.serial_number)), 'utf-8').decode('utf-8'))

entry = {
"type": "revocation_set",
"issuer_subject_key_id": certificate_akid_hex,
"issuer_name": certificate_authority_name_b64,
"revoked_serial_numbers": serialnumber_list,
"crl_signer_cert": base64.b64encode(crl_signer_certificate.public_bytes(serialization.Encoding.DER)).decode('utf-8'),
}

if crl_signer_delegator_cert:
entry["crl_signer_delegator"] = base64.b64encode(
crl_signer_delegator_cert.public_bytes(serialization.Encoding.DER)).decode('utf-8'),
entry = generate_revocation_set_from_crl(crl_file, crl_signer_certificate,
certificate_authority_name_b64, certificate_akid_hex, crl_signer_delegator_cert)
logging.debug(f"Entry to append: {entry}")
revocation_set.append(entry)

with open(output, 'w+') as outfile:
json.dump(revocation_set, outfile, indent=4)


@cli.command('from-crl')
@click.option('--crl', required=True, type=click.File('rb'), help='Path to the CRL file')
@click.option('--crl-signer', required=True, type=click.File('rb'), help='Path to the signer certificate')
@click.option('--delegator', type=click.File('rb'), help='Path to the delegator certificate (optional)')
@click.option('--paa', type=click.File('rb'), help='Path to the PAA certificate (optional)')
@click.option('--output', default='revocation_set.json', type=click.File('w'), help='Output filename (default: revocation_set.json)')
@click.option('--is-paa', default=False, is_flag=True, help='Indicates if the CRL issuer is the PAA')
def from_crl(crl, crl_signer, delegator, paa, output, is_paa):
"""Generate revocation set from a single CRL file"""
crl = x509.load_pem_x509_crl(crl.read())
crl_signer = x509.load_pem_x509_certificate(crl_signer.read())
delegator = x509.load_pem_x509_certificate(delegator.read()) if delegator else None
paa = x509.load_pem_x509_certificate(paa.read()) if paa else None

ca_name_b64, ca_akid_hex = get_certificate_authority_details(crl_signer, delegator, paa, is_paa)
revocation_set = generate_revocation_set_from_crl(crl, crl_signer, ca_name_b64, ca_akid_hex, delegator)
output.write(json.dumps([revocation_set], indent=4))


class TestRevocationSetGeneration(unittest.TestCase):
"""Test class for revocation set generation"""

def setUp(self):
# Get the directory containing this file
self.test_base_dir = os.path.dirname(os.path.abspath(__file__))

def get_test_file_path(self, filename):
return os.path.join(self.test_base_dir, 'test', filename)

def compare_revocation_sets(self, generated_set, expected_file):
with open(os.path.join(self.test_base_dir, expected_file), 'r') as f:
expected_set = json.load(f)

# Compare the contents
self.assertEqual(len([generated_set]), len(expected_set))
expected = expected_set[0]

# Compare required fields
self.assertEqual(generated_set['type'], expected['type'])
self.assertEqual(generated_set['issuer_subject_key_id'], expected['issuer_subject_key_id'])
self.assertEqual(generated_set['issuer_name'], expected['issuer_name'])
self.assertEqual(set(generated_set['revoked_serial_numbers']), set(expected['revoked_serial_numbers']))
self.assertEqual(generated_set['crl_signer_cert'], expected['crl_signer_cert'])

# Compare optional fields if present in either set
if 'crl_signer_delegator' in generated_set and 'crl_signer_delegator' in expected:
self.assertEqual(generated_set['crl_signer_delegator'], expected['crl_signer_delegator'],
"CRL signer delegator certificates do not match")
elif 'crl_signer_delegator' in generated_set or 'crl_signer_delegator' in expected:
self.fail("CRL signer delegator certificate is missing in one of the sets")

def test_paa_revocation_set(self):
"""Test generation of PAA revocation set"""
with open(self.get_test_file_path('revoked-attestation-certificates/Chip-Test-PAA-FFF1-CRL.pem'), 'rb') as f:
crl = x509.load_pem_x509_crl(f.read())
with open(self.get_test_file_path('revoked-attestation-certificates/Chip-Test-PAA-FFF1-Cert.pem'), 'rb') as f:
crl_signer = x509.load_pem_x509_certificate(f.read())

ca_name_b64, ca_akid_hex = get_certificate_authority_details(
crl_signer, None, None, True)
revocation_set = generate_revocation_set_from_crl(
crl, crl_signer, ca_name_b64, ca_akid_hex, None)

self.compare_revocation_sets(
revocation_set,
'test/revoked-attestation-certificates/revocation-sets/revocation-set-for-paa.json'
)

def test_pai_revocation_set(self):
"""Test generation of PAI revocation set"""
with open(self.get_test_file_path('revoked-attestation-certificates/Matter-Development-PAI-FFF1-noPID-CRL.pem'), 'rb') as f:
crl = x509.load_pem_x509_crl(f.read())
with open(self.get_test_file_path('revoked-attestation-certificates/Matter-Development-PAI-FFF1-noPID-Cert.pem'), 'rb') as f:
crl_signer = x509.load_pem_x509_certificate(f.read())
with open(self.get_test_file_path('revoked-attestation-certificates/Chip-Test-PAA-FFF1-Cert.pem'), 'rb') as f:
paa = x509.load_pem_x509_certificate(f.read())

ca_name_b64, ca_akid_hex = get_certificate_authority_details(
crl_signer, None, paa, False)
revocation_set = generate_revocation_set_from_crl(
crl, crl_signer, ca_name_b64, ca_akid_hex, None)

self.compare_revocation_sets(
revocation_set,
'test/revoked-attestation-certificates/revocation-sets/revocation-set-for-pai.json'
)


if __name__ == "__main__":
if len(sys.argv) == 1:
main.main(['--help'])
if len(sys.argv) > 1 and sys.argv[1] == 'test':
# Remove the 'test' argument and run tests
sys.argv.pop(1)
unittest.main()
elif len(sys.argv) == 1:
cli.main(['--help'])
else:
main()
cli()
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[
{
"type": "revocation_set",
"issuer_subject_key_id": "6AFD22771F511FECBF1641976710DCDC31A1717E",
"issuer_name": "MDAxGDAWBgNVBAMMD01hdHRlciBUZXN0IFBBQTEUMBIGCisGAQQBgqJ8AgEMBEZGRjE=",
"revoked_serial_numbers": ["302664392B8A3F2A"],
"crl_signer_cert": "MIIBvTCCAWSgAwIBAgIITqjoMYLUHBwwCgYIKoZIzj0EAwIwMDEYMBYGA1UEAwwPTWF0dGVyIFRlc3QgUEFBMRQwEgYKKwYBBAGConwCAQwERkZGMTAgFw0yMTA2MjgxNDIzNDNaGA85OTk5MTIzMTIzNTk1OVowMDEYMBYGA1UEAwwPTWF0dGVyIFRlc3QgUEFBMRQwEgYKKwYBBAGConwCAQwERkZGMTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABLbLY3KIfyko9brIGqnZOuJDHK2p154kL2UXfvnO2TKijs0Duq9qj8oYShpQNUKWDUU/MD8fGUIddR6Pjxqam3WjZjBkMBIGA1UdEwEB/wQIMAYBAf8CAQEwDgYDVR0PAQH/BAQDAgEGMB0GA1UdDgQWBBRq/SJ3H1Ef7L8WQZdnENzcMaFxfjAfBgNVHSMEGDAWgBRq/SJ3H1Ef7L8WQZdnENzcMaFxfjAKBggqhkjOPQQDAgNHADBEAiBQqoAC9NkyqaAFOPZTaK0P/8jvu8m+t9pWmDXPmqdRDgIgI7rI/g8j51RFtlM5CBpHmUkpxyqvChVI1A0DTVFLJd4="
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[
{
"type": "revocation_set",
"issuer_subject_key_id": "63540E47F64B1C38D13884A462D16C195D8FFB3C",
"issuer_name": "MD0xJTAjBgNVBAMMHE1hdHRlciBEZXYgUEFJIDB4RkZGMSBubyBQSUQxFDASBgorBgEEAYKifAIBDARGRkYx",
"revoked_serial_numbers": [
"AB042494323FE54",
"19367D978EAC533A",
"2569383D24BB36EA"
],
"crl_signer_cert": "MIIByzCCAXGgAwIBAgIIVq2CIq2UW2QwCgYIKoZIzj0EAwIwMDEYMBYGA1UEAwwPTWF0dGVyIFRlc3QgUEFBMRQwEgYKKwYBBAGConwCAQwERkZGMTAgFw0yMjAyMDUwMDAwMDBaGA85OTk5MTIzMTIzNTk1OVowPTElMCMGA1UEAwwcTWF0dGVyIERldiBQQUkgMHhGRkYxIG5vIFBJRDEUMBIGCisGAQQBgqJ8AgEMBEZGRjEwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAARBmpMVwhc+DIyHbQPM/JRIUmR/f+xeUIL0BZko7KiUxZQVEwmsYx5MsDOSr2hLC6+35ls7gWLC9Sv5MbjneqqCo2YwZDASBgNVHRMBAf8ECDAGAQH/AgEAMA4GA1UdDwEB/wQEAwIBBjAdBgNVHQ4EFgQUY1QOR/ZLHDjROISkYtFsGV2P+zwwHwYDVR0jBBgwFoAUav0idx9RH+y/FkGXZxDc3DGhcX4wCgYIKoZIzj0EAwIDSAAwRQIhALLvJ/Sa6bUPuR7qyUxNC9u415KcbLiPrOUpNo0SBUwMAiBlXckrhr2QmIKmxiF3uCXX0F7b58Ivn+pxIg5+pwP4kQ=="
}
]
1 change: 1 addition & 0 deletions scripts/setup/requirements.build.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

# scripts/build
click
click-option-group

# scripts/py_matter_idl/matter_idl
jinja2
Expand Down

0 comments on commit d7e30d0

Please sign in to comment.