mirror of
https://github.com/yt-dlp/yt-dlp.git
synced 2024-12-23 02:26:37 -05:00
de20687ee6
Closes #7688, #7675
283 lines
10 KiB
Python
283 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# Allow direct execution
|
|
import os
|
|
import sys
|
|
|
|
import pytest
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
import contextlib
|
|
import io
|
|
import platform
|
|
import random
|
|
import ssl
|
|
import urllib.error
|
|
import warnings
|
|
|
|
from yt_dlp.cookies import YoutubeDLCookieJar
|
|
from yt_dlp.dependencies import certifi
|
|
from yt_dlp.networking import Response
|
|
from yt_dlp.networking._helper import (
|
|
InstanceStoreMixin,
|
|
add_accept_encoding_header,
|
|
get_redirect_method,
|
|
make_socks_proxy_opts,
|
|
select_proxy,
|
|
ssl_load_certs,
|
|
)
|
|
from yt_dlp.networking.exceptions import (
|
|
HTTPError,
|
|
IncompleteRead,
|
|
_CompatHTTPError,
|
|
)
|
|
from yt_dlp.socks import ProxyType
|
|
from yt_dlp.utils.networking import HTTPHeaderDict
|
|
|
|
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
|
|
class TestNetworkingUtils:
|
|
|
|
def test_select_proxy(self):
|
|
proxies = {
|
|
'all': 'socks5://example.com',
|
|
'http': 'http://example.com:1080',
|
|
'no': 'bypass.example.com,yt-dl.org'
|
|
}
|
|
|
|
assert select_proxy('https://example.com', proxies) == proxies['all']
|
|
assert select_proxy('http://example.com', proxies) == proxies['http']
|
|
assert select_proxy('http://bypass.example.com', proxies) is None
|
|
assert select_proxy('https://yt-dl.org', proxies) is None
|
|
|
|
@pytest.mark.parametrize('socks_proxy,expected', [
|
|
('socks5h://example.com', {
|
|
'proxytype': ProxyType.SOCKS5,
|
|
'addr': 'example.com',
|
|
'port': 1080,
|
|
'rdns': True,
|
|
'username': None,
|
|
'password': None
|
|
}),
|
|
('socks5://user:@example.com:5555', {
|
|
'proxytype': ProxyType.SOCKS5,
|
|
'addr': 'example.com',
|
|
'port': 5555,
|
|
'rdns': False,
|
|
'username': 'user',
|
|
'password': ''
|
|
}),
|
|
('socks4://u%40ser:pa%20ss@127.0.0.1:1080', {
|
|
'proxytype': ProxyType.SOCKS4,
|
|
'addr': '127.0.0.1',
|
|
'port': 1080,
|
|
'rdns': False,
|
|
'username': 'u@ser',
|
|
'password': 'pa ss'
|
|
}),
|
|
('socks4a://:pa%20ss@127.0.0.1', {
|
|
'proxytype': ProxyType.SOCKS4A,
|
|
'addr': '127.0.0.1',
|
|
'port': 1080,
|
|
'rdns': True,
|
|
'username': '',
|
|
'password': 'pa ss'
|
|
})
|
|
])
|
|
def test_make_socks_proxy_opts(self, socks_proxy, expected):
|
|
assert make_socks_proxy_opts(socks_proxy) == expected
|
|
|
|
def test_make_socks_proxy_unknown(self):
|
|
with pytest.raises(ValueError, match='Unknown SOCKS proxy version: socks'):
|
|
make_socks_proxy_opts('socks://127.0.0.1')
|
|
|
|
@pytest.mark.skipif(not certifi, reason='certifi is not installed')
|
|
def test_load_certifi(self):
|
|
context_certifi = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
context_certifi.load_verify_locations(cafile=certifi.where())
|
|
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ssl_load_certs(context, use_certifi=True)
|
|
assert context.get_ca_certs() == context_certifi.get_ca_certs()
|
|
|
|
context_default = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
context_default.load_default_certs()
|
|
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ssl_load_certs(context, use_certifi=False)
|
|
assert context.get_ca_certs() == context_default.get_ca_certs()
|
|
|
|
if context_default.get_ca_certs() == context_certifi.get_ca_certs():
|
|
pytest.skip('System uses certifi as default. The test is not valid')
|
|
|
|
@pytest.mark.parametrize('method,status,expected', [
|
|
('GET', 303, 'GET'),
|
|
('HEAD', 303, 'HEAD'),
|
|
('PUT', 303, 'GET'),
|
|
('POST', 301, 'GET'),
|
|
('HEAD', 301, 'HEAD'),
|
|
('POST', 302, 'GET'),
|
|
('HEAD', 302, 'HEAD'),
|
|
('PUT', 302, 'PUT'),
|
|
('POST', 308, 'POST'),
|
|
('POST', 307, 'POST'),
|
|
('HEAD', 308, 'HEAD'),
|
|
('HEAD', 307, 'HEAD'),
|
|
])
|
|
def test_get_redirect_method(self, method, status, expected):
|
|
assert get_redirect_method(method, status) == expected
|
|
|
|
@pytest.mark.parametrize('headers,supported_encodings,expected', [
|
|
({'Accept-Encoding': 'br'}, ['gzip', 'br'], {'Accept-Encoding': 'br'}),
|
|
({}, ['gzip', 'br'], {'Accept-Encoding': 'gzip, br'}),
|
|
({'Content-type': 'application/json'}, [], {'Content-type': 'application/json', 'Accept-Encoding': 'identity'}),
|
|
])
|
|
def test_add_accept_encoding_header(self, headers, supported_encodings, expected):
|
|
headers = HTTPHeaderDict(headers)
|
|
add_accept_encoding_header(headers, supported_encodings)
|
|
assert headers == HTTPHeaderDict(expected)
|
|
|
|
|
|
class TestInstanceStoreMixin:
|
|
|
|
class FakeInstanceStoreMixin(InstanceStoreMixin):
|
|
def _create_instance(self, **kwargs):
|
|
return random.randint(0, 1000000)
|
|
|
|
def _close_instance(self, instance):
|
|
pass
|
|
|
|
def test_mixin(self):
|
|
mixin = self.FakeInstanceStoreMixin()
|
|
assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}}) == mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
|
|
|
|
assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'e', 4}}) != mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
|
|
|
|
assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}} != mixin._get_instance(d={'a': 1, 'b': 2, 'g': {'d', 4}}))
|
|
|
|
assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) == mixin._get_instance(d={'a': 1}, e=[1, 2, 3])
|
|
|
|
assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) != mixin._get_instance(d={'a': 1}, e=[1, 2, 3, 4])
|
|
|
|
cookiejar = YoutubeDLCookieJar()
|
|
assert mixin._get_instance(b=[1, 2], c=cookiejar) == mixin._get_instance(b=[1, 2], c=cookiejar)
|
|
|
|
assert mixin._get_instance(b=[1, 2], c=cookiejar) != mixin._get_instance(b=[1, 2], c=YoutubeDLCookieJar())
|
|
|
|
# Different order
|
|
assert mixin._get_instance(c=cookiejar, b=[1, 2]) == mixin._get_instance(b=[1, 2], c=cookiejar)
|
|
|
|
m = mixin._get_instance(t=1234)
|
|
assert mixin._get_instance(t=1234) == m
|
|
mixin._clear_instances()
|
|
assert mixin._get_instance(t=1234) != m
|
|
|
|
|
|
class TestNetworkingExceptions:
|
|
|
|
@staticmethod
|
|
def create_response(status):
|
|
return Response(fp=io.BytesIO(b'test'), url='http://example.com', headers={'tesT': 'test'}, status=status)
|
|
|
|
@pytest.mark.parametrize('http_error_class', [HTTPError, lambda r: _CompatHTTPError(HTTPError(r))])
|
|
def test_http_error(self, http_error_class):
|
|
|
|
response = self.create_response(403)
|
|
error = http_error_class(response)
|
|
|
|
assert error.status == 403
|
|
assert str(error) == error.msg == 'HTTP Error 403: Forbidden'
|
|
assert error.reason == response.reason
|
|
assert error.response is response
|
|
|
|
data = error.response.read()
|
|
assert data == b'test'
|
|
assert repr(error) == '<HTTPError 403: Forbidden>'
|
|
|
|
@pytest.mark.parametrize('http_error_class', [HTTPError, lambda *args, **kwargs: _CompatHTTPError(HTTPError(*args, **kwargs))])
|
|
def test_redirect_http_error(self, http_error_class):
|
|
response = self.create_response(301)
|
|
error = http_error_class(response, redirect_loop=True)
|
|
assert str(error) == error.msg == 'HTTP Error 301: Moved Permanently (redirect loop detected)'
|
|
assert error.reason == 'Moved Permanently'
|
|
|
|
def test_compat_http_error(self):
|
|
response = self.create_response(403)
|
|
error = _CompatHTTPError(HTTPError(response))
|
|
assert isinstance(error, HTTPError)
|
|
assert isinstance(error, urllib.error.HTTPError)
|
|
|
|
@contextlib.contextmanager
|
|
def raises_deprecation_warning():
|
|
with warnings.catch_warnings(record=True) as w:
|
|
warnings.simplefilter('always')
|
|
yield
|
|
|
|
if len(w) == 0:
|
|
pytest.fail('Did not raise DeprecationWarning')
|
|
if len(w) > 1:
|
|
pytest.fail(f'Raised multiple warnings: {w}')
|
|
|
|
if not issubclass(w[-1].category, DeprecationWarning):
|
|
pytest.fail(f'Expected DeprecationWarning, got {w[-1].category}')
|
|
w.clear()
|
|
|
|
with raises_deprecation_warning():
|
|
assert error.code == 403
|
|
|
|
with raises_deprecation_warning():
|
|
assert error.getcode() == 403
|
|
|
|
with raises_deprecation_warning():
|
|
assert error.hdrs is error.response.headers
|
|
|
|
with raises_deprecation_warning():
|
|
assert error.info() is error.response.headers
|
|
|
|
with raises_deprecation_warning():
|
|
assert error.headers is error.response.headers
|
|
|
|
with raises_deprecation_warning():
|
|
assert error.filename == error.response.url
|
|
|
|
with raises_deprecation_warning():
|
|
assert error.url == error.response.url
|
|
|
|
with raises_deprecation_warning():
|
|
assert error.geturl() == error.response.url
|
|
|
|
# Passthrough file operations
|
|
with raises_deprecation_warning():
|
|
assert error.read() == b'test'
|
|
|
|
with raises_deprecation_warning():
|
|
assert not error.closed
|
|
|
|
with raises_deprecation_warning():
|
|
# Technically Response operations are also passed through, which should not be used.
|
|
assert error.get_header('test') == 'test'
|
|
|
|
# Should not raise a warning
|
|
error.close()
|
|
|
|
@pytest.mark.skipif(
|
|
platform.python_implementation() == 'PyPy', reason='garbage collector works differently in pypy')
|
|
def test_compat_http_error_autoclose(self):
|
|
# Compat HTTPError should not autoclose response
|
|
response = self.create_response(403)
|
|
_CompatHTTPError(HTTPError(response))
|
|
assert not response.closed
|
|
|
|
def test_incomplete_read_error(self):
|
|
error = IncompleteRead(b'test', 3, cause='test')
|
|
assert isinstance(error, IncompleteRead)
|
|
assert repr(error) == '<IncompleteRead: 4 bytes read, 3 more expected>'
|
|
assert str(error) == error.msg == '4 bytes read, 3 more expected'
|
|
assert error.partial == b'test'
|
|
assert error.expected == 3
|
|
assert error.cause == 'test'
|
|
|
|
error = IncompleteRead(b'aaa')
|
|
assert repr(error) == '<IncompleteRead: 3 bytes read>'
|
|
assert str(error) == '3 bytes read'
|