[niconico] Back-port extractor from yt-dlp

Add Nico search extractors, fix extraction
pull/30613/head
dirkf 2 years ago
parent 73e1ab6125
commit 91278f4b6b

@ -789,7 +789,14 @@ from .nick import (
NickNightIE, NickNightIE,
NickRuIE, NickRuIE,
) )
from .niconico import NiconicoIE, NiconicoPlaylistIE from .niconico import (
NiconicoIE,
NiconicoPlaylistIE,
NiconicoUserIE,
NicovideoSearchIE,
NicovideoSearchDateIE,
NicovideoSearchURLIE,
)
from .ninecninemedia import NineCNineMediaIE from .ninecninemedia import NineCNineMediaIE
from .ninegag import NineGagIE from .ninegag import NineGagIE
from .ninenow import NineNowIE from .ninenow import NineNowIE

@ -2,25 +2,28 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import datetime import datetime
import functools import itertools
import json import json
import math import re
from .common import InfoExtractor from .common import InfoExtractor, SearchInfoExtractor
from ..postprocessor.ffmpeg import FFmpegPostProcessor
from ..compat import ( from ..compat import (
compat_parse_qs, compat_parse_qs,
compat_str,
compat_urllib_parse_urlparse, compat_urllib_parse_urlparse,
) )
from ..utils import ( from ..utils import (
determine_ext,
dict_get,
ExtractorError, ExtractorError,
dict_get,
float_or_none, float_or_none,
InAdvancePagedList,
int_or_none, int_or_none,
OnDemandPagedList,
parse_duration, parse_duration,
parse_iso8601, parse_iso8601,
PostProcessingError,
remove_start, remove_start,
str_or_none,
try_get, try_get,
unified_timestamp, unified_timestamp,
urlencode_postdata, urlencode_postdata,
@ -34,7 +37,7 @@ class NiconicoIE(InfoExtractor):
_TESTS = [{ _TESTS = [{
'url': 'http://www.nicovideo.jp/watch/sm22312215', 'url': 'http://www.nicovideo.jp/watch/sm22312215',
'md5': 'd1a75c0823e2f629128c43e1212760f9', 'md5': 'a5bad06f1347452102953f323c69da34s',
'info_dict': { 'info_dict': {
'id': 'sm22312215', 'id': 'sm22312215',
'ext': 'mp4', 'ext': 'mp4',
@ -162,6 +165,11 @@ class NiconicoIE(InfoExtractor):
_VALID_URL = r'https?://(?:www\.|secure\.|sp\.)?nicovideo\.jp/watch/(?P<id>(?:[a-z]{2})?[0-9]+)' _VALID_URL = r'https?://(?:www\.|secure\.|sp\.)?nicovideo\.jp/watch/(?P<id>(?:[a-z]{2})?[0-9]+)'
_NETRC_MACHINE = 'niconico' _NETRC_MACHINE = 'niconico'
_API_HEADERS = {
'X-Frontend-ID': '6',
'X-Frontend-Version': '0'
}
def _real_initialize(self): def _real_initialize(self):
self._login() self._login()
@ -191,37 +199,89 @@ class NiconicoIE(InfoExtractor):
self._downloader.report_warning('unable to log in: bad username or password') self._downloader.report_warning('unable to log in: bad username or password')
return login_ok return login_ok
def _extract_format_for_quality(self, api_data, video_id, audio_quality, video_quality): def _get_heartbeat_info(self, info_dict):
def yesno(boolean):
return 'yes' if boolean else 'no' video_id, video_src_id, audio_src_id = info_dict['url'].split(':')[1].split('/')
session_api_data = api_data['video']['dmcInfo']['session_api'] api_data = (
session_api_endpoint = session_api_data['urls'][0] info_dict.get('_api_data')
or self._parse_json(
format_id = '-'.join(map(lambda s: remove_start(s['id'], 'archive_'), [video_quality, audio_quality])) self._html_search_regex(
'data-api-data="([^"]+)"',
self._download_webpage('http://www.nicovideo.jp/watch/' + video_id, video_id),
'API data', default='{}'),
video_id))
session_api_data = try_get(api_data, lambda x: x['media']['delivery']['movie']['session'])
session_api_endpoint = try_get(session_api_data, lambda x: x['urls'][0])
def ping():
status = try_get(
self._download_json(
'https://nvapi.nicovideo.jp/v1/2ab0cbaa/watch', video_id,
query={'t': try_get(api_data, lambda x: x['media']['delivery']['trackingId'])},
note='Acquiring permission for downloading video',
headers=self._API_HEADERS),
lambda x: x['meta']['status'])
if status != 200:
self.report_warning('Failed to acquire permission for playing video. The video may not download.')
yesno = lambda x: 'yes' if x else 'no'
# m3u8 (encryption)
if try_get(api_data, lambda x: x['media']['delivery']['encryption']) is not None:
protocol = 'm3u8'
encryption = self._parse_json(session_api_data['token'], video_id)['hls_encryption']
session_api_http_parameters = {
'parameters': {
'hls_parameters': {
'encryption': {
encryption: {
'encrypted_key': try_get(api_data, lambda x: x['media']['delivery']['encryption']['encryptedKey']),
'key_uri': try_get(api_data, lambda x: x['media']['delivery']['encryption']['keyUri'])
}
},
'transfer_preset': '',
'use_ssl': yesno(session_api_endpoint['isSsl']),
'use_well_known_port': yesno(session_api_endpoint['isWellKnownPort']),
'segment_duration': 6000,
}
}
}
# http
else:
protocol = 'http'
session_api_http_parameters = {
'parameters': {
'http_output_download_parameters': {
'use_ssl': yesno(session_api_endpoint['isSsl']),
'use_well_known_port': yesno(session_api_endpoint['isWellKnownPort']),
}
}
}
session_response = self._download_json( session_response = self._download_json(
session_api_endpoint['url'], video_id, session_api_endpoint['url'], video_id,
query={'_format': 'json'}, query={'_format': 'json'},
headers={'Content-Type': 'application/json'}, headers={'Content-Type': 'application/json'},
note='Downloading JSON metadata for %s' % format_id, note='Downloading JSON metadata for %s' % info_dict['format_id'],
data=json.dumps({ data=json.dumps({
'session': { 'session': {
'client_info': { 'client_info': {
'player_id': session_api_data['player_id'], 'player_id': session_api_data.get('playerId'),
}, },
'content_auth': { 'content_auth': {
'auth_type': session_api_data['auth_types'][session_api_data['protocols'][0]], 'auth_type': try_get(session_api_data, lambda x: x['authTypes'][session_api_data['protocols'][0]]),
'content_key_timeout': session_api_data['content_key_timeout'], 'content_key_timeout': session_api_data.get('contentKeyTimeout'),
'service_id': 'nicovideo', 'service_id': 'nicovideo',
'service_user_id': session_api_data['service_user_id'] 'service_user_id': session_api_data.get('serviceUserId')
}, },
'content_id': session_api_data['content_id'], 'content_id': session_api_data.get('contentId'),
'content_src_id_sets': [{ 'content_src_id_sets': [{
'content_src_ids': [{ 'content_src_ids': [{
'src_id_to_mux': { 'src_id_to_mux': {
'audio_src_ids': [audio_quality['id']], 'audio_src_ids': [audio_src_id],
'video_src_ids': [video_quality['id']], 'video_src_ids': [video_src_id],
} }
}] }]
}], }],
@ -229,52 +289,81 @@ class NiconicoIE(InfoExtractor):
'content_uri': '', 'content_uri': '',
'keep_method': { 'keep_method': {
'heartbeat': { 'heartbeat': {
'lifetime': session_api_data['heartbeat_lifetime'] 'lifetime': session_api_data.get('heartbeatLifetime')
} }
}, },
'priority': session_api_data['priority'], 'priority': session_api_data.get('priority'),
'protocol': { 'protocol': {
'name': 'http', 'name': 'http',
'parameters': { 'parameters': {
'http_parameters': { 'http_parameters': session_api_http_parameters
'parameters': {
'http_output_download_parameters': {
'use_ssl': yesno(session_api_endpoint['is_ssl']),
'use_well_known_port': yesno(session_api_endpoint['is_well_known_port']),
}
}
}
} }
}, },
'recipe_id': session_api_data['recipe_id'], 'recipe_id': session_api_data.get('recipeId'),
'session_operation_auth': { 'session_operation_auth': {
'session_operation_auth_by_signature': { 'session_operation_auth_by_signature': {
'signature': session_api_data['signature'], 'signature': session_api_data.get('signature'),
'token': session_api_data['token'], 'token': session_api_data.get('token'),
} }
}, },
'timing_constraint': 'unlimited' 'timing_constraint': 'unlimited'
} }
}).encode()) }).encode())
resolution = video_quality.get('resolution', {}) info_dict['url'] = session_response['data']['session']['content_uri']
info_dict['protocol'] = protocol
# get heartbeat info
heartbeat_info_dict = {
'url': session_api_endpoint['url'] + '/' + session_response['data']['session']['id'] + '?_format=json&_method=PUT',
'data': json.dumps(session_response['data']),
# interval, convert milliseconds to seconds, then halve to make a buffer.
'interval': float_or_none(session_api_data.get('heartbeatLifetime'), scale=3000),
'ping': ping
}
return info_dict, heartbeat_info_dict
def _extract_format_for_quality(self, api_data, video_id, audio_quality, video_quality):
def parse_format_id(id_code):
mobj = re.match(r'''(?x)
(?:archive_)?
(?:(?P<codec>[^_]+)_)?
(?:(?P<br>[\d]+)kbps_)?
(?:(?P<res>[\d+]+)p_)?
''', '%s_' % id_code)
return mobj.groupdict() if mobj else {}
protocol = 'niconico_dmc'
format_id = '-'.join(map(lambda s: remove_start(s['id'], 'archive_'), [video_quality, audio_quality]))
vdict = parse_format_id(video_quality['id'])
adict = parse_format_id(audio_quality['id'])
resolution = try_get(video_quality, lambda x: x['metadata']['resolution'], dict) or {'height': vdict.get('res')}
vbr = try_get(video_quality, lambda x: x['metadata']['bitrate'], float)
return { return {
'url': session_response['data']['session']['content_uri'], 'url': '%s:%s/%s/%s' % (protocol, video_id, video_quality['id'], audio_quality['id']),
'format_id': format_id, 'format_id': format_id,
'format_note': 'DMC %s' % try_get(video_quality, lambda x: x['metadata']['label'], compat_str),
'ext': 'mp4', # Session API are used in HTML5, which always serves mp4 'ext': 'mp4', # Session API are used in HTML5, which always serves mp4
'abr': float_or_none(audio_quality.get('bitrate'), 1000), 'vcodec': vdict.get('codec'),
'vbr': float_or_none(video_quality.get('bitrate'), 1000), 'acodec': adict.get('codec'),
'height': resolution.get('height'), 'vbr': float_or_none(vbr, 1000) or float_or_none(vdict.get('br')),
'width': resolution.get('width'), 'abr': float_or_none(audio_quality.get('bitrate'), 1000) or float_or_none(adict.get('br')),
'height': int_or_none(resolution.get('height', vdict.get('res'))),
'width': int_or_none(resolution.get('width')),
'quality': -2 if 'low' in format_id else -1, # Default quality value is -1
'protocol': protocol,
'http_headers': {
'Origin': 'https://www.nicovideo.jp',
'Referer': 'https://www.nicovideo.jp/watch/' + video_id,
}
} }
def _real_extract(self, url): def _real_extract(self, url):
video_id = self._match_id(url) video_id = self._match_id(url)
# Get video webpage. We are not actually interested in it for normal # Get video webpage for API data.
# cases, but need the cookies in order to be able to download the
# info webpage
webpage, handle = self._download_webpage_handle( webpage, handle = self._download_webpage_handle(
'http://www.nicovideo.jp/watch/' + video_id, video_id) 'http://www.nicovideo.jp/watch/' + video_id, video_id)
if video_id.startswith('so'): if video_id.startswith('so'):
@ -284,86 +373,136 @@ class NiconicoIE(InfoExtractor):
'data-api-data="([^"]+)"', webpage, 'data-api-data="([^"]+)"', webpage,
'API data', default='{}'), video_id) 'API data', default='{}'), video_id)
def _format_id_from_url(video_url): def get_video_info_web(items):
return 'economy' if video_real_url.endswith('low') else 'normal' return dict_get(api_data['video'], items)
try: # Get video info
video_real_url = api_data['video']['smileInfo']['url'] video_info_xml = self._download_xml(
except KeyError: # Flash videos 'http://ext.nicovideo.jp/api/getthumbinfo/' + video_id,
# Get flv info video_id, note='Downloading video info page')
flv_info_webpage = self._download_webpage(
'http://flapi.nicovideo.jp/api/getflv/' + video_id + '?as3=1', def get_video_info_xml(items):
video_id, 'Downloading flv info') if not isinstance(items, list):
items = [items]
flv_info = compat_parse_qs(flv_info_webpage) for item in items:
if 'url' not in flv_info: ret = xpath_text(video_info_xml, './/' + item)
if 'deleted' in flv_info: if ret:
raise ExtractorError('The video has been deleted.', return ret
expected=True)
elif 'closed' in flv_info: if get_video_info_xml('error'):
raise ExtractorError('Niconico videos now require logging in', error_code = get_video_info_xml('code')
expected=True)
elif 'error' in flv_info: if error_code == 'DELETED':
raise ExtractorError('%s reports error: %s' % ( raise ExtractorError('The video has been deleted.',
self.IE_NAME, flv_info['error'][0]), expected=True) expected=True)
else: elif error_code == 'NOT_FOUND':
raise ExtractorError('Unable to find video URL') raise ExtractorError('The video is not found.',
expected=True)
video_info_xml = self._download_xml( elif error_code == 'COMMUNITY':
'http://ext.nicovideo.jp/api/getthumbinfo/' + video_id, self.to_screen('%s: The video is community members only.' % video_id)
video_id, note='Downloading video info page') else:
raise ExtractorError('%s reports error: %s' % (self.IE_NAME, error_code))
def get_video_info(items):
if not isinstance(items, list): # Start extracting video formats
items = [items] formats = []
for item in items:
ret = xpath_text(video_info_xml, './/' + item) # Get HTML5 videos info
if ret: quality_info = try_get(api_data, lambda x: x['media']['delivery']['movie'])
return ret if not quality_info:
raise ExtractorError('The video can\'t be downloaded', expected=True)
video_real_url = flv_info['url'][0]
for audio_quality in quality_info.get('audios') or {}:
extension = get_video_info('movie_type') for video_quality in quality_info.get('videos') or {}:
if not extension: if not audio_quality.get('isAvailable') or not video_quality.get('isAvailable'):
extension = determine_ext(video_real_url) continue
formats.append(self._extract_format_for_quality(
formats = [{ api_data, video_id, audio_quality, video_quality))
'url': video_real_url,
'ext': extension, # Get flv/swf info
'format_id': _format_id_from_url(video_real_url), timestamp = None
}] video_real_url = try_get(api_data, lambda x: x['video']['smileInfo']['url'])
else: if video_real_url:
formats = [] is_economy = video_real_url.endswith('low')
dmc_info = api_data['video'].get('dmcInfo') if is_economy:
if dmc_info: # "New" HTML5 videos self.report_warning('Site is currently in economy mode! You will only have access to lower quality streams')
quality_info = dmc_info['quality']
for audio_quality in quality_info['audios']: # Invoking ffprobe to determine resolution
for video_quality in quality_info['videos']: pp = FFmpegPostProcessor(self._downloader)
if not audio_quality['available'] or not video_quality['available']: cookies = self._get_cookies('https://nicovideo.jp').output(header='', sep='; path=/; domain=nicovideo.jp;\n')
continue
formats.append(self._extract_format_for_quality( self.to_screen('%s: %s' % (video_id, 'Checking smile format with ffprobe'))
api_data, video_id, audio_quality, video_quality))
try:
self._sort_formats(formats) metadata = pp.get_metadata_object(video_real_url, ['-cookies', cookies])
else: # "Old" HTML5 videos except PostProcessingError as err:
formats = [{ raise ExtractorError(err.msg, expected=True)
v_stream = a_stream = {}
# Some complex swf files doesn't have video stream (e.g. nm4809023)
for stream in metadata['streams']:
if stream['codec_type'] == 'video':
v_stream = stream
elif stream['codec_type'] == 'audio':
a_stream = stream
# Community restricted videos seem to have issues with the thumb API not returning anything at all
filesize = int(
(get_video_info_xml('size_high') if not is_economy else get_video_info_xml('size_low'))
or metadata['format']['size']
)
extension = (
get_video_info_xml('movie_type')
or 'mp4' if 'mp4' in metadata['format']['format_name'] else metadata['format']['format_name']
)
# 'creation_time' tag on video stream of re-encoded SMILEVIDEO mp4 files are '1970-01-01T00:00:00.000000Z'.
timestamp = (
parse_iso8601(get_video_info_web('first_retrieve'))
or unified_timestamp(get_video_info_web('postedDateTime'))
)
metadata_timestamp = (
parse_iso8601(try_get(v_stream, lambda x: x['tags']['creation_time']))
or timestamp if extension != 'mp4' else 0
)
# According to compconf, smile videos from pre-2017 are always better quality than their DMC counterparts
smile_threshold_timestamp = parse_iso8601('2016-12-08T00:00:00+09:00')
is_source = timestamp < smile_threshold_timestamp or metadata_timestamp > 0
# If movie file size is unstable, old server movie is not source movie.
if filesize > 1:
formats.append({
'url': video_real_url, 'url': video_real_url,
'ext': 'mp4', 'format_id': 'smile' if not is_economy else 'smile_low',
'format_id': _format_id_from_url(video_real_url), 'format_note': 'SMILEVIDEO source' if not is_economy else 'SMILEVIDEO low quality',
}] 'ext': extension,
'container': extension,
def get_video_info(items): 'vcodec': v_stream.get('codec_name'),
return dict_get(api_data['video'], items) 'acodec': a_stream.get('codec_name'),
# Some complex swf files doesn't have total bit rate metadata (e.g. nm6049209)
'tbr': int_or_none(metadata['format'].get('bit_rate'), scale=1000),
'vbr': int_or_none(v_stream.get('bit_rate'), scale=1000),
'abr': int_or_none(a_stream.get('bit_rate'), scale=1000),
'height': int_or_none(v_stream.get('height')),
'width': int_or_none(v_stream.get('width')),
'source_preference': 5 if not is_economy else -2,
'quality': 5 if is_source and not is_economy else None,
'filesize': filesize
})
self._sort_formats(formats)
# Start extracting information # Start extracting information
title = get_video_info('title') title = (
if not title: get_video_info_xml('title') # prefer to get the untranslated original title
title = self._og_search_title(webpage, default=None) or get_video_info_web(['originalTitle', 'title'])
if not title: or self._og_search_title(webpage, default=None)
title = self._html_search_regex( or self._html_search_regex(
r'<span[^>]+class="videoHeaderTitle"[^>]*>([^<]+)</span>', r'<span[^>]+class="videoHeaderTitle"[^>]*>([^<]+)</span>',
webpage, 'video title') webpage, 'video title'))
watch_api_data_string = self._html_search_regex( watch_api_data_string = self._html_search_regex(
r'<div[^>]+id="watchAPIDataContainer"[^>]+>([^<]+)</div>', r'<div[^>]+id="watchAPIDataContainer"[^>]+>([^<]+)</div>',
@ -372,14 +511,15 @@ class NiconicoIE(InfoExtractor):
video_detail = watch_api_data.get('videoDetail', {}) video_detail = watch_api_data.get('videoDetail', {})
thumbnail = ( thumbnail = (
get_video_info(['thumbnail_url', 'thumbnailURL']) self._html_search_regex(r'<meta property="og:image" content="([^"]+)">', webpage, 'thumbnail data', default=None)
or dict_get( # choose highest from 720p to 240p
get_video_info_web('thumbnail'),
['ogp', 'player', 'largeUrl', 'middleUrl', 'url'])
or self._html_search_meta('image', webpage, 'thumbnail', default=None) or self._html_search_meta('image', webpage, 'thumbnail', default=None)
or video_detail.get('thumbnail')) or video_detail.get('thumbnail'))
description = get_video_info('description') description = get_video_info_web('description')
timestamp = (parse_iso8601(get_video_info('first_retrieve'))
or unified_timestamp(get_video_info('postedDateTime')))
if not timestamp: if not timestamp:
match = self._html_search_meta('datePublished', webpage, 'date published', default=None) match = self._html_search_meta('datePublished', webpage, 'date published', default=None)
if match: if match:
@ -388,19 +528,25 @@ class NiconicoIE(InfoExtractor):
timestamp = parse_iso8601( timestamp = parse_iso8601(
video_detail['postedAt'].replace('/', '-'), video_detail['postedAt'].replace('/', '-'),
delimiter=' ', timezone=datetime.timedelta(hours=9)) delimiter=' ', timezone=datetime.timedelta(hours=9))
timestamp = timestamp or try_get(api_data, lambda x: parse_iso8601(x['video']['registeredAt']))
view_count = int_or_none(get_video_info(['view_counter', 'viewCount'])) view_count = int_or_none(get_video_info_web(['view_counter', 'viewCount']))
if not view_count: if not view_count:
match = self._html_search_regex( match = self._html_search_regex(
r'>Views: <strong[^>]*>([^<]+)</strong>', r'>Views: <strong[^>]*>([^<]+)</strong>',
webpage, 'view count', default=None) webpage, 'view count', default=None)
if match: if match:
view_count = int_or_none(match.replace(',', '')) view_count = int_or_none(match.replace(',', ''))
view_count = view_count or video_detail.get('viewCount') view_count = (
view_count
or video_detail.get('viewCount')
or try_get(api_data, lambda x: x['video']['count']['view']))
comment_count = (
int_or_none(get_video_info_web('comment_num'))
or video_detail.get('commentCount')
or try_get(api_data, lambda x: x['video']['count']['comment']))
comment_count = (int_or_none(get_video_info('comment_num'))
or video_detail.get('commentCount')
or try_get(api_data, lambda x: x['thread']['commentCount']))
if not comment_count: if not comment_count:
match = self._html_search_regex( match = self._html_search_regex(
r'>Comments: <strong[^>]*>([^<]+)</strong>', r'>Comments: <strong[^>]*>([^<]+)</strong>',
@ -409,22 +555,41 @@ class NiconicoIE(InfoExtractor):
comment_count = int_or_none(match.replace(',', '')) comment_count = int_or_none(match.replace(',', ''))
duration = (parse_duration( duration = (parse_duration(
get_video_info('length') get_video_info_web('length')
or self._html_search_meta( or self._html_search_meta(
'video:duration', webpage, 'video duration', default=None)) 'video:duration', webpage, 'video duration', default=None))
or video_detail.get('length') or video_detail.get('length')
or get_video_info('duration')) or get_video_info_web('duration'))
webpage_url = get_video_info('watch_url') or url webpage_url = get_video_info_web('watch_url') or url
# for channel movie and community movie
channel_id = try_get(
api_data,
(lambda x: x['channel']['globalId'],
lambda x: x['community']['globalId']))
channel = try_get(
api_data,
(lambda x: x['channel']['name'],
lambda x: x['community']['name']))
# Note: cannot use api_data.get('owner', {}) because owner may be set to "null" # Note: cannot use api_data.get('owner', {}) because owner may be set to "null"
# in the JSON, which will cause None to be returned instead of {}. # in the JSON, which will cause None to be returned instead of {}.
owner = try_get(api_data, lambda x: x.get('owner'), dict) or {} owner = try_get(api_data, lambda x: x.get('owner'), dict) or {}
uploader_id = get_video_info(['ch_id', 'user_id']) or owner.get('id') uploader_id = str_or_none(
uploader = get_video_info(['ch_name', 'user_nickname']) or owner.get('nickname') get_video_info_web(['ch_id', 'user_id'])
or owner.get('id')
or channel_id
)
uploader = (
get_video_info_web(['ch_name', 'user_nickname'])
or owner.get('nickname')
or channel
)
return { return {
'id': video_id, 'id': video_id,
'_api_data': api_data,
'title': title, 'title': title,
'formats': formats, 'formats': formats,
'thumbnail': thumbnail, 'thumbnail': thumbnail,
@ -432,6 +597,8 @@ class NiconicoIE(InfoExtractor):
'uploader': uploader, 'uploader': uploader,
'timestamp': timestamp, 'timestamp': timestamp,
'uploader_id': uploader_id, 'uploader_id': uploader_id,
'channel': channel,
'channel_id': channel_id,
'view_count': view_count, 'view_count': view_count,
'comment_count': comment_count, 'comment_count': comment_count,
'duration': duration, 'duration': duration,
@ -440,7 +607,7 @@ class NiconicoIE(InfoExtractor):
class NiconicoPlaylistIE(InfoExtractor): class NiconicoPlaylistIE(InfoExtractor):
_VALID_URL = r'https?://(?:www\.)?nicovideo\.jp/(?:user/\d+/)?mylist/(?P<id>\d+)' _VALID_URL = r'https?://(?:www\.)?nicovideo\.jp/(?:user/\d+/|my/)?mylist/(?P<id>\d+)'
_TESTS = [{ _TESTS = [{
'url': 'http://www.nicovideo.jp/mylist/27411728', 'url': 'http://www.nicovideo.jp/mylist/27411728',
@ -456,60 +623,185 @@ class NiconicoPlaylistIE(InfoExtractor):
'url': 'https://www.nicovideo.jp/user/805442/mylist/27411728', 'url': 'https://www.nicovideo.jp/user/805442/mylist/27411728',
'only_matching': True, 'only_matching': True,
}] }]
_PAGE_SIZE = 100
def _call_api(self, list_id, resource, query): _API_HEADERS = {
return self._download_json( 'X-Frontend-ID': '6',
'https://nvapi.nicovideo.jp/v2/mylists/' + list_id, list_id, 'X-Frontend-Version': '0'
'Downloading %s JSON metatdata' % resource, query=query, }
headers={'X-Frontend-Id': 6})['data']['mylist']
def _real_extract(self, url):
def _parse_owner(self, item): list_id = self._match_id(url)
owner = item.get('owner') or {}
if owner: def get_page_data(pagenum, pagesize):
return { return self._download_json(
'uploader': owner.get('name'), 'http://nvapi.nicovideo.jp/v2/mylists/' + list_id, list_id,
'uploader_id': owner.get('id'), query={'page': 1 + pagenum, 'pageSize': pagesize},
} headers=self._API_HEADERS).get('data').get('mylist')
return {}
data = get_page_data(0, 1)
def _fetch_page(self, list_id, page): title = data.get('name')
page += 1 description = data.get('description')
items = self._call_api(list_id, 'page %d' % page, { uploader = data.get('owner').get('name')
'page': page, uploader_id = data.get('owner').get('id')
'pageSize': self._PAGE_SIZE,
})['items'] def pagefunc(pagenum):
for item in items: data = get_page_data(pagenum, 25)
video = item.get('video') or {} return ({
video_id = video.get('id')
if not video_id:
continue
count = video.get('count') or {}
get_count = lambda x: int_or_none(count.get(x))
info = {
'_type': 'url', '_type': 'url',
'id': video_id, 'url': 'http://www.nicovideo.jp/watch/' + item.get('watchId'),
'title': video.get('title'), } for item in data.get('items'))
'url': 'https://www.nicovideo.jp/watch/' + video_id,
'description': video.get('shortDescription'), return {
'duration': int_or_none(video.get('duration')), '_type': 'playlist',
'view_count': get_count('view'), 'id': list_id,
'comment_count': get_count('comment'), 'title': title,
'ie_key': NiconicoIE.ie_key(), 'description': description,
} 'uploader': uploader,
info.update(self._parse_owner(video)) 'uploader_id': uploader_id,
yield info 'entries': OnDemandPagedList(pagefunc, 25),
}
class NicovideoSearchBaseIE(InfoExtractor):
_MAX_RESULTS = float('inf')
def _entries(self, url, item_id, query=None, note='Downloading page %(page)s'):
query = query or {}
pages = [query['page']] if 'page' in query else itertools.count(1)
for page_num in pages:
query['page'] = str(page_num)
webpage = self._download_webpage(url, item_id, query=query, note=note % {'page': page_num})
results = re.findall(r'(?<=data-video-id=)["\']?(?P<videoid>.+?)(?=["\'])', webpage)
for item in results:
yield self.url_result('http://www.nicovideo.jp/watch/%s' % item, 'Niconico', item)
if not results:
break
def _get_n_results(self, query, n):
entries = self._entries(self._proto_relative_url('//www.nicovideo.jp/search/%s' % query), query)
if n < self._MAX_RESULTS:
entries = itertools.islice(entries, 0, n)
return self.playlist_result(entries, query, query)
class NicovideoSearchIE(NicovideoSearchBaseIE, SearchInfoExtractor):
IE_DESC = 'Nico video search'
IE_NAME = 'nicovideo:search'
_SEARCH_KEY = 'nicosearch'
def _search_results(self, query):
return self._entries(
self._proto_relative_url('//www.nicovideo.jp/search/%s' % query), query)
class NicovideoSearchURLIE(NicovideoSearchBaseIE):
IE_NAME = '%s_url' % NicovideoSearchIE.IE_NAME
IE_DESC = 'Nico video search URLs'
_VALID_URL = r'https?://(?:www\.)?nicovideo\.jp/search/(?P<id>[^?#&]+)?'
_TESTS = [{
'url': 'http://www.nicovideo.jp/search/sm9',
'info_dict': {
'id': 'sm9',
'title': 'sm9'
},
'playlist_mincount': 40,
}, {
'url': 'https://www.nicovideo.jp/search/sm9?sort=h&order=d&end=2020-12-31&start=2020-01-01',
'info_dict': {
'id': 'sm9',
'title': 'sm9'
},
'playlist_count': 31,
}]
def _real_extract(self, url):
query = self._match_id(url)
return self.playlist_result(self._entries(url, query), query, query)
class NicovideoSearchDateIE(NicovideoSearchBaseIE, SearchInfoExtractor):
IE_DESC = 'Nico video search, newest first'
IE_NAME = '%s:date' % NicovideoSearchIE.IE_NAME
_SEARCH_KEY = 'nicosearchdate'
_TESTS = [{
'url': 'nicosearchdateall:a',
'info_dict': {
'id': 'a',
'title': 'a'
},
'playlist_mincount': 1610,
}]
_START_DATE = datetime.date(2007, 1, 1)
_RESULTS_PER_PAGE = 32
_MAX_PAGES = 50
def _entries(self, url, item_id, start_date=None, end_date=None):
start_date, end_date = start_date or self._START_DATE, end_date or datetime.datetime.now().date()
# If the last page has a full page of videos, we need to break down the query interval further
last_page_len = len(list(self._get_entries_for_date(
url, item_id, start_date, end_date, self._MAX_PAGES,
note='Checking number of videos from {0} to {1}'.format(start_date, end_date))))
if (last_page_len == self._RESULTS_PER_PAGE and start_date != end_date):
midpoint = start_date + ((end_date - start_date) // 2)
for entry in itertools.chain(
iter(self._entries(url, item_id, midpoint, end_date)),
iter(self._entries(url, item_id, start_date, midpoint))):
yield entry
else:
self.to_screen('{0}: Downloading results from {1} to {2}'.format(item_id, start_date, end_date))
for entry in iter(self._get_entries_for_date(
url, item_id, start_date, end_date, note=' Downloading page %(page)s')):
yield entry
def _get_entries_for_date(self, url, item_id, start_date, end_date=None, page_num=None, note=None):
query = {
'start': compat_str(start_date),
'end': compat_str(end_date or start_date),
'sort': 'f',
'order': 'd',
}
if page_num:
query['page'] = compat_str(page_num)
for entry in iter(super(NicovideoSearchDateIE, self)._entries(url, item_id, query=query, note=note)):
yield entry
class NiconicoUserIE(InfoExtractor):
_VALID_URL = r'https?://(?:www\.)?nicovideo\.jp/user/(?P<id>\d+)/?(?:$|[#?])'
_TEST = {
'url': 'https://www.nicovideo.jp/user/419948',
'info_dict': {
'id': '419948',
},
'playlist_mincount': 101,
}
_API_URL = "https://nvapi.nicovideo.jp/v1/users/%s/videos?sortKey=registeredAt&sortOrder=desc&pageSize=%s&page=%s"
_PAGE_SIZE = 100
_API_HEADERS = {
'X-Frontend-ID': '6',
'X-Frontend-Version': '0'
}
def _entries(self, list_id):
total_count = 1
count = page_num = 0
while count < total_count:
json_parsed = self._download_json(
self._API_URL % (list_id, self._PAGE_SIZE, page_num + 1), list_id,
headers=self._API_HEADERS,
note='Downloading JSON metadata%s' % (' page %d' % page_num if page_num else ''))
if not page_num:
total_count = int_or_none(json_parsed['data'].get('totalCount'))
for entry in json_parsed["data"]["items"]:
count += 1
yield self.url_result('https://www.nicovideo.jp/watch/%s' % entry['id'])
page_num += 1
def _real_extract(self, url): def _real_extract(self, url):
list_id = self._match_id(url) list_id = self._match_id(url)
mylist = self._call_api(list_id, 'list', { return self.playlist_result(self._entries(list_id), list_id)
'pageSize': 1,
})
entries = InAdvancePagedList(
functools.partial(self._fetch_page, list_id),
math.ceil(mylist['totalItemCount'] / self._PAGE_SIZE),
self._PAGE_SIZE)
result = self.playlist_result(
entries, list_id, mylist.get('name'), mylist.get('description'))
result.update(self._parse_owner(mylist))
return result

Loading…
Cancel
Save