Coverage for app \ services \ download_service.py: 41%
394 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 20:58 -0400
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 20:58 -0400
1# Youtube download helper
3import copy
4import json
5import os
6import platform
7import re
8import shutil
9import subprocess
10from pathlib import Path
11from typing import Any, Callable, Dict, Optional, Sequence, Tuple
13import yt_dlp
15from app.services.video_files import find_primary_video_file
16from app.settings import DOWNLOADS_DIR
18WINDOWS_BROWSERS = ("chrome", "firefox", "edge")
19MACOS_BROWSERS = ("chrome", "firefox")
20PROTECTED_PLAYER_CLIENTS = (
21 ("tv_downgraded", "web_safari"),
22 ("web_creator", "tv_downgraded"),
23 ("web_embedded", "android", "web"),
24 ("android", "web"),
25)
28def download_youtube(url: str) -> Dict[str, Any]:
29 result = {
30 "success": False,
31 "message": "Download error",
32 "video_id": None,
33 "title": None,
34 "thumbnail": None,
35 "files": [],
36 }
38 normalized_url = _normalize_youtube_url(url)
39 if not normalized_url:
40 result["message"] = "Please provide a valid YouTube URL."
41 result["error_code"] = "invalid_url"
42 result["auth_source"] = "none"
43 return result
45 system_name = platform.system()
46 auth_profile = _select_auth_profile(system_name=system_name)
47 ffmpeg_path = _resolve_ffmpeg_path()
48 has_ffmpeg = ffmpeg_path is not None
49 has_node = shutil.which("node") is not None
50 user_agent = (os.getenv("YTDLP_USER_AGENT") or "").strip() or None
51 result["auth_source"] = auth_profile["source"]
53 try:
54 info = _extract_metadata(
55 normalized_url,
56 auth_profile=auth_profile,
57 has_node=has_node,
58 user_agent=user_agent,
59 system_name=system_name,
60 )
61 video_id = info.get("id", "unknown")
62 title = info.get("title", "Untitled Video")
63 thumbnail = info.get("thumbnail", "")
64 duration = info.get("duration", 0)
66 result["video_id"] = video_id
67 result["title"] = title
68 result["thumbnail"] = thumbnail
70 video_dir = DOWNLOADS_DIR / video_id
71 video_dir.mkdir(parents=True, exist_ok=True)
72 _remove_stale_invalid_mp4(video_dir, video_id)
74 used_player_client = _download_video(
75 normalized_url,
76 video_dir=video_dir,
77 video_id=video_id,
78 auth_profile=auth_profile,
79 has_ffmpeg=has_ffmpeg,
80 ffmpeg_path=ffmpeg_path,
81 has_node=has_node,
82 user_agent=user_agent,
83 system_name=system_name,
84 )
86 subtitle_warning = _download_subtitles(
87 normalized_url,
88 video_dir=video_dir,
89 video_id=video_id,
90 auth_profile=auth_profile,
91 has_node=has_node,
92 used_player_client=used_player_client,
93 user_agent=user_agent,
94 )
96 created = _collect_created_files(video_dir)
97 video_path = find_primary_video_file(video_dir)
99 if not video_path:
100 result["message"] = "Download completed but no video file was found."
101 result["error_code"] = "video_file_missing"
102 result["files"] = created
103 if used_player_client:
104 result["used_player_client"] = used_player_client
105 return result
107 if video_path.suffix.lower() == ".mp4" and not _looks_like_mp4(video_path):
108 repaired = _repair_invalid_mp4(video_path, ffmpeg_path)
109 if repaired:
110 created = _collect_created_files(video_dir)
111 video_path = repaired
112 if video_path.suffix.lower() == ".mp4" and not _looks_like_mp4(video_path):
113 result["message"] = (
114 "Downloaded file is not a valid MP4 container. "
115 "Restart the app after installing ffmpeg or retry with a non-SABR format."
116 )
117 result["error_code"] = "invalid_container"
118 result["files"] = created
119 if used_player_client:
120 result["used_player_client"] = used_player_client
121 return result
123 meta = {
124 "video_id": video_id,
125 "title": title,
126 "thumbnail": thumbnail,
127 "duration": duration,
128 "local_path": f"/downloads/{video_path.relative_to(DOWNLOADS_DIR).as_posix()}",
129 }
131 meta_path = video_dir / "meta.json"
132 meta_path.write_text(json.dumps(meta, indent=2), encoding="utf-8")
134 result.update(
135 {
136 "success": True,
137 "message": "Video downloaded successfully.",
138 "files": created,
139 "duration": duration,
140 "local_path": meta["local_path"],
141 "auth_source": auth_profile["source"],
142 }
143 )
144 if used_player_client:
145 result["used_player_client"] = used_player_client
146 if subtitle_warning:
147 result["subtitle_warning"] = subtitle_warning
148 return result
150 except yt_dlp.utils.DownloadError as exc: # type: ignore[attr-defined]
151 _apply_download_error(
152 result,
153 message=str(exc),
154 auth_profile=auth_profile,
155 system_name=system_name,
156 user_agent_set=bool(user_agent),
157 )
158 return result
159 except Exception as exc:
160 result["message"] = f"Unexpected error: {exc}"
161 result["error_code"] = "unexpected_error"
162 result["auth_source"] = auth_profile["source"]
163 return result
166def _normalize_youtube_url(url: str) -> Optional[str]:
167 candidate = (url or "").strip()
168 if not candidate:
169 return None
170 lowered = candidate.lower()
171 if not candidate.startswith("http"):
172 return None
173 if "youtube.com" not in lowered and "youtu.be" not in lowered:
174 return None
175 return candidate
178def _extract_metadata(
179 url: str,
180 auth_profile: Dict[str, Any],
181 has_node: bool,
182 user_agent: Optional[str],
183 system_name: str,
184) -> Dict[str, Any]:
185 opts = _build_metadata_opts(auth_profile=auth_profile, has_node=has_node)
186 info, _ = _run_ytdlp_with_auth_fallback(
187 url,
188 base_opts=opts,
189 auth_profile=auth_profile,
190 user_agent=user_agent,
191 system_name=system_name,
192 operation=lambda attempt_opts: _run_extract_info(url, attempt_opts),
193 )
194 return info
197def _download_video(
198 url: str,
199 video_dir: Path,
200 video_id: str,
201 auth_profile: Dict[str, Any],
202 has_ffmpeg: bool,
203 ffmpeg_path: Optional[str],
204 has_node: bool,
205 user_agent: Optional[str],
206 system_name: str,
207) -> Optional[Sequence[str]]:
208 opts = _build_download_opts(
209 video_dir=video_dir,
210 video_id=video_id,
211 auth_profile=auth_profile,
212 has_ffmpeg=has_ffmpeg,
213 ffmpeg_path=ffmpeg_path,
214 has_node=has_node,
215 )
216 _, used_player_client = _run_ytdlp_with_auth_fallback(
217 url,
218 base_opts=opts,
219 auth_profile=auth_profile,
220 user_agent=user_agent,
221 system_name=system_name,
222 operation=lambda attempt_opts: _download_with_format_fallback(
223 url,
224 opts=attempt_opts,
225 has_ffmpeg=has_ffmpeg,
226 ),
227 )
228 return used_player_client
231def _download_subtitles(
232 url: str,
233 video_dir: Path,
234 video_id: str,
235 auth_profile: Dict[str, Any],
236 has_node: bool,
237 used_player_client: Optional[Sequence[str]],
238 user_agent: Optional[str],
239) -> Optional[str]:
240 subtitle_opts = _build_subtitle_opts(
241 video_dir=video_dir,
242 video_id=video_id,
243 auth_profile=auth_profile,
244 has_node=has_node,
245 used_player_client=used_player_client,
246 user_agent=user_agent,
247 )
248 try:
249 with yt_dlp.YoutubeDL(subtitle_opts) as ydl: # type: ignore[arg-type]
250 ydl.download([url])
251 except yt_dlp.utils.DownloadError as exc: # type: ignore[attr-defined]
252 return _clean_ytdlp_error(str(exc))
253 return None
256def _run_ytdlp_with_auth_fallback(
257 url: str,
258 base_opts: Dict[str, Any],
259 auth_profile: Dict[str, Any],
260 user_agent: Optional[str],
261 system_name: str,
262 operation: Callable[[Dict[str, Any]], Any],
263) -> Tuple[Any, Optional[Sequence[str]]]:
264 last_error: Optional[Exception] = None
265 last_classification: Optional[Dict[str, Any]] = None
266 try:
267 return operation(copy.deepcopy(base_opts)), None
268 except yt_dlp.utils.DownloadError as exc: # type: ignore[attr-defined]
269 classification = _classify_ytdlp_error(
270 str(exc),
271 auth_profile=auth_profile,
272 system_name=system_name,
273 user_agent_set=bool(user_agent),
274 )
275 if not classification["retry_other_clients"]:
276 raise
277 last_error = exc
278 last_classification = classification
280 for player_client in PROTECTED_PLAYER_CLIENTS:
281 attempt_opts = copy.deepcopy(base_opts)
282 attempt_opts["extractor_args"] = {
283 "youtube": {"player_client": list(player_client)}
284 }
285 if user_agent:
286 _apply_user_agent(attempt_opts, user_agent)
287 try:
288 return operation(attempt_opts), list(player_client)
289 except yt_dlp.utils.DownloadError as exc: # type: ignore[attr-defined]
290 classification = _classify_ytdlp_error(
291 str(exc),
292 auth_profile=auth_profile,
293 system_name=system_name,
294 user_agent_set=bool(user_agent),
295 )
296 if classification["retry_other_clients"]:
297 last_error = exc
298 last_classification = classification
299 continue
300 raise
302 if last_classification and last_classification["error_code"] == "format_unavailable":
303 missing_pot_opts = _with_missing_pot_formats(base_opts)
304 if user_agent:
305 _apply_user_agent(missing_pot_opts, user_agent)
306 try:
307 return operation(missing_pot_opts), None
308 except yt_dlp.utils.DownloadError as exc: # type: ignore[attr-defined]
309 last_error = exc
311 if last_error is not None:
312 raise last_error
313 raise yt_dlp.utils.DownloadError(f"Download failed for {url}") # type: ignore[attr-defined]
316def _run_extract_info(url: str, opts: Dict[str, Any]) -> Dict[str, Any]:
317 with yt_dlp.YoutubeDL(opts) as ydl: # type: ignore[arg-type]
318 return ydl.extract_info(url, download=False)
321def _download_with_format_fallback(url: str, opts: Dict[str, Any], has_ffmpeg: bool) -> None:
322 try:
323 with yt_dlp.YoutubeDL(opts) as ydl: # type: ignore[arg-type]
324 ydl.download([url])
325 except yt_dlp.utils.DownloadError as exc: # type: ignore[attr-defined]
326 message = str(exc)
327 if "Requested format is not available" not in message:
328 raise
329 fallback_opts = copy.deepcopy(opts)
330 if has_ffmpeg:
331 fallback_opts["format"] = "bestvideo+bestaudio/best"
332 else:
333 fallback_opts["format"] = "best"
334 fallback_opts.pop("merge_output_format", None)
335 fallback_opts.pop("postprocessors", None)
336 try:
337 with yt_dlp.YoutubeDL(fallback_opts) as ydl: # type: ignore[arg-type]
338 ydl.download([url])
339 except yt_dlp.utils.DownloadError as fallback_exc: # type: ignore[attr-defined]
340 fallback_message = str(fallback_exc)
341 if "Requested format is not available" not in fallback_message:
342 raise
343 second_fallback_opts = copy.deepcopy(fallback_opts)
344 second_fallback_opts.pop("extractor_args", None)
345 with yt_dlp.YoutubeDL(second_fallback_opts) as ydl: # type: ignore[arg-type]
346 ydl.download([url])
349def _build_metadata_opts(auth_profile: Dict[str, Any], has_node: bool) -> Dict[str, Any]:
350 opts: Dict[str, Any] = {
351 "quiet": True,
352 "no_warnings": True,
353 "noprogress": True,
354 "noplaylist": True,
355 }
356 _apply_runtime_options(opts, has_node)
357 _apply_auth_options(opts, auth_profile)
358 return opts
361def _build_download_opts(
362 video_dir: Path,
363 video_id: str,
364 auth_profile: Dict[str, Any],
365 has_ffmpeg: bool,
366 ffmpeg_path: Optional[str],
367 has_node: bool,
368) -> Dict[str, Any]:
369 opts: Dict[str, Any] = {
370 "format": _preferred_download_format(has_ffmpeg),
371 "merge_output_format": "mp4",
372 "allow_unplayable_formats": True,
373 "outtmpl": str(video_dir / f"{video_id}.%(ext)s"),
374 "quiet": False,
375 "no_warnings": True,
376 "noprogress": True,
377 "noplaylist": True,
378 "writethumbnail": True,
379 "writeinfojson": True,
380 "prefer_ffmpeg": True,
381 "postprocessors": [{"key": "FFmpegVideoRemuxer", "preferedformat": "mp4"}],
382 }
383 _apply_runtime_options(opts, has_node)
384 _apply_auth_options(opts, auth_profile)
385 if ffmpeg_path:
386 opts["ffmpeg_location"] = ffmpeg_path
387 if not has_ffmpeg:
388 opts["compat_opts"] = ["no-sabr"]
389 return opts
392def _preferred_download_format(has_ffmpeg: bool) -> str:
393 if has_ffmpeg:
394 return (
395 "bv*[vcodec^=avc1][ext=mp4][height<=?720]+ba[acodec^=mp4a]/"
396 "b[ext=mp4][height<=?720]/"
397 "bv*[height<=?720]+ba/"
398 "b[height<=?720]/"
399 "bestvideo+bestaudio/best"
400 )
401 return "b[ext=mp4][height<=?720]/b[height<=?720]/best[ext=mp4]/best"
404def _build_subtitle_opts(
405 video_dir: Path,
406 video_id: str,
407 auth_profile: Dict[str, Any],
408 has_node: bool,
409 used_player_client: Optional[Sequence[str]],
410 user_agent: Optional[str],
411) -> Dict[str, Any]:
412 opts: Dict[str, Any] = {
413 "outtmpl": str(video_dir / f"{video_id}.%(ext)s"),
414 "writesubtitles": True,
415 "writeautomaticsub": True,
416 "subtitleslangs": ["en"],
417 "subtitlesformat": "vtt",
418 "skip_download": True,
419 "quiet": True,
420 "no_warnings": True,
421 "noprogress": True,
422 "noplaylist": True,
423 }
424 _apply_runtime_options(opts, has_node)
425 _apply_auth_options(opts, auth_profile)
426 if used_player_client:
427 opts["extractor_args"] = {
428 "youtube": {"player_client": list(used_player_client)}
429 }
430 if used_player_client and user_agent:
431 _apply_user_agent(opts, user_agent)
432 return opts
435def _with_missing_pot_formats(opts: Dict[str, Any]) -> Dict[str, Any]:
436 updated = copy.deepcopy(opts)
437 extractor_args = copy.deepcopy(updated.get("extractor_args") or {})
438 youtube_args = copy.deepcopy(extractor_args.get("youtube") or {})
439 formats = list(youtube_args.get("formats") or [])
440 if "missing_pot" not in formats:
441 formats.append("missing_pot")
442 youtube_args["formats"] = formats
443 extractor_args["youtube"] = youtube_args
444 updated["extractor_args"] = extractor_args
445 return updated
448def _apply_runtime_options(opts: Dict[str, Any], has_node: bool) -> None:
449 opts["remote_components"] = ["ejs:github"]
450 if not has_node:
451 return
452 opts["js_runtimes"] = {"node": {}}
455def _apply_auth_options(opts: Dict[str, Any], auth_profile: Dict[str, Any]) -> None:
456 source = auth_profile.get("source")
457 if source == "browser" and auth_profile.get("browser"):
458 opts["cookiesfrombrowser"] = (auth_profile["browser"],)
459 elif source == "cookiefile" and auth_profile.get("cookiefile"):
460 opts["cookiefile"] = auth_profile["cookiefile"]
463def _apply_user_agent(opts: Dict[str, Any], user_agent: str) -> None:
464 headers = dict(opts.get("http_headers") or {})
465 headers["User-Agent"] = user_agent
466 opts["http_headers"] = headers
469def _select_auth_profile(
470 system_name: Optional[str] = None,
471 auth_mode: Optional[str] = None,
472 cookiefile: Optional[str] = None,
473 browser_probe: Optional[Callable[[str], bool]] = None,
474) -> Dict[str, Any]:
475 resolved_mode = _resolve_auth_mode(auth_mode)
476 resolved_cookiefile = _resolve_cookiefile(cookiefile)
477 probe = browser_probe or _browser_session_available
478 browser_order = list(_preferred_browser_order(system_name))
480 if resolved_mode in {"auto", "browser"}:
481 for browser in browser_order:
482 if probe(browser):
483 return {
484 "source": "browser",
485 "browser": browser,
486 "cookiefile": None,
487 "auth_mode": resolved_mode,
488 "browser_order": browser_order,
489 }
490 if resolved_mode == "auto" and resolved_cookiefile:
491 return {
492 "source": "cookiefile",
493 "browser": None,
494 "cookiefile": resolved_cookiefile,
495 "auth_mode": resolved_mode,
496 "browser_order": browser_order,
497 }
499 if resolved_mode == "file" and resolved_cookiefile:
500 return {
501 "source": "cookiefile",
502 "browser": None,
503 "cookiefile": resolved_cookiefile,
504 "auth_mode": resolved_mode,
505 "browser_order": browser_order,
506 }
508 return {
509 "source": "none",
510 "browser": None,
511 "cookiefile": resolved_cookiefile,
512 "auth_mode": resolved_mode,
513 "browser_order": browser_order,
514 }
517def _resolve_auth_mode(auth_mode: Optional[str] = None) -> str:
518 candidate = (auth_mode or os.getenv("YTDLP_AUTH_MODE") or "auto").strip().lower()
519 if candidate not in {"auto", "browser", "file"}:
520 return "auto"
521 return candidate
524def _resolve_cookiefile(cookiefile: Optional[str] = None) -> Optional[str]:
525 raw_value = (
526 cookiefile
527 or os.getenv("YTDLP_COOKIEFILE")
528 or os.getenv("YTDLP_COOKIES_FILE")
529 or ""
530 ).strip()
531 if not raw_value:
532 return None
533 try:
534 resolved = Path(raw_value).expanduser()
535 if resolved.exists():
536 return str(resolved)
537 except OSError:
538 return None
539 return None
542def _preferred_browser_order(system_name: Optional[str] = None) -> Tuple[str, ...]:
543 current_system = system_name or platform.system()
544 if current_system == "Windows":
545 return WINDOWS_BROWSERS
546 if current_system == "Darwin":
547 return MACOS_BROWSERS
548 return WINDOWS_BROWSERS
551def _browser_session_available(browser: str) -> bool:
552 try:
553 opts = {
554 "quiet": True,
555 "no_warnings": True,
556 "noplaylist": True,
557 "cookiesfrombrowser": (browser,),
558 }
559 with yt_dlp.YoutubeDL(opts) as ydl: # type: ignore[arg-type]
560 ydl.extract_info("https://www.youtube.com", download=False)
561 return True
562 except Exception:
563 return False
566def _classify_ytdlp_error(
567 message: str,
568 auth_profile: Dict[str, Any],
569 system_name: str,
570 user_agent_set: bool,
571) -> Dict[str, Any]:
572 clean = _clean_ytdlp_error(message)
573 lowered = clean.lower()
575 if "requested format is not available" in lowered:
576 return {
577 "error_code": "format_unavailable",
578 "message": "Requested video format is not available for this video.",
579 "recovery_hint": "Retry the download or install ffmpeg to allow broader format fallbacks.",
580 "is_auth_error": False,
581 "retry_other_clients": True,
582 }
584 if any(
585 token in lowered
586 for token in (
587 "age-restricted",
588 "confirm your age",
589 "age verification",
590 "age-verification",
591 "inappropriate for some users",
592 )
593 ):
594 return {
595 "error_code": "age_verification_required",
596 "message": "YouTube requires an age-verified account for this video.",
597 "recovery_hint": _build_auth_recovery_hint(
598 auth_profile=auth_profile,
599 system_name=system_name,
600 user_agent_set=user_agent_set,
601 age_verified=True,
602 ),
603 "is_auth_error": True,
604 "retry_other_clients": True,
605 }
607 if any(
608 token in lowered
609 for token in (
610 "not a bot",
611 "sign in to confirm",
612 "use --cookies-from-browser",
613 "cookies for the authentication",
614 "requires account credentials",
615 "login required",
616 )
617 ):
618 return {
619 "error_code": "auth_required",
620 "message": "YouTube requires a signed-in browser session for this video.",
621 "recovery_hint": _build_auth_recovery_hint(
622 auth_profile=auth_profile,
623 system_name=system_name,
624 user_agent_set=user_agent_set,
625 ),
626 "is_auth_error": True,
627 "retry_other_clients": True,
628 }
630 if any(
631 token in lowered
632 for token in (
633 "too many requests",
634 "rate limit",
635 "try again later",
636 "temporarily unavailable",
637 "429",
638 )
639 ):
640 return {
641 "error_code": "rate_limited",
642 "message": "YouTube temporarily rate-limited this session.",
643 "recovery_hint": _build_rate_limit_hint(system_name),
644 "is_auth_error": False,
645 "retry_other_clients": False,
646 }
648 if "403" in lowered:
649 return {
650 "error_code": "forbidden",
651 "message": "YouTube rejected the download request.",
652 "recovery_hint": _build_auth_recovery_hint(
653 auth_profile=auth_profile,
654 system_name=system_name,
655 user_agent_set=user_agent_set,
656 ),
657 "is_auth_error": True,
658 "retry_other_clients": True,
659 }
661 return {
662 "error_code": "download_failed",
663 "message": f"Download error: {clean}",
664 "recovery_hint": None,
665 "is_auth_error": False,
666 "retry_other_clients": False,
667 }
670def _build_auth_recovery_hint(
671 auth_profile: Dict[str, Any],
672 system_name: str,
673 user_agent_set: bool,
674 age_verified: bool = False,
675) -> str:
676 browser_text = _browser_help_text(system_name)
677 source = auth_profile.get("source")
678 browser = _browser_label(auth_profile.get("browser"))
680 if source == "browser" and browser:
681 hint = f"Open the video in the same signed-in {browser} session on this machine, refresh YouTube, and retry."
682 elif source == "cookiefile":
683 hint = (
684 "The configured cookies file may be stale. Export a fresh Netscape cookies file "
685 f"or retry with a signed-in {browser_text} session on this machine."
686 )
687 else:
688 hint = f"Sign in to YouTube in {browser_text} on this machine, open the video once, and retry."
690 if age_verified:
691 hint = f"{hint} Use an adult account that can view age-restricted content."
693 if system_name == "Darwin":
694 hint = f"{hint} Safari is not guaranteed for protected downloads."
696 if not user_agent_set:
697 hint = f"{hint} If it still fails, set YTDLP_USER_AGENT to that browser user agent."
699 return hint.strip()
702def _build_rate_limit_hint(system_name: str) -> str:
703 browser_text = _browser_help_text(system_name)
704 hint = f"Wait a few minutes, open YouTube in {browser_text} on this machine, then retry."
705 if system_name == "Darwin":
706 hint = f"{hint} Safari is not guaranteed for protected downloads."
707 return hint
710def _browser_help_text(system_name: str) -> str:
711 if system_name == "Windows":
712 return "Chrome, Firefox, or Edge"
713 if system_name == "Darwin":
714 return "Chrome or Firefox"
715 return "a supported browser"
718def _browser_label(browser: Optional[str]) -> Optional[str]:
719 if not browser:
720 return None
721 return {
722 "chrome": "Chrome",
723 "firefox": "Firefox",
724 "edge": "Edge",
725 }.get(browser, browser.title())
728def _apply_download_error(
729 result: Dict[str, Any],
730 message: str,
731 auth_profile: Dict[str, Any],
732 system_name: str,
733 user_agent_set: bool,
734) -> None:
735 classification = _classify_ytdlp_error(
736 message,
737 auth_profile=auth_profile,
738 system_name=system_name,
739 user_agent_set=user_agent_set,
740 )
741 result["message"] = classification["message"]
742 result["error_code"] = classification["error_code"]
743 result["auth_source"] = auth_profile["source"]
744 if classification["recovery_hint"]:
745 result["recovery_hint"] = classification["recovery_hint"]
748def _collect_created_files(video_dir: Path) -> Sequence[str]:
749 created = []
750 for path in sorted(video_dir.iterdir()):
751 if path.is_file():
752 created.append(path.relative_to(DOWNLOADS_DIR).as_posix())
753 return created
756def _resolve_ffmpeg_path() -> Optional[str]:
757 resolved = shutil.which("ffmpeg")
758 if resolved:
759 return resolved
760 if platform.system() != "Windows":
761 return None
763 local_appdata = os.getenv("LOCALAPPDATA") or ""
764 if not local_appdata:
765 return None
767 winget_link = Path(local_appdata) / "Microsoft" / "WinGet" / "Links" / "ffmpeg.exe"
768 if winget_link.exists():
769 return str(winget_link)
771 packages_dir = Path(local_appdata) / "Microsoft" / "WinGet" / "Packages"
772 if not packages_dir.is_dir():
773 return None
775 for pattern in ("Gyan.FFmpeg.Essentials_*", "Gyan.FFmpeg_*", "*FFmpeg*"):
776 for package_dir in sorted(packages_dir.glob(pattern), reverse=True):
777 binary = next(package_dir.rglob("ffmpeg.exe"), None)
778 if binary:
779 return str(binary)
780 return None
783def _remove_stale_invalid_mp4(video_dir: Path, video_id: str) -> None:
784 stale_mp4 = video_dir / f"{video_id}.mp4"
785 if not stale_mp4.exists() or _looks_like_mp4(stale_mp4):
786 return
787 for suffix in ("", ".part", ".ytdl"):
788 candidate = video_dir / f"{video_id}.mp4{suffix}"
789 try:
790 if candidate.exists():
791 candidate.unlink()
792 except OSError:
793 continue
796def _clean_ytdlp_error(message: str) -> str:
797 clean = re.sub(r"\x1b\[[0-9;]*m", "", message).strip()
798 return re.sub(r"^ERROR:\s*", "", clean)
801def _repair_invalid_mp4(video_path: Path, ffmpeg_path: Optional[str]) -> Optional[Path]:
802 if not ffmpeg_path or not video_path.exists():
803 return None
805 repaired_path = video_path.with_name(f"{video_path.stem}.remux.mp4")
806 command = [
807 ffmpeg_path,
808 "-y",
809 "-i",
810 str(video_path),
811 "-map",
812 "0",
813 "-c",
814 "copy",
815 str(repaired_path),
816 ]
818 try:
819 if repaired_path.exists():
820 repaired_path.unlink()
821 completed = subprocess.run(
822 command,
823 capture_output=True,
824 text=True,
825 timeout=180,
826 check=False,
827 )
828 if completed.returncode != 0 or not _looks_like_mp4(repaired_path):
829 if repaired_path.exists():
830 repaired_path.unlink()
831 return None
832 video_path.unlink()
833 repaired_path.replace(video_path)
834 return video_path
835 except (OSError, subprocess.SubprocessError):
836 if repaired_path.exists():
837 try:
838 repaired_path.unlink()
839 except OSError:
840 pass
841 return None
844def _looks_like_mp4(path: Path) -> bool:
845 try:
846 with path.open("rb") as handle:
847 header = handle.read(64)
848 return b"ftyp" in header
849 except Exception:
850 return False