[aes] Add aes_gcm_decrypt_and_verify (#1020)

Authored by: sulyi, pukkandan
This commit is contained in:
Ákos Sülyi 2021-09-19 14:22:31 +02:00 committed by GitHub
parent a63d9bd0b0
commit 09906f554d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 214 additions and 69 deletions

View File

@ -7,7 +7,19 @@ import sys
import unittest import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from yt_dlp.aes import aes_decrypt, aes_encrypt, aes_cbc_decrypt, aes_cbc_encrypt, aes_decrypt_text from yt_dlp.aes import (
aes_decrypt,
aes_encrypt,
aes_cbc_decrypt,
aes_cbc_decrypt_bytes,
aes_cbc_encrypt,
aes_ctr_decrypt,
aes_ctr_encrypt,
aes_gcm_decrypt_and_verify,
aes_gcm_decrypt_and_verify_bytes,
aes_decrypt_text
)
from yt_dlp.compat import compat_pycrypto_AES
from yt_dlp.utils import bytes_to_intlist, intlist_to_bytes from yt_dlp.utils import bytes_to_intlist, intlist_to_bytes
import base64 import base64
@ -27,18 +39,43 @@ class TestAES(unittest.TestCase):
self.assertEqual(decrypted, msg) self.assertEqual(decrypted, msg)
def test_cbc_decrypt(self): def test_cbc_decrypt(self):
data = bytes_to_intlist( data = b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\x27\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd'
b"\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6'\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd" decrypted = intlist_to_bytes(aes_cbc_decrypt(bytes_to_intlist(data), self.key, self.iv))
)
decrypted = intlist_to_bytes(aes_cbc_decrypt(data, self.key, self.iv))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg) self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
if compat_pycrypto_AES:
decrypted = aes_cbc_decrypt_bytes(data, intlist_to_bytes(self.key), intlist_to_bytes(self.iv))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
def test_cbc_encrypt(self): def test_cbc_encrypt(self):
data = bytes_to_intlist(self.secret_msg) data = bytes_to_intlist(self.secret_msg)
encrypted = intlist_to_bytes(aes_cbc_encrypt(data, self.key, self.iv)) encrypted = intlist_to_bytes(aes_cbc_encrypt(data, self.key, self.iv))
self.assertEqual( self.assertEqual(
encrypted, encrypted,
b"\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6'\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd") b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\'\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd')
def test_ctr_decrypt(self):
data = bytes_to_intlist(b'\x03\xc7\xdd\xd4\x8e\xb3\xbc\x1a*O\xdc1\x12+8Aio\xd1z\xb5#\xaf\x08')
decrypted = intlist_to_bytes(aes_ctr_decrypt(data, self.key, self.iv))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
def test_ctr_encrypt(self):
data = bytes_to_intlist(self.secret_msg)
encrypted = intlist_to_bytes(aes_ctr_encrypt(data, self.key, self.iv))
self.assertEqual(
encrypted,
b'\x03\xc7\xdd\xd4\x8e\xb3\xbc\x1a*O\xdc1\x12+8Aio\xd1z\xb5#\xaf\x08')
def test_gcm_decrypt(self):
data = b'\x159Y\xcf5eud\x90\x9c\x85&]\x14\x1d\x0f.\x08\xb4T\xe4/\x17\xbd'
authentication_tag = b'\xe8&I\x80rI\x07\x9d}YWuU@:e'
decrypted = intlist_to_bytes(aes_gcm_decrypt_and_verify(
bytes_to_intlist(data), self.key, bytes_to_intlist(authentication_tag), self.iv[:12]))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
if compat_pycrypto_AES:
decrypted = aes_gcm_decrypt_and_verify_bytes(
data, intlist_to_bytes(self.key), authentication_tag, intlist_to_bytes(self.iv[:12]))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
def test_decrypt_text(self): def test_decrypt_text(self):
password = intlist_to_bytes(self.key).decode('utf-8') password = intlist_to_bytes(self.key).decode('utf-8')

View File

@ -2,7 +2,6 @@ import unittest
from datetime import datetime, timezone from datetime import datetime, timezone
from yt_dlp import cookies from yt_dlp import cookies
from yt_dlp.compat import compat_pycrypto_AES
from yt_dlp.cookies import ( from yt_dlp.cookies import (
LinuxChromeCookieDecryptor, LinuxChromeCookieDecryptor,
MacChromeCookieDecryptor, MacChromeCookieDecryptor,
@ -53,7 +52,6 @@ class TestCookies(unittest.TestCase):
decryptor = LinuxChromeCookieDecryptor('Chrome', YDLLogger()) decryptor = LinuxChromeCookieDecryptor('Chrome', YDLLogger())
self.assertEqual(decryptor.decrypt(encrypted_value), value) self.assertEqual(decryptor.decrypt(encrypted_value), value)
@unittest.skipIf(not compat_pycrypto_AES, 'cryptography library not available')
def test_chrome_cookie_decryptor_windows_v10(self): def test_chrome_cookie_decryptor_windows_v10(self):
with MonkeyPatch(cookies, { with MonkeyPatch(cookies, {
'_get_windows_v10_key': lambda *args, **kwargs: b'Y\xef\xad\xad\xeerp\xf0Y\xe6\x9b\x12\xc2<z\x16]\n\xbb\xb8\xcb\xd7\x9bA\xc3\x14e\x99{\xd6\xf4&' '_get_windows_v10_key': lambda *args, **kwargs: b'Y\xef\xad\xad\xeerp\xf0Y\xe6\x9b\x12\xc2<z\x16]\n\xbb\xb8\xcb\xd7\x9bA\xc3\x14e\x99{\xd6\xf4&'

View File

@ -11,39 +11,59 @@ if compat_pycrypto_AES:
""" Decrypt bytes with AES-CBC using pycryptodome """ """ Decrypt bytes with AES-CBC using pycryptodome """
return compat_pycrypto_AES.new(key, compat_pycrypto_AES.MODE_CBC, iv).decrypt(data) return compat_pycrypto_AES.new(key, compat_pycrypto_AES.MODE_CBC, iv).decrypt(data)
def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce):
""" Decrypt bytes with AES-GCM using pycryptodome """
return compat_pycrypto_AES.new(key, compat_pycrypto_AES.MODE_GCM, nonce).decrypt_and_verify(data, tag)
else: else:
def aes_cbc_decrypt_bytes(data, key, iv): def aes_cbc_decrypt_bytes(data, key, iv):
""" Decrypt bytes with AES-CBC using native implementation since pycryptodome is unavailable """ """ Decrypt bytes with AES-CBC using native implementation since pycryptodome is unavailable """
return intlist_to_bytes(aes_cbc_decrypt(*map(bytes_to_intlist, (data, key, iv)))) return intlist_to_bytes(aes_cbc_decrypt(*map(bytes_to_intlist, (data, key, iv))))
def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce):
""" Decrypt bytes with AES-GCM using native implementation since pycryptodome is unavailable """
return intlist_to_bytes(aes_gcm_decrypt_and_verify(*map(bytes_to_intlist, (data, key, tag, nonce))))
BLOCK_SIZE_BYTES = 16 BLOCK_SIZE_BYTES = 16
def aes_ctr_decrypt(data, key, counter): def aes_ctr_decrypt(data, key, iv):
""" """
Decrypt with aes in counter mode Decrypt with aes in counter mode
@param {int[]} data cipher @param {int[]} data cipher
@param {int[]} key 16/24/32-Byte cipher key @param {int[]} key 16/24/32-Byte cipher key
@param {instance} counter Instance whose next_value function (@returns {int[]} 16-Byte block) @param {int[]} iv 16-Byte initialization vector
returns the next counter block
@returns {int[]} decrypted data @returns {int[]} decrypted data
""" """
return aes_ctr_encrypt(data, key, iv)
def aes_ctr_encrypt(data, key, iv):
"""
Encrypt with aes in counter mode
@param {int[]} data cleartext
@param {int[]} key 16/24/32-Byte cipher key
@param {int[]} iv 16-Byte initialization vector
@returns {int[]} encrypted data
"""
expanded_key = key_expansion(key) expanded_key = key_expansion(key)
block_count = int(ceil(float(len(data)) / BLOCK_SIZE_BYTES)) block_count = int(ceil(float(len(data)) / BLOCK_SIZE_BYTES))
counter = iter_vector(iv)
decrypted_data = [] encrypted_data = []
for i in range(block_count): for i in range(block_count):
counter_block = counter.next_value() counter_block = next(counter)
block = data[i * BLOCK_SIZE_BYTES: (i + 1) * BLOCK_SIZE_BYTES] block = data[i * BLOCK_SIZE_BYTES: (i + 1) * BLOCK_SIZE_BYTES]
block += [0] * (BLOCK_SIZE_BYTES - len(block)) block += [0] * (BLOCK_SIZE_BYTES - len(block))
cipher_counter_block = aes_encrypt(counter_block, expanded_key) cipher_counter_block = aes_encrypt(counter_block, expanded_key)
decrypted_data += xor(block, cipher_counter_block) encrypted_data += xor(block, cipher_counter_block)
decrypted_data = decrypted_data[:len(data)] encrypted_data = encrypted_data[:len(data)]
return decrypted_data return encrypted_data
def aes_cbc_decrypt(data, key, iv): def aes_cbc_decrypt(data, key, iv):
@ -100,39 +120,47 @@ def aes_cbc_encrypt(data, key, iv):
return encrypted_data return encrypted_data
def key_expansion(data): def aes_gcm_decrypt_and_verify(data, key, tag, nonce):
""" """
Generate key schedule Decrypt with aes in GBM mode and checks authenticity using tag
@param {int[]} data 16/24/32-Byte cipher key @param {int[]} data cipher
@returns {int[]} 176/208/240-Byte expanded key @param {int[]} key 16-Byte cipher key
@param {int[]} tag authentication tag
@param {int[]} nonce IV (recommended 12-Byte)
@returns {int[]} decrypted data
""" """
data = data[:] # copy
rcon_iteration = 1
key_size_bytes = len(data)
expanded_key_size_bytes = (key_size_bytes // 4 + 7) * BLOCK_SIZE_BYTES
while len(data) < expanded_key_size_bytes: # XXX: check aes, gcm param
temp = data[-4:]
temp = key_schedule_core(temp, rcon_iteration)
rcon_iteration += 1
data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes])
for _ in range(3): hash_subkey = aes_encrypt([0] * BLOCK_SIZE_BYTES, key_expansion(key))
temp = data[-4:]
data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes])
if key_size_bytes == 32: if len(nonce) == 12:
temp = data[-4:] j0 = nonce + [0, 0, 0, 1]
temp = sub_bytes(temp) else:
data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes]) fill = (BLOCK_SIZE_BYTES - (len(nonce) % BLOCK_SIZE_BYTES)) % BLOCK_SIZE_BYTES + 8
ghash_in = nonce + [0] * fill + bytes_to_intlist((8 * len(nonce)).to_bytes(8, 'big'))
j0 = ghash(hash_subkey, ghash_in)
for _ in range(3 if key_size_bytes == 32 else 2 if key_size_bytes == 24 else 0): # TODO: add nonce support to aes_ctr_decrypt
temp = data[-4:]
data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes])
data = data[:expanded_key_size_bytes]
return data # nonce_ctr = j0[:12]
iv_ctr = inc(j0)
decrypted_data = aes_ctr_decrypt(data, key, iv_ctr + [0] * (BLOCK_SIZE_BYTES - len(iv_ctr)))
pad_len = len(data) // 16 * 16
s_tag = ghash(
hash_subkey,
data
+ [0] * (BLOCK_SIZE_BYTES - len(data) + pad_len) # pad
+ bytes_to_intlist((0 * 8).to_bytes(8, 'big') # length of associated data
+ ((len(data) * 8).to_bytes(8, 'big'))) # length of data
)
if tag != aes_ctr_encrypt(s_tag, key, j0):
raise ValueError("Mismatching authentication tag")
return decrypted_data
def aes_encrypt(data, expanded_key): def aes_encrypt(data, expanded_key):
@ -201,15 +229,7 @@ def aes_decrypt_text(data, password, key_size_bytes):
nonce = data[:NONCE_LENGTH_BYTES] nonce = data[:NONCE_LENGTH_BYTES]
cipher = data[NONCE_LENGTH_BYTES:] cipher = data[NONCE_LENGTH_BYTES:]
class Counter(object): decrypted_data = aes_ctr_decrypt(cipher, key, nonce + [0] * (BLOCK_SIZE_BYTES - NONCE_LENGTH_BYTES))
__value = nonce + [0] * (BLOCK_SIZE_BYTES - NONCE_LENGTH_BYTES)
def next_value(self):
temp = self.__value
self.__value = inc(self.__value)
return temp
decrypted_data = aes_ctr_decrypt(cipher, key, Counter())
plaintext = intlist_to_bytes(decrypted_data) plaintext = intlist_to_bytes(decrypted_data)
return plaintext return plaintext
@ -290,6 +310,47 @@ RIJNDAEL_LOG_TABLE = (0x00, 0x00, 0x19, 0x01, 0x32, 0x02, 0x1a, 0xc6, 0x4b, 0xc7
0x67, 0x4a, 0xed, 0xde, 0xc5, 0x31, 0xfe, 0x18, 0x0d, 0x63, 0x8c, 0x80, 0xc0, 0xf7, 0x70, 0x07) 0x67, 0x4a, 0xed, 0xde, 0xc5, 0x31, 0xfe, 0x18, 0x0d, 0x63, 0x8c, 0x80, 0xc0, 0xf7, 0x70, 0x07)
def key_expansion(data):
"""
Generate key schedule
@param {int[]} data 16/24/32-Byte cipher key
@returns {int[]} 176/208/240-Byte expanded key
"""
data = data[:] # copy
rcon_iteration = 1
key_size_bytes = len(data)
expanded_key_size_bytes = (key_size_bytes // 4 + 7) * BLOCK_SIZE_BYTES
while len(data) < expanded_key_size_bytes:
temp = data[-4:]
temp = key_schedule_core(temp, rcon_iteration)
rcon_iteration += 1
data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes])
for _ in range(3):
temp = data[-4:]
data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes])
if key_size_bytes == 32:
temp = data[-4:]
temp = sub_bytes(temp)
data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes])
for _ in range(3 if key_size_bytes == 32 else 2 if key_size_bytes == 24 else 0):
temp = data[-4:]
data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes])
data = data[:expanded_key_size_bytes]
return data
def iter_vector(iv):
while True:
yield iv
iv = inc(iv)
def sub_bytes(data): def sub_bytes(data):
return [SBOX[x] for x in data] return [SBOX[x] for x in data]
@ -315,7 +376,7 @@ def xor(data1, data2):
def rijndael_mul(a, b): def rijndael_mul(a, b):
if(a == 0 or b == 0): if a == 0 or b == 0:
return 0 return 0
return RIJNDAEL_EXP_TABLE[(RIJNDAEL_LOG_TABLE[a] + RIJNDAEL_LOG_TABLE[b]) % 0xFF] return RIJNDAEL_EXP_TABLE[(RIJNDAEL_LOG_TABLE[a] + RIJNDAEL_LOG_TABLE[b]) % 0xFF]
@ -359,6 +420,20 @@ def shift_rows_inv(data):
return data_shifted return data_shifted
def shift_block(data):
data_shifted = []
bit = 0
for n in data:
if bit:
n |= 0x100
bit = n & 1
n >>= 1
data_shifted.append(n)
return data_shifted
def inc(data): def inc(data):
data = data[:] # copy data = data[:] # copy
for i in range(len(data) - 1, -1, -1): for i in range(len(data) - 1, -1, -1):
@ -370,4 +445,50 @@ def inc(data):
return data return data
__all__ = ['aes_encrypt', 'key_expansion', 'aes_ctr_decrypt', 'aes_cbc_decrypt', 'aes_decrypt_text'] def block_product(block_x, block_y):
# NIST SP 800-38D, Algorithm 1
if len(block_x) != BLOCK_SIZE_BYTES or len(block_y) != BLOCK_SIZE_BYTES:
raise ValueError("Length of blocks need to be %d bytes" % BLOCK_SIZE_BYTES)
block_r = [0xE1] + [0] * (BLOCK_SIZE_BYTES - 1)
block_v = block_y[:]
block_z = [0] * BLOCK_SIZE_BYTES
for i in block_x:
for bit in range(7, -1, -1):
if i & (1 << bit):
block_z = xor(block_z, block_v)
do_xor = block_v[-1] & 1
block_v = shift_block(block_v)
if do_xor:
block_v = xor(block_v, block_r)
return block_z
def ghash(subkey, data):
# NIST SP 800-38D, Algorithm 2
if len(data) % BLOCK_SIZE_BYTES:
raise ValueError("Length of data should be %d bytes" % BLOCK_SIZE_BYTES)
last_y = [0] * BLOCK_SIZE_BYTES
for i in range(0, len(data), BLOCK_SIZE_BYTES):
block = data[i : i + BLOCK_SIZE_BYTES] # noqa: E203
last_y = block_product(xor(last_y, block), subkey)
return last_y
__all__ = [
'aes_ctr_decrypt',
'aes_cbc_decrypt',
'aes_cbc_decrypt_bytes',
'aes_decrypt_text',
'aes_encrypt',
'aes_gcm_decrypt_and_verify',
'aes_gcm_decrypt_and_verify_bytes',
'key_expansion'
]

View File

@ -9,17 +9,14 @@ import tempfile
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from hashlib import pbkdf2_hmac from hashlib import pbkdf2_hmac
from yt_dlp.aes import aes_cbc_decrypt from .aes import aes_cbc_decrypt_bytes, aes_gcm_decrypt_and_verify_bytes
from yt_dlp.compat import ( from .compat import (
compat_b64decode, compat_b64decode,
compat_cookiejar_Cookie, compat_cookiejar_Cookie,
compat_pycrypto_AES
) )
from yt_dlp.utils import ( from .utils import (
bug_reports_message, bug_reports_message,
bytes_to_intlist,
expand_path, expand_path,
intlist_to_bytes,
process_communicate_or_kill, process_communicate_or_kill,
YoutubeDLCookieJar, YoutubeDLCookieJar,
) )
@ -395,11 +392,6 @@ class WindowsChromeCookieDecryptor(ChromeCookieDecryptor):
if self._v10_key is None: if self._v10_key is None:
self._logger.warning('cannot decrypt v10 cookies: no key found', only_once=True) self._logger.warning('cannot decrypt v10 cookies: no key found', only_once=True)
return None return None
elif not compat_pycrypto_AES:
self._logger.warning('cannot decrypt cookie as the `pycryptodome` module is not installed. '
'Please install by running `python3 -m pip install pycryptodome`',
only_once=True)
return None
# https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_win.cc # https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_win.cc
# kNonceLength # kNonceLength
@ -643,21 +635,18 @@ def pbkdf2_sha1(password, salt, iterations, key_length):
def _decrypt_aes_cbc(ciphertext, key, logger, initialization_vector=b' ' * 16): def _decrypt_aes_cbc(ciphertext, key, logger, initialization_vector=b' ' * 16):
plaintext = aes_cbc_decrypt(bytes_to_intlist(ciphertext), plaintext = aes_cbc_decrypt_bytes(ciphertext, key, initialization_vector)
bytes_to_intlist(key),
bytes_to_intlist(initialization_vector))
padding_length = plaintext[-1] padding_length = plaintext[-1]
try: try:
return intlist_to_bytes(plaintext[:-padding_length]).decode('utf-8') return plaintext[:-padding_length].decode('utf-8')
except UnicodeDecodeError: except UnicodeDecodeError:
logger.warning('failed to decrypt cookie because UTF-8 decoding failed. Possibly the key is wrong?', only_once=True) logger.warning('failed to decrypt cookie because UTF-8 decoding failed. Possibly the key is wrong?', only_once=True)
return None return None
def _decrypt_aes_gcm(ciphertext, key, nonce, authentication_tag, logger): def _decrypt_aes_gcm(ciphertext, key, nonce, authentication_tag, logger):
cipher = compat_pycrypto_AES.new(key, compat_pycrypto_AES.MODE_GCM, nonce)
try: try:
plaintext = cipher.decrypt_and_verify(ciphertext, authentication_tag) plaintext = aes_gcm_decrypt_and_verify_bytes(ciphertext, key, authentication_tag, nonce)
except ValueError: except ValueError:
logger.warning('failed to decrypt cookie because the MAC check failed. Possibly the key is wrong?', only_once=True) logger.warning('failed to decrypt cookie because the MAC check failed. Possibly the key is wrong?', only_once=True)
return None return None