Coverage for app \ services \ children_service.py: 79%
165 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 20:58 -0400
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 20:58 -0400
1from datetime import datetime, timezone
2import secrets
3from typing import Any, Dict, List, Optional
5from app.services.expert_auth_service import normalize_expert_id
6from app.services.sqlite_store import get_conn
8# Fixed icon picker options allowed by backend/API validation.
9ALLOWED_CHILD_ICON_KEYS = (
10 "pig", "fox", "owl", "cat", "bear", "rabbit", "lion", "penguin",
11 "simba", "nemo", "walle", "moana", "elsa", "spiderman", "mickey",
12 "pooh", "chase", "spongebob", "turtle", "bluey", "hellokitty",
13 "mlp", "peppa", "mario", "dino", "empoleon",
14)
16MAX_CHILD_ID_RETRIES = 60
19def utc_now_iso() -> str:
20 return datetime.now(timezone.utc).isoformat()
23def normalize_child_id(child_id: str) -> str:
24 return (child_id or "").strip()
27def normalize_name(value: str) -> str:
28 return " ".join((value or "").strip().split())
31def normalize_icon_key(icon_key: str) -> str:
32 return (icon_key or "").strip().lower()
35def _row_to_child(row: Any) -> Dict[str, Any]:
36 payload: Dict[str, Any] = {
37 "child_id": row["child_id"],
38 "expert_id": row["expert_id"],
39 "first_name": row["first_name"],
40 "last_name": row["last_name"],
41 "icon_key": row["icon_key"],
42 "interaction_mode": row["interaction_mode"] or "flexible",
43 "is_active": bool(row["is_active"]),
44 "created_at": row["created_at"],
45 "updated_at": row["updated_at"],
46 }
47 if "expert_name" in row.keys():
48 payload["expert_name"] = row["expert_name"]
49 return payload
52def _ensure_expert_exists(expert_id: str) -> None:
53 with get_conn() as conn:
54 row = conn.execute(
55 "SELECT 1 FROM experts WHERE expert_id = ?",
56 (expert_id,),
57 ).fetchone()
58 if not row:
59 raise ValueError("expert_id not found")
62def _child_id_exists(child_id: str) -> bool:
63 with get_conn() as conn:
64 row = conn.execute(
65 "SELECT 1 FROM children WHERE child_id = ?",
66 (child_id,),
67 ).fetchone()
68 return row is not None
70# Generate a 6-digit child ID and retry on rare collisions.
71def generate_child_id() -> str:
72 for _ in range(MAX_CHILD_ID_RETRIES):
73 candidate = f"{secrets.randbelow(1_000_000):06d}"
74 if not _child_id_exists(candidate):
75 return candidate
76 raise RuntimeError("child_id_generation_failed")
79def get_child(child_id: str, include_inactive: bool = True) -> Optional[Dict[str, Any]]:
80 child_id = normalize_child_id(child_id)
81 if not child_id:
82 return None
84 query = """
85 SELECT c.child_id, c.expert_id, c.first_name, c.last_name, c.icon_key,
86 c.interaction_mode, c.is_active, c.created_at, c.updated_at, e.display_name AS expert_name
87 FROM children c
88 LEFT JOIN experts e ON e.expert_id = c.expert_id
89 WHERE c.child_id = ?
90 """
91 params: List[Any] = [child_id]
92 if not include_inactive:
93 query += " AND c.is_active = 1"
95 with get_conn() as conn:
96 row = conn.execute(query, tuple(params)).fetchone()
98 if not row:
99 return None
100 return _row_to_child(row)
103def list_children(
104 expert_id: Optional[str] = None,
105 include_inactive: bool = False,
106) -> List[Dict[str, Any]]:
107 filters: List[str] = []
108 values: List[Any] = []
110 normalized_expert_id = None
111 if expert_id is not None:
112 normalized_expert_id = normalize_expert_id(expert_id)
113 if not normalized_expert_id:
114 return []
115 filters.append("lower(trim(c.expert_id)) = ?")
116 values.append(normalized_expert_id)
118 if not include_inactive:
119 filters.append("c.is_active = 1")
121 where_sql = ""
122 if filters:
123 where_sql = "WHERE " + " AND ".join(filters)
125 with get_conn() as conn:
126 rows = conn.execute(
127 f"""
128 SELECT c.child_id, c.expert_id, c.first_name, c.last_name, c.icon_key, c.interaction_mode,
129 c.is_active, c.created_at, c.updated_at, e.display_name AS expert_name
130 FROM children c
131 LEFT JOIN experts e ON e.expert_id = c.expert_id
132 {where_sql}
133 ORDER BY lower(c.expert_id), lower(c.last_name), lower(c.first_name), c.child_id
134 """,
135 tuple(values),
136 ).fetchall()
138 return [_row_to_child(row) for row in rows]
141# Each child must belong to an existing expert.
142def create_child(
143 expert_id: str,
144 first_name: str,
145 last_name: str,
146 icon_key: str,
147 interaction_mode: str = "flexible",
148) -> Dict[str, Any]:
149 expert_id = normalize_expert_id(expert_id)
150 first_name = normalize_name(first_name)
151 last_name = normalize_name(last_name)
152 icon_key = normalize_icon_key(icon_key)
153 interaction_mode = (interaction_mode or "flexible").strip().lower()
155 if not expert_id:
156 raise ValueError("expert_id is required")
157 if not first_name:
158 raise ValueError("first_name is required")
159 if not last_name:
160 raise ValueError("last_name is required")
161 if icon_key not in ALLOWED_CHILD_ICON_KEYS:
162 raise ValueError("icon_key is invalid")
164 _ensure_expert_exists(expert_id)
166 now = utc_now_iso()
167 child_id = generate_child_id()
169# DB unique index prevents duplicate first+last name under the same expert.
170 try:
171 with get_conn() as conn:
172 conn.execute(
173 """
174 INSERT INTO children (
175 child_id, expert_id, first_name, last_name, icon_key, interaction_mode, is_active, created_at, updated_at
176 )
177 VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?)
178 """,
179 (child_id, expert_id, first_name, last_name, icon_key, interaction_mode, now, now),
180 )
181 conn.commit()
182 except Exception as exc:
183 message = str(exc)
184 if "idx_children_unique_profile_per_expert" in message:
185 raise RuntimeError("duplicate_child_profile") from exc
186 if "UNIQUE constraint failed: children.child_id" in message:
187 raise RuntimeError("duplicate_child_id") from exc
188 raise
190 created = get_child(child_id)
191 if not created:
192 raise RuntimeError("create_failed")
193 return created
196def update_child(
197 child_id: str,
198 first_name: Optional[str] = None,
199 last_name: Optional[str] = None,
200 icon_key: Optional[str] = None,
201 interaction_mode: Optional[str] = None,
202 is_active: Optional[bool] = None,
203 expert_id: Optional[str] = None,
204) -> Optional[Dict[str, Any]]:
205 child_id = normalize_child_id(child_id)
206 if not child_id:
207 return None
209 updates: List[str] = []
210 values: List[Any] = []
212 if expert_id is not None:
213 if expert_id == "":
214 # empty string means unlink
215 updates.append("expert_id = ?")
216 values.append(None)
217 else:
218 cleaned_expert = normalize_expert_id(expert_id)
219 _ensure_expert_exists(cleaned_expert)
220 updates.append("expert_id = ?")
221 values.append(cleaned_expert)
223 if first_name is not None:
224 cleaned_first = normalize_name(first_name)
225 if not cleaned_first:
226 raise ValueError("first_name cannot be empty")
227 updates.append("first_name = ?")
228 values.append(cleaned_first)
230 if last_name is not None:
231 cleaned_last = normalize_name(last_name)
232 if not cleaned_last:
233 raise ValueError("last_name cannot be empty")
234 updates.append("last_name = ?")
235 values.append(cleaned_last)
237 if icon_key is not None:
238 cleaned_icon = normalize_icon_key(icon_key)
239 if cleaned_icon not in ALLOWED_CHILD_ICON_KEYS:
240 raise ValueError("icon_key is invalid")
241 updates.append("icon_key = ?")
242 values.append(cleaned_icon)
244 if interaction_mode is not None:
245 updates.append("interaction_mode = ?")
246 values.append(interaction_mode.strip().lower())
248 if is_active is not None:
249 if not isinstance(is_active, bool):
250 raise ValueError("is_active must be true or false")
251 updates.append("is_active = ?")
252 values.append(1 if is_active else 0)
254 if not updates:
255 return get_child(child_id)
257 updates.append("updated_at = ?")
258 values.append(utc_now_iso())
259 values.append(child_id)
261 try:
262 with get_conn() as conn:
263 cur = conn.execute(
264 f"UPDATE children SET {', '.join(updates)} WHERE child_id = ?",
265 tuple(values),
266 )
267 conn.commit()
268 except Exception as exc:
269 message = str(exc)
270 if "idx_children_unique_profile_per_expert" in message:
271 raise RuntimeError("duplicate_child_profile") from exc
272 raise
274 if cur.rowcount == 0:
275 return None
276 return get_child(child_id)
278# Soft-delete behavior: mark inactive instead of deleting. (might be changed later since we are hard coding it to 60 students/learners)
279def deactivate_child(child_id: str) -> Optional[Dict[str, Any]]:
280 return update_child(child_id, is_active=False)
283def delete_child(child_id: str) -> bool:
284 child_id = normalize_child_id(child_id)
285 if not child_id:
286 return False
287 with get_conn() as conn:
288 cur = conn.execute("DELETE FROM children WHERE child_id = ?", (child_id,))
289 conn.commit()
290 return cur.rowcount > 0