Coverage for app \ services \ expert_review_service.py: 5%
405 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 20:58 -0400
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 20:58 -0400
1from datetime import datetime
2import json
3from pathlib import Path
4from typing import Any, Dict, List, Optional, Tuple
6from fastapi import HTTPException
8from app.settings import (
9 DOWNLOADS_DIR,
10 EXPERT_QUESTION_TYPE_LABELS,
11 EXPERT_QUESTION_TYPE_VALUES,
12)
13from app.services.question_generation_service import (
14 _maybe_parse_json,
15 resolve_question_file_param,
16)
17from app.services.video_files import find_primary_video_file, list_question_json_files
20SEGMENT_MATCH_TOLERANCE = 1e-3
23def normalize_segment_value(value: Any) -> float:
24 try:
25 return round(float(value), 3)
26 except (TypeError, ValueError):
27 return 0.0
30def _parse_rank_value(value: Any) -> Optional[int]:
31 if value is None:
32 return None
33 if isinstance(value, bool):
34 return int(value)
35 if isinstance(value, (int, float)):
36 try:
37 return int(value)
38 except Exception:
39 return None
40 try:
41 text = str(value).strip()
42 except Exception:
43 return None
44 if not text:
45 return None
46 try:
47 return int(text)
48 except ValueError:
49 try:
50 return int(float(text))
51 except Exception:
52 return None
55def _build_llm_rank_lookup(
56 video_dir: Path, video_id: str
57) -> Tuple[Dict[int, Dict[str, Optional[int]]], Dict[Tuple[Any, Any], Dict[str, Optional[int]]]]:
58 by_index: Dict[int, Dict[str, Optional[int]]] = {}
59 by_range: Dict[Tuple[Any, Any], Dict[str, Optional[int]]] = {}
60 json_path = video_dir / "questions" / f"{video_id}.json"
61 if not json_path.exists():
62 return by_index, by_range
64 try:
65 data = json.loads(json_path.read_text(encoding="utf-8"))
66 except Exception:
67 return by_index, by_range
69 segments = data.get("segments")
70 if not isinstance(segments, list):
71 return by_index, by_range
73 for idx, seg in enumerate(segments):
74 if not isinstance(seg, dict):
75 continue
76 result = seg.get("result") or {}
77 questions = result.get("questions") or {}
78 q_map: Dict[str, Optional[int]] = {}
79 for qtype, info in questions.items():
80 if isinstance(info, dict):
81 q_map[qtype] = _parse_rank_value(info.get("rank"))
82 by_index[idx] = q_map
83 start = seg.get("start")
84 end = seg.get("end")
85 if start is not None and end is not None:
86 by_range[(start, end)] = q_map
88 return by_index, by_range
91def load_expert_annotations(question_file: Path, video_id: str) -> Dict[str, Any]:
92 annotations_path = question_file.with_suffix(question_file.suffix + ".expert.json")
93 payload: Dict[str, Any] = {
94 "video_id": video_id,
95 "question_file": question_file.name,
96 "annotations": [],
97 }
98 if annotations_path.exists():
99 try:
100 loaded = json.loads(annotations_path.read_text(encoding="utf-8"))
101 if isinstance(loaded, dict):
102 payload.update(
103 {
104 "annotations": loaded.get("annotations", []),
105 }
106 )
107 except Exception:
108 pass
109 return {
110 "path": annotations_path,
111 "data": payload,
112 }
115def serialize_question_segments(question_data: Dict[str, Any]) -> List[Dict[str, Any]]:
116 segments: List[Dict[str, Any]] = []
117 for idx, seg in enumerate(question_data.get("segments", [])):
118 start = int(seg.get("start", 0))
119 end = int(seg.get("end", start))
120 result_raw = seg.get("result")
121 parsed = _maybe_parse_json(result_raw)
122 if isinstance(parsed, (dict, list)):
123 display_payload = json.dumps(parsed, indent=2, ensure_ascii=False)
124 parsed_for_js = parsed
125 else:
126 display_payload = (
127 result_raw
128 if isinstance(result_raw, str)
129 else json.dumps(result_raw, indent=2, ensure_ascii=False)
130 )
131 parsed_for_js = None
132 segments.append(
133 {
134 "index": idx,
135 "start": start,
136 "end": end,
137 "parsed": parsed_for_js,
138 "display": display_payload,
139 }
140 )
141 return segments
144def _build_annotations_map(annotations: List[Dict[str, Any]]) -> Dict[str, Any]:
145 annotations_map: Dict[str, Any] = {}
146 for entry in annotations:
147 if not isinstance(entry, dict):
148 continue
149 key = f"{entry.get('start')}-{entry.get('end')}"
150 annotations_map[key] = entry
151 return annotations_map
154def build_expert_preview_data(
155 file: Optional[str], video: Optional[str], mode: Optional[str]
156) -> Dict[str, Any]:
157 mode_value = mode or "review"
158 question_files = list_question_json_files()
159 selected_file_path: Optional[Path] = None
160 selected_file_rel: Optional[str] = None
161 selection_error: Optional[str] = None
163 if mode_value != "create" and not file and video:
164 for item in question_files:
165 if item["video_id"] == video:
166 file = item["rel_path"]
167 break
169 if file:
170 candidate = resolve_question_file_param(file)
171 if candidate and candidate.exists():
172 selected_file_path = candidate
173 selected_file_rel = candidate.relative_to(DOWNLOADS_DIR).as_posix()
174 else:
175 selection_error = "Selected question JSON could not be found."
177 segments_info: List[Dict[str, Any]] = []
178 segments_for_js: List[Dict[str, Any]] = []
179 existing_annotations: List[Dict[str, Any]] = []
180 existing_annotations_map: Dict[str, Any] = {}
181 selected_json_pretty: Optional[str] = None
182 video_url: Optional[str] = None
183 annotation_rel_path: Optional[str] = None
184 selected_video_id: Optional[str] = None
185 selected_file_name: Optional[str] = None
187 if selected_file_path:
188 selected_file_name = selected_file_path.name
189 selected_video_dir = selected_file_path.parent.parent
190 selected_video_id = selected_video_dir.name
191 try:
192 raw_data = json.loads(selected_file_path.read_text(encoding="utf-8"))
193 except Exception:
194 raw_data = {}
195 segments_info = serialize_question_segments(raw_data)
196 for segment in segments_info:
197 parsed = segment.get("parsed")
198 best_question = None
199 questions_payload = None
200 if isinstance(parsed, dict):
201 questions_payload = parsed.get("questions")
202 best_question = parsed.get("best_question")
203 segments_for_js.append(
204 {
205 "index": segment["index"],
206 "start": segment["start"],
207 "end": segment["end"],
208 "questions": questions_payload,
209 "best_question": best_question,
210 }
211 )
212 selected_json_pretty = json.dumps(raw_data, indent=2, ensure_ascii=False)
214 video_candidate = find_primary_video_file(selected_video_dir)
215 if video_candidate:
216 video_url = (
217 f"/downloads/{video_candidate.relative_to(DOWNLOADS_DIR).as_posix()}"
218 )
220 annotations_bundle = load_expert_annotations(selected_file_path, selected_video_id)
221 annotations_data = annotations_bundle["data"]
222 annotations_list = annotations_data.get("annotations", [])
223 if isinstance(annotations_list, list):
224 existing_annotations = [entry for entry in annotations_list if isinstance(entry, dict)]
225 existing_annotations_map = _build_annotations_map(existing_annotations)
226 try:
227 annotation_rel_path = (
228 annotations_bundle["path"].relative_to(DOWNLOADS_DIR).as_posix()
229 )
230 except ValueError:
231 annotation_rel_path = None
232 elif mode_value == "create" and video:
233 selected_video_id = video
234 video_dir = DOWNLOADS_DIR / video
235 if video_dir.exists():
236 video_candidate = find_primary_video_file(video_dir)
237 if video_candidate:
238 video_url = (
239 f"/downloads/{video_candidate.relative_to(DOWNLOADS_DIR).as_posix()}"
240 )
242 expert_questions_dir = video_dir / "expert_questions"
243 expert_file = expert_questions_dir / f"expert_{video}.json"
245 if expert_file.exists():
246 try:
247 expert_data = json.loads(expert_file.read_text(encoding="utf-8"))
248 annotations_list = (
249 expert_data.get("annotations", [])
250 if isinstance(expert_data, dict)
251 else []
252 )
253 if isinstance(annotations_list, list):
254 existing_annotations = [
255 entry for entry in annotations_list if isinstance(entry, dict)
256 ]
257 existing_annotations_map = _build_annotations_map(existing_annotations)
258 try:
259 annotation_rel_path = expert_file.relative_to(
260 DOWNLOADS_DIR
261 ).as_posix()
262 except ValueError:
263 annotation_rel_path = None
264 except Exception:
265 pass
267 return {
268 "question_files": question_files,
269 "selected_file_rel": selected_file_rel,
270 "selected_file_name": selected_file_name,
271 "selected_video_id": selected_video_id,
272 "video_url": video_url,
273 "segments": segments_info,
274 "segments_for_js": segments_for_js,
275 "existing_annotations": existing_annotations,
276 "existing_annotations_map": existing_annotations_map,
277 "selected_json_pretty": selected_json_pretty,
278 "annotations_rel_path": annotation_rel_path,
279 "selection_error": selection_error,
280 "question_file_url": (
281 f"/downloads/{selected_file_rel}" if selected_file_rel else None
282 ),
283 "mode": mode_value,
284 }
287def save_expert_annotation_payload(payload: Dict[str, Any]) -> Dict[str, Any]:
288 if not isinstance(payload, dict):
289 raise HTTPException(status_code=400, detail="Invalid payload.")
291 mode = payload.get("mode", "review")
293 if mode == "create":
294 video_id = payload.get("video_id")
295 if not video_id:
296 raise HTTPException(
297 status_code=400, detail="Missing video_id for create mode."
298 )
300 video_dir = DOWNLOADS_DIR / video_id
301 if not video_dir.exists():
302 raise HTTPException(status_code=400, detail="Video directory not found.")
304 expert_questions_dir = video_dir / "expert_questions"
305 expert_questions_dir.mkdir(exist_ok=True)
307 expert_file = expert_questions_dir / f"expert_{video_id}.json"
309 if expert_file.exists():
310 try:
311 expert_data = json.loads(expert_file.read_text(encoding="utf-8"))
312 if not isinstance(expert_data, dict):
313 expert_data = {}
314 except Exception:
315 expert_data = {}
316 else:
317 expert_data = {}
319 expert_data.setdefault("video_id", video_id)
320 expert_data.setdefault("mode", "create")
321 annotations_list = expert_data.setdefault("annotations", [])
322 else:
323 question_file = resolve_question_file_param(payload.get("file"))
324 if not question_file or not question_file.exists():
325 raise HTTPException(status_code=400, detail="Invalid question file.")
327 video_dir = question_file.parent.parent
328 video_id = video_dir.name
330 annotations_bundle = load_expert_annotations(question_file, video_id)
331 expert_data = annotations_bundle["data"]
332 expert_data["video_id"] = video_id
333 expert_data["question_file"] = question_file.name
334 annotations_list = expert_data.setdefault("annotations", [])
335 expert_file = annotations_bundle["path"]
337 try:
338 start = int(payload.get("start"))
339 end = int(payload.get("end"))
340 except (TypeError, ValueError):
341 raise HTTPException(status_code=400, detail="Invalid segment bounds.")
343 skip_requested = bool(payload.get("skip"))
344 segment_index = payload.get("segment_index")
345 try:
346 segment_index = int(segment_index) if segment_index is not None else None
347 except (TypeError, ValueError):
348 segment_index = None
350 timestamp = datetime.utcnow().isoformat(timespec="seconds") + "Z"
352 if skip_requested:
353 entry: Dict[str, Any] = {
354 "segment_index": segment_index,
355 "start": start,
356 "end": end,
357 "question_type": "skip",
358 "question_type_label": "Skipped",
359 "question": "(skipped)",
360 "answer": "",
361 "skipped": True,
362 "saved_at": timestamp,
363 "mode": mode,
364 }
365 else:
366 question = (payload.get("question") or "").strip()
367 answer = (payload.get("answer") or "").strip()
368 question_type_raw = (payload.get("question_type") or "").strip().lower()
370 if not question or not answer:
371 raise HTTPException(
372 status_code=400, detail="Question and answer are required."
373 )
374 if question_type_raw not in EXPERT_QUESTION_TYPE_VALUES:
375 raise HTTPException(status_code=400, detail="Invalid question type.")
377 entry = {
378 "segment_index": segment_index,
379 "start": start,
380 "end": end,
381 "question_type": question_type_raw,
382 "question_type_label": EXPERT_QUESTION_TYPE_LABELS.get(
383 question_type_raw, question_type_raw.title()
384 ),
385 "question": question,
386 "answer": answer,
387 "skipped": False,
388 "saved_at": timestamp,
389 "mode": mode,
390 }
392 if mode == "review":
393 best_question_payload = payload.get("best_question")
394 if isinstance(best_question_payload, dict):
395 best_question_question = (
396 best_question_payload.get("question") or ""
397 ).strip()
398 best_question_answer = (
399 best_question_payload.get("answer") or ""
400 ).strip()
401 approved_raw = best_question_payload.get("approved")
403 if isinstance(approved_raw, bool):
404 approved_value = approved_raw
405 elif isinstance(approved_raw, str):
406 approved_value = approved_raw.lower() in {
407 "true",
408 "1",
409 "yes",
410 "approved",
411 }
412 else:
413 approved_value = None
415 comment_text = (best_question_payload.get("comment") or "").strip()
417 if approved_value is False and not comment_text:
418 raise HTTPException(
419 status_code=400,
420 detail="Provide a comment when disapproving the best question.",
421 )
423 if any(
424 [
425 best_question_question,
426 best_question_answer,
427 approved_value is not None,
428 comment_text,
429 ]
430 ):
431 if approved_value is None:
432 approved_value = True
434 entry["best_question"] = {
435 "question": best_question_question,
436 "answer": best_question_answer,
437 "approved": approved_value,
438 "comment": comment_text if not approved_value else "",
439 }
441 updated = False
442 for idx, existing in enumerate(list(annotations_list)):
443 if (
444 isinstance(existing, dict)
445 and existing.get("start") == start
446 and existing.get("end") == end
447 ):
448 if (
449 not skip_requested
450 and mode == "review"
451 and entry.get("best_question") is None
452 and existing.get("best_question") is not None
453 ):
454 entry["best_question"] = existing.get("best_question")
455 annotations_list[idx] = entry
456 updated = True
457 break
459 if not updated:
460 annotations_list.append(entry)
462 annotations_list.sort(key=lambda item: (item.get("start", 0), item.get("end", 0)))
464 expert_file.parent.mkdir(parents=True, exist_ok=True)
465 try:
466 expert_file.write_text(
467 json.dumps(expert_data, indent=2, ensure_ascii=False), encoding="utf-8"
468 )
469 except Exception as exc:
470 raise HTTPException(status_code=500, detail=f"Failed to store annotation: {exc}")
472 try:
473 annotation_rel = expert_file.relative_to(DOWNLOADS_DIR).as_posix()
474 except ValueError:
475 annotation_rel = None
477 return {
478 "success": True,
479 "annotation": entry,
480 "annotations_file": annotation_rel,
481 "updated": updated,
482 "mode": mode,
483 }
486def get_expert_questions_payload(video_id: str) -> Tuple[Dict[str, Any], int]:
487 video_dir = DOWNLOADS_DIR / video_id
488 questions_dir = video_dir / "expert_questions"
489 file_path = questions_dir / "expert_questions.json"
491 if not video_dir.exists() or not questions_dir.exists() or not file_path.exists():
492 return {"success": True, "video_id": video_id, "questions": []}, 200
494 try:
495 data = json.loads(file_path.read_text(encoding="utf-8"))
496 except Exception as exc:
497 return {
498 "success": False,
499 "message": f"Unable to read expert questions: {exc}",
500 "questions": [],
501 }, 500
503 questions = data.get("questions") if isinstance(data, dict) else []
504 if not isinstance(questions, list):
505 questions = []
507 return {"success": True, "video_id": video_id, "questions": questions}, 200
510def save_expert_question_payload(payload: Dict[str, Any]) -> Tuple[Dict[str, Any], int]:
511 video_id = str(payload.get("videoId") or payload.get("video_id") or "").strip()
512 if not video_id:
513 return {"success": False, "message": "videoId is required"}, 400
515 video_dir = DOWNLOADS_DIR / video_id
516 if not video_dir.exists():
517 return {"success": False, "message": "Video not found"}, 404
519 segment_start_value = normalize_segment_value(payload.get("segmentStart"))
520 segment_end_value = normalize_segment_value(payload.get("segmentEnd"))
521 timestamp_value = normalize_segment_value(
522 payload.get("timestamp", segment_end_value)
523 )
525 skipped = bool(payload.get("skipped") or payload.get("skip") or payload.get("isSkipped"))
526 skip_reason = str(payload.get("skipReason") or payload.get("skip_reason") or "").strip()
528 if segment_end_value <= segment_start_value:
529 segment_end_value = segment_start_value
531 question_type = (
532 str(payload.get("questionType") or payload.get("question_type") or "")
533 .strip()
534 .lower()
535 )
536 question_text = str(payload.get("question") or "").strip()
537 answer_text = str(payload.get("answer") or "").strip()
539 if skipped:
540 question_type = ""
541 question_text = ""
542 answer_text = ""
543 else:
544 if question_type not in EXPERT_QUESTION_TYPE_VALUES:
545 return {"success": False, "message": "Invalid question type"}, 400
547 if not question_text or not answer_text:
548 return {"success": False, "message": "Question and answer are required"}, 400
550 questions_dir = video_dir / "expert_questions"
551 questions_dir.mkdir(parents=True, exist_ok=True)
552 file_path = questions_dir / "expert_questions.json"
554 try:
555 stored = (
556 json.loads(file_path.read_text(encoding="utf-8")) if file_path.exists() else {}
557 )
558 except Exception:
559 stored = {}
561 if not isinstance(stored, dict):
562 stored = {}
564 questions_list = stored.get("questions")
565 if not isinstance(questions_list, list):
566 questions_list = []
568 def matches_existing(entry: Dict[str, Any]) -> bool:
569 existing_start = normalize_segment_value(entry.get("segment_start"))
570 existing_end = normalize_segment_value(entry.get("segment_end"))
571 return (
572 abs(existing_start - segment_start_value) < SEGMENT_MATCH_TOLERANCE
573 and abs(existing_end - segment_end_value) < SEGMENT_MATCH_TOLERANCE
574 )
576 questions_list = [q for q in questions_list if not matches_existing(q)]
578 entry = {
579 "segment_start": segment_start_value,
580 "segment_end": segment_end_value,
581 "timestamp": timestamp_value,
582 "question_type": question_type if not skipped else None,
583 "question": question_text,
584 "answer": answer_text,
585 "skipped": skipped,
586 "skip_reason": skip_reason,
587 "updated_at": datetime.utcnow().isoformat(),
588 }
590 questions_list.append(entry)
591 questions_list.sort(key=lambda q: normalize_segment_value(q.get("segment_start")))
593 stored["video_id"] = video_id
594 stored["questions"] = questions_list
596 try:
597 file_path.write_text(json.dumps(stored, indent=2), encoding="utf-8")
598 except Exception as exc:
599 return {"success": False, "message": f"Failed to write expert questions: {exc}"}, 500
601 message = "Segment marked as skipped." if skipped else "Expert question saved."
602 return {
603 "success": True,
604 "message": message,
605 "updatedAt": entry["updated_at"],
606 "skipped": skipped,
607 }, 200
610def save_final_questions_payload(payload: Dict[str, Any]) -> Tuple[Dict[str, Any], int]:
611 video_id = str(payload.get("videoId") or "").strip()
612 if not video_id:
613 return {"success": False, "message": "videoId is required"}, 400
615 video_dir = DOWNLOADS_DIR / video_id
616 if not video_dir.exists():
617 return {"success": False, "message": "Video not found"}, 404
619 final_data = payload.get("data")
620 if not final_data:
621 return {"success": False, "message": "No data provided"}, 400
623 final_questions_dir = video_dir / "final_questions"
624 final_questions_dir.mkdir(parents=True, exist_ok=True)
625 final_file_path = final_questions_dir / "final_questions.json"
627 try:
628 final_data["saved_at"] = datetime.utcnow().isoformat()
629 final_data["video_id"] = video_id
631 segments = final_data.get("segments")
632 if not isinstance(segments, list):
633 segments = []
634 final_data["segments"] = segments
636 llm_by_index, llm_by_range = _build_llm_rank_lookup(video_dir, video_id)
638 for idx, seg in enumerate(segments):
639 if not isinstance(seg, dict):
640 continue
642 raw_index = seg.get("segmentIndex", idx)
643 try:
644 seg_index = int(raw_index)
645 except (TypeError, ValueError):
646 seg_index = idx
648 llm_rankings = llm_by_index.get(seg_index)
649 if llm_rankings is None:
650 start = seg.get("start")
651 end = seg.get("end")
652 llm_rankings = llm_by_range.get((start, end))
653 if llm_rankings is None:
654 llm_rankings = {}
656 ai_questions = seg.get("aiQuestions")
657 if not isinstance(ai_questions, list):
658 seg["aiQuestions"] = []
659 continue
661 for question in ai_questions:
662 if not isinstance(question, dict):
663 continue
665 raw_expert = question.get("expert_ranking")
666 if raw_expert is None:
667 raw_expert = question.get("ranking")
668 expert_rank = _parse_rank_value(raw_expert)
669 if expert_rank is None and question.get("trashed"):
670 expert_rank = 0
671 question["expert_ranking"] = expert_rank
672 if "ranking" in question:
673 del question["ranking"]
675 llm_rank = None
676 q_type = question.get("type")
677 if q_type and isinstance(llm_rankings, dict):
678 llm_rank = llm_rankings.get(q_type)
679 if llm_rank is None:
680 llm_rank = _parse_rank_value(question.get("llm_ranking"))
681 question["llm_ranking"] = llm_rank
683 final_file_path.write_text(json.dumps(final_data, indent=2), encoding="utf-8")
685 return {
686 "success": True,
687 "message": "Final questions saved successfully",
688 "filepath": f"downloads/{video_id}/final_questions/final_questions.json",
689 "saved_at": final_data["saved_at"],
690 }, 200
692 except Exception as exc:
693 return {"success": False, "message": f"Failed to save final questions: {exc}"}, 500