Coverage for app \ services \ frame_service.py: 11%
55 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 20:58 -0400
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 20:58 -0400
1from typing import Any, Dict, List
2import json
4import cv2
6from app.settings import DOWNLOADS_DIR
7from app.services.video_files import find_primary_video_file
10def extract_frames_per_second_for_video(video_id: str) -> Dict[str, Any]:
11 """
12 Extract 1 frame per second from downloads/<video_id> into extracted_frames/.
13 Service-only logic: no route/web dependencies.
14 """
15 folder_path = DOWNLOADS_DIR / video_id
16 if not folder_path.exists():
17 return {
18 "success": False,
19 "message": f"Folder '{video_id}' not found.",
20 "files": [],
21 }
23 video_file = find_primary_video_file(folder_path)
24 if not video_file:
25 return {
26 "success": False,
27 "message": f"No video files found in '{video_id}'.",
28 "files": [],
29 }
31 output_dir = folder_path / "extracted_frames"
32 output_dir.mkdir(exist_ok=True)
34 cap = cv2.VideoCapture(str(video_file))
35 if not cap.isOpened():
36 return {
37 "success": False,
38 "message": f"Error opening video file: {video_file.name}",
39 "files": [],
40 }
42 fps = cap.get(cv2.CAP_PROP_FPS)
43 total_video_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
44 if fps is None or fps <= 0:
45 cap.release()
46 return {"success": False, "message": "Invalid FPS detected.", "files": []}
48 duration = total_video_frames / fps
49 total_seconds = int(duration)
51 frame_data = []
52 for second in range(total_seconds):
53 frame_number = int(second * fps)
54 cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
55 ret, frame = cap.read()
56 if not ret:
57 continue
59 frame_filename = f"frame_{second:04d}s.jpg"
60 frame_path = output_dir / frame_filename
61 ok, encoded = cv2.imencode(".jpg", frame)
62 if not ok:
63 continue
64 encoded.tofile(str(frame_path))
66 frame_info = {
67 "frame_number": second + 1,
68 "timestamp_seconds": second,
69 "timestamp_formatted": f"{second // 60:02d}:{second % 60:02d}",
70 "filename": frame_filename,
71 "file_path": str(frame_path),
72 }
73 frame_data.append(frame_info)
75 cap.release()
77 # Persist machine-readable frame metadata.
78 json_path = output_dir / "frame_data.json"
79 with open(json_path, "w", encoding="utf-8") as f:
80 json.dump(
81 {
82 "video_info": {
83 "filename": video_file.name,
84 "duration_seconds": duration,
85 "total_frames": total_video_frames,
86 "fps": fps,
87 "extracted_frames": len(frame_data),
88 },
89 "frames": frame_data,
90 },
91 f,
92 indent=2,
93 ensure_ascii=False,
94 )
97 # Persist CSV for debugging/manual review.
98 csv_path = output_dir / "frame_data.csv"
99 with open(csv_path, "w", encoding="utf-8") as f:
100 f.write("Frame,Timestamp,Time_Formatted,Filename\n")
101 for fr in frame_data:
102 f.write(
103 f"{fr['frame_number']},{fr['timestamp_seconds']},"
104 f"{fr['timestamp_formatted']},{fr['filename']}\n"
105 )
107 links: List[str] = []
108 if output_dir.exists():
109 for p in sorted(output_dir.iterdir()):
110 if p.is_file():
111 rel = p.relative_to(DOWNLOADS_DIR).as_posix()
112 links.append(f"/downloads/{rel}")
114 return {
115 "success": True,
116 "message": f"Extracted {len(frame_data)} frames to '{output_dir.name}'.",
117 "files": links,
118 "video_id": video_id,
119 "output_dir": f"/downloads/{video_id}/extracted_frames",
120 "count": len(frame_data),
121 }