python-pkcs7/pkcs7.py

655 lines
22 KiB
Python

import os
import subprocess
import shutil
import tempfile
import enum
import asn1crypto.x509
import asn1crypto.cms
from cryptography.exceptions import InvalidKey
from cryptography.x509 import load_der_x509_certificate
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed
from cryptography.hazmat.primitives.ciphers import Cipher
from cryptography.hazmat.primitives.hashes import Hash, SHA1
from cryptography.hazmat.primitives.padding import PKCS7 as PKCS7Padding
from cryptography.hazmat.primitives.cmac import CMAC
from cryptography.hazmat.primitives.hmac import HMAC
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives.ciphers.algorithms import TripleDES, AES
from cryptography.hazmat.primitives.ciphers.modes import (
ModeWithInitializationVector, ModeWithTweak, ModeWithNonce,
ECB, CBC, OFB, CFB
)
from cryptography_x509_verify import (
VerifyAspect, Verifier, VerifyFailure,
get_sig_algo_from_oid, get_hash_algo_from_asn,
)
def to_cms_cert(cert):
return asn1crypto.x509.Certificate.load(
cert.public_bytes(Encoding.DER)
)
def to_cms_cert_id(cert):
return asn1crypto.cms.IssuerAndSerialNumber({
'issuer': asn1crypto.x509.Name.build({
a.oid.dotted_string: a.value
for a in cert.issuer
}),
'serial_number': cert.serial_number,
})
def to_cms_hash_algo(algo):
return {
'algorithm': algo.name.lower(),
}
def from_cms_hash_algo(desc):
algo, *params = desc['algorithm'].native.split('_')
return get_hash_algo_from_asn(algo)
def to_cms_asym_pad(pad):
return {
'EMSA-PKCS1-v1_5': 'pkcs1v15',
}[pad.name]
def to_cms_asym_algo(key, pad):
if isinstance(key, RSAPublicKey):
algo = 'rsaes_{}'.format(to_cms_asym_pad(pad))
else:
raise ValueError('unsupported asymmetric PKCS#7 key: {}'.format(key.__class__.__name__))
return {
'algorithm': algo,
}
def to_cms_sign_algo(key, pad):
if isinstance(key, RSAPublicKey):
algo = 'rsassa_{}'.format(to_cms_asym_pad(pad))
else:
raise ValueError('unsupported asymmetric PKCS#7 key: {}'.format(key.__class__.__name__))
return {
'algorithm': algo,
}
def from_cms_asym_algo(desc):
return get_sig_algo_from_oid(desc['algorithm'].dotted)
def to_cms_sym_mode(mode):
return mode.__class__.__name__.lower()
def to_cms_sym_algo(algo, mode):
desc = {
'algorithm': '{}{}_{}'.format(algo.__class__.__name__.lower(), algo.key_size, to_cms_sym_mode(mode)),
}
if isinstance(mode, ModeWithInitializationVector):
desc['parameters'] = mode.initialization_vector
elif isinstance(mode, ModeWithTweak):
desc['parameters'] = mode.tweak
elif isinstance(mode, ModeWithNonce):
desc['parameters'] = mode.nonce
return desc
def from_cms_sym_mode(desc: asn1crypto.algos.EncryptionAlgorithm):
return {
'ecb': ECB(),
'cbc': CBC(desc.encryption_iv),
'ofb': OFB(desc.encryption_iv),
'cfb': CFB(desc.encryption_iv),
}[desc.encryption_mode]
def from_cms_sym_algo(desc: asn1crypto.cms.EncryptionAlgorithm, key: bytes):
cipher = {
'aes': AES,
'des': TripleDES,
'tripledes': TripleDES,
}[desc.encryption_cipher]
return cipher(key), from_cms_sym_mode(desc)
def from_cms_mac_algo(desc: asn1crypto.cms.HmacAlgorithm, key: bytes):
algo, *params = desc['algorithm'].native.split('_')
if algo == 'des':
return CMAC(TripleDES(key))
algo = from_cms_hash_algo(desc)
return HMAC(key, algo)
def match_x509_name(left, right):
return left.native == right.native
def match_cms_cert_id(recip, cms_cert):
return match_cms_cert_issuer_serial(recip, cms_cert.issuer, cms_cert.serial_number)
def match_cms_cert_issuer_serial(recip, issuer, serial):
return (isinstance(recip, asn1crypto.cms.IssuerAndSerialNumber)
and match_x509_name(issuer, recip['issuer']) and recip['serial_number'].native == serial)
def encode_sym_enc_params(algo, mode, key):
desc = to_cms_sym_algo(algo, mode)
params = {
'key': key,
'algo': {
AES: 'aes',
TripleDES: 'des' if len(key) <= 8 else 'tripledes',
}[type(algo)],
'mode': {
ECB: 'ecb',
CBC: 'cbc',
OFB: 'ofb',
CFB: 'cfb',
}[type(mode)],
}
if 'parameters' in desc:
params['parameters'] = desc['parameters']
return params
def decode_sym_enc_params(params):
algo_cls = {
'aes': AES,
'des': TripleDES,
'tripledes': TripleDES,
}[params['algo']]
mode_cls = {
'ecb': ECB,
'cbc': CBC,
'ofb': OFB,
'cfb': CFB,
}[params['mode']]
if 'parameters' in params:
mode = mode_cls(params['parameters'])
else:
mode = mode_cls()
key = params['key']
return algo_cls(key), mode, key
def p7data_hash(hash_algo, attrs, content):
if attrs:
content = asn1crypto.cms.CMSAttributes(attrs).dump()
h = Hash(hash_algo)
h.update(content)
return h.finalize()
def p7data_verify(key, pad_algo, hash_algo, attrs, content_type, content, sig):
if attrs:
# Have to do our own verification here
for attr in attrs:
attr_type = attr['type']
attr_vals = attr['values']
if attr_type == 'content_type':
if attr_vals[0] != content_type:
return False
elif attr_type == 'message_digest':
if p7data_hash(hash_algo, None, content) != attr_vals[0]:
return False
else:
# Unknown attribute, so we can't verify it
return False
content = p7data_hash(hash_algo, attrs, content)
return key.recover_data_from_signature(sig, pad_algo, hash_algo) == content
def p7data_sign(key, pad_algo, hash_algo, content_type, content, use_attrs=False):
attrs = []
if use_attrs:
attrs.append(asn1crypto.cms.CMSAttribute({
'type': 'content_type',
'values': [content_type],
}))
attrs.append(asn1crypto.cms.CMSAttribute({
'type': 'message_digest',
'values': [p7data_hash(hash_algo, None, content)],
}))
content = p7data_hash(hash_algo, attrs, content)
return key.sign(content, pad_algo, Prehashed(hash_algo)), attrs
def p7pe_sign(data, cert, key, chain=[], algo=SHA1):
with tempfile.NamedTemporaryFile() as certfile, \
tempfile.NamedTemporaryFile() as keyfile, \
tempfile.NamedTemporaryFile() as chainfile, \
tempfile.NamedTemporaryFile() as infile:
certfile.write(cert.public_bytes(Encoding.PEM))
certfile.flush()
keyfile.write(key.private_bytes(
encoding=Encoding.PEM,
format=PrivateFormat.PKCS8,
encryption_algorithm=NoEncryption()
))
keyfile.flush()
infile.write(data)
infile.flush()
if chain:
chainfile.write(b'\n'.join(c.public_bytes(Encoding.PEM) for c in chain))
chainfile.flush()
chainargs = ['-ac', chainfile.name]
else:
chainargs = []
# has to be mktemp() because osslsigncode expects target files to not exist
temp = tempfile.mktemp()
outfile = tempfile.mktemp()
try:
try:
subprocess.check_call([
'osslsigncode', 'remove-signature',
infile.name, temp,
])#, stdout=subprocess.DEVNULL)
except subprocess.CalledProcessError:
shutil.copyfile(infile.name, temp)
subprocess.check_call([
'osslsigncode', 'sign',
'-h', algo.name.lower(), *chainargs, '-certs', certfile.name, '-key', keyfile.name,
temp, outfile
])#, stdout=subprocess.DEVNULL)
return open(outfile, 'rb').read()
finally:
if os.path.isfile(temp):
os.unlink(temp)
if os.path.isfile(outfile):
os.unlink(outfile)
def p7pe_verify(data, certs) -> bool:
with tempfile.NamedTemporaryFile() as infile, \
tempfile.NamedTemporaryFile() as cafile:
infile.write(data)
infile.flush()
cafile.write(b'\n'.join(c.public_bytes(Encoding.PEM) for c in certs))
cafile.flush()
res = subprocess.run([
'osslsigncode', 'verify',
'-in', infile.name, '-CAfile', cafile.name
], capture_output=True)
if res.returncode != 0:
return False
else:
return True
class PKCS7Type(enum.Flag):
Compressed = enum.auto()
Digested = enum.auto()
TargetDigested = enum.auto() # 'authenticated'
Signed = enum.auto()
Encrypted = enum.auto()
TargetEncrypted = enum.auto() # 'enveloped'
def from_cms_type(type):
t = PKCS7Type(0)
if type in ('digested_data',):
t |= PKCS7Type.Digested
if type in ('compressed_data',):
t |= PKCS7Type.Compressed
if type in ('signed_data', 'signed_and_enveloped_data'):
t |= PKCS7Type.Signed | PKCS7Type.Digested
if type in ('authenticated_data', 'authenticated_enveloped_data'):
t |= PKCS7Type.TargetDigested
if type in ('encrypted_data',):
t |= PKCS7Type.Encrypted
if type in ('enveloped_data', 'signed_and_enveloped_data', 'authenticated_enveloped_data'):
t |= PKCS7Type.TargetEncrypted
return t
class PKCS7Signer:
def __init__(self, msg):
self.msg = msg
def matches(self, cert):
return match_cms_cert_id(self.msg['sid'].chosen, to_cms_cert(cert))
def get_hash_algo(self):
return from_cms_hash_algo(self.msg['digest_algorithm'])
def get_sig_algo(self):
return from_cms_asym_algo(self.msg['signature_algorithm'])
def verify(self, cert, content_type, content: bytes) -> bool:
hash_algo = self.get_hash_algo()
sig_algo, _, sig_pad = self.get_sig_algo()
if not isinstance(cert.public_key(), sig_algo):
return None
return p7data_verify(
cert.public_key(),
sig_pad, hash_algo,
self.msg['signed_attrs'].native, content_type,
content, self.msg['signature'].native,
)
class PKCS7Recipient:
def __init__(self, msg):
self.msg = msg
def matches(self, cert):
return match_cms_cert_id(self.msg['rid'].chosen, to_cms_cert(cert))
def get_key_algo(self):
return from_cms_asym_algo(self.msg['key_encryption_algorithm'])
def decrypt_key(self, cert_key):
key_algo, _, key_pad = self.get_key_algo()
if isinstance(self.msg, asn1crypto.cms.KeyTransRecipientInfo):
key_enc = self.msg['encrypted_key'].native
else:
raise ValueError('recipient type not supported: %s', recip.__class__.__name__)
if not isinstance(cert_key.public_key(), key_algo):
return None
return cert_key.decrypt(key_enc, key_pad)
class PKCS7:
def __init__(self, msg):
self.msg = msg
@classmethod
def load(cls, buf):
return cls(asn1crypto.cms.ContentInfo.load(buf))
def dump(self):
return self.msg.dump()
@classmethod
def create_signed(cls, data=None, signers=[], certs=[], algo=SHA1, pad=PKCS1v15):
cert, key = signers[0]
with tempfile.NamedTemporaryFile() as certfile, \
tempfile.NamedTemporaryFile() as keyfile, \
tempfile.NamedTemporaryFile() as chainfile, \
tempfile.NamedTemporaryFile() as infile, \
tempfile.NamedTemporaryFile('rb') as outfile:
certfile.write(cert.public_bytes(Encoding.PEM))
certfile.flush()
keyfile.write(key.private_bytes(
encoding=Encoding.PEM,
format=PrivateFormat.PKCS8,
encryption_algorithm=NoEncryption()
))
keyfile.flush()
infile.write(data)
infile.flush()
if certs:
chainfile.write(b'\n'.join(c.public_bytes(Encoding.PEM) for c in certs))
chainfile.flush()
chainargs = ['-certfile', chainfile.name]
else:
chainargs = []
subprocess.check_call([
'openssl', 'smime', '-sign', '-binary', '-nodetach', '-noattr',
*chainargs, '-signer', certfile.name, '-inkey', keyfile.name,
'-in', infile.name, '-out', outfile.name, '-outform', 'der'
])
return cls.load(outfile.read())
@classmethod
def sign_for_cert(cls, data, cert, key, algo, pad):
signature, attrs = p7data_sign(key, pad, algo, 'data', data, use_attrs=True)
return asn1crypto.cms.SignerInfo({
'version': 'v1',
'sid': to_cms_cert_id(cert),
'digest_algorithm': to_cms_hash_algo(algo),
'signed_attrs': asn1crypto.cms.CMSAttributes(attrs),
'signature_algorithm': to_cms_sign_algo(key.public_key(), pad),
'signature': signature,
})
@classmethod
def create_store(cls, certificates, crls=[]):
der_certs = [c.public_bytes(Encoding.DER) for c in certificates]
return cls(asn1crypto.cms.ContentInfo({
'content_type': 'signed_data',
'content': {
'version': 'v1',
'digest_algorithms': [],
'encap_content_info': {
'content_type': 'data'
},
'certificates': [
asn1crypto.x509.Certificate.load(c) for c in der_certs
],
'crls': [],
'signer_infos': []
}
}))
@classmethod
def create_encrypted_with_params(cls, data, recipients, params, key_pad=PKCS1v15):
algo, mode, key = decode_sym_enc_params(params)
return cls(asn1crypto.cms.ContentInfo({
'content_type': 'enveloped_data',
'content': {
'version': 'v0',
'recipient_infos': [
cls.encrypt_key_for_cert(key, cert, key_pad())
for cert in recipients
],
'encrypted_content_info': cls.encrypt_content_with_params(data, params),
}
}))
@classmethod
def create_encrypted(cls, data, recipients, key_pad=PKCS1v15, algo=AES, mode=CBC, keylength=128, key=None, iv=None):
if key:
keylength = len(key) * 8
if not key:
key = os.urandom(keylength // 8)
if not iv:
iv = os.urandom(algo.block_size // 8)
return cls(asn1crypto.cms.ContentInfo({
'content_type': 'enveloped_data',
'content': {
'version': 'v0',
'recipient_infos': [
cls.encrypt_key_for_cert(key, cert, key_pad())
for cert in recipients
],
'encrypted_content_info': cls.encrypt_content(data, algo(key), mode(iv)),
}
}))
@classmethod
def encrypt_content_with_params(cls, data, params):
algo, mode, key = decode_sym_enc_params(params)
return cls.encrypt_content(data, algo, mode)
@classmethod
def encrypt_content(cls, data, algo, mode):
padder = PKCS7Padding(algo.block_size).padder()
pad_data = padder.update(data) + padder.finalize()
cipher = Cipher(algo, mode).encryptor()
enc_data = cipher.update(pad_data) + cipher.finalize()
return asn1crypto.cms.EncryptedContentInfo({
'content_type': 'data',
'content_encryption_algorithm': to_cms_sym_algo(algo, mode),
'encrypted_content': enc_data,
})
@classmethod
def encrypt_key_for_cert(cls, data, cert, pad):
return asn1crypto.cms.KeyTransRecipientInfo({
'version': 'v0',
'rid': to_cms_cert_id(cert),
'key_encryption_algorithm': to_cms_asym_algo(cert.public_key(), pad),
'encrypted_key': cert.public_key().encrypt(data, pad),
})
def get_type(self):
return from_cms_type(self.msg['content_type'].native)
def get_encrypted_info(self):
type = self.get_type()
if type & PKCS7Type.TargetEncrypted and type & PKCS7Type.TargetDigested:
return self.msg['content']['auth_encrypted_content_info']
elif type & (PKCS7Type.TargetEncrypted | PKCS7Type.Encrypted):
return self.msg['content']['encrypted_content_info']
return None
def get_content_info(self):
type = self.get_type()
if type & (PKCS7Type.Encrypted | PKCS7Type.TargetEncrypted):
return self.get_encrypted_info()
elif type & (PKCS7Type.Signed | PKCS7Type.Digested | PKCS7Type.Compressed):
return self.msg['content']['encap_content_info']
else:
return None
def get_certificates(self):
type = self.get_type()
certs = []
if type & PKCS7Type.Signed:
certs.extend(c.dump() for c in self.msg['content']['certificates'])
elif type & (PKCS7Type.TargetDigested | PKCS7Type.TargetEncrypted):
certs.extend(c.dump() for c in self.msg['content']['originator_info'])
return [load_der_x509_certificate(c) for c in certs]
def get_recipients(self):
if self.get_type() & (PKCS7Type.TargetEncrypted | PKCS7Type.TargetDigested):
return [PKCS7Recipient(r.chosen) for r in self.msg['content']['recipient_infos']]
return []
def get_signers(self):
if self.get_type() & PKCS7Type.Signed:
return [PKCS7Signer(s) for s in self.msg['content']['signer_infos']]
return []
def get_hash_algo(self):
if self.get_type() & (PKCS7Type.Digested | PKCS7Type.TargetDigested):
return from_cms_hash_algo(self.msg['content']['digest_algorithm'])
def get_hash(self):
if self.get_type() & PKCS7Type.Digested:
return self.msg['content']['digest'].native
if self.get_type() & PKCS7Type.TargetDigested:
return self.msg['content']['mac'].native
def get_encryption_algo(self, key):
if self.get_type() & (PKCS7Type.Encrypted | PKCS7Type.TargetEncrypted):
return from_cms_sym_algo(self.get_content_info()['content_encryption_algorithm'], key)
if self.get_type() & PKCS7Type.TargetDigested:
return from_cms_hmac_algo(self.msg['content']['mac_algorithm'], key)
return None
def get_verify_content(self):
if self.get_type() & PKCS7Type.Signed:
return self.get_content_info()['content'].native
def get_content(self):
info = self.get_content_info()
if not info:
info = self.msg
if self.get_type() & PKCS7Type.Compressed:
return self.msg.decompressed
elif self.get_type() & (PKCS7Type.Encrypted | PKCS7Type.TargetEncrypted):
return info['encrypted_content'].native
elif self.get_type() & PKCS7Type.Signed:
return self.get_verify_content()
else:
return info['content'].native
def decrypt_for_password(self, password: str, content=None):
...
return self.decrypt_content(key, content)
def decrypt_for_cert(self, cert, key, content=None):
for recip in self.get_recipients():
if recip.matches(cert):
dec, parameters = self.decrypt_for_recipient(recip, key, content)
return dec, (recip, parameters)
raise InvalidKey('no matching recipient found')
def decrypt_for_recipient(self, recip, key, content=None):
key_dec = recip.decrypt_key(key)
return self.decrypt_content(key_dec, content)
def decrypt_content(self, key: bytes, content=None):
algo, mode = self.get_encryption_algo(key)
if not content:
content = self.get_content()
cipher = Cipher(algo, mode).decryptor()
content_pad = cipher.update(content) + cipher.finalize()
unpadder = PKCS7Padding(algo.block_size).unpadder()
content_dec = unpadder.update(content_pad) + unpadder.finalize()
return content_dec, encode_sym_enc_params(algo, mode, key)
def verify_content(self, content=None):
if not content:
content = self.get_verify_content()
return self.get_hash() == p7data_hash(
self.get_hash_algo(),
None,
content,
)
def verify_for_cert(self, cert, key, content=None):
for recip in self.get_recipients():
if recip.matches(cert):
return self.verify_for_recipient(recip, key, content)
return False
def verify_for_recipient(self, recip, key, content=None):
key_dec = recip.decrypt_key(key)
return self.verify_enc_content(key_dec, content)
def verify_enc_content(self, key: bytes, content=None):
if not content:
content = self.get_content()
algo = self.get_encryption_algo(key)
return algo.decrypt(self.get_hash()) == p7data_hash(
self.get_hash_algo(),
self.msg['content']['auth_attrs'].native,
content,
)
def verify_for_certs(self, root_certs, content=None, any=True, aspects=None):
success = False
chain = self.get_certificates()
all_certs = chain + root_certs
verifier = Verifier(aspects)
for signer in self.get_signers():
for cert in all_certs:
if not signer.matches(cert):
continue
try:
verifier.verify_chain(cert, root_certs, chain)
except VerifyFailure:
continue
if self.verify_for_signer(signer, cert, content):
# Verified signature found!
success = True
break
else:
# No verified signature found for this signer
if not any:
success = False
break
if any and success:
break
return success
def verify_for_signer(self, signer, cert, content=None):
if not content:
content = self.get_verify_content()
return signer.verify(cert, self.get_content_info()['content_type'].native, content)