You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
552 lines
19 KiB
552 lines
19 KiB
import os
|
|
import subprocess
|
|
import shutil
|
|
import tempfile
|
|
import enum
|
|
|
|
import asn1crypto.x509
|
|
import asn1crypto.cms
|
|
|
|
from cryptography.x509 import load_der_x509_certificate
|
|
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
|
|
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed
|
|
from cryptography.hazmat.primitives.ciphers import Cipher
|
|
from cryptography.hazmat.primitives.hashes import Hash, SHA1
|
|
|
|
from cryptography.hazmat.primitives.padding import PKCS7 as PKCS7Padding
|
|
from cryptography.hazmat.primitives.cmac import CMAC
|
|
from cryptography.hazmat.primitives.hmac import HMAC
|
|
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
|
|
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
|
|
from cryptography.hazmat.primitives.ciphers.algorithms import TripleDES, AES
|
|
from cryptography.hazmat.primitives.ciphers.modes import ECB, CBC, OFB, CFB
|
|
|
|
from cryptography_x509_verify import (
|
|
VerifyAspect, Verifier, VerifyFailure,
|
|
get_sig_algo_from_oid, get_hash_algo_from_asn,
|
|
)
|
|
|
|
|
|
def to_cms_cert(cert):
|
|
return asn1crypto.x509.Certificate.load(
|
|
cert.public_bytes(Encoding.DER)
|
|
)
|
|
|
|
def to_cms_cert_id(cert):
|
|
return asn1crypto.cms.IssuerAndSerialNumber({
|
|
'issuer': asn1crypto.x509.Name.build({
|
|
a.oid.dotted_string: a.value
|
|
for a in cert.issuer
|
|
}),
|
|
'serial_number': cert.serial_number,
|
|
})
|
|
|
|
def to_cms_hash_algo(algo):
|
|
return {
|
|
'algorithm': algo.name.lower(),
|
|
}
|
|
|
|
def from_cms_hash_algo(desc):
|
|
algo, *params = desc['algorithm'].native.split('_')
|
|
return get_hash_algo_from_asn(algo)
|
|
|
|
def to_cms_asym_pad(pad):
|
|
return {
|
|
'EMSA-PKCS1-v1_5': 'pkcs1v15',
|
|
}[pad.name]
|
|
|
|
def to_cms_asym_algo(key, pad):
|
|
if isinstance(key, RSAPublicKey):
|
|
algo = 'rsaes_{}'.format(to_cms_asym_pad(pad))
|
|
else:
|
|
raise ValueError('unsupported asymmetric PKCS#7 key: {}'.format(key.__class__.__name__))
|
|
return {
|
|
'algorithm': algo,
|
|
}
|
|
|
|
def to_cms_sign_algo(key, pad):
|
|
if isinstance(key, RSAPublicKey):
|
|
algo = 'rsassa_{}'.format(to_cms_asym_pad(pad))
|
|
else:
|
|
raise ValueError('unsupported asymmetric PKCS#7 key: {}'.format(key.__class__.__name__))
|
|
return {
|
|
'algorithm': algo,
|
|
}
|
|
|
|
def from_cms_asym_algo(desc):
|
|
return get_sig_algo_from_oid(desc['algorithm'].dotted)
|
|
|
|
def to_cms_sym_mode(mode):
|
|
return mode.__class__.__name__.lower()
|
|
|
|
def to_cms_sym_algo(algo, mode):
|
|
desc = {
|
|
'algorithm': '{}{}_{}'.format(algo.__class__.__name__.lower(), algo.key_size, to_cms_sym_mode(mode)),
|
|
}
|
|
if isinstance(mode, (CBC, OFB, CFB)):
|
|
desc['parameters'] = mode.initialization_vector
|
|
return desc
|
|
|
|
def from_cms_sym_mode(desc: asn1crypto.algos.EncryptionAlgorithm):
|
|
return {
|
|
'ecb': ECB(),
|
|
'cbc': CBC(desc.encryption_iv),
|
|
'ofb': OFB(desc.encryption_iv),
|
|
'cfb': CFB(desc.encryption_iv),
|
|
}[desc.encryption_mode]
|
|
|
|
def from_cms_sym_algo(desc: asn1crypto.cms.EncryptionAlgorithm, key: bytes):
|
|
cipher = {
|
|
'aes': AES,
|
|
'des': TripleDES,
|
|
'tripledes': TripleDES,
|
|
}[desc.encryption_cipher]
|
|
return cipher(key), from_cms_sym_mode(desc)
|
|
|
|
def from_cms_mac_algo(desc: asn1crypto.cms.HmacAlgorithm, key: bytes):
|
|
algo, *params = desc['algorithm'].native.split('_')
|
|
if algo == 'des':
|
|
return CMAC(TripleDES(key))
|
|
algo = from_cms_hash_algo(desc)
|
|
return HMAC(key, algo)
|
|
|
|
|
|
def match_x509_name(left, right):
|
|
return left.native == right.native
|
|
|
|
def match_cms_cert_id(recip, cms_cert):
|
|
return match_cms_cert_issuer_serial(recip, cms_cert.issuer, cms_cert.serial_number)
|
|
|
|
def match_cms_cert_issuer_serial(recip, issuer, serial):
|
|
return (isinstance(recip, asn1crypto.cms.IssuerAndSerialNumber)
|
|
and match_x509_name(issuer, recip['issuer']) and recip['serial_number'].native == serial)
|
|
|
|
|
|
def p7data_hash(hash_algo, attrs, content):
|
|
if attrs:
|
|
content = asn1crypto.cms.CMSAttributes(attrs).dump()
|
|
h = Hash(hash_algo)
|
|
h.update(content)
|
|
return h.finalize()
|
|
|
|
def p7data_verify(key, pad_algo, hash_algo, attrs, content_type, content, sig):
|
|
if attrs:
|
|
# Have to do our own verification here
|
|
for attr in attrs:
|
|
attr_type = attr['type']
|
|
attr_vals = attr['values']
|
|
if attr_type == 'content_type':
|
|
if attr_vals[0] != content_type:
|
|
return False
|
|
elif attr_type == 'message_digest':
|
|
if p7data_hash(hash_algo, None, content) != attr_vals[0]:
|
|
return False
|
|
else:
|
|
# Unknown attribute, so we can't verify it
|
|
return False
|
|
|
|
content = p7data_hash(hash_algo, attrs, content)
|
|
return key.recover_data_from_signature(sig, pad_algo, hash_algo) == content
|
|
|
|
def p7data_sign(key, pad_algo, hash_algo, content_type, content, use_attrs=False):
|
|
attrs = []
|
|
if use_attrs:
|
|
attrs.append(asn1crypto.cms.CMSAttribute({
|
|
'type': 'content_type',
|
|
'values': [content_type],
|
|
}))
|
|
attrs.append(asn1crypto.cms.CMSAttribute({
|
|
'type': 'message_digest',
|
|
'values': [p7data_hash(hash_algo, None, content)],
|
|
}))
|
|
content = p7data_hash(hash_algo, attrs, content)
|
|
return key.sign(content, pad_algo, Prehashed(hash_algo))
|
|
|
|
def p7pe_sign(data, cert, key, chain=[], algo=SHA1):
|
|
with tempfile.NamedTemporaryFile() as certfile, \
|
|
tempfile.NamedTemporaryFile() as keyfile, \
|
|
tempfile.NamedTemporaryFile() as chainfile, \
|
|
tempfile.NamedTemporaryFile() as infile:
|
|
|
|
certfile.write(cert.public_bytes(Encoding.PEM))
|
|
certfile.flush()
|
|
keyfile.write(key.private_bytes(
|
|
encoding=Encoding.PEM,
|
|
format=PrivateFormat.PKCS8,
|
|
encryption_algorithm=NoEncryption()
|
|
))
|
|
keyfile.flush()
|
|
infile.write(data)
|
|
infile.flush()
|
|
|
|
if chain:
|
|
chainfile.write(b'\n'.join(c.public_bytes(Encoding.PEM) for c in chain))
|
|
chainfile.flush()
|
|
chainargs = ['-ac', chainfile.name]
|
|
else:
|
|
chainargs = []
|
|
|
|
# has to be mktemp() because osslsigncode expects target files to not exist
|
|
temp = tempfile.mktemp()
|
|
outfile = tempfile.mktemp()
|
|
try:
|
|
try:
|
|
subprocess.check_call([
|
|
'osslsigncode', 'remove-signature',
|
|
infile.name, temp,
|
|
])#, stdout=subprocess.DEVNULL)
|
|
except subprocess.CalledProcessError:
|
|
shutil.copyfile(infile.name, temp)
|
|
subprocess.check_call([
|
|
'osslsigncode', 'sign',
|
|
'-h', algo.name.lower(), *chainargs, '-certs', certfile.name, '-key', keyfile.name,
|
|
temp, outfile
|
|
])#, stdout=subprocess.DEVNULL)
|
|
return open(outfile, 'rb').read()
|
|
finally:
|
|
if os.path.isfile(temp):
|
|
os.unlink(temp)
|
|
if os.path.isfile(outfile):
|
|
os.unlink(outfile)
|
|
|
|
def p7pe_verify(data, certs) -> bool:
|
|
with tempfile.NamedTemporaryFile() as infile, \
|
|
tempfile.NamedTemporaryFile() as cafile:
|
|
|
|
infile.write(data)
|
|
infile.flush()
|
|
cafile.write(b'\n'.join(c.public_bytes(Encoding.PEM) for c in certs))
|
|
cafile.flush()
|
|
|
|
res = subprocess.run([
|
|
'osslsigncode', 'verify',
|
|
'-in', infile.name, '-CAfile', cafile.name
|
|
], capture_output=True)
|
|
|
|
if res.returncode != 0:
|
|
print(res.stdout.decode('utf-8'), file=sys.stderr)
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
class PKCS7Type(enum.Flag):
|
|
Compressed = enum.auto()
|
|
Digested = enum.auto()
|
|
TargetDigested = enum.auto() # 'authenticated'
|
|
Signed = enum.auto()
|
|
Encrypted = enum.auto()
|
|
TargetEncrypted = enum.auto() # 'enveloped'
|
|
|
|
def from_cms_type(type):
|
|
t = PKCS7Type(0)
|
|
if type in ('digested_data',):
|
|
t |= PKCS7Type.Digested
|
|
if type in ('compressed_data',):
|
|
t |= PKCS7Type.Compressed
|
|
if type in ('signed_data', 'signed_and_enveloped_data'):
|
|
t |= PKCS7Type.Signed | PKCS7Type.Digested
|
|
if type in ('authenticated_data', 'authenticated_enveloped_data'):
|
|
t |= PKCS7Type.TargetDigested
|
|
if type in ('encrypted_data',):
|
|
t |= PKCS7Type.Encrypted
|
|
if type in ('enveloped_data', 'signed_and_enveloped_data', 'authenticated_enveloped_data'):
|
|
t |= PKCS7Type.TargetEncrypted
|
|
return t
|
|
|
|
|
|
class PKCS7Signer:
|
|
def __init__(self, msg):
|
|
self.msg = msg
|
|
|
|
def matches(self, cert):
|
|
return match_cms_cert_id(self.msg['sid'].chosen, to_cms_cert(cert))
|
|
|
|
def get_hash_algo(self):
|
|
return from_cms_hash_algo(self.msg['digest_algorithm'])
|
|
|
|
def get_sig_algo(self):
|
|
return from_cms_asym_algo(self.msg['signature_algorithm'])
|
|
|
|
def verify(self, cert, content_type, content: bytes) -> bool:
|
|
hash_algo = self.get_hash_algo()
|
|
sig_algo, _, sig_pad = self.get_sig_algo()
|
|
if not isinstance(cert.public_key(), sig_algo):
|
|
return None
|
|
|
|
return p7data_verify(
|
|
cert.public_key(),
|
|
sig_pad, hash_algo,
|
|
self.msg['signed_attrs'].native, content_type,
|
|
content, self.msg['signature'].native,
|
|
)
|
|
|
|
class PKCS7Recipient:
|
|
def __init__(self, msg):
|
|
self.msg = msg
|
|
|
|
def matches(self, cert):
|
|
return match_cms_cert_id(self.msg['rid'].chosen, to_cms_cert(cert))
|
|
|
|
def get_key_algo(self):
|
|
return from_cms_asym_algo(self.msg['key_encryption_algorithm'])
|
|
|
|
def decrypt_key(self, cert_key):
|
|
key_algo, _, key_pad = self.get_key_algo()
|
|
if isinstance(self.msg, asn1crypto.cms.KeyTransRecipientInfo):
|
|
key_enc = self.msg['encrypted_key'].native
|
|
else:
|
|
raise ValueError('recipient type not supported: %s', recip.__class__.__name__)
|
|
|
|
if not isinstance(cert_key.public_key(), key_algo):
|
|
return None
|
|
return cert_key.decrypt(key_enc, key_pad)
|
|
|
|
class PKCS7:
|
|
def __init__(self, msg):
|
|
self.msg = msg
|
|
|
|
@classmethod
|
|
def load(cls, buf):
|
|
return cls(asn1crypto.cms.ContentInfo.load(buf))
|
|
|
|
def dump(self):
|
|
return self.msg.dump()
|
|
|
|
@classmethod
|
|
def create_signed(cls, data=None, signers=[], certs=[], algo=SHA1, pad=PKCS1v15):
|
|
signatures = [cls.sign_for_cert(data, cert, key, algo(), pad()) for (cert, key) in signers]
|
|
return cls(asn1crypto.cms.ContentInfo({
|
|
'content_type': 'signed_data',
|
|
'content': {
|
|
'version': 'v1',
|
|
'digest_algorithms': list(set(s['digest_algorithm'] for s in signatures)),
|
|
'encap_content_info': {
|
|
'content_type': 'data',
|
|
**({'content': data} if data is not None else {}),
|
|
},
|
|
'certificates': [to_cms_cert(c) for c in certs + [cert for cert, key in signers]],
|
|
'signer_infos': signatures,
|
|
}
|
|
}))
|
|
|
|
@classmethod
|
|
def sign_for_cert(cls, data, cert, key, algo, pad):
|
|
return asn1crypto.cms.SignerInfo({
|
|
'version': 'v1',
|
|
'sid': to_cms_cert_id(cert),
|
|
'digest_algorithm': to_cms_hash_algo(algo),
|
|
'signature_algorithm': to_cms_sign_algo(key.public_key(), pad),
|
|
'signature': p7data_sign(key, pad, algo, 'data', data, use_attrs=True),
|
|
})
|
|
|
|
@classmethod
|
|
def create_store(cls, certificates):
|
|
return cls.create_signed(certs=certificates)
|
|
|
|
@classmethod
|
|
def create_encrypted(cls, data, recipients, key_pad=PKCS1v15, algo=AES, mode=CBC, keylength=128, key=None, iv=None):
|
|
if key:
|
|
keylength = len(key) * 8
|
|
if not key:
|
|
key = os.urandom(keylength // 8)
|
|
if not iv:
|
|
iv = os.urandom(algo.block_size // 8)
|
|
|
|
return cls(asn1crypto.cms.ContentInfo({
|
|
'content_type': 'enveloped_data',
|
|
'content': {
|
|
'version': 'v0',
|
|
'recipient_infos': [
|
|
cls.encrypt_key_for_cert(key, cert, key_pad())
|
|
for cert in recipients
|
|
],
|
|
'encrypted_content_info': cls.encrypt_content(data, algo(key), mode(iv)),
|
|
}
|
|
}))
|
|
|
|
@classmethod
|
|
def encrypt_content(cls, data, algo, mode):
|
|
padder = PKCS7Padding(algo.block_size).padder()
|
|
pad_data = padder.update(data) + padder.finalize()
|
|
cipher = Cipher(algo, mode).encryptor()
|
|
enc_data = cipher.update(pad_data) + cipher.finalize()
|
|
return asn1crypto.cms.EncryptedContentInfo({
|
|
'content_type': 'data',
|
|
'content_encryption_algorithm': to_cms_sym_algo(algo, mode),
|
|
'encrypted_content': enc_data,
|
|
})
|
|
|
|
@classmethod
|
|
def encrypt_key_for_cert(cls, data, cert, pad):
|
|
return asn1crypto.cms.KeyTransRecipientInfo({
|
|
'version': 'v0',
|
|
'rid': to_cms_cert_id(cert),
|
|
'key_encryption_algorithm': to_cms_asym_algo(cert.public_key(), pad),
|
|
'encrypted_key': cert.public_key().encrypt(data, pad),
|
|
})
|
|
|
|
|
|
def get_type(self):
|
|
return from_cms_type(self.msg['content_type'].native)
|
|
|
|
def get_encrypted_info(self):
|
|
type = self.get_type()
|
|
if type & PKCS7Type.TargetEncrypted and type & PKCS7Type.TargetDigested:
|
|
return self.msg['content']['auth_encrypted_content_info']
|
|
elif type & (PKCS7Type.TargetEncrypted | PKCS7Type.Encrypted):
|
|
return self.msg['content']['encrypted_content_info']
|
|
return None
|
|
|
|
def get_content_info(self):
|
|
type = self.get_type()
|
|
|
|
if type & (PKCS7Type.Encrypted | PKCS7Type.TargetEncrypted):
|
|
return self.get_encrypted_info()
|
|
elif type & (PKCS7Type.Digested | PKCS7Type.Compressed):
|
|
return self.msg['content']['encap_content_info']
|
|
else:
|
|
return None
|
|
|
|
def get_certificates(self):
|
|
type = self.get_type()
|
|
certs = []
|
|
if type & PKCS7Type.Signed:
|
|
certs.extend(c.dump() for c in self.msg['content']['certificates'])
|
|
elif type & (PKCS7Type.TargetDigested | PKCS7Type.TargetEncrypted):
|
|
certs.extend(c.dump() for c in self.msg['content']['originator_info'])
|
|
|
|
return [load_der_x509_certificate(c) for c in certs]
|
|
|
|
def get_recipients(self):
|
|
if self.get_type() & (PKCS7Type.TargetEncrypted | PKCS7Type.TargetDigested):
|
|
return [PKCS7Recipient(r.chosen) for r in self.msg['content']['recipient_infos']]
|
|
return []
|
|
|
|
def get_signers(self):
|
|
if self.get_type() & PKCS7Type.Signed:
|
|
return [PKCS7Signer(s) for s in self.msg['content']['signer_infos']]
|
|
return []
|
|
|
|
def get_hash_algo(self):
|
|
if self.get_type() & (PKCS7Type.Digested | PKCS7Type.TargetDigested):
|
|
return from_cms_hash_algo(self.msg['content']['digest_algorithm'])
|
|
|
|
def get_hash(self):
|
|
if self.get_type() & PKCS7Type.Digested:
|
|
return self.msg['content']['digest'].native
|
|
if self.get_type() & PKCS7Type.TargetDigested:
|
|
return self.msg['content']['mac'].native
|
|
|
|
def get_encryption_algo(self, key):
|
|
if self.get_type() & (PKCS7Type.Encrypted | PKCS7Type.TargetEncrypted):
|
|
return from_cms_sym_algo(self.get_content_info()['content_encryption_algorithm'], key)
|
|
if self.get_type() & PKCS7Type.TargetDigested:
|
|
return from_cms_hmac_algo(self.msg['content']['mac_algorithm'], key)
|
|
return None
|
|
|
|
def get_verify_content(self):
|
|
if self.get_type() & PKCS7Type.Signed:
|
|
return self.get_content_info()['content'].contents
|
|
|
|
def get_content(self):
|
|
info = self.get_content_info()
|
|
if not info:
|
|
info = self.msg
|
|
if self.get_type() & PKCS7Type.Compressed:
|
|
return self.msg.decompressed
|
|
elif self.get_type() & (PKCS7Type.Encrypted | PKCS7Type.TargetEncrypted):
|
|
return info['encrypted_content'].native
|
|
else:
|
|
return info['content'].native
|
|
|
|
|
|
def decrypt_for_password(self, password: str, content=None):
|
|
...
|
|
return self.decrypt_content(key, content)
|
|
|
|
def decrypt_for_cert(self, cert, key, content=None):
|
|
for recip in self.get_recipients():
|
|
if recip.matches(cert):
|
|
return self.decrypt_for_recipient(recip, key, content)
|
|
return None, None
|
|
|
|
def decrypt_for_recipient(self, recip, key, content=None):
|
|
key_dec = recip.decrypt_key(key)
|
|
return self.decrypt_content(key_dec, content), recip
|
|
|
|
def decrypt_content(self, key: bytes, content=None) -> bytes:
|
|
algo, mode = self.get_encryption_algo(key)
|
|
if not content:
|
|
content = self.get_content()
|
|
|
|
cipher = Cipher(algo, mode).decryptor()
|
|
content_pad = cipher.update(content) + cipher.finalize()
|
|
unpadder = PKCS7Padding(algo.block_size).unpadder()
|
|
content_dec = unpadder.update(content_pad) + unpadder.finalize()
|
|
return content_dec
|
|
|
|
def verify_content(self, content=None):
|
|
if not content:
|
|
content = self.get_verify_content()
|
|
|
|
return self.get_hash() == p7data_hash(
|
|
self.get_hash_algo(),
|
|
None,
|
|
content,
|
|
)
|
|
|
|
def verify_for_cert(self, cert, key, content=None):
|
|
for recip in self.get_recipients():
|
|
if recip.matches(cert):
|
|
return self.verify_for_recipient(recip, key, content)
|
|
return False
|
|
|
|
def verify_for_recipient(self, recip, key, content=None):
|
|
key_dec = recip.decrypt_key(key)
|
|
return self.verify_enc_content(key_dec, content)
|
|
|
|
def verify_enc_content(self, key: bytes, content=None):
|
|
if not content:
|
|
content = self.get_content()
|
|
|
|
algo = self.get_encryption_algo(key)
|
|
return algo.decrypt(self.get_hash()) == p7data_hash(
|
|
self.get_hash_algo(),
|
|
self.msg['content']['auth_attrs'].native,
|
|
content,
|
|
)
|
|
|
|
def verify_for_certs(self, root_certs, content=None, any=True, aspects=None):
|
|
success = False
|
|
chain = self.get_certificates()
|
|
all_certs = chain + root_certs
|
|
|
|
verifier = Verifier(aspects)
|
|
|
|
for signer in self.get_signers():
|
|
for cert in all_certs:
|
|
if not signer.matches(cert):
|
|
continue
|
|
try:
|
|
verifier.verify_chain(cert, root_certs, chain)
|
|
except VerifyFailure:
|
|
continue
|
|
if self.verify_for_signer(signer, cert, content):
|
|
# Verified signature found!
|
|
success = True
|
|
break
|
|
else:
|
|
# No verified signature found for this signer
|
|
if not any:
|
|
success = False
|
|
break
|
|
if any and success:
|
|
break
|
|
|
|
return success
|
|
|
|
def verify_for_signer(self, signer, cert, content=None):
|
|
if not content:
|
|
content = self.get_verify_content()
|
|
return signer.verify(cert, self.get_content_info()['content_type'].native, content)
|
|
|