This commit is contained in:
Shiz 2022-09-28 01:25:16 +02:00
commit 55fea3b84c
6 changed files with 569 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/build
*.egg-info

13
LICENSE Normal file
View File

@ -0,0 +1,13 @@
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
Version 2, December 2004
Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
Everyone is permitted to copy and distribute verbatim or modified
copies of this license document, and changing it is allowed as long
as the name is changed.
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
0. You just DO WHAT THE FUCK YOU WANT TO.

537
pkcs7.py Normal file
View File

@ -0,0 +1,537 @@
import os
import subprocess
import shutil
import tempfile
import enum
import asn1crypto.x509
import asn1crypto.cms
from cryptography.x509 import load_der_x509_certificate
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed
from cryptography.hazmat.primitives.ciphers import Cipher
from cryptography.hazmat.primitives.hashes import Hash, MD5, SHA1
from cryptography.hazmat.primitives.padding import PKCS7 as PKCS7Padding
from cryptography.hazmat.primitives.cmac import CMAC
from cryptography.hazmat.primitives.hmac import HMAC
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives.ciphers.algorithms import TripleDES, AES
from cryptography.hazmat.primitives.ciphers.modes import ECB, CBC, OFB, CFB
from cryptography_verify import VerifyAspect, Verifier, VerifyFailure, get_sig_algo_from_oid
def to_cms_cert(cert):
return asn1crypto.x509.Certificate.load(
cert.public_bytes(Encoding.DER)
)
def to_cms_cert_id(cert):
return asn1crypto.cms.IssuerAndSerialNumber({
'issuer': asn1crypto.x509.Name.build({
a.oid.dotted_string: a.value
for a in cert.issuer
}),
'serial_number': cert.serial_number,
})
def to_cms_hash_algo(algo):
return algo.name.lower()
def from_cms_hash_algo(desc):
algo, *params = desc['algorithm'].native.split('_')
return {
'md5': MD5(),
'sha1': SHA1(),
}[algo]
def to_cms_asym_pad(pad):
return pad.name.lower()
def to_cms_asym_algo(key, pad):
if isinstance(key, rsa.RSAPublicKey):
algo = 'rsaes_{}'.format(to_cms_asym_pad(pad))
else:
raise ValueError('unsupported asymmetric PKCS#7 key: {}'.format(key.__class__.__name__))
return {
'algorithm': algo
}
def from_cms_asym_algo(desc):
return get_sig_algo_from_oid(desc['algorithm'].dotted)
def to_cms_sym_mode(mode):
return mode.__name__.lower()
def to_cms_sym_algo(algo, mode):
desc = {
'algorithm': '{}{}_{}'.format(algo.__name__.lower(), algo.key_size, to_cms_sym_mode(mode)),
}
if isinstance(mode, CBC, OFB, CFB):
desc['parameter'] = mode.initialization_vector
return desc
def from_cms_sym_mode(desc: asn1crypto.cms.EncryptionAlgorithm):
return {
'ecb': ECB(),
'cbc': CBC(desc.encryption_iv),
'ofb': OFB(desc.encryption_iv),
'cfb': CFB(desc.encryption_iv),
}[desc['encryption_mode'].native]
def from_cms_sym_algo(desc: asn1crypto.cms.EncryptionAlgorithm, key: bytes):
cipher = {
'aes': AES,
}[desc['encryption_algorithm'].native]
keylength = desc.key_length * 8
return cipher(key), from_cms_sym_mode(desc)
def from_cms_mac_algo(desc: asn1crypto.cms.HmacAlgorithm, key: bytes):
algo, *params = desc['algorithm'].native.split('_')
if algo == 'des':
return cmac.CMAC(TripleDES(key))
algo = from_cms_hash_algo(desc)
return hmac.HMAC(key, algo)
def match_x509_name(left, right):
return left.native == right.native
def match_cms_cert_id(recip, cms_cert):
return match_cms_cert_issuer_serial(recip, cms_cert.issuer, cms_cert.serial_number)
def match_cms_cert_issuer_serial(recip, issuer, serial):
issuer_name = asn1crypto.x509.Name.load(issuer.public_bytes())
return (isinstance(recip, asn1crypto.cms.IssuerAndSerialNumber)
and match_x509_name(issuer_name, recip['issuer']) and recip['serial_number'].native == serial)
def p7data_hash(hash_algo, attrs, content):
if attrs:
content = asn1crypto.cms.CMSAttributes(attrs).dump()
h = Hash(hash_algo)
h.update(content)
return h.finalize()
def p7data_verify(key, pad_algo, hash_algo, attrs, content_type, content, sig):
if attrs:
# Have to do our own verification here
for attr in attrs:
attr_type = attr['type']
attr_vals = attr['values']
if attr_type == 'content_type':
if attr_vals[0] != content_type:
return False
elif attr_type == 'message_digest':
if p7data_hash(hash_algo, None, content) != attr_vals[0]:
return False
else:
# Unknown attribute, so we can't verify it
return False
content = p7data_hash(hash_algo, attrs, content)
return key.recover_data_from_signature(sig, pad_algo, hash_algo) == content
def p7data_sign(key, pad_algo, hash_algo, content_type, content, use_attrs=False):
attrs = []
if use_attrs:
attrs.append(asn1crypto.cms.CMSAttribute({
'type': 'content_type',
'values': [content_type],
}))
attrs.append(asn1crypto.cms.CMSAttribute({
'type': 'message_digest',
'values': [p7data_hash(hash_algo, None, content)],
}))
content = p7data_hash(hash_algo, attrs, content)
return key.sign(content, pad_algo, Prehashed(hash_algo))
def p7pe_sign(data, cert, key, chain=[], algo=SHA1):
with tempfile.NamedTemporaryFile() as certfile, \
tempfile.NamedTemporaryFile() as keyfile, \
tempfile.NamedTemporaryFile() as chainfile, \
tempfile.NamedTemporaryFile() as infile:
certfile.write(cert.public_bytes(Encoding.PEM))
certfile.flush()
keyfile.write(key.private_bytes(
encoding=Encoding.PEM,
format=PrivateFormat.PKCS8,
encryption_algorithm=NoEncryption()
))
keyfile.flush()
infile.write(data)
infile.flush()
if chain:
chainfile.write(b'\n'.join(c.public_bytes(Encoding.PEM) for c in chain))
chainfile.flush()
chainargs = ['-ac', chainfile.name]
else:
chainargs = []
# has to be mktemp() because osslsigncode expects target files to not exist
temp = tempfile.mktemp()
outfile = tempfile.mktemp()
try:
try:
subprocess.check_call([
'osslsigncode', 'remove-signature',
infile.name, temp,
])#, stdout=subprocess.DEVNULL)
except subprocess.CalledProcessError:
shutil.copyfile(infile.name, temp)
subprocess.check_call([
'osslsigncode', 'sign',
'-h', algo.name.lower(), *chainargs, '-certs', certfile.name, '-key', keyfile.name,
temp, outfile
])#, stdout=subprocess.DEVNULL)
return open(outfile, 'rb').read()
finally:
if os.path.isfile(temp):
os.unlink(temp)
if os.path.isfile(outfile):
os.unlink(outfile)
def p7pe_verify(data, certs) -> bool:
with tempfile.NamedTemporaryFile() as infile, \
tempfile.NamedTemporaryFile() as cafile:
infile.write(data)
infile.flush()
cafile.write(b'\n'.join(c.public_bytes(Encoding.PEM) for c in certs))
cafile.flush()
res = subprocess.run([
'osslsigncode', 'verify',
'-in', infile.name, '-CAfile', cafile.name
], capture_output=True)
if res.returncode != 0:
print(res.stdout.decode('utf-8'), file=sys.stderr)
return False
else:
return True
class PKCS7Type(enum.Flag):
Compressed = enum.auto()
Digested = enum.auto()
TargetDigested = enum.auto() # 'authenticated'
Signed = enum.auto()
Encrypted = enum.auto()
TargetEncrypted = enum.auto() # 'enveloped'
def from_cms_type(type):
t = PKCS7Type(0)
if type in ('digested_data',):
t |= PKCS7Type.Digested
if type in ('compressed_data',):
t |= PKCS7Type.Compressed
if type in ('signed_data', 'signed_and_enveloped_data'):
t |= PKCS7Type.Signed | PKCS7Type.Digested
if type in ('authenticated_data', 'authenticated_enveloped_data'):
t |= PKCS7Type.TargetDigested
if type in ('encrypted_data',):
t |= PKCS7Type.Encrypted
if type in ('enveloped_data', 'signed_and_enveloped_data', 'authenticated_enveloped_data'):
t |= PKCS7Type.TargetEncrypted
return t
class PKCS7Signer:
def __init__(self, msg):
self.msg = msg
def matches(self, cert):
return match_cms_cert_id(self.msg['sid'].chosen, cert)
def get_hash_algo(self):
return from_cms_hash_algo(self.msg['digest_algorithm'])
def get_sig_algo(self):
return from_cms_asym_algo(self.msg['signature_algorithm'])
def verify(self, cert, content_type, content: bytes) -> bool:
hash_algo = self.get_hash_algo()
sig_algo, _, sig_pad = self.get_sig_algo()
if not isinstance(cert.public_key(), sig_algo):
return None
return p7data_verify(
cert.public_key(),
sig_pad, hash_algo,
self.msg['signed_attrs'].native, content_type,
content, self.msg['signature'].native,
)
class PKCS7Recipient:
def __init__(self, msg):
self.msg = msg
def matches(self, cert):
return match_cms_cert_id(self.msg['rid'].chosen, cert)
def get_key_algo(self):
return from_cms_asym_algo(self.msg['key_encryption_algorithm'])
def decrypt_key(self, cert_key):
key_algo, _, key_pad = self.get_key_algo()
if isinstance(self.msg, asn1crypto.cms.KeyTransRecipientInfo):
key_enc = self.msg['encrypted_key'].native
else:
raise ValueError('recipient type not supported: %s', recip.__class__.__name__)
if not isinstance(cert_key, key_algo):
return None
return cert_key.decrypt(key_enc, key_pad)
class PKCS7:
def __init__(self, msg):
self.msg = msg
@classmethod
def load(cls, buf):
return cls(asn1crypto.cms.ContentInfo.load(buf))
def dump(self):
return self.msg.dump()
@classmethod
def create_signed(cls, data=None, signers=[], certs=[], algo=SHA1, pad=PKCS1v15):
signatures = [cls.sign_for_cert(data, cert, key, algo(), pad()) for (cert, key) in signers]
return cls(asn1crypto.cms.ContentInfo({
'content_type': 'signed_data',
'content': {
'version': 'v1',
'digest_algorithms': list(set(s['signature_algorithm'] for s in signatures)),
'encap_content_info': {
'content_type': 'data',
**({'content': data} if data is not None else {}),
},
'certificates': [to_cms_cert(c) for c in certs + [cert for cert, key in signers]],
'signer_infos': signatures,
}
}))
@classmethod
def sign_for_cert(cls, data, cert, key, algo, pad):
return asn1crypto.cms.SignerInfo({
'version': 'v1',
'sid': to_cms_cert_id(cert),
'digest_algorithm': to_cms_hash_algo(algo),
'signature_algorithm': to_cms_asym_algo(key.public_key(), pad),
'signature': p7data_sign(key, pad, algo, 'data', data),
})
@classmethod
def create_store(cls, certificates):
return cls.create_signed(certs=certificates)
@classmethod
def create_encrypted(cls, data, recipients, key_pad=PKCS1v15, algo=AES, mode=CBC, keylength=128, key=None, iv=None):
if not key:
key = os.urandom(keylength // 8)
if not iv:
iv = os.urandom(algo.block_size // 8)
return cls(asn1crypto.cms.ContentInfo({
'content_type': 'enveloped_data',
'content': {
'version': 'v0',
'recipient_infos': [
cls.encrypt_key_for_cert(key, cert, key_pad())
for cert in recipients
],
'encrypted_content_info': cls.encrypt_content(data, algo(key), mode(iv)),
}
}))
@classmethod
def encrypt_content(cls, data, algo, mode):
padder = PKCS7Padding(algo.block_size).padder()
pad_data = padder.update(data) + padder.finalize()
cipher = Cipher(algo, mode).encryptor()
enc_data = cipher.update(pad_data) + cipher.finalize()
return asn1crypto.cms.EncryptedContentInfo({
'content_type': 'data',
'content_encryption_algorithm': to_cms_sym_algo(algo, mode),
'encrypted_content': enc_data,
})
@classmethod
def encrypt_key_for_cert(cls, data, cert, pad):
return asn1crypto.cms.KeyTransRecipientInfo({
'version': 'v0',
'rid': to_cms_cert_id(cert),
'key_encryption_algorithm': to_cms_asym_algo(r.public_key(), pad),
'encrypted_key': cert.public_key().encrypt(data, pad),
})
def get_type(self):
return from_cms_type(self.msg['content_type'].native)
def get_encrypted_info(self):
type = self.get_type()
if type & PKCS7Type.TargetEncrypted and type & PKCS7Type.TargetSigned:
return self.msg['content']['auth_encrypted_content_info']
elif type & (PKCS7Type.TargetEncrypted | PKCS7Type.Encrypted):
return self.msg['content']['encrypted_content_info']
return None
def get_content_info(self):
type = self.get_type()
if type & (PKCS7Type.Encrypted | PKCS7Type.TargetEncrypted):
return self.get_encrypted_info()
elif type & (PKCS7Type.Digested | PKCS7Type.Compressed):
return self.msg['content']['encap_content_info']
else:
return None
def get_certificates(self):
type = self.get_type()
certs = []
if type & PKCS7Type.Signed:
certs.extend(c.dump() for c in self.msg['content']['certificates'])
elif type & (PKCS7Type.TargetDigested | PKCS7Type.TargetEncrypted):
certs.extend(c.dump() for c in self.msg['content']['originator_info'])
return [load_der_x509_certificate(c) for c in certs]
def get_recipients(self):
if self.get_type() & (PKCS7Type.TargetEncrypted | PKCS7Type.TargetDigested):
return [PKCS7Recipient(r) for r in self.msg['content']['recipient_infos']]
return []
def get_signers(self):
if self.get_type() & PKCS7Type.Signed:
return [PKCS7Signer(s) for s in self.msg['content']['signer_infos']]
return []
def get_hash_algo(self):
if self.get_type() & (PKCS7Type.Digested | PKCS7Type.TargetDigested):
return from_cms_hash_algo(self.msg['content']['digest_algorithm'])
def get_hash(self):
if self.get_type() & PKCS7Type.Digested:
return self.msg['content']['digest'].native
if self.get_type() & PKCS7Type.TargetDigested:
return self.msg['content']['mac'].native
def get_encryption_algo(self, key):
if self.get_type() & (PKCS7Type.Encrypted | PKCS7Type.TargetEncrypted):
return from_cms_sym_algo(self.get_content_info()['content_encryption_algorithm'], key)
if self.get_type() & PKCS7Type.TargetDigested:
return from_cms_hmac_algo(self.msg['content']['mac_algorithm'], key)
return None
def get_verify_content(self):
if self.get_type() & PKCS7Type.Signed:
return self.get_content_info()['content'].contents
def get_content(self):
info = self.get_content_info()
if not info:
info = self.msg
if self.get_type() & PKCS7Type.Compressed:
return self.msg.decompressed
else:
return info['content'].native
def decrypt_for_password(self, password: str, content=None):
...
return self.decrypt_content(key, content)
def decrypt_for_cert(self, cert, key, content=None):
cms_cert = to_cms_cert(cert)
for recip in self.get_recipients():
if recip.matches(recip, cms_cert):
return self.decrypt_for_recipient(recip, key, content)
return None
def decrypt_for_recipient(self, recip, key, content=None):
key_dec = recip.decrypt_key(key)
return self.decrypt_content(key_dec, content)
def decrypt_content(self, key: bytes, content=None) -> bytes:
algo, mode = self.get_encryption_algo(key)
if not content:
content = self.get_content()
cipher = Cipher(algo, mode).decryptor()
content_pad = cipher.update(content) + cipher.finalize()
unpadder = PKCS7Padding(algo.block_size).unpadder()
content_dec = unpadder.update(content_pad) + unpadder.finalize()
return content_dec
def verify_content(self, content=None):
if not content:
content = self.get_verify_content()
return self.get_hash() == p7data_hash(
self.get_hash_algo(),
None,
content,
)
def verify_for_cert(self, cert, key, content=None):
cms_cert = to_cms_cert(cert)
for recip in self.get_recipients():
if recip.matches(recip, cms_cert):
return self.verify_for_recipient(recip, key, content)
return False
def verify_for_recipient(self, recip, key, content=None):
key_dec = recip.decrypt_key(key)
return self.verify_enc_content(key_dec, content)
def verify_enc_content(self, key: bytes, content=None):
if not content:
content = self.get_content()
algo = self.get_encryption_algo(key)
return algo.decrypt(self.get_hash()) == p7data_hash(
self.get_hash_algo(),
self.msg['content']['auth_attrs'].native,
content,
)
def verify_for_certs(self, root_certs, content=None, any=True, aspects=None):
success = False
chain = self.get_certificates()
all_certs = chain + root_certs
verifier = Verifier(aspects)
for signer in self.get_signers():
for cert in all_certs:
if not signer.matches(cert):
continue
try:
verifier.verify_chain(cert, root_certs, chain)
except VerifyFailure:
continue
if self.verify_for_signer(signer, cert, content):
# Verified signature found!
success = True
break
else:
# No verified signature found for this signer
if not any:
success = False
break
if any and success:
break
return success
def verify_for_signer(self, signer, cert, content=None):
if not content:
content = self.get_verify_content()
return signer.verify(cert, self.get_content_info()['content_type'].native, content)

3
pyproject.toml Normal file
View File

@ -0,0 +1,3 @@
[build-system]
requires = ["setuptools >= 40.9.0"]
build-backend = "setuptools.build_meta"

10
setup.cfg Normal file
View File

@ -0,0 +1,10 @@
[metadata]
name = python-pkcs7
version = 0.1.0
[options]
py_modules = pkcs7
install_requires =
asn1crypto >= 1.5.0
cryptography >= 3.0
cryptography_x509_verify

4
setup.py Normal file
View File

@ -0,0 +1,4 @@
# for legacy compatibility
from setuptools import setup
setup()