Coverage for app \ services \ frame_service.py: 11%

55 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 20:58 -0400

1from typing import Any, Dict, List 

2import json 

3 

4import cv2 

5 

6from app.settings import DOWNLOADS_DIR 

7from app.services.video_files import find_primary_video_file 

8 

9 

10def extract_frames_per_second_for_video(video_id: str) -> Dict[str, Any]: 

11 """ 

12 Extract 1 frame per second from downloads/<video_id> into extracted_frames/. 

13 Service-only logic: no route/web dependencies. 

14 """ 

15 folder_path = DOWNLOADS_DIR / video_id 

16 if not folder_path.exists(): 

17 return { 

18 "success": False, 

19 "message": f"Folder '{video_id}' not found.", 

20 "files": [], 

21 } 

22 

23 video_file = find_primary_video_file(folder_path) 

24 if not video_file: 

25 return { 

26 "success": False, 

27 "message": f"No video files found in '{video_id}'.", 

28 "files": [], 

29 } 

30 

31 output_dir = folder_path / "extracted_frames" 

32 output_dir.mkdir(exist_ok=True) 

33 

34 cap = cv2.VideoCapture(str(video_file)) 

35 if not cap.isOpened(): 

36 return { 

37 "success": False, 

38 "message": f"Error opening video file: {video_file.name}", 

39 "files": [], 

40 } 

41 

42 fps = cap.get(cv2.CAP_PROP_FPS) 

43 total_video_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) 

44 if fps is None or fps <= 0: 

45 cap.release() 

46 return {"success": False, "message": "Invalid FPS detected.", "files": []} 

47 

48 duration = total_video_frames / fps 

49 total_seconds = int(duration) 

50 

51 frame_data = [] 

52 for second in range(total_seconds): 

53 frame_number = int(second * fps) 

54 cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) 

55 ret, frame = cap.read() 

56 if not ret: 

57 continue 

58 

59 frame_filename = f"frame_{second:04d}s.jpg" 

60 frame_path = output_dir / frame_filename 

61 ok, encoded = cv2.imencode(".jpg", frame) 

62 if not ok: 

63 continue 

64 encoded.tofile(str(frame_path)) 

65 

66 frame_info = { 

67 "frame_number": second + 1, 

68 "timestamp_seconds": second, 

69 "timestamp_formatted": f"{second // 60:02d}:{second % 60:02d}", 

70 "filename": frame_filename, 

71 "file_path": str(frame_path), 

72 } 

73 frame_data.append(frame_info) 

74 

75 cap.release() 

76 

77 # Persist machine-readable frame metadata. 

78 json_path = output_dir / "frame_data.json" 

79 with open(json_path, "w", encoding="utf-8") as f: 

80 json.dump( 

81 { 

82 "video_info": { 

83 "filename": video_file.name, 

84 "duration_seconds": duration, 

85 "total_frames": total_video_frames, 

86 "fps": fps, 

87 "extracted_frames": len(frame_data), 

88 }, 

89 "frames": frame_data, 

90 }, 

91 f, 

92 indent=2, 

93 ensure_ascii=False, 

94 ) 

95 

96 

97 # Persist CSV for debugging/manual review. 

98 csv_path = output_dir / "frame_data.csv" 

99 with open(csv_path, "w", encoding="utf-8") as f: 

100 f.write("Frame,Timestamp,Time_Formatted,Filename\n") 

101 for fr in frame_data: 

102 f.write( 

103 f"{fr['frame_number']},{fr['timestamp_seconds']}," 

104 f"{fr['timestamp_formatted']},{fr['filename']}\n" 

105 ) 

106 

107 links: List[str] = [] 

108 if output_dir.exists(): 

109 for p in sorted(output_dir.iterdir()): 

110 if p.is_file(): 

111 rel = p.relative_to(DOWNLOADS_DIR).as_posix() 

112 links.append(f"/downloads/{rel}") 

113 

114 return { 

115 "success": True, 

116 "message": f"Extracted {len(frame_data)} frames to '{output_dir.name}'.", 

117 "files": links, 

118 "video_id": video_id, 

119 "output_dir": f"/downloads/{video_id}/extracted_frames", 

120 "count": len(frame_data), 

121 }