pull/31187/merge
u-n-k-n-o-w-n 5 months ago committed by GitHub
commit 73abdd44cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -206,7 +206,10 @@ def t_factory(name, sig_func, url_pattern):
test_id = m.group('id')
def test_func(self):
basename = 'player-{0}-{1}.js'.format(name, test_id)
tn = name
if name.endswith('_wd'):
tn = name[:-3]
basename = 'player-{0}-{1}.js'.format(tn, test_id)
fn = os.path.join(self.TESTDATA_DIR, basename)
if not os.path.exists(fn):
@ -233,6 +236,10 @@ def n_sig(jscode, sig_input):
return JSInterpreter(jscode).call_function(funcname, sig_input)
def n_sig_wd(jscode, sig_input):
return YoutubeIE(FakeYDL())._call_n_function_with_webdriver('chrome', jscode, sig_input)
make_sig_test = t_factory(
'signature', signature, re.compile(r'.*-(?P<id>[a-zA-Z0-9_-]+)(?:/watch_as3|/html5player)?\.[a-z]+$'))
for test_spec in _SIG_TESTS:
@ -243,6 +250,17 @@ make_nsig_test = t_factory(
for test_spec in _NSIG_TESTS:
make_nsig_test(*test_spec)
test_wd = False
for arg in sys.argv:
if arg == '--test_wd':
test_wd = True
break
if test_wd:
sys.argv = [arg for arg in sys.argv if arg != '--test_wd']
make_nsig_wd_test = t_factory(
'nsig_wd', n_sig_wd, re.compile(r'.+/player/(?P<id>[a-zA-Z0-9_-]+)/.+.js$'))
for test_spec in _NSIG_TESTS:
make_nsig_wd_test(*test_spec)
if __name__ == '__main__':
unittest.main()

@ -415,6 +415,7 @@ def _real_main(argv=None):
'call_home': opts.call_home,
'sleep_interval': opts.sleep_interval,
'max_sleep_interval': opts.max_sleep_interval,
'webdriver': opts.webdriver,
'external_downloader': opts.external_downloader,
'list_thumbnails': opts.list_thumbnails,
'playlist_items': opts.playlist_items,

@ -2444,6 +2444,11 @@ try:
except ImportError:
import BaseHTTPServer as compat_http_server
try:
from urllib.parse import quote as compat_urllib_quote
except ImportError: # Python 2
from urllib import quote as compat_urllib_quote
try:
from urllib.parse import unquote_to_bytes as compat_urllib_parse_unquote_to_bytes
from urllib.parse import unquote as compat_urllib_parse_unquote
@ -3294,6 +3299,7 @@ __all__ = [
'compat_tokenize_tokenize',
'compat_urllib_error',
'compat_urllib_parse',
'compat_urllib_quote',
'compat_urllib_request',
'compat_urllib_request_DataHandler',
'compat_urllib_response',

@ -8,6 +8,7 @@ import os.path
import random
import re
import traceback
import importlib
from .common import InfoExtractor, SearchInfoExtractor
from ..compat import (
@ -18,12 +19,14 @@ from ..compat import (
compat_urllib_parse,
compat_urllib_parse_parse_qs as compat_parse_qs,
compat_urllib_parse_unquote_plus,
compat_urllib_quote,
compat_urllib_parse_urlparse,
compat_zip as zip,
)
from ..jsinterp import JSInterpreter
from ..utils import (
ExtractorError,
check_executable,
clean_html,
dict_get,
error_to_compat_str,
@ -1459,6 +1462,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
super(YoutubeIE, self).__init__(*args, **kwargs)
self._code_cache = {}
self._player_cache = {}
self._webdriver_wrapper = None
def _signature_cache_id(self, example_sig):
""" Return a string representation of a signature """
@ -1652,6 +1656,27 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
return lambda s: jsi.extract_function_from_code(*func_code)([s])
def _call_n_function_with_webdriver(self, webdriver_type, jscode, n_param):
if self._webdriver_wrapper is None:
self._webdriver_wrapper = WebDriverJSWrapper(webdriver_type)
self._webdriver_wrapper.get('about:blank')
funcname = self._extract_n_function_name(jscode)
alphabet = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
dummyfunc = ''.join(random.choice(alphabet) for _ in range(8))
f = ('return ((e) => {{'
'const d = decodeURIComponent(e);'
'const p = d.lastIndexOf("}}");'
'const th = d.substring(0, p);'
'const bh = d.substring(p);'
'const m = "var {0};" + th + ";{0} = {1};" + bh;'
'const s = document.createElement("script");'
's.innerHTML = m;'
'document.body.append(s);'
'return {0}("{2}");'
'}})("{3}");').format(dummyfunc, funcname, n_param, compat_urllib_quote(jscode))
n = self._webdriver_wrapper.executeJS(f)
return n
def _n_descramble(self, n_param, player_url, video_id):
"""Compute the response to YT's "n" parameter challenge,
or None
@ -1668,14 +1693,19 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
return self._player_cache[sig_id]
try:
player_id = ('nsig', player_url)
if player_id not in self._player_cache:
self._player_cache[player_id] = self._extract_n_function(video_id, player_url)
func = self._player_cache[player_id]
ret = func(n_param)
if ret.startswith('enhanced_except_'):
raise ExtractorError('Unhandled exception in decode')
self._player_cache[sig_id] = ret
webdriver_type = self._downloader.params.get('webdriver', None)
if webdriver_type is not None:
jscode = self._get_player_code(video_id, player_url)
self._player_cache[sig_id] = self._call_n_function_with_webdriver(webdriver_type, jscode, n_param)
else:
player_id = ('nsig', player_url)
if player_id not in self._player_cache:
self._player_cache[player_id] = self._extract_n_function(video_id, player_url)
func = self._player_cache[player_id]
ret = func(n_param)
if ret.startswith('enhanced_except_'):
raise ExtractorError('Unhandled exception in decode')
self._player_cache[sig_id] = ret
if self._downloader.params.get('verbose', False):
self._downloader.to_screen('[debug] [%s] %s' % (self.IE_NAME, 'Decrypted nsig {0} => {1}'.format(n_param, self._player_cache[sig_id])))
return self._player_cache[sig_id]
@ -3699,3 +3729,75 @@ class YoutubeTruncatedIDIE(InfoExtractor):
raise ExtractorError(
'Incomplete YouTube ID %s. URL %s looks truncated.' % (video_id, url),
expected=True)
class WebDriverJSWrapper(object):
"""WebDriver Wrapper class"""
def __init__(self, webdriver_type, pageload_timeout=10, script_timeout=5):
self._webdriver = None
try:
wd = importlib.import_module('selenium.webdriver')
except ImportError as e:
self._raise_exception('Failed to import module "selenium.webdriver"', cause=e)
if webdriver_type == 'firefox': # geckodriver
if not check_executable('geckodriver', ['--version']):
self._raise_exception('geckodriver not found in PATH')
o = wd.FirefoxOptions()
o.headless = True
s = wd.firefox.service.Service(log_path=os.path.devnull)
self._webdriver = wd.Firefox(options=o, service=s)
elif webdriver_type == 'chrome': # chromedriver
if not check_executable('chromedriver', ['--version']):
self._raise_exception('chromedriver not found in PATH')
o = wd.ChromeOptions()
o.headless = True
"""
If you are using the snap version of the chromium, chromedriver is included in the snap package.
You should use that driver.
$ cd /snap/bin && sudo ln -s -T chromium.chromedriver chromedriver
or
s = wd.chrome.service.Service(executable_path='chromium.chromedriver')
self._webdriver = wd.Chrome(options=o, service=s)
"""
self._webdriver = wd.Chrome(options=o)
elif webdriver_type == 'edge': # msedgedriver
if not check_executable('msedgedriver', ['--version']):
self._raise_exception('msedgedriver not found in PATH')
o = wd.EdgeOptions()
o.headless = True
self._webdriver = wd.Edge(options=o)
elif webdriver_type == 'safari': # safaridriver
if not check_executable('safaridriver', ['--version']):
self._raise_exception('safaridriver not found in PATH')
"""
safaridriver does not have headless-mode. :(
But macOS includes safaridriver by default.
To enable automation on safaridriver, run the following command once from the admin terminal.
# safaridriver --enable
"""
self._webdriver = wd.Safari()
else:
self._raise_exception('unsupported type: %s' % (webdriver_type))
self._webdriver.set_page_load_timeout(pageload_timeout)
self._webdriver.set_script_timeout(script_timeout)
def __del__(self):
if self._webdriver is not None:
self._webdriver.quit()
def _raise_exception(self, msg, cause=None):
raise ExtractorError('[WebDriverJSWrapper] %s' % (msg), cause=cause)
def get(self, url):
"""Loads a web page in the current browser session"""
self._webdriver.get(url)
def executeJS(self, jscode):
"""Execute JS and return value"""
try:
ret = self._webdriver.execute_script(jscode)
except Exception as e:
self._raise_exception('Failed to execute JS', cause=e)
return ret

@ -572,6 +572,9 @@ def parseOpts(overrideArguments=None):
'Upper bound of a range for randomized sleep before each download '
'(maximum possible number of seconds to sleep). Must only be used '
'along with --min-sleep-interval.'))
workarounds.add_option(
'--webdriver', metavar='TYPE', dest='webdriver', default=None,
help='Specify webdriver type when you want to use Selenium to execute YouTube\'s "n_function" in order to avoid throttling: "firefox", "chrome", "edge", or "safari"')
verbosity = optparse.OptionGroup(parser, 'Verbosity / Simulation Options')
verbosity.add_option(

Loading…
Cancel
Save