Coverage for app \ services \ download_service.py: 41%

394 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 20:58 -0400

1# Youtube download helper 

2 

3import copy 

4import json 

5import os 

6import platform 

7import re 

8import shutil 

9import subprocess 

10from pathlib import Path 

11from typing import Any, Callable, Dict, Optional, Sequence, Tuple 

12 

13import yt_dlp 

14 

15from app.services.video_files import find_primary_video_file 

16from app.settings import DOWNLOADS_DIR 

17 

18WINDOWS_BROWSERS = ("chrome", "firefox", "edge") 

19MACOS_BROWSERS = ("chrome", "firefox") 

20PROTECTED_PLAYER_CLIENTS = ( 

21 ("tv_downgraded", "web_safari"), 

22 ("web_creator", "tv_downgraded"), 

23 ("web_embedded", "android", "web"), 

24 ("android", "web"), 

25) 

26 

27 

28def download_youtube(url: str) -> Dict[str, Any]: 

29 result = { 

30 "success": False, 

31 "message": "Download error", 

32 "video_id": None, 

33 "title": None, 

34 "thumbnail": None, 

35 "files": [], 

36 } 

37 

38 normalized_url = _normalize_youtube_url(url) 

39 if not normalized_url: 

40 result["message"] = "Please provide a valid YouTube URL." 

41 result["error_code"] = "invalid_url" 

42 result["auth_source"] = "none" 

43 return result 

44 

45 system_name = platform.system() 

46 auth_profile = _select_auth_profile(system_name=system_name) 

47 ffmpeg_path = _resolve_ffmpeg_path() 

48 has_ffmpeg = ffmpeg_path is not None 

49 has_node = shutil.which("node") is not None 

50 user_agent = (os.getenv("YTDLP_USER_AGENT") or "").strip() or None 

51 result["auth_source"] = auth_profile["source"] 

52 

53 try: 

54 info = _extract_metadata( 

55 normalized_url, 

56 auth_profile=auth_profile, 

57 has_node=has_node, 

58 user_agent=user_agent, 

59 system_name=system_name, 

60 ) 

61 video_id = info.get("id", "unknown") 

62 title = info.get("title", "Untitled Video") 

63 thumbnail = info.get("thumbnail", "") 

64 duration = info.get("duration", 0) 

65 

66 result["video_id"] = video_id 

67 result["title"] = title 

68 result["thumbnail"] = thumbnail 

69 

70 video_dir = DOWNLOADS_DIR / video_id 

71 video_dir.mkdir(parents=True, exist_ok=True) 

72 _remove_stale_invalid_mp4(video_dir, video_id) 

73 

74 used_player_client = _download_video( 

75 normalized_url, 

76 video_dir=video_dir, 

77 video_id=video_id, 

78 auth_profile=auth_profile, 

79 has_ffmpeg=has_ffmpeg, 

80 ffmpeg_path=ffmpeg_path, 

81 has_node=has_node, 

82 user_agent=user_agent, 

83 system_name=system_name, 

84 ) 

85 

86 subtitle_warning = _download_subtitles( 

87 normalized_url, 

88 video_dir=video_dir, 

89 video_id=video_id, 

90 auth_profile=auth_profile, 

91 has_node=has_node, 

92 used_player_client=used_player_client, 

93 user_agent=user_agent, 

94 ) 

95 

96 created = _collect_created_files(video_dir) 

97 video_path = find_primary_video_file(video_dir) 

98 

99 if not video_path: 

100 result["message"] = "Download completed but no video file was found." 

101 result["error_code"] = "video_file_missing" 

102 result["files"] = created 

103 if used_player_client: 

104 result["used_player_client"] = used_player_client 

105 return result 

106 

107 if video_path.suffix.lower() == ".mp4" and not _looks_like_mp4(video_path): 

108 repaired = _repair_invalid_mp4(video_path, ffmpeg_path) 

109 if repaired: 

110 created = _collect_created_files(video_dir) 

111 video_path = repaired 

112 if video_path.suffix.lower() == ".mp4" and not _looks_like_mp4(video_path): 

113 result["message"] = ( 

114 "Downloaded file is not a valid MP4 container. " 

115 "Restart the app after installing ffmpeg or retry with a non-SABR format." 

116 ) 

117 result["error_code"] = "invalid_container" 

118 result["files"] = created 

119 if used_player_client: 

120 result["used_player_client"] = used_player_client 

121 return result 

122 

123 meta = { 

124 "video_id": video_id, 

125 "title": title, 

126 "thumbnail": thumbnail, 

127 "duration": duration, 

128 "local_path": f"/downloads/{video_path.relative_to(DOWNLOADS_DIR).as_posix()}", 

129 } 

130 

131 meta_path = video_dir / "meta.json" 

132 meta_path.write_text(json.dumps(meta, indent=2), encoding="utf-8") 

133 

134 result.update( 

135 { 

136 "success": True, 

137 "message": "Video downloaded successfully.", 

138 "files": created, 

139 "duration": duration, 

140 "local_path": meta["local_path"], 

141 "auth_source": auth_profile["source"], 

142 } 

143 ) 

144 if used_player_client: 

145 result["used_player_client"] = used_player_client 

146 if subtitle_warning: 

147 result["subtitle_warning"] = subtitle_warning 

148 return result 

149 

150 except yt_dlp.utils.DownloadError as exc: # type: ignore[attr-defined] 

151 _apply_download_error( 

152 result, 

153 message=str(exc), 

154 auth_profile=auth_profile, 

155 system_name=system_name, 

156 user_agent_set=bool(user_agent), 

157 ) 

158 return result 

159 except Exception as exc: 

160 result["message"] = f"Unexpected error: {exc}" 

161 result["error_code"] = "unexpected_error" 

162 result["auth_source"] = auth_profile["source"] 

163 return result 

164 

165 

166def _normalize_youtube_url(url: str) -> Optional[str]: 

167 candidate = (url or "").strip() 

168 if not candidate: 

169 return None 

170 lowered = candidate.lower() 

171 if not candidate.startswith("http"): 

172 return None 

173 if "youtube.com" not in lowered and "youtu.be" not in lowered: 

174 return None 

175 return candidate 

176 

177 

178def _extract_metadata( 

179 url: str, 

180 auth_profile: Dict[str, Any], 

181 has_node: bool, 

182 user_agent: Optional[str], 

183 system_name: str, 

184) -> Dict[str, Any]: 

185 opts = _build_metadata_opts(auth_profile=auth_profile, has_node=has_node) 

186 info, _ = _run_ytdlp_with_auth_fallback( 

187 url, 

188 base_opts=opts, 

189 auth_profile=auth_profile, 

190 user_agent=user_agent, 

191 system_name=system_name, 

192 operation=lambda attempt_opts: _run_extract_info(url, attempt_opts), 

193 ) 

194 return info 

195 

196 

197def _download_video( 

198 url: str, 

199 video_dir: Path, 

200 video_id: str, 

201 auth_profile: Dict[str, Any], 

202 has_ffmpeg: bool, 

203 ffmpeg_path: Optional[str], 

204 has_node: bool, 

205 user_agent: Optional[str], 

206 system_name: str, 

207) -> Optional[Sequence[str]]: 

208 opts = _build_download_opts( 

209 video_dir=video_dir, 

210 video_id=video_id, 

211 auth_profile=auth_profile, 

212 has_ffmpeg=has_ffmpeg, 

213 ffmpeg_path=ffmpeg_path, 

214 has_node=has_node, 

215 ) 

216 _, used_player_client = _run_ytdlp_with_auth_fallback( 

217 url, 

218 base_opts=opts, 

219 auth_profile=auth_profile, 

220 user_agent=user_agent, 

221 system_name=system_name, 

222 operation=lambda attempt_opts: _download_with_format_fallback( 

223 url, 

224 opts=attempt_opts, 

225 has_ffmpeg=has_ffmpeg, 

226 ), 

227 ) 

228 return used_player_client 

229 

230 

231def _download_subtitles( 

232 url: str, 

233 video_dir: Path, 

234 video_id: str, 

235 auth_profile: Dict[str, Any], 

236 has_node: bool, 

237 used_player_client: Optional[Sequence[str]], 

238 user_agent: Optional[str], 

239) -> Optional[str]: 

240 subtitle_opts = _build_subtitle_opts( 

241 video_dir=video_dir, 

242 video_id=video_id, 

243 auth_profile=auth_profile, 

244 has_node=has_node, 

245 used_player_client=used_player_client, 

246 user_agent=user_agent, 

247 ) 

248 try: 

249 with yt_dlp.YoutubeDL(subtitle_opts) as ydl: # type: ignore[arg-type] 

250 ydl.download([url]) 

251 except yt_dlp.utils.DownloadError as exc: # type: ignore[attr-defined] 

252 return _clean_ytdlp_error(str(exc)) 

253 return None 

254 

255 

256def _run_ytdlp_with_auth_fallback( 

257 url: str, 

258 base_opts: Dict[str, Any], 

259 auth_profile: Dict[str, Any], 

260 user_agent: Optional[str], 

261 system_name: str, 

262 operation: Callable[[Dict[str, Any]], Any], 

263) -> Tuple[Any, Optional[Sequence[str]]]: 

264 last_error: Optional[Exception] = None 

265 last_classification: Optional[Dict[str, Any]] = None 

266 try: 

267 return operation(copy.deepcopy(base_opts)), None 

268 except yt_dlp.utils.DownloadError as exc: # type: ignore[attr-defined] 

269 classification = _classify_ytdlp_error( 

270 str(exc), 

271 auth_profile=auth_profile, 

272 system_name=system_name, 

273 user_agent_set=bool(user_agent), 

274 ) 

275 if not classification["retry_other_clients"]: 

276 raise 

277 last_error = exc 

278 last_classification = classification 

279 

280 for player_client in PROTECTED_PLAYER_CLIENTS: 

281 attempt_opts = copy.deepcopy(base_opts) 

282 attempt_opts["extractor_args"] = { 

283 "youtube": {"player_client": list(player_client)} 

284 } 

285 if user_agent: 

286 _apply_user_agent(attempt_opts, user_agent) 

287 try: 

288 return operation(attempt_opts), list(player_client) 

289 except yt_dlp.utils.DownloadError as exc: # type: ignore[attr-defined] 

290 classification = _classify_ytdlp_error( 

291 str(exc), 

292 auth_profile=auth_profile, 

293 system_name=system_name, 

294 user_agent_set=bool(user_agent), 

295 ) 

296 if classification["retry_other_clients"]: 

297 last_error = exc 

298 last_classification = classification 

299 continue 

300 raise 

301 

302 if last_classification and last_classification["error_code"] == "format_unavailable": 

303 missing_pot_opts = _with_missing_pot_formats(base_opts) 

304 if user_agent: 

305 _apply_user_agent(missing_pot_opts, user_agent) 

306 try: 

307 return operation(missing_pot_opts), None 

308 except yt_dlp.utils.DownloadError as exc: # type: ignore[attr-defined] 

309 last_error = exc 

310 

311 if last_error is not None: 

312 raise last_error 

313 raise yt_dlp.utils.DownloadError(f"Download failed for {url}") # type: ignore[attr-defined] 

314 

315 

316def _run_extract_info(url: str, opts: Dict[str, Any]) -> Dict[str, Any]: 

317 with yt_dlp.YoutubeDL(opts) as ydl: # type: ignore[arg-type] 

318 return ydl.extract_info(url, download=False) 

319 

320 

321def _download_with_format_fallback(url: str, opts: Dict[str, Any], has_ffmpeg: bool) -> None: 

322 try: 

323 with yt_dlp.YoutubeDL(opts) as ydl: # type: ignore[arg-type] 

324 ydl.download([url]) 

325 except yt_dlp.utils.DownloadError as exc: # type: ignore[attr-defined] 

326 message = str(exc) 

327 if "Requested format is not available" not in message: 

328 raise 

329 fallback_opts = copy.deepcopy(opts) 

330 if has_ffmpeg: 

331 fallback_opts["format"] = "bestvideo+bestaudio/best" 

332 else: 

333 fallback_opts["format"] = "best" 

334 fallback_opts.pop("merge_output_format", None) 

335 fallback_opts.pop("postprocessors", None) 

336 try: 

337 with yt_dlp.YoutubeDL(fallback_opts) as ydl: # type: ignore[arg-type] 

338 ydl.download([url]) 

339 except yt_dlp.utils.DownloadError as fallback_exc: # type: ignore[attr-defined] 

340 fallback_message = str(fallback_exc) 

341 if "Requested format is not available" not in fallback_message: 

342 raise 

343 second_fallback_opts = copy.deepcopy(fallback_opts) 

344 second_fallback_opts.pop("extractor_args", None) 

345 with yt_dlp.YoutubeDL(second_fallback_opts) as ydl: # type: ignore[arg-type] 

346 ydl.download([url]) 

347 

348 

349def _build_metadata_opts(auth_profile: Dict[str, Any], has_node: bool) -> Dict[str, Any]: 

350 opts: Dict[str, Any] = { 

351 "quiet": True, 

352 "no_warnings": True, 

353 "noprogress": True, 

354 "noplaylist": True, 

355 } 

356 _apply_runtime_options(opts, has_node) 

357 _apply_auth_options(opts, auth_profile) 

358 return opts 

359 

360 

361def _build_download_opts( 

362 video_dir: Path, 

363 video_id: str, 

364 auth_profile: Dict[str, Any], 

365 has_ffmpeg: bool, 

366 ffmpeg_path: Optional[str], 

367 has_node: bool, 

368) -> Dict[str, Any]: 

369 opts: Dict[str, Any] = { 

370 "format": _preferred_download_format(has_ffmpeg), 

371 "merge_output_format": "mp4", 

372 "allow_unplayable_formats": True, 

373 "outtmpl": str(video_dir / f"{video_id}.%(ext)s"), 

374 "quiet": False, 

375 "no_warnings": True, 

376 "noprogress": True, 

377 "noplaylist": True, 

378 "writethumbnail": True, 

379 "writeinfojson": True, 

380 "prefer_ffmpeg": True, 

381 "postprocessors": [{"key": "FFmpegVideoRemuxer", "preferedformat": "mp4"}], 

382 } 

383 _apply_runtime_options(opts, has_node) 

384 _apply_auth_options(opts, auth_profile) 

385 if ffmpeg_path: 

386 opts["ffmpeg_location"] = ffmpeg_path 

387 if not has_ffmpeg: 

388 opts["compat_opts"] = ["no-sabr"] 

389 return opts 

390 

391 

392def _preferred_download_format(has_ffmpeg: bool) -> str: 

393 if has_ffmpeg: 

394 return ( 

395 "bv*[vcodec^=avc1][ext=mp4][height<=?720]+ba[acodec^=mp4a]/" 

396 "b[ext=mp4][height<=?720]/" 

397 "bv*[height<=?720]+ba/" 

398 "b[height<=?720]/" 

399 "bestvideo+bestaudio/best" 

400 ) 

401 return "b[ext=mp4][height<=?720]/b[height<=?720]/best[ext=mp4]/best" 

402 

403 

404def _build_subtitle_opts( 

405 video_dir: Path, 

406 video_id: str, 

407 auth_profile: Dict[str, Any], 

408 has_node: bool, 

409 used_player_client: Optional[Sequence[str]], 

410 user_agent: Optional[str], 

411) -> Dict[str, Any]: 

412 opts: Dict[str, Any] = { 

413 "outtmpl": str(video_dir / f"{video_id}.%(ext)s"), 

414 "writesubtitles": True, 

415 "writeautomaticsub": True, 

416 "subtitleslangs": ["en"], 

417 "subtitlesformat": "vtt", 

418 "skip_download": True, 

419 "quiet": True, 

420 "no_warnings": True, 

421 "noprogress": True, 

422 "noplaylist": True, 

423 } 

424 _apply_runtime_options(opts, has_node) 

425 _apply_auth_options(opts, auth_profile) 

426 if used_player_client: 

427 opts["extractor_args"] = { 

428 "youtube": {"player_client": list(used_player_client)} 

429 } 

430 if used_player_client and user_agent: 

431 _apply_user_agent(opts, user_agent) 

432 return opts 

433 

434 

435def _with_missing_pot_formats(opts: Dict[str, Any]) -> Dict[str, Any]: 

436 updated = copy.deepcopy(opts) 

437 extractor_args = copy.deepcopy(updated.get("extractor_args") or {}) 

438 youtube_args = copy.deepcopy(extractor_args.get("youtube") or {}) 

439 formats = list(youtube_args.get("formats") or []) 

440 if "missing_pot" not in formats: 

441 formats.append("missing_pot") 

442 youtube_args["formats"] = formats 

443 extractor_args["youtube"] = youtube_args 

444 updated["extractor_args"] = extractor_args 

445 return updated 

446 

447 

448def _apply_runtime_options(opts: Dict[str, Any], has_node: bool) -> None: 

449 opts["remote_components"] = ["ejs:github"] 

450 if not has_node: 

451 return 

452 opts["js_runtimes"] = {"node": {}} 

453 

454 

455def _apply_auth_options(opts: Dict[str, Any], auth_profile: Dict[str, Any]) -> None: 

456 source = auth_profile.get("source") 

457 if source == "browser" and auth_profile.get("browser"): 

458 opts["cookiesfrombrowser"] = (auth_profile["browser"],) 

459 elif source == "cookiefile" and auth_profile.get("cookiefile"): 

460 opts["cookiefile"] = auth_profile["cookiefile"] 

461 

462 

463def _apply_user_agent(opts: Dict[str, Any], user_agent: str) -> None: 

464 headers = dict(opts.get("http_headers") or {}) 

465 headers["User-Agent"] = user_agent 

466 opts["http_headers"] = headers 

467 

468 

469def _select_auth_profile( 

470 system_name: Optional[str] = None, 

471 auth_mode: Optional[str] = None, 

472 cookiefile: Optional[str] = None, 

473 browser_probe: Optional[Callable[[str], bool]] = None, 

474) -> Dict[str, Any]: 

475 resolved_mode = _resolve_auth_mode(auth_mode) 

476 resolved_cookiefile = _resolve_cookiefile(cookiefile) 

477 probe = browser_probe or _browser_session_available 

478 browser_order = list(_preferred_browser_order(system_name)) 

479 

480 if resolved_mode in {"auto", "browser"}: 

481 for browser in browser_order: 

482 if probe(browser): 

483 return { 

484 "source": "browser", 

485 "browser": browser, 

486 "cookiefile": None, 

487 "auth_mode": resolved_mode, 

488 "browser_order": browser_order, 

489 } 

490 if resolved_mode == "auto" and resolved_cookiefile: 

491 return { 

492 "source": "cookiefile", 

493 "browser": None, 

494 "cookiefile": resolved_cookiefile, 

495 "auth_mode": resolved_mode, 

496 "browser_order": browser_order, 

497 } 

498 

499 if resolved_mode == "file" and resolved_cookiefile: 

500 return { 

501 "source": "cookiefile", 

502 "browser": None, 

503 "cookiefile": resolved_cookiefile, 

504 "auth_mode": resolved_mode, 

505 "browser_order": browser_order, 

506 } 

507 

508 return { 

509 "source": "none", 

510 "browser": None, 

511 "cookiefile": resolved_cookiefile, 

512 "auth_mode": resolved_mode, 

513 "browser_order": browser_order, 

514 } 

515 

516 

517def _resolve_auth_mode(auth_mode: Optional[str] = None) -> str: 

518 candidate = (auth_mode or os.getenv("YTDLP_AUTH_MODE") or "auto").strip().lower() 

519 if candidate not in {"auto", "browser", "file"}: 

520 return "auto" 

521 return candidate 

522 

523 

524def _resolve_cookiefile(cookiefile: Optional[str] = None) -> Optional[str]: 

525 raw_value = ( 

526 cookiefile 

527 or os.getenv("YTDLP_COOKIEFILE") 

528 or os.getenv("YTDLP_COOKIES_FILE") 

529 or "" 

530 ).strip() 

531 if not raw_value: 

532 return None 

533 try: 

534 resolved = Path(raw_value).expanduser() 

535 if resolved.exists(): 

536 return str(resolved) 

537 except OSError: 

538 return None 

539 return None 

540 

541 

542def _preferred_browser_order(system_name: Optional[str] = None) -> Tuple[str, ...]: 

543 current_system = system_name or platform.system() 

544 if current_system == "Windows": 

545 return WINDOWS_BROWSERS 

546 if current_system == "Darwin": 

547 return MACOS_BROWSERS 

548 return WINDOWS_BROWSERS 

549 

550 

551def _browser_session_available(browser: str) -> bool: 

552 try: 

553 opts = { 

554 "quiet": True, 

555 "no_warnings": True, 

556 "noplaylist": True, 

557 "cookiesfrombrowser": (browser,), 

558 } 

559 with yt_dlp.YoutubeDL(opts) as ydl: # type: ignore[arg-type] 

560 ydl.extract_info("https://www.youtube.com", download=False) 

561 return True 

562 except Exception: 

563 return False 

564 

565 

566def _classify_ytdlp_error( 

567 message: str, 

568 auth_profile: Dict[str, Any], 

569 system_name: str, 

570 user_agent_set: bool, 

571) -> Dict[str, Any]: 

572 clean = _clean_ytdlp_error(message) 

573 lowered = clean.lower() 

574 

575 if "requested format is not available" in lowered: 

576 return { 

577 "error_code": "format_unavailable", 

578 "message": "Requested video format is not available for this video.", 

579 "recovery_hint": "Retry the download or install ffmpeg to allow broader format fallbacks.", 

580 "is_auth_error": False, 

581 "retry_other_clients": True, 

582 } 

583 

584 if any( 

585 token in lowered 

586 for token in ( 

587 "age-restricted", 

588 "confirm your age", 

589 "age verification", 

590 "age-verification", 

591 "inappropriate for some users", 

592 ) 

593 ): 

594 return { 

595 "error_code": "age_verification_required", 

596 "message": "YouTube requires an age-verified account for this video.", 

597 "recovery_hint": _build_auth_recovery_hint( 

598 auth_profile=auth_profile, 

599 system_name=system_name, 

600 user_agent_set=user_agent_set, 

601 age_verified=True, 

602 ), 

603 "is_auth_error": True, 

604 "retry_other_clients": True, 

605 } 

606 

607 if any( 

608 token in lowered 

609 for token in ( 

610 "not a bot", 

611 "sign in to confirm", 

612 "use --cookies-from-browser", 

613 "cookies for the authentication", 

614 "requires account credentials", 

615 "login required", 

616 ) 

617 ): 

618 return { 

619 "error_code": "auth_required", 

620 "message": "YouTube requires a signed-in browser session for this video.", 

621 "recovery_hint": _build_auth_recovery_hint( 

622 auth_profile=auth_profile, 

623 system_name=system_name, 

624 user_agent_set=user_agent_set, 

625 ), 

626 "is_auth_error": True, 

627 "retry_other_clients": True, 

628 } 

629 

630 if any( 

631 token in lowered 

632 for token in ( 

633 "too many requests", 

634 "rate limit", 

635 "try again later", 

636 "temporarily unavailable", 

637 "429", 

638 ) 

639 ): 

640 return { 

641 "error_code": "rate_limited", 

642 "message": "YouTube temporarily rate-limited this session.", 

643 "recovery_hint": _build_rate_limit_hint(system_name), 

644 "is_auth_error": False, 

645 "retry_other_clients": False, 

646 } 

647 

648 if "403" in lowered: 

649 return { 

650 "error_code": "forbidden", 

651 "message": "YouTube rejected the download request.", 

652 "recovery_hint": _build_auth_recovery_hint( 

653 auth_profile=auth_profile, 

654 system_name=system_name, 

655 user_agent_set=user_agent_set, 

656 ), 

657 "is_auth_error": True, 

658 "retry_other_clients": True, 

659 } 

660 

661 return { 

662 "error_code": "download_failed", 

663 "message": f"Download error: {clean}", 

664 "recovery_hint": None, 

665 "is_auth_error": False, 

666 "retry_other_clients": False, 

667 } 

668 

669 

670def _build_auth_recovery_hint( 

671 auth_profile: Dict[str, Any], 

672 system_name: str, 

673 user_agent_set: bool, 

674 age_verified: bool = False, 

675) -> str: 

676 browser_text = _browser_help_text(system_name) 

677 source = auth_profile.get("source") 

678 browser = _browser_label(auth_profile.get("browser")) 

679 

680 if source == "browser" and browser: 

681 hint = f"Open the video in the same signed-in {browser} session on this machine, refresh YouTube, and retry." 

682 elif source == "cookiefile": 

683 hint = ( 

684 "The configured cookies file may be stale. Export a fresh Netscape cookies file " 

685 f"or retry with a signed-in {browser_text} session on this machine." 

686 ) 

687 else: 

688 hint = f"Sign in to YouTube in {browser_text} on this machine, open the video once, and retry." 

689 

690 if age_verified: 

691 hint = f"{hint} Use an adult account that can view age-restricted content." 

692 

693 if system_name == "Darwin": 

694 hint = f"{hint} Safari is not guaranteed for protected downloads." 

695 

696 if not user_agent_set: 

697 hint = f"{hint} If it still fails, set YTDLP_USER_AGENT to that browser user agent." 

698 

699 return hint.strip() 

700 

701 

702def _build_rate_limit_hint(system_name: str) -> str: 

703 browser_text = _browser_help_text(system_name) 

704 hint = f"Wait a few minutes, open YouTube in {browser_text} on this machine, then retry." 

705 if system_name == "Darwin": 

706 hint = f"{hint} Safari is not guaranteed for protected downloads." 

707 return hint 

708 

709 

710def _browser_help_text(system_name: str) -> str: 

711 if system_name == "Windows": 

712 return "Chrome, Firefox, or Edge" 

713 if system_name == "Darwin": 

714 return "Chrome or Firefox" 

715 return "a supported browser" 

716 

717 

718def _browser_label(browser: Optional[str]) -> Optional[str]: 

719 if not browser: 

720 return None 

721 return { 

722 "chrome": "Chrome", 

723 "firefox": "Firefox", 

724 "edge": "Edge", 

725 }.get(browser, browser.title()) 

726 

727 

728def _apply_download_error( 

729 result: Dict[str, Any], 

730 message: str, 

731 auth_profile: Dict[str, Any], 

732 system_name: str, 

733 user_agent_set: bool, 

734) -> None: 

735 classification = _classify_ytdlp_error( 

736 message, 

737 auth_profile=auth_profile, 

738 system_name=system_name, 

739 user_agent_set=user_agent_set, 

740 ) 

741 result["message"] = classification["message"] 

742 result["error_code"] = classification["error_code"] 

743 result["auth_source"] = auth_profile["source"] 

744 if classification["recovery_hint"]: 

745 result["recovery_hint"] = classification["recovery_hint"] 

746 

747 

748def _collect_created_files(video_dir: Path) -> Sequence[str]: 

749 created = [] 

750 for path in sorted(video_dir.iterdir()): 

751 if path.is_file(): 

752 created.append(path.relative_to(DOWNLOADS_DIR).as_posix()) 

753 return created 

754 

755 

756def _resolve_ffmpeg_path() -> Optional[str]: 

757 resolved = shutil.which("ffmpeg") 

758 if resolved: 

759 return resolved 

760 if platform.system() != "Windows": 

761 return None 

762 

763 local_appdata = os.getenv("LOCALAPPDATA") or "" 

764 if not local_appdata: 

765 return None 

766 

767 winget_link = Path(local_appdata) / "Microsoft" / "WinGet" / "Links" / "ffmpeg.exe" 

768 if winget_link.exists(): 

769 return str(winget_link) 

770 

771 packages_dir = Path(local_appdata) / "Microsoft" / "WinGet" / "Packages" 

772 if not packages_dir.is_dir(): 

773 return None 

774 

775 for pattern in ("Gyan.FFmpeg.Essentials_*", "Gyan.FFmpeg_*", "*FFmpeg*"): 

776 for package_dir in sorted(packages_dir.glob(pattern), reverse=True): 

777 binary = next(package_dir.rglob("ffmpeg.exe"), None) 

778 if binary: 

779 return str(binary) 

780 return None 

781 

782 

783def _remove_stale_invalid_mp4(video_dir: Path, video_id: str) -> None: 

784 stale_mp4 = video_dir / f"{video_id}.mp4" 

785 if not stale_mp4.exists() or _looks_like_mp4(stale_mp4): 

786 return 

787 for suffix in ("", ".part", ".ytdl"): 

788 candidate = video_dir / f"{video_id}.mp4{suffix}" 

789 try: 

790 if candidate.exists(): 

791 candidate.unlink() 

792 except OSError: 

793 continue 

794 

795 

796def _clean_ytdlp_error(message: str) -> str: 

797 clean = re.sub(r"\x1b\[[0-9;]*m", "", message).strip() 

798 return re.sub(r"^ERROR:\s*", "", clean) 

799 

800 

801def _repair_invalid_mp4(video_path: Path, ffmpeg_path: Optional[str]) -> Optional[Path]: 

802 if not ffmpeg_path or not video_path.exists(): 

803 return None 

804 

805 repaired_path = video_path.with_name(f"{video_path.stem}.remux.mp4") 

806 command = [ 

807 ffmpeg_path, 

808 "-y", 

809 "-i", 

810 str(video_path), 

811 "-map", 

812 "0", 

813 "-c", 

814 "copy", 

815 str(repaired_path), 

816 ] 

817 

818 try: 

819 if repaired_path.exists(): 

820 repaired_path.unlink() 

821 completed = subprocess.run( 

822 command, 

823 capture_output=True, 

824 text=True, 

825 timeout=180, 

826 check=False, 

827 ) 

828 if completed.returncode != 0 or not _looks_like_mp4(repaired_path): 

829 if repaired_path.exists(): 

830 repaired_path.unlink() 

831 return None 

832 video_path.unlink() 

833 repaired_path.replace(video_path) 

834 return video_path 

835 except (OSError, subprocess.SubprocessError): 

836 if repaired_path.exists(): 

837 try: 

838 repaired_path.unlink() 

839 except OSError: 

840 pass 

841 return None 

842 

843 

844def _looks_like_mp4(path: Path) -> bool: 

845 try: 

846 with path.open("rb") as handle: 

847 header = handle.read(64) 

848 return b"ftyp" in header 

849 except Exception: 

850 return False