Coverage for app \ services \ question_generation_service.py: 15%
300 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 20:58 -0400
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 20:58 -0400
1from typing import Any, Dict, List, Optional
2from pathlib import Path
3import base64
4import io
5import json
6import os
7import random
8import time
10import pandas as pd
11from PIL import Image
13from app.settings import DOWNLOADS_DIR, GEMINI_API_KEY ,ANTHROPIC_API_KEY
14from app.services.clients import get_openai_client, genai
16import anthropic
18# -----------------------------
19# Question generation helpers
20# -----------------------------
21def encode_image_to_base64(image_path, max_size=(512, 512)):
22 """Convert image to base64 string with optional resizing for efficiency"""
23 try:
24 with Image.open(image_path) as img:
25 if img.mode != "RGB":
26 img = img.convert("RGB")
27 img.thumbnail(max_size, Image.Resampling.LANCZOS)
28 buffer = io.BytesIO()
29 img.save(buffer, format="JPEG", quality=85)
30 image_bytes = buffer.getvalue()
31 return base64.b64encode(image_bytes).decode("utf-8")
32 except Exception as e:
33 print(f"Error encoding image {image_path}: {e}")
34 return None
37def time_to_seconds(time_str):
38 """Convert time string (HH:MM:SS or MM:SS) to seconds"""
39 try:
40 parts = time_str.split(":")
41 if len(parts) == 3: # HH:MM:SS
42 hours, minutes, seconds = map(int, parts)
43 return hours * 3600 + minutes * 60 + seconds
44 elif len(parts) == 2: # MM:SS
45 minutes, seconds = map(int, parts)
46 return minutes * 60 + seconds
47 else: # Just seconds
48 return int(parts[0])
49 except (TypeError, ValueError):
50 return 0
53def read_frame_data_from_csv(folder_name, start_time, end_time):
54 """Read frame data from CSV file and get frames within specified time range"""
55 folder_path = Path(folder_name)
56 frames_dir = folder_path / "extracted_frames"
57 csv_path = frames_dir / "frame_data.csv"
59 if not csv_path.exists():
60 return [], ""
62 try:
63 df = pd.read_csv(csv_path)
65 # Convert time strings to seconds for filtering
66 if "Time_Formatted" in df.columns:
67 df["Time_Seconds"] = df["Time_Formatted"].apply(time_to_seconds)
68 elif "Time_Seconds" in df.columns:
69 pass
70 else:
71 df["Time_Seconds"] = df.index # fallback
73 filtered_frames = df[
74 (df["Time_Seconds"] >= start_time) & (df["Time_Seconds"] <= end_time)
75 ]
76 if len(filtered_frames) == 0:
77 return [], ""
79 frame_data = []
80 transcript_parts = []
82 for _, row in filtered_frames.iterrows():
83 image_path = frames_dir / row["Filename"]
84 frame_info = {
85 "image_path": image_path,
86 "subtitle_text": row.get("Subtitle_Text", "No transcript available"),
87 "time_seconds": row.get("Time_Seconds", 0),
88 "time_formatted": row.get("Time_Formatted", ""),
89 }
90 frame_data.append(frame_info)
92 subtitle = row.get("Subtitle_Text", "")
93 if subtitle and subtitle not in [
94 "No transcript available",
95 "No subtitle at this time",
96 "No subtitles available",
97 ]:
98 time_label = row.get("Time_Formatted", f"{row.get('Time_Seconds', 0)}s")
99 transcript_parts.append(f"[{time_label}] {subtitle}")
101 complete_transcript = (
102 "\n".join(transcript_parts)
103 if transcript_parts
104 else "No transcript available for this video segment."
105 )
106 return frame_data, complete_transcript
108 except Exception as e:
109 print(f"Error reading CSV: {e}")
110 return [], ""
116def generate_questions_for_segment(
117 video_id: str, start_time: int, end_time: int, polite_first: bool = False, provider: Optional[str] = None
118) -> Optional[str]:
119 """
120 Analyze frames + transcript for a time window and return JSON text with the questions.
121 Uses env-provided OPENAI_API_KEY only. Optimized for rate limits with retry logic.
122 When polite_first is True, the polite prompt is attempted before the standard prompt.
123 """
124 folder_name = str(DOWNLOADS_DIR / video_id)
125 try:
126 client = get_openai_client()
127 except Exception as e:
128 print(f"Error creating OpenAI client: {e}")
129 return None
131 frame_data, complete_transcript = read_frame_data_from_csv(
132 folder_name, start_time, end_time
133 )
134 if not frame_data:
135 return json.dumps(
136 {
137 "error": {
138 "reason": "no_frames_in_segment",
139 "retryable": False,
140 }
141 }
142 )
144 duration = end_time - start_time + 1 # inclusive window
145 #Provider is requested- scope so Admin switch model backend without changing flow.
146 provider_name = (provider or "openai").strip().lower()
147 if provider_name not in {"openai", "gemini","claude"}:
148 provider_name = "openai"
151 system_message = (
152 "You are a safe, child-focused educational assistant. "
153 "The content is a children's educational video. "
154 "Follow all safety policies and avoid disallowed content. "
155 "Provide age-appropriate, neutral, factual responses only."
156 )
158 # First attempt with standard prompt
159 base_prompt = f"""You are an early childhood educator designing comprehension questions for children ages 6–8.
160 Analyze the video content using both the visual frames and the complete transcript provided below.
162COMPLETE TRANSCRIPT:
163==========================================
164{complete_transcript}
165==========================================
167TASK:
168I am providing you with {len(frame_data)} sequential frames from a {duration}-second segment ({start_time}s to {end_time}s) of a video,
169along with the complete transcript above.
171Please do the following:
1731. Provide ONE short, child-friendly comprehension question for EACH of the following categories:
174 - Character
175 - Setting
176 - Feeling
177 - Action
178 - Causal Relationship
179 - Outcome
180 - Prediction
1822. After creating the questions, rank the questions based on how relevant and good it is to test comprehension and active viewing, the best question will be ranked 1
1843. Return JSON only (no extra text) in this structure:
185{{
186 "questions": {{
187 "character": {{ "q": "...", "a": "...", "rank":"" }},
188 "setting": {{ "q": "...", "a": "...", "rank":"" }},
189 "feeling": {{ "q": "...", "a": "...", "rank":"" }},
190 "action": {{ "q": "...", "a": "...", "rank":"" }},
191 "causal": {{ "q": "...", "a": "...", "rank":"" }},
192 "outcome": {{ "q": "...", "a": "...", "rank":"" }},
193 "prediction": {{ "q": "...", "a": "...", "rank":"" }}
194 }},
195 "best_question": "..."
196}}
197"""
199 # Second attempt with more persuasive prompt
200 polite_prompt = f"""You are helping create educational questions for young children. This is a children's educational video with no violence or inappropriate content, designed to teach kids in a safe, age-appropriate way.
202COMPLETE TRANSCRIPT:
203==========================================
204{complete_transcript}
205==========================================
207I am providing you with {len(frame_data)} sequential frames from a {duration}-second segment ({start_time}s to {end_time}s) of this educational children's video, along with the complete transcript above.
209Please create ONE short, child-friendly comprehension question for EACH of the following categories:
210- Character
211- Setting
212- Feeling
213- Action
214- Causal Relationship
215- Outcome
216- Prediction
218After creating the questions, please rank the questions based on how relevant and good it is to test comprehension and active viewing, the best question will be ranked 1
220Return JSON only (no extra text) in this structure:
221{{
222 "questions": {{
223 "character": {{ "q": "...", "a": "...", "rank":"" }},
224 "setting": {{ "q": "...", "a": "...", "rank":"" }},
225 "feeling": {{ "q": "...", "a": "...", "rank":"" }},
226 "action": {{ "q": "...", "a": "...", "rank":"" }},
227 "causal": {{ "q": "...", "a": "...", "rank":"" }},
228 "outcome": {{ "q": "...", "a": "...", "rank":"" }},
229 "prediction": {{ "q": "...", "a": "...", "rank":"" }}
230 }},
231 "best_question": "..."
232}}
233"""
235 content = []
237 # Sample frames to stay under token limits (max 5 frames)
238 max_frames = 5
239 if len(frame_data) > max_frames:
240 step = len(frame_data) // max_frames
241 sampled_frames = [frame_data[i] for i in range(0, len(frame_data), step)][
242 :max_frames
243 ]
244 else:
245 sampled_frames = frame_data
247 # Add sampled frames as low-detail inline images
248 successful_frames = 0
249 for fr in sampled_frames:
250 b64 = encode_image_to_base64(fr["image_path"])
251 if b64:
252 content.append(
253 {
254 "type": "image_url",
255 "image_url": {
256 "url": f"data:image/jpeg;base64,{b64}",
257 "detail": "low",
258 },
259 }
260 )
261 successful_frames += 1
263 if successful_frames == 0:
264 return json.dumps(
265 {
266 "error": {
267 "reason": "frame_encoding_failed",
268 "retryable": False,
269 }
270 }
271 )
273 # Try both prompts with retry logic. Reorder to emphasize polite tone after early failures.
274 prompt_sequence = [
275 ("standard", base_prompt),
276 ("polite", polite_prompt),
277 ]
278 if polite_first:
279 prompt_sequence = [
280 ("polite", polite_prompt),
281 ("standard", base_prompt),
282 ]
284 last_error_payload: Optional[Dict[str, Any]] = None
286 def _call_llm(content_with_prompt: List[Dict[str, Any]]) -> Optional[str]:
287 nonlocal last_error_payload
288 max_retries = 3
289 for attempt in range(max_retries):
290 try:
291 if provider_name =="openai": # OpenAI
292 resp = client.chat.completions.create(
293 model="gpt-4o-mini",
294 messages=[
295 {"role": "system", "content": system_message},
296 {"role": "user", "content": content_with_prompt},
297 ], # type: ignore
298 max_tokens=1500,
299 temperature=0.3,
300 response_format={"type": "json_object"},
301 )
302 result_content = resp.choices[0].message.content
303 finish_reason = resp.choices[0].finish_reason
304 elif provider_name == "gemini": # Gemini
305 if not GEMINI_API_KEY:
306 last_error_payload= {
307 "reason": "gemini_key_missing",
308 "retryable": False,
309 }
310 return None
311 gemini_model_name = os.getenv("GEMINI_MODEL", "gemini-2.5-flash")
312 gemini_model = genai.GenerativeModel(gemini_model_name)
314 #Keep provider output contrtact idential(JSON) , so downstream flow is
316 prompt_text = ""
317 for item in content_with_prompt:
318 if isinstance(item, dict) and item.get("type")== "text":
319 prompt_text += str(item.get("text", "")) + "\n\n"
321 gemini_parts = [system_message + "\n\n" + prompt_text]
322 for fr in sampled_frames:
323 try:
324 with Image.open(fr["image_path"]) as img:
325 gemini_parts.append(img.convert("RGB").copy())
326 except Exception:
327 continue
329 gemini_resp = gemini_model.generate_content(
330 gemini_parts,
331 generation_config={
332 "temperature": 0.3,
333 "response_mime_type": "application/json",
334 },
335 )
336 result_content = getattr(gemini_resp, "text", None)
337 finish_reason = None
338 else: # Claude
340 # Check for Claude API key
341 if not ANTHROPIC_API_KEY:
342 last_error_payload = {
343 "reason": "anthropic key missing",
344 "retryable": False
345 }
346 return None
348 claude_client = anthropic.Anthropic()
349 model_name = os.getenv("ANTHROPIC_MODEL", "claude-haiku-4-5-20251001")
351 # Load Claude prompt
352 prompt_text = "\n\nIMPORTANT: Return only raw JSON with no markdown, no code fences, no explanation."
353 for item in content_with_prompt:
354 if isinstance(item, dict) and item.get("type")== "text":
355 prompt_text += str(item.get("text", "")) + "\n\n"
356 image_parts = []
357 # Load frames
358 for frame in sampled_frames:
359 try:
360 image_parts.append({
361 "type":"image",
362 "source": {
363 "type":"base64",
364 "media_type":"image/jpeg",
365 "data": encode_image_to_base64(frame["image_path"])
366 }
367 })
368 except Exception:
369 continue
371 parts = [
372 {
373 "role":"user",
374 "content":[
375 {"type":"text", "text": prompt_text.strip()},
376 *image_parts
377 ]
378 }
379 ]
381 # Get Claude response
382 resp = claude_client.messages.create(
383 model=model_name,
384 max_tokens=1024,
385 system=system_message,
386 messages=parts
387 )
388 if resp.content:
389 result_content= resp.content[0].text
390 else:
391 result_content = None
392 finish_reason = None
394 if finish_reason == "content_filter":
395 last_error_payload = {
396 "reason": "model_refusal",
397 "retryable": False,
398 }
399 return None
401 if result_content:
402 try:
403 parsed = _maybe_parse_json(result_content)
404 if isinstance(parsed, (dict, list)):
405 return json.dumps(parsed) # normalize to clean JSON string
406 last_error_payload = {
407 "reason": "invalid_json",
408 "retryable": True,
409 "raw_preview": str(result_content)[:300],
410 }
411 except Exception:
412 last_error_payload = {
413 "reason": "invalid_json",
414 "retryable": True,
415 "raw_preview": str(result_content)[:300],
416 }
417 else:
418 last_error_payload = {
419 "reason": "empty_response",
420 "retryable": True,
421 }
422 except Exception as e:
423 if "rate_limit_exceeded" in str(e) and attempt < max_retries - 1:
424 wait_time = (2**attempt) + random.uniform(0, 1)
425 print(f"[QGEN ERROR] video={video_id} segment={start_time}-{end_time} err={e}")
426 time.sleep(wait_time)
427 last_error_payload = {
428 "reason": "rate_limit_exceeded",
429 "retryable": True,
430 "message": str(e),
431 }
432 continue
434 print(f"[QGEN ERROR] video={video_id} segment={start_time}-{end_time} err={e}")
435 last_error_payload = {
436 "reason": "openai_error",
437 "retryable": True,
438 "message": str(e),
439 }
440 return None
442 return None
444 tried_transcript_only = False
446 for attempt_round, (prompt_label, prompt) in enumerate(prompt_sequence):
447 content_with_prompt = [{"type": "text", "text": prompt}] + content
449 result_content = _call_llm(content_with_prompt)
450 if result_content:
451 return result_content
453 # If refusal or empty, fall back to transcript-only once.
454 if (
455 not tried_transcript_only
456 and last_error_payload
457 and last_error_payload.get("reason") in {"model_refusal", "empty_response"}
458 ):
459 tried_transcript_only = True
460 transcript_only_prompt = (
461 prompt
462 + "\n\nIf visuals are unavailable, answer using the transcript only."
463 )
464 transcript_only_content = [{"type": "text", "text": transcript_only_prompt}]
465 result_content = _call_llm(transcript_only_content)
466 if result_content:
467 return result_content
469 # If first prompt failed, try second prompt
470 if attempt_round == 0 and len(prompt_sequence) > 1:
471 next_label = prompt_sequence[1][0]
472 print(
473 f"{prompt_label.capitalize()} prompt attempt failed for segment {start_time}-{end_time}s, trying {next_label} prompt next"
474 )
476 print(f"Both prompt attempts failed for segment {start_time}-{end_time}s")
477 if last_error_payload is None:
478 last_error_payload = {"reason": "generation_failed", "retryable": True}
479 return json.dumps({"error": last_error_payload})
482def generate_questions_for_segment_with_retry(
483 video_id: str, start_time: int, end_time: int, max_attempts: int = 10, provider: Optional[str]= None
484) -> Optional[str]:
485 """
486 Attempt to generate questions for a segment, retrying up to max_attempts times.
487 Starts prioritizing the polite prompt from the third attempt onward and waits
488 a random 1-3 seconds between consecutive attempts.
489 """
490 last_result: Optional[str] = None
492 for attempt in range(1, max_attempts + 1):
493 polite_first = attempt > 2
494 if attempt > 1:
495 print(
496 f"Retrying segment {start_time}-{end_time}s (attempt {attempt}/{max_attempts})"
497 )
499 result_text = generate_questions_for_segment(
500 video_id, start_time, end_time, polite_first=polite_first, provider=provider
501 )
502 if result_text:
503 try:
504 parsed = json.loads(result_text)
505 except Exception:
506 parsed = None
507 if isinstance(parsed, dict) and "error" in parsed:
508 retryable = bool(parsed.get("error", {}).get("retryable"))
509 if not retryable:
510 return result_text
511 else:
512 return result_text
514 last_result = result_text
515 if attempt < max_attempts:
516 wait_time = random.uniform(1, 3)
517 print(
518 f"Attempt {attempt} failed for segment {start_time}-{end_time}s; waiting {wait_time:.1f}s before retrying"
519 )
520 time.sleep(wait_time)
522 print(
523 f"All {max_attempts} attempts exhausted for segment {start_time}-{end_time}s without a successful generation"
524 )
525 return last_result
528def build_segments_from_duration(
529 duration_seconds: int, interval_seconds: int, start_offset: int = 0
530) -> List[tuple]:
531 """
532 Build inclusive segments like (0, 60), (61, 120), ... until duration_seconds.
533 """
534 segments = []
535 start = max(0, int(start_offset))
536 step = max(1, int(interval_seconds))
537 while start <= duration_seconds:
538 end = min(start + step - 1, duration_seconds)
539 segments.append((start, end))
540 if end >= duration_seconds:
541 break
542 start = end + 1
543 return segments
545# -----------------------------
546# WebSocket endpoint for streaming interval results
547# -----------------------------
548def _maybe_parse_json(text: Optional[str]):
549 if text is None:
550 return None
551 if isinstance(text, (dict, list)):
552 return text
553 if not isinstance(text, str):
554 return text
555 cleaned = text.strip()
556 if cleaned.startswith("```"):
557 cleaned = cleaned[3:].lstrip()
558 if cleaned.lower().startswith("json"):
559 cleaned = cleaned[4:].lstrip()
560 if cleaned.endswith("```"):
561 cleaned = cleaned[:-3].rstrip()
562 try:
563 return json.loads(cleaned)
564 except Exception:
565 return text # return raw text if not valid JSON
568def persist_segment_questions_json(
569 video_id: str, start: int, end: int, payload: Any
570) -> Optional[str]:
571 """Persist a single segment's questions JSON to disk and return a downloads URL."""
572 if payload is None:
573 return None
575 if isinstance(payload, (dict, list)):
576 data = payload
577 elif isinstance(payload, str):
578 try:
579 data = json.loads(payload)
580 except Exception:
581 return None
582 else:
583 return None
585 try:
586 start_int = int(start)
587 except Exception:
588 start_int = None
589 try:
590 end_int = int(end)
591 except Exception:
592 end_int = None
594 if start_int is not None and end_int is not None:
595 filename = f"questions_{start_int:05d}-{end_int:05d}.json"
596 else:
597 filename = f"questions_{start}-{end}.json"
599 questions_dir = DOWNLOADS_DIR / video_id / "questions"
600 questions_dir.mkdir(parents=True, exist_ok=True)
601 out_path = questions_dir / filename
603 try:
604 out_path.write_text(
605 json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8"
606 )
607 except Exception as exc:
608 print(f"Failed to write questions JSON for {video_id} {start}-{end}: {exc}")
609 return None
611 return f"/downloads/{out_path.relative_to(DOWNLOADS_DIR).as_posix()}"
614def resolve_question_file_param(value: Optional[str]) -> Optional[Path]:
615 if not value:
616 return None
617 cleaned = value.strip()
618 if cleaned.startswith("/"):
619 cleaned = cleaned.lstrip("/")
620 if cleaned.startswith("downloads/"):
621 cleaned = cleaned[len("downloads/") :]
622 rel_path = Path(cleaned)
623 if rel_path.is_absolute() or ".." in rel_path.parts:
624 return None
625 candidate = DOWNLOADS_DIR / rel_path
626 if candidate.is_file() and candidate.suffix.lower() == ".json":
627 try:
628 candidate.relative_to(DOWNLOADS_DIR)
629 except ValueError:
630 return None
631 return candidate
632 return None
634'''
635def generate_persona_variants(
636 questions: Dict[str, Any], best_question_text: Optional[str] = None
637) -> Dict[str, Any]:
638 """
639 Take AI questions {type: {q, a, ...}} and rephrase them into 3 child-friendly
640 personas: bunny (warm/gentle), alligator (blunt/direct), pig (excited).
641 Returns {"success": bool, "variants": {bunny: {type: {q, a}}, ...}}.
642 """
643 if not questions or not isinstance(questions, dict):
644 return {"success": False, "message": "No questions provided"}
646 questions_text = "\n".join(
647 f"- Type: {qtype.upper()}, Question: {data.get('q', '')}, Answer: {data.get('a', '')}"
648 for qtype, data in questions.items()
649 if isinstance(data, dict) and data.get("q")
650 )
651 if not questions_text.strip():
652 return {"success": False, "message": "No valid questions to rephrase"}
654 prompt = (
655 "You are helping rephrase reading comprehension questions for young children "
656 "into 3 different character personas. Keep the meaning and correct answers "
657 "exactly the same — only change the wording/tone of the QUESTIONS.\n\n"
658 "PERSONAS:\n"
659 "- bunny: Warm, gentle, nurturing. Uses 'dear friend' or 'sweetie'. Asks with soft openers "
660 "like 'Can you remember...?' or 'Do you know...?'. Max 12 words. Always feels cozy and caring.\n"
661 "- alligator: Blunt, direct, zero fluff. Max 8 words. Imperative style. No greetings or filler. "
662 "One idea per sentence. Example style: 'Who is the hero? Answer.'\n"
663 "- pig: Wildly enthusiastic! Starts with 'Ooh!' or 'Wow!'. Uses CAPS on 1-2 key words. "
664 "Repeats phrases for excitement like 'tell me tell me!'. Always ends with '?!'\n\n"
665 f"Original questions:\n{questions_text}\n\n"
666 "Return ONLY a valid JSON object with this exact structure:\n"
667 "{\n"
668 ' "bunny": {"TYPE": {"q": "rephrased question", "a": "same original answer"}, ...},\n'
669 ' "alligator": {"TYPE": {"q": "rephrased question", "a": "same original answer"}, ...},\n'
670 ' "pig": {"TYPE": {"q": "rephrased question", "a": "same original answer"}, ...}\n'
671 "}\n"
672 "Use lowercase keys for question types. Return only the JSON, no explanation."
673 )
675 try:
676 response = get_openai_client().chat.completions.create(
677 model="gpt-4o-mini",
678 max_tokens=2000,
679 messages=[{"role": "user", "content": prompt}],
680 )
681 text = response.choices[0].message.content.strip() if response.choices else ""
682 parsed = _maybe_parse_json(text)
683 if not isinstance(parsed, dict):
684 return {"success": False, "message": "Invalid AI response format"}
686 # Mark the best question in each persona variant
687 if best_question_text:
688 for persona_key, persona_qs in parsed.items():
689 if not isinstance(persona_qs, dict):
690 continue
691 for qtype, data in persona_qs.items():
692 if not isinstance(data, dict):
693 continue
694 orig = questions.get(qtype)
695 if isinstance(orig, dict) and orig.get("q") == best_question_text:
696 data["is_best"] = True
698 return {"success": True, "variants": parsed}
699 except Exception as exc:
700 return {"success": False, "message": f"Persona generation failed: {exc}"}
702'''