mirror of https://github.com/yt-dlp/yt-dlp.git
Compare commits
4 Commits
a74f00e92b
...
6654b09548
Author | SHA1 | Date |
---|---|---|
bashonly | 6654b09548 | |
Simon Sawicki | 64766459e3 | |
bashonly | 8cc031f259 | |
bashonly | 9d37b9e298 |
|
@ -2059,7 +2059,22 @@ Line 1
|
|||
assert extract_basic_auth('http://user:pass@foo.bar') == ('http://foo.bar', 'Basic dXNlcjpwYXNz')
|
||||
|
||||
@unittest.skipUnless(compat_os_name == 'nt', 'Only relevant on Windows')
|
||||
def test_Popen_windows_escaping(self):
|
||||
def test_windows_escaping(self):
|
||||
tests = [
|
||||
'test"&',
|
||||
'%CMDCMDLINE:~-1%&',
|
||||
'a\nb',
|
||||
'"',
|
||||
'\\',
|
||||
'!',
|
||||
'^!',
|
||||
'a \\ b',
|
||||
'a \\" b',
|
||||
'a \\ b\\',
|
||||
# We replace \r with \n
|
||||
('a\r\ra', 'a\n\na'),
|
||||
]
|
||||
|
||||
def run_shell(args):
|
||||
stdout, stderr, error = Popen.run(
|
||||
args, text=True, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
|
@ -2067,15 +2082,18 @@ Line 1
|
|||
assert not error
|
||||
return stdout
|
||||
|
||||
# Test escaping
|
||||
assert run_shell(['echo', 'test"&']) == '"test""&"\n'
|
||||
assert run_shell(['echo', '%CMDCMDLINE:~-1%&']) == '"%CMDCMDLINE:~-1%&"\n'
|
||||
assert run_shell(['echo', 'a\nb']) == '"a"\n"b"\n'
|
||||
assert run_shell(['echo', '"']) == '""""\n'
|
||||
assert run_shell(['echo', '\\']) == '\\\n'
|
||||
# Test if delayed expansion is disabled
|
||||
assert run_shell(['echo', '^!']) == '"^!"\n'
|
||||
assert run_shell('echo "^!"') == '"^!"\n'
|
||||
for argument in tests:
|
||||
if isinstance(argument, str):
|
||||
expected = argument
|
||||
else:
|
||||
argument, expected = argument
|
||||
|
||||
args = [sys.executable, '-c', 'import sys; print(end=sys.argv[1])', argument, 'end']
|
||||
assert run_shell(args) == expected
|
||||
|
||||
escaped = shell_quote(argument, shell=True)
|
||||
args = f'{sys.executable} -c "import sys; print(end=sys.argv[1])" {escaped} end'
|
||||
assert run_shell(args) == expected
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -11,7 +11,6 @@ from ..compat import compat_urllib_parse_urlparse
|
|||
from ..networking import HEADRequest
|
||||
from ..utils import (
|
||||
ExtractorError,
|
||||
LazyList,
|
||||
UnsupportedError,
|
||||
UserNotLive,
|
||||
determine_ext,
|
||||
|
@ -793,102 +792,150 @@ class TikTokIE(TikTokBaseIE):
|
|||
|
||||
class TikTokUserIE(TikTokBaseIE):
|
||||
IE_NAME = 'tiktok:user'
|
||||
_VALID_URL = r'https?://(?:www\.)?tiktok\.com/@(?P<id>[\w\.-]+)/?(?:$|[#?])'
|
||||
_WORKING = False
|
||||
_VALID_URL = [
|
||||
r'https?://(?:www\.)?tiktok\.com/@(?P<id>[\w\.-]+)/?(?:$|[#?])',
|
||||
r'tiktokuser:(?P<id>MS4wLjABAAAA[\w-]{64})',
|
||||
]
|
||||
_TESTS = [{
|
||||
'url': 'https://tiktok.com/@corgibobaa?lang=en',
|
||||
'playlist_mincount': 45,
|
||||
'info_dict': {
|
||||
'id': '6935371178089399301',
|
||||
'id': 'MS4wLjABAAAAepiJKgwWhulvCpSuUVsp7sgVVsFJbbNaLeQ6OQ0oAJERGDUIXhb2yxxHZedsItgT',
|
||||
'title': 'corgibobaa',
|
||||
'thumbnail': r're:https://.+_1080x1080\.webp'
|
||||
},
|
||||
'expected_warnings': ['Retrying']
|
||||
}, {
|
||||
'url': 'https://www.tiktok.com/@6820838815978423302',
|
||||
'playlist_mincount': 5,
|
||||
'info_dict': {
|
||||
'id': '6820838815978423302',
|
||||
'id': 'MS4wLjABAAAA0tF1nBwQVVMyrGu3CqttkNgM68Do1OXUFuCY0CRQk8fEtSVDj89HqoqvbSTmUP2W',
|
||||
'title': '6820838815978423302',
|
||||
'thumbnail': r're:https://.+_1080x1080\.webp'
|
||||
},
|
||||
'expected_warnings': ['Retrying']
|
||||
}, {
|
||||
'url': 'https://www.tiktok.com/@meme',
|
||||
'playlist_mincount': 593,
|
||||
'info_dict': {
|
||||
'id': '79005827461758976',
|
||||
'id': 'MS4wLjABAAAAiKfaDWeCsT3IHwY77zqWGtVRIy9v4ws1HbVi7auP1Vx7dJysU_hc5yRiGywojRD6',
|
||||
'title': 'meme',
|
||||
'thumbnail': r're:https://.+_1080x1080\.webp'
|
||||
},
|
||||
'expected_warnings': ['Retrying']
|
||||
}, {
|
||||
'url': 'tiktokuser:MS4wLjABAAAAM3R2BtjzVT-uAtstkl2iugMzC6AtnpkojJbjiOdDDrdsTiTR75-8lyWJCY5VvDrZ',
|
||||
'playlist_mincount': 31,
|
||||
'info_dict': {
|
||||
'id': 'MS4wLjABAAAAM3R2BtjzVT-uAtstkl2iugMzC6AtnpkojJbjiOdDDrdsTiTR75-8lyWJCY5VvDrZ',
|
||||
},
|
||||
}]
|
||||
_USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:115.0) Gecko/20100101 Firefox/115.0'
|
||||
_API_BASE_URL = 'https://www.tiktok.com/api/creator/item_list/'
|
||||
|
||||
r''' # TODO: Fix by adding _signature to api_url
|
||||
def _entries(self, webpage, user_id, username):
|
||||
secuid = self._search_regex(r'\"secUid\":\"(?P<secUid>[^\"]+)', webpage, username)
|
||||
verifyfp_cookie = self._get_cookies('https://www.tiktok.com').get('s_v_web_id')
|
||||
if not verifyfp_cookie:
|
||||
raise ExtractorError('Improper cookies (missing s_v_web_id).', expected=True)
|
||||
api_url = f'https://m.tiktok.com/api/post/item_list/?aid=1988&cookie_enabled=true&count=30&verifyFp={verifyfp_cookie.value}&secUid={secuid}&cursor='
|
||||
cursor = '0'
|
||||
for page in itertools.count():
|
||||
data_json = self._download_json(api_url + cursor, username, note='Downloading Page %d' % page)
|
||||
for video in data_json.get('itemList', []):
|
||||
video_id = video['id']
|
||||
video_url = f'https://www.tiktok.com/@{user_id}/video/{video_id}'
|
||||
yield self._url_result(video_url, 'TikTok', video_id, str_or_none(video.get('desc')))
|
||||
if not data_json.get('hasMore'):
|
||||
break
|
||||
cursor = data_json['cursor']
|
||||
'''
|
||||
|
||||
def _video_entries_api(self, webpage, user_id, username):
|
||||
query = {
|
||||
'user_id': user_id,
|
||||
'count': 21,
|
||||
'max_cursor': 0,
|
||||
'min_cursor': 0,
|
||||
'retry_type': 'no_retry',
|
||||
'device_id': ''.join(random.choices(string.digits, k=19)), # Some endpoints don't like randomized device_id, so it isn't directly set in _call_api.
|
||||
def _build_web_query(self, sec_uid, cursor):
|
||||
return {
|
||||
'aid': '1988',
|
||||
'app_language': 'en',
|
||||
'app_name': 'tiktok_web',
|
||||
'browser_language': 'en-US',
|
||||
'browser_name': 'Mozilla',
|
||||
'browser_online': 'true',
|
||||
'browser_platform': 'Win32',
|
||||
'browser_version': '5.0 (Windows)',
|
||||
'channel': 'tiktok_web',
|
||||
'cookie_enabled': 'true',
|
||||
'count': '15',
|
||||
'cursor': cursor,
|
||||
'device_id': ''.join(random.choices(string.digits, k=19)),
|
||||
'device_platform': 'web_pc',
|
||||
'focus_state': 'true',
|
||||
'from_page': 'user',
|
||||
'history_len': '2',
|
||||
'is_fullscreen': 'false',
|
||||
'is_page_visible': 'true',
|
||||
'language': 'en',
|
||||
'os': 'windows',
|
||||
'priority_region': '',
|
||||
'referer': '',
|
||||
'region': 'US',
|
||||
'screen_height': '1080',
|
||||
'screen_width': '1920',
|
||||
'secUid': sec_uid,
|
||||
'type': '1', # pagination type: 0 == oldest-to-newest, 1 == newest-to-oldest
|
||||
'tz_name': 'UTC',
|
||||
'verifyFp': 'verify_%s' % ''.join(random.choices(string.hexdigits, k=7)),
|
||||
'webcast_language': 'en',
|
||||
}
|
||||
|
||||
def _entries(self, sec_uid, user_name):
|
||||
cursor = int(time.time() * 1E3)
|
||||
for page in itertools.count(1):
|
||||
for retry in self.RetryManager():
|
||||
try:
|
||||
post_list = self._call_api(
|
||||
'aweme/post', query, username, note=f'Downloading user video list page {page}',
|
||||
errnote='Unable to download user video list')
|
||||
except ExtractorError as e:
|
||||
if isinstance(e.cause, json.JSONDecodeError) and e.cause.pos == 0:
|
||||
retry.error = e
|
||||
continue
|
||||
raise
|
||||
yield from post_list.get('aweme_list', [])
|
||||
if not post_list.get('has_more'):
|
||||
break
|
||||
query['max_cursor'] = post_list['max_cursor']
|
||||
response = self._download_json(
|
||||
self._API_BASE_URL, user_name or sec_uid, f'Downloading page {page}',
|
||||
query=self._build_web_query(sec_uid, cursor), headers={'User-Agent': self._USER_AGENT})
|
||||
|
||||
def _entries_api(self, user_id, videos):
|
||||
for video in videos:
|
||||
yield {
|
||||
**self._parse_aweme_video_app(video),
|
||||
'extractor_key': TikTokIE.ie_key(),
|
||||
'extractor': 'TikTok',
|
||||
'webpage_url': f'https://tiktok.com/@{user_id}/video/{video["aweme_id"]}',
|
||||
}
|
||||
for video in traverse_obj(response, ('itemList', lambda _, v: v['id'])):
|
||||
video_id = video['id']
|
||||
webpage_url = self._create_url(user_name, video_id)
|
||||
info = try_call(
|
||||
lambda: self._parse_aweme_video_web(video, webpage_url, video_id)) or {'id': video_id}
|
||||
info.pop('formats', None)
|
||||
yield self.url_result(webpage_url, TikTokIE, **info)
|
||||
|
||||
old_cursor = cursor
|
||||
cursor = traverse_obj(
|
||||
response, ('itemList', -1, 'createTime', {lambda x: x * 1E3}, {int_or_none}))
|
||||
if not cursor:
|
||||
cursor = old_cursor - 604800000 # jump 1 week back in time
|
||||
if cursor < 1472706000000 or not traverse_obj(response, 'hasMorePrevious'):
|
||||
break
|
||||
|
||||
def _get_sec_uid(self, user_url, user_name, msg):
|
||||
webpage = self._download_webpage(
|
||||
user_url, user_name, fatal=False, headers={'User-Agent': 'Mozilla/5.0'},
|
||||
note=f'Downloading {msg} webpage', errnote=f'Unable to download {msg} webpage') or ''
|
||||
return traverse_obj(
|
||||
self._get_universal_data(webpage, user_name),
|
||||
('webapp.user-detail', 'userInfo', 'user', 'secUid', {str})) or traverse_obj(
|
||||
self._get_sigi_state(webpage, user_name),
|
||||
('LiveRoom', 'liveRoomUserInfo', 'user', 'secUid'),
|
||||
('UserModule', 'users', ..., 'secUid'),
|
||||
get_all=False, expected_type=str)
|
||||
|
||||
def _real_extract(self, url):
|
||||
user_name = self._match_id(url)
|
||||
webpage = self._download_webpage(url, user_name, headers={
|
||||
'User-Agent': 'facebookexternalhit/1.1 (+http://www.facebook.com/externalhit_uatext.php)'
|
||||
})
|
||||
user_id = self._html_search_regex(r'snssdk\d*://user/profile/(\d+)', webpage, 'user ID', default=None) or user_name
|
||||
user_name, sec_uid = None, None
|
||||
if url.startswith('tiktokuser:'):
|
||||
sec_uid = self._match_id(url)
|
||||
else:
|
||||
user_name = self._match_id(url)
|
||||
|
||||
videos = LazyList(self._video_entries_api(webpage, user_id, user_name))
|
||||
thumbnail = traverse_obj(videos, (0, 'author', 'avatar_larger', 'url_list', 0))
|
||||
if not sec_uid:
|
||||
for user_url, msg in (
|
||||
(self._UPLOADER_URL_FORMAT % user_name, 'user'),
|
||||
(self._UPLOADER_URL_FORMAT % f'{user_name}/live', 'live'),
|
||||
):
|
||||
sec_uid = self._get_sec_uid(user_url, user_name, msg)
|
||||
if sec_uid:
|
||||
break
|
||||
|
||||
return self.playlist_result(self._entries_api(user_id, videos), user_id, user_name, thumbnail=thumbnail)
|
||||
if not sec_uid:
|
||||
webpage = self._download_webpage(
|
||||
f'https://www.tiktok.com/embed/@{user_name}', user_name,
|
||||
note='Downloading user embed page', fatal=False) or ''
|
||||
data = traverse_obj(self._search_json(
|
||||
r'<script[^>]+\bid=[\'"]__FRONTITY_CONNECT_STATE__[\'"][^>]*>',
|
||||
webpage, 'data', user_name, default={}),
|
||||
('source', 'data', f'/embed/@{user_name}', {dict}))
|
||||
|
||||
for aweme_id in traverse_obj(data, ('videoList', ..., 'id')):
|
||||
try:
|
||||
sec_uid = self._extract_aweme_app(aweme_id).get('channel_id')
|
||||
except ExtractorError:
|
||||
continue
|
||||
if sec_uid:
|
||||
break
|
||||
|
||||
if not sec_uid:
|
||||
raise ExtractorError(
|
||||
'Unable to extract secondary user ID. Try using "tiktokuser:CHANNEL_ID" as the '
|
||||
'input URL, replacing "CHANNEL_ID" with the channel_id of the requested user')
|
||||
|
||||
return self.playlist_result(self._entries(sec_uid, user_name), sec_uid, user_name)
|
||||
|
||||
|
||||
class TikTokBaseListIE(TikTokBaseIE): # XXX: Conventionally, base classes should end with BaseIE/InfoExtractor
|
||||
|
|
|
@ -1638,16 +1638,14 @@ def get_filesystem_encoding():
|
|||
return encoding if encoding is not None else 'utf-8'
|
||||
|
||||
|
||||
_WINDOWS_QUOTE_TRANS = str.maketrans({'"': '\\"', '\\': '\\\\'})
|
||||
_WINDOWS_QUOTE_TRANS = str.maketrans({'"': R'\"'})
|
||||
_CMD_QUOTE_TRANS = str.maketrans({
|
||||
# Keep quotes balanced by replacing them with `""` instead of `\\"`
|
||||
'"': '""',
|
||||
# Requires a variable `=` containing `"^\n\n"` (set in `utils.Popen`)
|
||||
# These require an env-variable `=` containing `"^\n\n"` (set in `utils.Popen`)
|
||||
# `=` should be unique since variables containing `=` cannot be set using cmd
|
||||
'\n': '%=%',
|
||||
# While we are only required to escape backslashes immediately before quotes,
|
||||
# we instead escape all of 'em anyways to be consistent
|
||||
'\\': '\\\\',
|
||||
'\r': '%=%',
|
||||
# Use zero length variable replacement so `%` doesn't get expanded
|
||||
# `cd` is always set as long as extensions are enabled (`/E:ON` in `utils.Popen`)
|
||||
'%': '%%cd:~,%',
|
||||
|
@ -1656,19 +1654,14 @@ _CMD_QUOTE_TRANS = str.maketrans({
|
|||
|
||||
def shell_quote(args, *, shell=False):
|
||||
args = list(variadic(args))
|
||||
if any(isinstance(item, bytes) for item in args):
|
||||
deprecation_warning('Passing bytes to utils.shell_quote is deprecated')
|
||||
encoding = get_filesystem_encoding()
|
||||
for index, item in enumerate(args):
|
||||
if isinstance(item, bytes):
|
||||
args[index] = item.decode(encoding)
|
||||
|
||||
if compat_os_name != 'nt':
|
||||
return shlex.join(args)
|
||||
|
||||
trans = _CMD_QUOTE_TRANS if shell else _WINDOWS_QUOTE_TRANS
|
||||
return ' '.join(
|
||||
s if re.fullmatch(r'[\w#$*\-+./:?@\\]+', s, re.ASCII) else s.translate(trans).join('""')
|
||||
s if re.fullmatch(r'[\w#$*\-+./:?@\\]+', s, re.ASCII)
|
||||
else re.sub(r'(\\+)("|$)', r'\1\1\2', s).translate(trans).join('""')
|
||||
for s in args)
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue