Compare commits

...

4 Commits

Author SHA1 Message Date
bashonly 6654b09548
Merge 8cc031f259 into 64766459e3 2024-04-28 02:45:07 +05:30
Simon Sawicki 64766459e3
[core/windows] Improve shell quoting and tests (#9802)
Authored by: Grub4K
2024-04-27 10:37:26 +02:00
bashonly 8cc031f259
add prefix url test
Authored by: bashonly
2024-04-10 10:30:38 -05:00
bashonly 9d37b9e298
[ie/tiktok:user] Fix extractor
Authored by: bashonly
2024-04-10 10:26:03 -05:00
3 changed files with 149 additions and 91 deletions

View File

@ -2059,7 +2059,22 @@ Line 1
assert extract_basic_auth('http://user:pass@foo.bar') == ('http://foo.bar', 'Basic dXNlcjpwYXNz')
@unittest.skipUnless(compat_os_name == 'nt', 'Only relevant on Windows')
def test_Popen_windows_escaping(self):
def test_windows_escaping(self):
tests = [
'test"&',
'%CMDCMDLINE:~-1%&',
'a\nb',
'"',
'\\',
'!',
'^!',
'a \\ b',
'a \\" b',
'a \\ b\\',
# We replace \r with \n
('a\r\ra', 'a\n\na'),
]
def run_shell(args):
stdout, stderr, error = Popen.run(
args, text=True, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@ -2067,15 +2082,18 @@ Line 1
assert not error
return stdout
# Test escaping
assert run_shell(['echo', 'test"&']) == '"test""&"\n'
assert run_shell(['echo', '%CMDCMDLINE:~-1%&']) == '"%CMDCMDLINE:~-1%&"\n'
assert run_shell(['echo', 'a\nb']) == '"a"\n"b"\n'
assert run_shell(['echo', '"']) == '""""\n'
assert run_shell(['echo', '\\']) == '\\\n'
# Test if delayed expansion is disabled
assert run_shell(['echo', '^!']) == '"^!"\n'
assert run_shell('echo "^!"') == '"^!"\n'
for argument in tests:
if isinstance(argument, str):
expected = argument
else:
argument, expected = argument
args = [sys.executable, '-c', 'import sys; print(end=sys.argv[1])', argument, 'end']
assert run_shell(args) == expected
escaped = shell_quote(argument, shell=True)
args = f'{sys.executable} -c "import sys; print(end=sys.argv[1])" {escaped} end'
assert run_shell(args) == expected
if __name__ == '__main__':

View File

@ -11,7 +11,6 @@ from ..compat import compat_urllib_parse_urlparse
from ..networking import HEADRequest
from ..utils import (
ExtractorError,
LazyList,
UnsupportedError,
UserNotLive,
determine_ext,
@ -793,102 +792,150 @@ class TikTokIE(TikTokBaseIE):
class TikTokUserIE(TikTokBaseIE):
IE_NAME = 'tiktok:user'
_VALID_URL = r'https?://(?:www\.)?tiktok\.com/@(?P<id>[\w\.-]+)/?(?:$|[#?])'
_WORKING = False
_VALID_URL = [
r'https?://(?:www\.)?tiktok\.com/@(?P<id>[\w\.-]+)/?(?:$|[#?])',
r'tiktokuser:(?P<id>MS4wLjABAAAA[\w-]{64})',
]
_TESTS = [{
'url': 'https://tiktok.com/@corgibobaa?lang=en',
'playlist_mincount': 45,
'info_dict': {
'id': '6935371178089399301',
'id': 'MS4wLjABAAAAepiJKgwWhulvCpSuUVsp7sgVVsFJbbNaLeQ6OQ0oAJERGDUIXhb2yxxHZedsItgT',
'title': 'corgibobaa',
'thumbnail': r're:https://.+_1080x1080\.webp'
},
'expected_warnings': ['Retrying']
}, {
'url': 'https://www.tiktok.com/@6820838815978423302',
'playlist_mincount': 5,
'info_dict': {
'id': '6820838815978423302',
'id': 'MS4wLjABAAAA0tF1nBwQVVMyrGu3CqttkNgM68Do1OXUFuCY0CRQk8fEtSVDj89HqoqvbSTmUP2W',
'title': '6820838815978423302',
'thumbnail': r're:https://.+_1080x1080\.webp'
},
'expected_warnings': ['Retrying']
}, {
'url': 'https://www.tiktok.com/@meme',
'playlist_mincount': 593,
'info_dict': {
'id': '79005827461758976',
'id': 'MS4wLjABAAAAiKfaDWeCsT3IHwY77zqWGtVRIy9v4ws1HbVi7auP1Vx7dJysU_hc5yRiGywojRD6',
'title': 'meme',
'thumbnail': r're:https://.+_1080x1080\.webp'
},
'expected_warnings': ['Retrying']
}, {
'url': 'tiktokuser:MS4wLjABAAAAM3R2BtjzVT-uAtstkl2iugMzC6AtnpkojJbjiOdDDrdsTiTR75-8lyWJCY5VvDrZ',
'playlist_mincount': 31,
'info_dict': {
'id': 'MS4wLjABAAAAM3R2BtjzVT-uAtstkl2iugMzC6AtnpkojJbjiOdDDrdsTiTR75-8lyWJCY5VvDrZ',
},
}]
_USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:115.0) Gecko/20100101 Firefox/115.0'
_API_BASE_URL = 'https://www.tiktok.com/api/creator/item_list/'
r''' # TODO: Fix by adding _signature to api_url
def _entries(self, webpage, user_id, username):
secuid = self._search_regex(r'\"secUid\":\"(?P<secUid>[^\"]+)', webpage, username)
verifyfp_cookie = self._get_cookies('https://www.tiktok.com').get('s_v_web_id')
if not verifyfp_cookie:
raise ExtractorError('Improper cookies (missing s_v_web_id).', expected=True)
api_url = f'https://m.tiktok.com/api/post/item_list/?aid=1988&cookie_enabled=true&count=30&verifyFp={verifyfp_cookie.value}&secUid={secuid}&cursor='
cursor = '0'
for page in itertools.count():
data_json = self._download_json(api_url + cursor, username, note='Downloading Page %d' % page)
for video in data_json.get('itemList', []):
video_id = video['id']
video_url = f'https://www.tiktok.com/@{user_id}/video/{video_id}'
yield self._url_result(video_url, 'TikTok', video_id, str_or_none(video.get('desc')))
if not data_json.get('hasMore'):
break
cursor = data_json['cursor']
'''
def _video_entries_api(self, webpage, user_id, username):
query = {
'user_id': user_id,
'count': 21,
'max_cursor': 0,
'min_cursor': 0,
'retry_type': 'no_retry',
'device_id': ''.join(random.choices(string.digits, k=19)), # Some endpoints don't like randomized device_id, so it isn't directly set in _call_api.
def _build_web_query(self, sec_uid, cursor):
return {
'aid': '1988',
'app_language': 'en',
'app_name': 'tiktok_web',
'browser_language': 'en-US',
'browser_name': 'Mozilla',
'browser_online': 'true',
'browser_platform': 'Win32',
'browser_version': '5.0 (Windows)',
'channel': 'tiktok_web',
'cookie_enabled': 'true',
'count': '15',
'cursor': cursor,
'device_id': ''.join(random.choices(string.digits, k=19)),
'device_platform': 'web_pc',
'focus_state': 'true',
'from_page': 'user',
'history_len': '2',
'is_fullscreen': 'false',
'is_page_visible': 'true',
'language': 'en',
'os': 'windows',
'priority_region': '',
'referer': '',
'region': 'US',
'screen_height': '1080',
'screen_width': '1920',
'secUid': sec_uid,
'type': '1', # pagination type: 0 == oldest-to-newest, 1 == newest-to-oldest
'tz_name': 'UTC',
'verifyFp': 'verify_%s' % ''.join(random.choices(string.hexdigits, k=7)),
'webcast_language': 'en',
}
def _entries(self, sec_uid, user_name):
cursor = int(time.time() * 1E3)
for page in itertools.count(1):
for retry in self.RetryManager():
try:
post_list = self._call_api(
'aweme/post', query, username, note=f'Downloading user video list page {page}',
errnote='Unable to download user video list')
except ExtractorError as e:
if isinstance(e.cause, json.JSONDecodeError) and e.cause.pos == 0:
retry.error = e
continue
raise
yield from post_list.get('aweme_list', [])
if not post_list.get('has_more'):
break
query['max_cursor'] = post_list['max_cursor']
response = self._download_json(
self._API_BASE_URL, user_name or sec_uid, f'Downloading page {page}',
query=self._build_web_query(sec_uid, cursor), headers={'User-Agent': self._USER_AGENT})
def _entries_api(self, user_id, videos):
for video in videos:
yield {
**self._parse_aweme_video_app(video),
'extractor_key': TikTokIE.ie_key(),
'extractor': 'TikTok',
'webpage_url': f'https://tiktok.com/@{user_id}/video/{video["aweme_id"]}',
}
for video in traverse_obj(response, ('itemList', lambda _, v: v['id'])):
video_id = video['id']
webpage_url = self._create_url(user_name, video_id)
info = try_call(
lambda: self._parse_aweme_video_web(video, webpage_url, video_id)) or {'id': video_id}
info.pop('formats', None)
yield self.url_result(webpage_url, TikTokIE, **info)
old_cursor = cursor
cursor = traverse_obj(
response, ('itemList', -1, 'createTime', {lambda x: x * 1E3}, {int_or_none}))
if not cursor:
cursor = old_cursor - 604800000 # jump 1 week back in time
if cursor < 1472706000000 or not traverse_obj(response, 'hasMorePrevious'):
break
def _get_sec_uid(self, user_url, user_name, msg):
webpage = self._download_webpage(
user_url, user_name, fatal=False, headers={'User-Agent': 'Mozilla/5.0'},
note=f'Downloading {msg} webpage', errnote=f'Unable to download {msg} webpage') or ''
return traverse_obj(
self._get_universal_data(webpage, user_name),
('webapp.user-detail', 'userInfo', 'user', 'secUid', {str})) or traverse_obj(
self._get_sigi_state(webpage, user_name),
('LiveRoom', 'liveRoomUserInfo', 'user', 'secUid'),
('UserModule', 'users', ..., 'secUid'),
get_all=False, expected_type=str)
def _real_extract(self, url):
user_name = self._match_id(url)
webpage = self._download_webpage(url, user_name, headers={
'User-Agent': 'facebookexternalhit/1.1 (+http://www.facebook.com/externalhit_uatext.php)'
})
user_id = self._html_search_regex(r'snssdk\d*://user/profile/(\d+)', webpage, 'user ID', default=None) or user_name
user_name, sec_uid = None, None
if url.startswith('tiktokuser:'):
sec_uid = self._match_id(url)
else:
user_name = self._match_id(url)
videos = LazyList(self._video_entries_api(webpage, user_id, user_name))
thumbnail = traverse_obj(videos, (0, 'author', 'avatar_larger', 'url_list', 0))
if not sec_uid:
for user_url, msg in (
(self._UPLOADER_URL_FORMAT % user_name, 'user'),
(self._UPLOADER_URL_FORMAT % f'{user_name}/live', 'live'),
):
sec_uid = self._get_sec_uid(user_url, user_name, msg)
if sec_uid:
break
return self.playlist_result(self._entries_api(user_id, videos), user_id, user_name, thumbnail=thumbnail)
if not sec_uid:
webpage = self._download_webpage(
f'https://www.tiktok.com/embed/@{user_name}', user_name,
note='Downloading user embed page', fatal=False) or ''
data = traverse_obj(self._search_json(
r'<script[^>]+\bid=[\'"]__FRONTITY_CONNECT_STATE__[\'"][^>]*>',
webpage, 'data', user_name, default={}),
('source', 'data', f'/embed/@{user_name}', {dict}))
for aweme_id in traverse_obj(data, ('videoList', ..., 'id')):
try:
sec_uid = self._extract_aweme_app(aweme_id).get('channel_id')
except ExtractorError:
continue
if sec_uid:
break
if not sec_uid:
raise ExtractorError(
'Unable to extract secondary user ID. Try using "tiktokuser:CHANNEL_ID" as the '
'input URL, replacing "CHANNEL_ID" with the channel_id of the requested user')
return self.playlist_result(self._entries(sec_uid, user_name), sec_uid, user_name)
class TikTokBaseListIE(TikTokBaseIE): # XXX: Conventionally, base classes should end with BaseIE/InfoExtractor

View File

@ -1638,16 +1638,14 @@ def get_filesystem_encoding():
return encoding if encoding is not None else 'utf-8'
_WINDOWS_QUOTE_TRANS = str.maketrans({'"': '\\"', '\\': '\\\\'})
_WINDOWS_QUOTE_TRANS = str.maketrans({'"': R'\"'})
_CMD_QUOTE_TRANS = str.maketrans({
# Keep quotes balanced by replacing them with `""` instead of `\\"`
'"': '""',
# Requires a variable `=` containing `"^\n\n"` (set in `utils.Popen`)
# These require an env-variable `=` containing `"^\n\n"` (set in `utils.Popen`)
# `=` should be unique since variables containing `=` cannot be set using cmd
'\n': '%=%',
# While we are only required to escape backslashes immediately before quotes,
# we instead escape all of 'em anyways to be consistent
'\\': '\\\\',
'\r': '%=%',
# Use zero length variable replacement so `%` doesn't get expanded
# `cd` is always set as long as extensions are enabled (`/E:ON` in `utils.Popen`)
'%': '%%cd:~,%',
@ -1656,19 +1654,14 @@ _CMD_QUOTE_TRANS = str.maketrans({
def shell_quote(args, *, shell=False):
args = list(variadic(args))
if any(isinstance(item, bytes) for item in args):
deprecation_warning('Passing bytes to utils.shell_quote is deprecated')
encoding = get_filesystem_encoding()
for index, item in enumerate(args):
if isinstance(item, bytes):
args[index] = item.decode(encoding)
if compat_os_name != 'nt':
return shlex.join(args)
trans = _CMD_QUOTE_TRANS if shell else _WINDOWS_QUOTE_TRANS
return ' '.join(
s if re.fullmatch(r'[\w#$*\-+./:?@\\]+', s, re.ASCII) else s.translate(trans).join('""')
s if re.fullmatch(r'[\w#$*\-+./:?@\\]+', s, re.ASCII)
else re.sub(r'(\\+)("|$)', r'\1\1\2', s).translate(trans).join('""')
for s in args)