import os import subprocess import shutil import tempfile import enum import asn1crypto.x509 import asn1crypto.cms from cryptography.exceptions import InvalidKey from cryptography.x509 import load_der_x509_certificate from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption from cryptography.hazmat.primitives.asymmetric.utils import Prehashed from cryptography.hazmat.primitives.ciphers import Cipher from cryptography.hazmat.primitives.hashes import Hash, SHA1 from cryptography.hazmat.primitives.padding import PKCS7 as PKCS7Padding from cryptography.hazmat.primitives.cmac import CMAC from cryptography.hazmat.primitives.hmac import HMAC from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15 from cryptography.hazmat.primitives.ciphers.algorithms import TripleDES, AES from cryptography.hazmat.primitives.ciphers.modes import ( ModeWithInitializationVector, ModeWithTweak, ModeWithNonce, ECB, CBC, OFB, CFB ) from cryptography_x509_verify import ( VerifyAspect, Verifier, VerifyFailure, get_sig_algo_from_oid, get_hash_algo_from_asn, ) def to_cms_cert(cert): return asn1crypto.x509.Certificate.load( cert.public_bytes(Encoding.DER) ) def to_cms_cert_id(cert): return asn1crypto.cms.IssuerAndSerialNumber({ 'issuer': asn1crypto.x509.Name.build({ a.oid.dotted_string: a.value for a in cert.issuer }), 'serial_number': cert.serial_number, }) def to_cms_hash_algo(algo): return { 'algorithm': algo.name.lower(), } def from_cms_hash_algo(desc): algo, *params = desc['algorithm'].native.split('_') return get_hash_algo_from_asn(algo) def to_cms_asym_pad(pad): return { 'EMSA-PKCS1-v1_5': 'pkcs1v15', }[pad.name] def to_cms_asym_algo(key, pad): if isinstance(key, RSAPublicKey): algo = 'rsaes_{}'.format(to_cms_asym_pad(pad)) else: raise ValueError('unsupported asymmetric PKCS#7 key: {}'.format(key.__class__.__name__)) return { 'algorithm': algo, } def to_cms_sign_algo(key, pad): if isinstance(key, RSAPublicKey): algo = 'rsassa_{}'.format(to_cms_asym_pad(pad)) else: raise ValueError('unsupported asymmetric PKCS#7 key: {}'.format(key.__class__.__name__)) return { 'algorithm': algo, } def from_cms_asym_algo(desc): return get_sig_algo_from_oid(desc['algorithm'].dotted) def to_cms_sym_mode(mode): return mode.__class__.__name__.lower() def to_cms_sym_algo(algo, mode): desc = { 'algorithm': '{}{}_{}'.format(algo.__class__.__name__.lower(), algo.key_size, to_cms_sym_mode(mode)), } if isinstance(mode, ModeWithInitializationVector): desc['parameters'] = mode.initialization_vector elif isinstance(mode, ModeWithTweak): desc['parameters'] = mode.tweak elif isinstance(mode, ModeWithNonce): desc['parameters'] = mode.nonce return desc def from_cms_sym_mode(desc: asn1crypto.algos.EncryptionAlgorithm): return { 'ecb': ECB(), 'cbc': CBC(desc.encryption_iv), 'ofb': OFB(desc.encryption_iv), 'cfb': CFB(desc.encryption_iv), }[desc.encryption_mode] def from_cms_sym_algo(desc: asn1crypto.cms.EncryptionAlgorithm, key: bytes): cipher = { 'aes': AES, 'des': TripleDES, 'tripledes': TripleDES, }[desc.encryption_cipher] return cipher(key), from_cms_sym_mode(desc) def from_cms_mac_algo(desc: asn1crypto.cms.HmacAlgorithm, key: bytes): algo, *params = desc['algorithm'].native.split('_') if algo == 'des': return CMAC(TripleDES(key)) algo = from_cms_hash_algo(desc) return HMAC(key, algo) def match_x509_name(left, right): return left.native == right.native def match_cms_cert_id(recip, cms_cert): return match_cms_cert_issuer_serial(recip, cms_cert.issuer, cms_cert.serial_number) def match_cms_cert_issuer_serial(recip, issuer, serial): return (isinstance(recip, asn1crypto.cms.IssuerAndSerialNumber) and match_x509_name(issuer, recip['issuer']) and recip['serial_number'].native == serial) def encode_sym_enc_params(algo, mode, key): desc = to_cms_sym_algo(algo, mode) params = { 'key': key, 'algo': { AES: 'aes', TripleDES: 'des' if len(key) <= 8 else 'tripledes', }[type(algo)], 'mode': { ECB: 'ecb', CBC: 'cbc', OFB: 'ofb', CFB: 'cfb', }[type(mode)], } if 'parameters' in desc: params['parameters'] = desc['parameters'] return params def decode_sym_enc_params(params): algo_cls = { 'aes': AES, 'des': TripleDES, 'tripledes': TripleDES, }[params['algo']] mode_cls = { 'ecb': ECB, 'cbc': CBC, 'ofb': OFB, 'cfb': CFB, }[params['mode']] if 'parameters' in params: mode = mode_cls(params['parameters']) else: mode = mode_cls() key = params['key'] return algo_cls(key), mode, key def p7data_hash(hash_algo, attrs, content): if attrs: content = asn1crypto.cms.CMSAttributes(attrs).dump() h = Hash(hash_algo) h.update(content) return h.finalize() def p7data_verify(key, pad_algo, hash_algo, attrs, content_type, content, sig): if attrs: # Have to do our own verification here for attr in attrs: attr_type = attr['type'] attr_vals = attr['values'] if attr_type == 'content_type': if attr_vals[0] != content_type: return False elif attr_type == 'message_digest': if p7data_hash(hash_algo, None, content) != attr_vals[0]: return False else: # Unknown attribute, so we can't verify it return False content = p7data_hash(hash_algo, attrs, content) return key.recover_data_from_signature(sig, pad_algo, hash_algo) == content def p7data_sign(key, pad_algo, hash_algo, content_type, content, use_attrs=False): attrs = [] if use_attrs: attrs.append(asn1crypto.cms.CMSAttribute({ 'type': 'content_type', 'values': [content_type], })) attrs.append(asn1crypto.cms.CMSAttribute({ 'type': 'message_digest', 'values': [p7data_hash(hash_algo, None, content)], })) content = p7data_hash(hash_algo, attrs, content) return key.sign(content, pad_algo, Prehashed(hash_algo)), attrs def p7pe_sign(data, cert, key, chain=[], algo=SHA1): with tempfile.NamedTemporaryFile() as certfile, \ tempfile.NamedTemporaryFile() as keyfile, \ tempfile.NamedTemporaryFile() as chainfile, \ tempfile.NamedTemporaryFile() as infile: certfile.write(cert.public_bytes(Encoding.PEM)) certfile.flush() keyfile.write(key.private_bytes( encoding=Encoding.PEM, format=PrivateFormat.PKCS8, encryption_algorithm=NoEncryption() )) keyfile.flush() infile.write(data) infile.flush() if chain: chainfile.write(b'\n'.join(c.public_bytes(Encoding.PEM) for c in chain)) chainfile.flush() chainargs = ['-ac', chainfile.name] else: chainargs = [] # has to be mktemp() because osslsigncode expects target files to not exist temp = tempfile.mktemp() outfile = tempfile.mktemp() try: try: subprocess.check_call([ 'osslsigncode', 'remove-signature', infile.name, temp, ])#, stdout=subprocess.DEVNULL) except subprocess.CalledProcessError: shutil.copyfile(infile.name, temp) subprocess.check_call([ 'osslsigncode', 'sign', '-h', algo.name.lower(), *chainargs, '-certs', certfile.name, '-key', keyfile.name, temp, outfile ])#, stdout=subprocess.DEVNULL) return open(outfile, 'rb').read() finally: if os.path.isfile(temp): os.unlink(temp) if os.path.isfile(outfile): os.unlink(outfile) def p7pe_verify(data, certs) -> bool: with tempfile.NamedTemporaryFile() as infile, \ tempfile.NamedTemporaryFile() as cafile: infile.write(data) infile.flush() cafile.write(b'\n'.join(c.public_bytes(Encoding.PEM) for c in certs)) cafile.flush() res = subprocess.run([ 'osslsigncode', 'verify', '-in', infile.name, '-CAfile', cafile.name ], capture_output=True) if res.returncode != 0: return False else: return True class PKCS7Type(enum.Flag): Compressed = enum.auto() Digested = enum.auto() TargetDigested = enum.auto() # 'authenticated' Signed = enum.auto() Encrypted = enum.auto() TargetEncrypted = enum.auto() # 'enveloped' def from_cms_type(type): t = PKCS7Type(0) if type in ('digested_data',): t |= PKCS7Type.Digested if type in ('compressed_data',): t |= PKCS7Type.Compressed if type in ('signed_data', 'signed_and_enveloped_data'): t |= PKCS7Type.Signed | PKCS7Type.Digested if type in ('authenticated_data', 'authenticated_enveloped_data'): t |= PKCS7Type.TargetDigested if type in ('encrypted_data',): t |= PKCS7Type.Encrypted if type in ('enveloped_data', 'signed_and_enveloped_data', 'authenticated_enveloped_data'): t |= PKCS7Type.TargetEncrypted return t class PKCS7Signer: def __init__(self, msg): self.msg = msg def matches(self, cert): return match_cms_cert_id(self.msg['sid'].chosen, to_cms_cert(cert)) def get_hash_algo(self): return from_cms_hash_algo(self.msg['digest_algorithm']) def get_sig_algo(self): return from_cms_asym_algo(self.msg['signature_algorithm']) def verify(self, cert, content_type, content: bytes) -> bool: hash_algo = self.get_hash_algo() sig_algo, _, sig_pad = self.get_sig_algo() if not isinstance(cert.public_key(), sig_algo): return None return p7data_verify( cert.public_key(), sig_pad, hash_algo, self.msg['signed_attrs'].native, content_type, content, self.msg['signature'].native, ) class PKCS7Recipient: def __init__(self, msg): self.msg = msg def matches(self, cert): return match_cms_cert_id(self.msg['rid'].chosen, to_cms_cert(cert)) def get_key_algo(self): return from_cms_asym_algo(self.msg['key_encryption_algorithm']) def decrypt_key(self, cert_key): key_algo, _, key_pad = self.get_key_algo() if isinstance(self.msg, asn1crypto.cms.KeyTransRecipientInfo): key_enc = self.msg['encrypted_key'].native else: raise ValueError('recipient type not supported: %s', recip.__class__.__name__) if not isinstance(cert_key.public_key(), key_algo): return None return cert_key.decrypt(key_enc, key_pad) class PKCS7: def __init__(self, msg): self.msg = msg @classmethod def load(cls, buf): return cls(asn1crypto.cms.ContentInfo.load(buf)) def dump(self): return self.msg.dump() @classmethod def create_signed(cls, data=None, signers=[], certs=[], algo=SHA1, pad=PKCS1v15): cert, key = signers[0] with tempfile.NamedTemporaryFile() as certfile, \ tempfile.NamedTemporaryFile() as keyfile, \ tempfile.NamedTemporaryFile() as chainfile, \ tempfile.NamedTemporaryFile() as infile, \ tempfile.NamedTemporaryFile('rb') as outfile: certfile.write(cert.public_bytes(Encoding.PEM)) certfile.flush() keyfile.write(key.private_bytes( encoding=Encoding.PEM, format=PrivateFormat.PKCS8, encryption_algorithm=NoEncryption() )) keyfile.flush() infile.write(data) infile.flush() if certs: chainfile.write(b'\n'.join(c.public_bytes(Encoding.PEM) for c in certs)) chainfile.flush() chainargs = ['-certfile', chainfile.name] else: chainargs = [] subprocess.check_call([ 'openssl', 'smime', '-sign', '-binary', '-nodetach', '-noattr', *chainargs, '-signer', certfile.name, '-inkey', keyfile.name, '-in', infile.name, '-out', outfile.name, '-outform', 'der' ]) return cls.load(outfile.read()) @classmethod def sign_for_cert(cls, data, cert, key, algo, pad): signature, attrs = p7data_sign(key, pad, algo, 'data', data, use_attrs=True) return asn1crypto.cms.SignerInfo({ 'version': 'v1', 'sid': to_cms_cert_id(cert), 'digest_algorithm': to_cms_hash_algo(algo), 'signed_attrs': asn1crypto.cms.CMSAttributes(attrs), 'signature_algorithm': to_cms_sign_algo(key.public_key(), pad), 'signature': signature, }) @classmethod def create_store(cls, certificates, crls=[]): der_certs = [c.public_bytes(Encoding.DER) for c in certificates] return cls(asn1crypto.cms.ContentInfo({ 'content_type': 'signed_data', 'content': { 'version': 'v1', 'digest_algorithms': [], 'encap_content_info': { 'content_type': 'data' }, 'certificates': [ asn1crypto.x509.Certificate.load(c) for c in der_certs ], 'crls': [], 'signer_infos': [] } })) @classmethod def create_encrypted_with_params(cls, data, recipients, params, key_pad=PKCS1v15): algo, mode, key = decode_sym_enc_params(params) return cls(asn1crypto.cms.ContentInfo({ 'content_type': 'enveloped_data', 'content': { 'version': 'v0', 'recipient_infos': [ cls.encrypt_key_for_cert(key, cert, key_pad()) for cert in recipients ], 'encrypted_content_info': cls.encrypt_content_with_params(data, params), } })) @classmethod def create_encrypted(cls, data, recipients, key_pad=PKCS1v15, algo=AES, mode=CBC, keylength=128, key=None, iv=None): if key: keylength = len(key) * 8 if not key: key = os.urandom(keylength // 8) if not iv: iv = os.urandom(algo.block_size // 8) return cls(asn1crypto.cms.ContentInfo({ 'content_type': 'enveloped_data', 'content': { 'version': 'v0', 'recipient_infos': [ cls.encrypt_key_for_cert(key, cert, key_pad()) for cert in recipients ], 'encrypted_content_info': cls.encrypt_content(data, algo(key), mode(iv)), } })) @classmethod def encrypt_content_with_params(cls, data, params): algo, mode, key = decode_sym_enc_params(params) return cls.encrypt_content(data, algo, mode) @classmethod def encrypt_content(cls, data, algo, mode): padder = PKCS7Padding(algo.block_size).padder() pad_data = padder.update(data) + padder.finalize() cipher = Cipher(algo, mode).encryptor() enc_data = cipher.update(pad_data) + cipher.finalize() return asn1crypto.cms.EncryptedContentInfo({ 'content_type': 'data', 'content_encryption_algorithm': to_cms_sym_algo(algo, mode), 'encrypted_content': enc_data, }) @classmethod def encrypt_key_for_cert(cls, data, cert, pad): return asn1crypto.cms.KeyTransRecipientInfo({ 'version': 'v0', 'rid': to_cms_cert_id(cert), 'key_encryption_algorithm': to_cms_asym_algo(cert.public_key(), pad), 'encrypted_key': cert.public_key().encrypt(data, pad), }) def get_type(self): return from_cms_type(self.msg['content_type'].native) def get_encrypted_info(self): type = self.get_type() if type & PKCS7Type.TargetEncrypted and type & PKCS7Type.TargetDigested: return self.msg['content']['auth_encrypted_content_info'] elif type & (PKCS7Type.TargetEncrypted | PKCS7Type.Encrypted): return self.msg['content']['encrypted_content_info'] return None def get_content_info(self): type = self.get_type() if type & (PKCS7Type.Encrypted | PKCS7Type.TargetEncrypted): return self.get_encrypted_info() elif type & (PKCS7Type.Signed | PKCS7Type.Digested | PKCS7Type.Compressed): return self.msg['content']['encap_content_info'] else: return None def get_certificates(self): type = self.get_type() certs = [] if type & PKCS7Type.Signed: certs.extend(c.dump() for c in self.msg['content']['certificates']) elif type & (PKCS7Type.TargetDigested | PKCS7Type.TargetEncrypted): certs.extend(c.dump() for c in self.msg['content']['originator_info']) return [load_der_x509_certificate(c) for c in certs] def get_recipients(self): if self.get_type() & (PKCS7Type.TargetEncrypted | PKCS7Type.TargetDigested): return [PKCS7Recipient(r.chosen) for r in self.msg['content']['recipient_infos']] return [] def get_signers(self): if self.get_type() & PKCS7Type.Signed: return [PKCS7Signer(s) for s in self.msg['content']['signer_infos']] return [] def get_hash_algo(self): if self.get_type() & (PKCS7Type.Digested | PKCS7Type.TargetDigested): return from_cms_hash_algo(self.msg['content']['digest_algorithm']) def get_hash(self): if self.get_type() & PKCS7Type.Digested: return self.msg['content']['digest'].native if self.get_type() & PKCS7Type.TargetDigested: return self.msg['content']['mac'].native def get_encryption_algo(self, key): if self.get_type() & (PKCS7Type.Encrypted | PKCS7Type.TargetEncrypted): return from_cms_sym_algo(self.get_content_info()['content_encryption_algorithm'], key) if self.get_type() & PKCS7Type.TargetDigested: return from_cms_hmac_algo(self.msg['content']['mac_algorithm'], key) return None def get_verify_content(self): if self.get_type() & PKCS7Type.Signed: return self.get_content_info()['content'].native def get_content(self): info = self.get_content_info() if not info: info = self.msg if self.get_type() & PKCS7Type.Compressed: return self.msg.decompressed elif self.get_type() & (PKCS7Type.Encrypted | PKCS7Type.TargetEncrypted): return info['encrypted_content'].native elif self.get_type() & PKCS7Type.Signed: return self.get_verify_content() else: return info['content'].native def decrypt_for_password(self, password: str, content=None): ... return self.decrypt_content(key, content) def decrypt_for_cert(self, cert, key, content=None): for recip in self.get_recipients(): if recip.matches(cert): dec, parameters = self.decrypt_for_recipient(recip, key, content) return dec, (recip, parameters) raise InvalidKey('no matching recipient found') def decrypt_for_recipient(self, recip, key, content=None): key_dec = recip.decrypt_key(key) return self.decrypt_content(key_dec, content) def decrypt_content(self, key: bytes, content=None): algo, mode = self.get_encryption_algo(key) if not content: content = self.get_content() cipher = Cipher(algo, mode).decryptor() content_pad = cipher.update(content) + cipher.finalize() unpadder = PKCS7Padding(algo.block_size).unpadder() content_dec = unpadder.update(content_pad) + unpadder.finalize() return content_dec, encode_sym_enc_params(algo, mode, key) def verify_content(self, content=None): if not content: content = self.get_verify_content() return self.get_hash() == p7data_hash( self.get_hash_algo(), None, content, ) def verify_for_cert(self, cert, key, content=None): for recip in self.get_recipients(): if recip.matches(cert): return self.verify_for_recipient(recip, key, content) return False def verify_for_recipient(self, recip, key, content=None): key_dec = recip.decrypt_key(key) return self.verify_enc_content(key_dec, content) def verify_enc_content(self, key: bytes, content=None): if not content: content = self.get_content() algo = self.get_encryption_algo(key) return algo.decrypt(self.get_hash()) == p7data_hash( self.get_hash_algo(), self.msg['content']['auth_attrs'].native, content, ) def verify_for_certs(self, root_certs, content=None, any=True, aspects=None): success = False chain = self.get_certificates() all_certs = chain + root_certs verifier = Verifier(aspects) for signer in self.get_signers(): for cert in all_certs: if not signer.matches(cert): continue try: verifier.verify_chain(cert, root_certs, chain) except VerifyFailure: continue if self.verify_for_signer(signer, cert, content): # Verified signature found! success = True break else: # No verified signature found for this signer if not any: success = False break if any and success: break return success def verify_for_signer(self, signer, cert, content=None): if not content: content = self.get_verify_content() return signer.verify(cert, self.get_content_info()['content_type'].native, content)