6.1 BOB’S RECEIVER

EXERCISE 6.1: BOB’S RECEIVER

Implement the reverse of this transmitter by creating a ReceiverManager. The exact API might vary a little, but you will probably need at least an update and finalize method. You will need to unpack the keys and IV using Bob’s private key and verify the signature using Alice’s public key. Then, you will decrypt data until it’s exhausted, finally verifying the HMAC over all received data.

Remember, the last bytes of the transmission are the HMAC trailer and are not data to be decrypted by AES. But when update is called, you may not yet know whether these are the last bytes or not! Think through it carefully!


import os
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import hashes , hmac
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import padding, rsa

# WARNING: This code is NOT secure. DO NOT USE!
class TransmissionManager:
    def __init__(self, send_private_key: rsa.RSAPrivateKey, recv_public_key: rsa.RSAPublicKey):
        self.send_private_key = send_private_key
        self.recv_public_key = recv_public_key
        self.ekey = os.urandom(32)
        self.mkey = os.urandom(32)
        self.iv  = os.urandom(16)

        self.encryptor = Cipher(
            algorithms.AES(self.ekey),
            modes.CTR(self.iv),
            backend=default_backend()).encryptor()
        self.mac = hmac.HMAC(
            self.mkey,
            hashes.SHA256(),
            backend=default_backend())

    def initialize(self):
        data = self.ekey + self.iv + self.mkey
        h = hashes.Hash(hashes.SHA256(), backend=default_backend())
        h.update(data)
        data_digest = h.finalize()
        signature = self.send_private_key.sign(
            data_digest,
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH),
            hashes.SHA256())
        ciphertext = self.recv_public_key.encrypt(
            data,
            padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None)) # rarely used. Just leave it 'None'

        ciphertext += signature
        self.mac.update(ciphertext)
        return ciphertext

    def update(self, plaintext):
        ciphertext = self.encryptor.update(plaintext)
        self.mac.update(ciphertext)
        return ciphertext

    def finalize(self):
        return self.mac.finalize()

Now let’s create the ReceiverManager. But before we get into the actual code of this class, let’s first understand the structure of the ciphertext:

  1. The first \(256\) bytes of the ciphertext is the AES encryption key, IV, and the MAC key encrypted under the receiver’s public key. The structure of the plaintext for this part is as follows: AES_ENCRYPTION_KEY + IV + MAC_KEY
    • AES_ENCRYPTION_KEY: \(32\) bytes long
    • IV: \(16\) bytes long
    • MAC_KEY: \(32\) bytes long
  2. The next \(256\) bytes of the ciphertext is a signature over the hash of the AES key, IV, and MAC key by the sender’s private key.
  3. The last \(32\) bytes of the ciphertext is an HMAC over the entire transmission.
class ReceiverManager: 
    def __init__(self, sender_public_key: rsa.RSAPublicKey, receiver_private_key: rsa.RSAPrivateKey): 
        self.sender_public_key = sender_public_key
        self.receiver_private_key = receiver_private_key
        self.data = b'' 
    
    def update(self, ciphertext) -> None: 
        self.data += ciphertext 
    
    def finalize(self) -> bytes: 
        keys_encrypted, sig, body, mac = self.data[:256], self.data[256:512], self.data[512:-32], self.data[-32:]
        
        # first question: has the header been tampered with? 
        header = self.receiver_private_key.decrypt(
            ciphertext=keys_encrypted, 
            padding=padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None,
            ),
        )

        h = hashes.Hash(hashes.SHA256(), backend=default_backend())
        h.update(header)
        header_digest = h.finalize()

        self.sender_public_key.verify(
            signature=sig, 
            data=header_digest,
            padding=padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH,
            ),
            algorithm=hashes.SHA256(),
        )

        # Answer: No it has not been tampered with. 
        aes_key, iv, mac_key = header[:32], header[32:-32], header[-32:]

        # second question: has any data been tampered with?
        mac_er = hmac.HMAC(
            key=mac_key,
            algorithm=hashes.SHA256(),
            backend=default_backend(),
        )
        mac_er.update(self.data[:-32])
        mac_er.verify(mac)

        # Answer: No it has not been tampered with. 

        decryptor = Cipher(
            algorithm=algorithms.AES(aes_key), 
            mode=modes.CTR(iv), 
            backend=default_backend(),
        ).decryptor() 

        plaintext = decryptor.update(body)
        plaintext += decryptor.finalize() 
        return plaintext 
alice_private_key = rsa.generate_private_key(
    public_exponent=65537, 
    key_size=2048, 
    backend=default_backend(),
)
alice_public_key = alice_private_key.public_key() 

bob_private_key = rsa.generate_private_key(
    public_exponent=65537, 
    key_size=2048, 
    backend=default_backend(),
)
bob_public_key = bob_private_key.public_key() 
tm = TransmissionManager(
    send_private_key=alice_private_key,
    recv_public_key=bob_public_key
)

ciphertext = tm.initialize() 
ciphertext += tm.update(b"Hello bob, this is alice.")
ciphertext += tm.finalize() 

print(f"ciphertext: {ciphertext.hex(' ')}")
print(f"len of ciphertext: {len(ciphertext)}")
ciphertext: 8b c6 a6 f7 e0 19 f2 31 b1 e0 a6 05 d3 22 56 1e fd 1d 23 09 a4 26 2a 76 82 02 01 05 04 04 23 a2 dc 4c 0d 27 57 bc 04 1e e9 de 84 5e e8 25 8a 2d 52 bc 04 54 cf 2b da 6c 7b 49 45 dd 8c 1c ab d7 21 4b fb b6 20 03 dd 47 1b 05 6d e4 4c 5a 2a 01 12 23 05 92 a7 b5 54 c5 95 37 f3 57 d2 0c 88 d8 c0 3f f9 e9 36 28 04 4d 51 23 a4 a1 51 ee f6 36 1f db 73 bb da 81 3a bb d3 fb 60 90 66 99 88 7f ef 47 0c 51 f3 54 fd d7 27 87 d7 93 d5 13 c7 b8 5c 35 5e 40 07 3b ad 3b 21 89 82 07 97 bc ff 0d 97 08 2c ca 67 fe 81 83 a3 3d b3 5c c3 fd 64 a2 ed 41 b4 e4 d2 62 cc df af 00 c8 43 a1 cc 07 5b 5b 2a 6e 2b 14 8f 0d 38 40 72 19 01 9e c4 c8 d0 40 ba 7f 9d 9a ea 42 6d ee 2d e7 30 c6 7f 2a f9 08 56 6d c5 b9 5e f6 00 32 bd 06 1c 89 47 df 1e d3 b0 96 56 13 1d d8 dd d1 73 d7 fc 0c f6 93 eb 61 fc 59 4c eb e0 b8 d5 da 11 55 bb eb 66 78 c4 23 1e 4e 08 f3 22 a3 27 9c 02 e4 bd 59 41 d3 65 a8 2c 45 2f d7 16 fb 7a c2 7d 02 5b 18 7f 38 93 91 54 d1 2f da f8 5b a4 ff ac 39 67 14 a7 a8 21 6b 57 4d bb 7f 98 e2 cf 46 f2 ac 7d c5 7f 6e 80 d7 ba cd 7a 66 e5 b5 12 b0 57 87 dc 18 72 95 dd 36 c2 e1 26 d9 95 a2 bb af 6a b5 c0 e3 fa 07 09 91 ec 44 88 b8 65 2d 90 4f 3f 74 44 03 98 78 b6 0e 8d fb 2c 6b 3e 2e 25 96 63 80 e9 20 7a d4 b0 aa af 04 ed 51 62 4e 77 46 18 9f 0e 79 79 2c af d6 6e e3 73 e2 31 8c c1 0f b5 a4 6f ca 5d 34 ca 18 24 41 12 8d 51 43 76 28 10 47 c6 b6 23 d3 c2 d7 39 87 63 0d f3 06 90 66 45 d1 44 91 30 ed 67 02 d0 a9 da 1d c7 33 47 5f 23 81 e2 b9 09 7e 9f 4d 70 a0 0c ff 90 bf 82 5f cb bd df fb 41 b2 d7 92 bc 3b e4 fa 9f 2d fb 92 df d7 5c 43 25 d3 a6 c2 31 fb c5 f7 78 5b 9a a2 5d e3 d4 0e 0f fa 6e 06 44 37 4b fa a6 50 54 90 5c 70 9c 82 b5 fb d4 6a bf 68 85 d7 b7 60 e2 a8 e9 03 b1 aa 1e b5 e1 3a 17 38 8e 52 aa 86 e1 63
len of ciphertext: 569
rm = ReceiverManager(
    sender_public_key=alice_public_key, 
    receiver_private_key=bob_private_key,
)
rm.update(ciphertext=ciphertext)
plaintext = rm.finalize()

print(f"plaintext: {plaintext.hex(' ')}")
print(f"len of plaintext: {len(plaintext)}")
plaintext: 48 65 6c 6c 6f 20 62 6f 62 2c 20 74 68 69 73 20 69 73 20 61 6c 69 63 65 2e
len of plaintext: 25
print(plaintext)
b'Hello bob, this is alice.'

Which is exactly what we expect.