Coverage for app \ services \ expert_review_service.py: 5%

405 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 20:58 -0400

1from datetime import datetime 

2import json 

3from pathlib import Path 

4from typing import Any, Dict, List, Optional, Tuple 

5 

6from fastapi import HTTPException 

7 

8from app.settings import ( 

9 DOWNLOADS_DIR, 

10 EXPERT_QUESTION_TYPE_LABELS, 

11 EXPERT_QUESTION_TYPE_VALUES, 

12) 

13from app.services.question_generation_service import ( 

14 _maybe_parse_json, 

15 resolve_question_file_param, 

16) 

17from app.services.video_files import find_primary_video_file, list_question_json_files 

18 

19 

20SEGMENT_MATCH_TOLERANCE = 1e-3 

21 

22 

23def normalize_segment_value(value: Any) -> float: 

24 try: 

25 return round(float(value), 3) 

26 except (TypeError, ValueError): 

27 return 0.0 

28 

29 

30def _parse_rank_value(value: Any) -> Optional[int]: 

31 if value is None: 

32 return None 

33 if isinstance(value, bool): 

34 return int(value) 

35 if isinstance(value, (int, float)): 

36 try: 

37 return int(value) 

38 except Exception: 

39 return None 

40 try: 

41 text = str(value).strip() 

42 except Exception: 

43 return None 

44 if not text: 

45 return None 

46 try: 

47 return int(text) 

48 except ValueError: 

49 try: 

50 return int(float(text)) 

51 except Exception: 

52 return None 

53 

54 

55def _build_llm_rank_lookup( 

56 video_dir: Path, video_id: str 

57) -> Tuple[Dict[int, Dict[str, Optional[int]]], Dict[Tuple[Any, Any], Dict[str, Optional[int]]]]: 

58 by_index: Dict[int, Dict[str, Optional[int]]] = {} 

59 by_range: Dict[Tuple[Any, Any], Dict[str, Optional[int]]] = {} 

60 json_path = video_dir / "questions" / f"{video_id}.json" 

61 if not json_path.exists(): 

62 return by_index, by_range 

63 

64 try: 

65 data = json.loads(json_path.read_text(encoding="utf-8")) 

66 except Exception: 

67 return by_index, by_range 

68 

69 segments = data.get("segments") 

70 if not isinstance(segments, list): 

71 return by_index, by_range 

72 

73 for idx, seg in enumerate(segments): 

74 if not isinstance(seg, dict): 

75 continue 

76 result = seg.get("result") or {} 

77 questions = result.get("questions") or {} 

78 q_map: Dict[str, Optional[int]] = {} 

79 for qtype, info in questions.items(): 

80 if isinstance(info, dict): 

81 q_map[qtype] = _parse_rank_value(info.get("rank")) 

82 by_index[idx] = q_map 

83 start = seg.get("start") 

84 end = seg.get("end") 

85 if start is not None and end is not None: 

86 by_range[(start, end)] = q_map 

87 

88 return by_index, by_range 

89 

90 

91def load_expert_annotations(question_file: Path, video_id: str) -> Dict[str, Any]: 

92 annotations_path = question_file.with_suffix(question_file.suffix + ".expert.json") 

93 payload: Dict[str, Any] = { 

94 "video_id": video_id, 

95 "question_file": question_file.name, 

96 "annotations": [], 

97 } 

98 if annotations_path.exists(): 

99 try: 

100 loaded = json.loads(annotations_path.read_text(encoding="utf-8")) 

101 if isinstance(loaded, dict): 

102 payload.update( 

103 { 

104 "annotations": loaded.get("annotations", []), 

105 } 

106 ) 

107 except Exception: 

108 pass 

109 return { 

110 "path": annotations_path, 

111 "data": payload, 

112 } 

113 

114 

115def serialize_question_segments(question_data: Dict[str, Any]) -> List[Dict[str, Any]]: 

116 segments: List[Dict[str, Any]] = [] 

117 for idx, seg in enumerate(question_data.get("segments", [])): 

118 start = int(seg.get("start", 0)) 

119 end = int(seg.get("end", start)) 

120 result_raw = seg.get("result") 

121 parsed = _maybe_parse_json(result_raw) 

122 if isinstance(parsed, (dict, list)): 

123 display_payload = json.dumps(parsed, indent=2, ensure_ascii=False) 

124 parsed_for_js = parsed 

125 else: 

126 display_payload = ( 

127 result_raw 

128 if isinstance(result_raw, str) 

129 else json.dumps(result_raw, indent=2, ensure_ascii=False) 

130 ) 

131 parsed_for_js = None 

132 segments.append( 

133 { 

134 "index": idx, 

135 "start": start, 

136 "end": end, 

137 "parsed": parsed_for_js, 

138 "display": display_payload, 

139 } 

140 ) 

141 return segments 

142 

143 

144def _build_annotations_map(annotations: List[Dict[str, Any]]) -> Dict[str, Any]: 

145 annotations_map: Dict[str, Any] = {} 

146 for entry in annotations: 

147 if not isinstance(entry, dict): 

148 continue 

149 key = f"{entry.get('start')}-{entry.get('end')}" 

150 annotations_map[key] = entry 

151 return annotations_map 

152 

153 

154def build_expert_preview_data( 

155 file: Optional[str], video: Optional[str], mode: Optional[str] 

156) -> Dict[str, Any]: 

157 mode_value = mode or "review" 

158 question_files = list_question_json_files() 

159 selected_file_path: Optional[Path] = None 

160 selected_file_rel: Optional[str] = None 

161 selection_error: Optional[str] = None 

162 

163 if mode_value != "create" and not file and video: 

164 for item in question_files: 

165 if item["video_id"] == video: 

166 file = item["rel_path"] 

167 break 

168 

169 if file: 

170 candidate = resolve_question_file_param(file) 

171 if candidate and candidate.exists(): 

172 selected_file_path = candidate 

173 selected_file_rel = candidate.relative_to(DOWNLOADS_DIR).as_posix() 

174 else: 

175 selection_error = "Selected question JSON could not be found." 

176 

177 segments_info: List[Dict[str, Any]] = [] 

178 segments_for_js: List[Dict[str, Any]] = [] 

179 existing_annotations: List[Dict[str, Any]] = [] 

180 existing_annotations_map: Dict[str, Any] = {} 

181 selected_json_pretty: Optional[str] = None 

182 video_url: Optional[str] = None 

183 annotation_rel_path: Optional[str] = None 

184 selected_video_id: Optional[str] = None 

185 selected_file_name: Optional[str] = None 

186 

187 if selected_file_path: 

188 selected_file_name = selected_file_path.name 

189 selected_video_dir = selected_file_path.parent.parent 

190 selected_video_id = selected_video_dir.name 

191 try: 

192 raw_data = json.loads(selected_file_path.read_text(encoding="utf-8")) 

193 except Exception: 

194 raw_data = {} 

195 segments_info = serialize_question_segments(raw_data) 

196 for segment in segments_info: 

197 parsed = segment.get("parsed") 

198 best_question = None 

199 questions_payload = None 

200 if isinstance(parsed, dict): 

201 questions_payload = parsed.get("questions") 

202 best_question = parsed.get("best_question") 

203 segments_for_js.append( 

204 { 

205 "index": segment["index"], 

206 "start": segment["start"], 

207 "end": segment["end"], 

208 "questions": questions_payload, 

209 "best_question": best_question, 

210 } 

211 ) 

212 selected_json_pretty = json.dumps(raw_data, indent=2, ensure_ascii=False) 

213 

214 video_candidate = find_primary_video_file(selected_video_dir) 

215 if video_candidate: 

216 video_url = ( 

217 f"/downloads/{video_candidate.relative_to(DOWNLOADS_DIR).as_posix()}" 

218 ) 

219 

220 annotations_bundle = load_expert_annotations(selected_file_path, selected_video_id) 

221 annotations_data = annotations_bundle["data"] 

222 annotations_list = annotations_data.get("annotations", []) 

223 if isinstance(annotations_list, list): 

224 existing_annotations = [entry for entry in annotations_list if isinstance(entry, dict)] 

225 existing_annotations_map = _build_annotations_map(existing_annotations) 

226 try: 

227 annotation_rel_path = ( 

228 annotations_bundle["path"].relative_to(DOWNLOADS_DIR).as_posix() 

229 ) 

230 except ValueError: 

231 annotation_rel_path = None 

232 elif mode_value == "create" and video: 

233 selected_video_id = video 

234 video_dir = DOWNLOADS_DIR / video 

235 if video_dir.exists(): 

236 video_candidate = find_primary_video_file(video_dir) 

237 if video_candidate: 

238 video_url = ( 

239 f"/downloads/{video_candidate.relative_to(DOWNLOADS_DIR).as_posix()}" 

240 ) 

241 

242 expert_questions_dir = video_dir / "expert_questions" 

243 expert_file = expert_questions_dir / f"expert_{video}.json" 

244 

245 if expert_file.exists(): 

246 try: 

247 expert_data = json.loads(expert_file.read_text(encoding="utf-8")) 

248 annotations_list = ( 

249 expert_data.get("annotations", []) 

250 if isinstance(expert_data, dict) 

251 else [] 

252 ) 

253 if isinstance(annotations_list, list): 

254 existing_annotations = [ 

255 entry for entry in annotations_list if isinstance(entry, dict) 

256 ] 

257 existing_annotations_map = _build_annotations_map(existing_annotations) 

258 try: 

259 annotation_rel_path = expert_file.relative_to( 

260 DOWNLOADS_DIR 

261 ).as_posix() 

262 except ValueError: 

263 annotation_rel_path = None 

264 except Exception: 

265 pass 

266 

267 return { 

268 "question_files": question_files, 

269 "selected_file_rel": selected_file_rel, 

270 "selected_file_name": selected_file_name, 

271 "selected_video_id": selected_video_id, 

272 "video_url": video_url, 

273 "segments": segments_info, 

274 "segments_for_js": segments_for_js, 

275 "existing_annotations": existing_annotations, 

276 "existing_annotations_map": existing_annotations_map, 

277 "selected_json_pretty": selected_json_pretty, 

278 "annotations_rel_path": annotation_rel_path, 

279 "selection_error": selection_error, 

280 "question_file_url": ( 

281 f"/downloads/{selected_file_rel}" if selected_file_rel else None 

282 ), 

283 "mode": mode_value, 

284 } 

285 

286 

287def save_expert_annotation_payload(payload: Dict[str, Any]) -> Dict[str, Any]: 

288 if not isinstance(payload, dict): 

289 raise HTTPException(status_code=400, detail="Invalid payload.") 

290 

291 mode = payload.get("mode", "review") 

292 

293 if mode == "create": 

294 video_id = payload.get("video_id") 

295 if not video_id: 

296 raise HTTPException( 

297 status_code=400, detail="Missing video_id for create mode." 

298 ) 

299 

300 video_dir = DOWNLOADS_DIR / video_id 

301 if not video_dir.exists(): 

302 raise HTTPException(status_code=400, detail="Video directory not found.") 

303 

304 expert_questions_dir = video_dir / "expert_questions" 

305 expert_questions_dir.mkdir(exist_ok=True) 

306 

307 expert_file = expert_questions_dir / f"expert_{video_id}.json" 

308 

309 if expert_file.exists(): 

310 try: 

311 expert_data = json.loads(expert_file.read_text(encoding="utf-8")) 

312 if not isinstance(expert_data, dict): 

313 expert_data = {} 

314 except Exception: 

315 expert_data = {} 

316 else: 

317 expert_data = {} 

318 

319 expert_data.setdefault("video_id", video_id) 

320 expert_data.setdefault("mode", "create") 

321 annotations_list = expert_data.setdefault("annotations", []) 

322 else: 

323 question_file = resolve_question_file_param(payload.get("file")) 

324 if not question_file or not question_file.exists(): 

325 raise HTTPException(status_code=400, detail="Invalid question file.") 

326 

327 video_dir = question_file.parent.parent 

328 video_id = video_dir.name 

329 

330 annotations_bundle = load_expert_annotations(question_file, video_id) 

331 expert_data = annotations_bundle["data"] 

332 expert_data["video_id"] = video_id 

333 expert_data["question_file"] = question_file.name 

334 annotations_list = expert_data.setdefault("annotations", []) 

335 expert_file = annotations_bundle["path"] 

336 

337 try: 

338 start = int(payload.get("start")) 

339 end = int(payload.get("end")) 

340 except (TypeError, ValueError): 

341 raise HTTPException(status_code=400, detail="Invalid segment bounds.") 

342 

343 skip_requested = bool(payload.get("skip")) 

344 segment_index = payload.get("segment_index") 

345 try: 

346 segment_index = int(segment_index) if segment_index is not None else None 

347 except (TypeError, ValueError): 

348 segment_index = None 

349 

350 timestamp = datetime.utcnow().isoformat(timespec="seconds") + "Z" 

351 

352 if skip_requested: 

353 entry: Dict[str, Any] = { 

354 "segment_index": segment_index, 

355 "start": start, 

356 "end": end, 

357 "question_type": "skip", 

358 "question_type_label": "Skipped", 

359 "question": "(skipped)", 

360 "answer": "", 

361 "skipped": True, 

362 "saved_at": timestamp, 

363 "mode": mode, 

364 } 

365 else: 

366 question = (payload.get("question") or "").strip() 

367 answer = (payload.get("answer") or "").strip() 

368 question_type_raw = (payload.get("question_type") or "").strip().lower() 

369 

370 if not question or not answer: 

371 raise HTTPException( 

372 status_code=400, detail="Question and answer are required." 

373 ) 

374 if question_type_raw not in EXPERT_QUESTION_TYPE_VALUES: 

375 raise HTTPException(status_code=400, detail="Invalid question type.") 

376 

377 entry = { 

378 "segment_index": segment_index, 

379 "start": start, 

380 "end": end, 

381 "question_type": question_type_raw, 

382 "question_type_label": EXPERT_QUESTION_TYPE_LABELS.get( 

383 question_type_raw, question_type_raw.title() 

384 ), 

385 "question": question, 

386 "answer": answer, 

387 "skipped": False, 

388 "saved_at": timestamp, 

389 "mode": mode, 

390 } 

391 

392 if mode == "review": 

393 best_question_payload = payload.get("best_question") 

394 if isinstance(best_question_payload, dict): 

395 best_question_question = ( 

396 best_question_payload.get("question") or "" 

397 ).strip() 

398 best_question_answer = ( 

399 best_question_payload.get("answer") or "" 

400 ).strip() 

401 approved_raw = best_question_payload.get("approved") 

402 

403 if isinstance(approved_raw, bool): 

404 approved_value = approved_raw 

405 elif isinstance(approved_raw, str): 

406 approved_value = approved_raw.lower() in { 

407 "true", 

408 "1", 

409 "yes", 

410 "approved", 

411 } 

412 else: 

413 approved_value = None 

414 

415 comment_text = (best_question_payload.get("comment") or "").strip() 

416 

417 if approved_value is False and not comment_text: 

418 raise HTTPException( 

419 status_code=400, 

420 detail="Provide a comment when disapproving the best question.", 

421 ) 

422 

423 if any( 

424 [ 

425 best_question_question, 

426 best_question_answer, 

427 approved_value is not None, 

428 comment_text, 

429 ] 

430 ): 

431 if approved_value is None: 

432 approved_value = True 

433 

434 entry["best_question"] = { 

435 "question": best_question_question, 

436 "answer": best_question_answer, 

437 "approved": approved_value, 

438 "comment": comment_text if not approved_value else "", 

439 } 

440 

441 updated = False 

442 for idx, existing in enumerate(list(annotations_list)): 

443 if ( 

444 isinstance(existing, dict) 

445 and existing.get("start") == start 

446 and existing.get("end") == end 

447 ): 

448 if ( 

449 not skip_requested 

450 and mode == "review" 

451 and entry.get("best_question") is None 

452 and existing.get("best_question") is not None 

453 ): 

454 entry["best_question"] = existing.get("best_question") 

455 annotations_list[idx] = entry 

456 updated = True 

457 break 

458 

459 if not updated: 

460 annotations_list.append(entry) 

461 

462 annotations_list.sort(key=lambda item: (item.get("start", 0), item.get("end", 0))) 

463 

464 expert_file.parent.mkdir(parents=True, exist_ok=True) 

465 try: 

466 expert_file.write_text( 

467 json.dumps(expert_data, indent=2, ensure_ascii=False), encoding="utf-8" 

468 ) 

469 except Exception as exc: 

470 raise HTTPException(status_code=500, detail=f"Failed to store annotation: {exc}") 

471 

472 try: 

473 annotation_rel = expert_file.relative_to(DOWNLOADS_DIR).as_posix() 

474 except ValueError: 

475 annotation_rel = None 

476 

477 return { 

478 "success": True, 

479 "annotation": entry, 

480 "annotations_file": annotation_rel, 

481 "updated": updated, 

482 "mode": mode, 

483 } 

484 

485 

486def get_expert_questions_payload(video_id: str) -> Tuple[Dict[str, Any], int]: 

487 video_dir = DOWNLOADS_DIR / video_id 

488 questions_dir = video_dir / "expert_questions" 

489 file_path = questions_dir / "expert_questions.json" 

490 

491 if not video_dir.exists() or not questions_dir.exists() or not file_path.exists(): 

492 return {"success": True, "video_id": video_id, "questions": []}, 200 

493 

494 try: 

495 data = json.loads(file_path.read_text(encoding="utf-8")) 

496 except Exception as exc: 

497 return { 

498 "success": False, 

499 "message": f"Unable to read expert questions: {exc}", 

500 "questions": [], 

501 }, 500 

502 

503 questions = data.get("questions") if isinstance(data, dict) else [] 

504 if not isinstance(questions, list): 

505 questions = [] 

506 

507 return {"success": True, "video_id": video_id, "questions": questions}, 200 

508 

509 

510def save_expert_question_payload(payload: Dict[str, Any]) -> Tuple[Dict[str, Any], int]: 

511 video_id = str(payload.get("videoId") or payload.get("video_id") or "").strip() 

512 if not video_id: 

513 return {"success": False, "message": "videoId is required"}, 400 

514 

515 video_dir = DOWNLOADS_DIR / video_id 

516 if not video_dir.exists(): 

517 return {"success": False, "message": "Video not found"}, 404 

518 

519 segment_start_value = normalize_segment_value(payload.get("segmentStart")) 

520 segment_end_value = normalize_segment_value(payload.get("segmentEnd")) 

521 timestamp_value = normalize_segment_value( 

522 payload.get("timestamp", segment_end_value) 

523 ) 

524 

525 skipped = bool(payload.get("skipped") or payload.get("skip") or payload.get("isSkipped")) 

526 skip_reason = str(payload.get("skipReason") or payload.get("skip_reason") or "").strip() 

527 

528 if segment_end_value <= segment_start_value: 

529 segment_end_value = segment_start_value 

530 

531 question_type = ( 

532 str(payload.get("questionType") or payload.get("question_type") or "") 

533 .strip() 

534 .lower() 

535 ) 

536 question_text = str(payload.get("question") or "").strip() 

537 answer_text = str(payload.get("answer") or "").strip() 

538 

539 if skipped: 

540 question_type = "" 

541 question_text = "" 

542 answer_text = "" 

543 else: 

544 if question_type not in EXPERT_QUESTION_TYPE_VALUES: 

545 return {"success": False, "message": "Invalid question type"}, 400 

546 

547 if not question_text or not answer_text: 

548 return {"success": False, "message": "Question and answer are required"}, 400 

549 

550 questions_dir = video_dir / "expert_questions" 

551 questions_dir.mkdir(parents=True, exist_ok=True) 

552 file_path = questions_dir / "expert_questions.json" 

553 

554 try: 

555 stored = ( 

556 json.loads(file_path.read_text(encoding="utf-8")) if file_path.exists() else {} 

557 ) 

558 except Exception: 

559 stored = {} 

560 

561 if not isinstance(stored, dict): 

562 stored = {} 

563 

564 questions_list = stored.get("questions") 

565 if not isinstance(questions_list, list): 

566 questions_list = [] 

567 

568 def matches_existing(entry: Dict[str, Any]) -> bool: 

569 existing_start = normalize_segment_value(entry.get("segment_start")) 

570 existing_end = normalize_segment_value(entry.get("segment_end")) 

571 return ( 

572 abs(existing_start - segment_start_value) < SEGMENT_MATCH_TOLERANCE 

573 and abs(existing_end - segment_end_value) < SEGMENT_MATCH_TOLERANCE 

574 ) 

575 

576 questions_list = [q for q in questions_list if not matches_existing(q)] 

577 

578 entry = { 

579 "segment_start": segment_start_value, 

580 "segment_end": segment_end_value, 

581 "timestamp": timestamp_value, 

582 "question_type": question_type if not skipped else None, 

583 "question": question_text, 

584 "answer": answer_text, 

585 "skipped": skipped, 

586 "skip_reason": skip_reason, 

587 "updated_at": datetime.utcnow().isoformat(), 

588 } 

589 

590 questions_list.append(entry) 

591 questions_list.sort(key=lambda q: normalize_segment_value(q.get("segment_start"))) 

592 

593 stored["video_id"] = video_id 

594 stored["questions"] = questions_list 

595 

596 try: 

597 file_path.write_text(json.dumps(stored, indent=2), encoding="utf-8") 

598 except Exception as exc: 

599 return {"success": False, "message": f"Failed to write expert questions: {exc}"}, 500 

600 

601 message = "Segment marked as skipped." if skipped else "Expert question saved." 

602 return { 

603 "success": True, 

604 "message": message, 

605 "updatedAt": entry["updated_at"], 

606 "skipped": skipped, 

607 }, 200 

608 

609 

610def save_final_questions_payload(payload: Dict[str, Any]) -> Tuple[Dict[str, Any], int]: 

611 video_id = str(payload.get("videoId") or "").strip() 

612 if not video_id: 

613 return {"success": False, "message": "videoId is required"}, 400 

614 

615 video_dir = DOWNLOADS_DIR / video_id 

616 if not video_dir.exists(): 

617 return {"success": False, "message": "Video not found"}, 404 

618 

619 final_data = payload.get("data") 

620 if not final_data: 

621 return {"success": False, "message": "No data provided"}, 400 

622 

623 final_questions_dir = video_dir / "final_questions" 

624 final_questions_dir.mkdir(parents=True, exist_ok=True) 

625 final_file_path = final_questions_dir / "final_questions.json" 

626 

627 try: 

628 final_data["saved_at"] = datetime.utcnow().isoformat() 

629 final_data["video_id"] = video_id 

630 

631 segments = final_data.get("segments") 

632 if not isinstance(segments, list): 

633 segments = [] 

634 final_data["segments"] = segments 

635 

636 llm_by_index, llm_by_range = _build_llm_rank_lookup(video_dir, video_id) 

637 

638 for idx, seg in enumerate(segments): 

639 if not isinstance(seg, dict): 

640 continue 

641 

642 raw_index = seg.get("segmentIndex", idx) 

643 try: 

644 seg_index = int(raw_index) 

645 except (TypeError, ValueError): 

646 seg_index = idx 

647 

648 llm_rankings = llm_by_index.get(seg_index) 

649 if llm_rankings is None: 

650 start = seg.get("start") 

651 end = seg.get("end") 

652 llm_rankings = llm_by_range.get((start, end)) 

653 if llm_rankings is None: 

654 llm_rankings = {} 

655 

656 ai_questions = seg.get("aiQuestions") 

657 if not isinstance(ai_questions, list): 

658 seg["aiQuestions"] = [] 

659 continue 

660 

661 for question in ai_questions: 

662 if not isinstance(question, dict): 

663 continue 

664 

665 raw_expert = question.get("expert_ranking") 

666 if raw_expert is None: 

667 raw_expert = question.get("ranking") 

668 expert_rank = _parse_rank_value(raw_expert) 

669 if expert_rank is None and question.get("trashed"): 

670 expert_rank = 0 

671 question["expert_ranking"] = expert_rank 

672 if "ranking" in question: 

673 del question["ranking"] 

674 

675 llm_rank = None 

676 q_type = question.get("type") 

677 if q_type and isinstance(llm_rankings, dict): 

678 llm_rank = llm_rankings.get(q_type) 

679 if llm_rank is None: 

680 llm_rank = _parse_rank_value(question.get("llm_ranking")) 

681 question["llm_ranking"] = llm_rank 

682 

683 final_file_path.write_text(json.dumps(final_data, indent=2), encoding="utf-8") 

684 

685 return { 

686 "success": True, 

687 "message": "Final questions saved successfully", 

688 "filepath": f"downloads/{video_id}/final_questions/final_questions.json", 

689 "saved_at": final_data["saved_at"], 

690 }, 200 

691 

692 except Exception as exc: 

693 return {"success": False, "message": f"Failed to save final questions: {exc}"}, 500