Coverage for app \ services \ children_service.py: 79%

165 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 20:58 -0400

1from datetime import datetime, timezone 

2import secrets 

3from typing import Any, Dict, List, Optional 

4 

5from app.services.expert_auth_service import normalize_expert_id 

6from app.services.sqlite_store import get_conn 

7 

8# Fixed icon picker options allowed by backend/API validation. 

9ALLOWED_CHILD_ICON_KEYS = ( 

10 "pig", "fox", "owl", "cat", "bear", "rabbit", "lion", "penguin", 

11 "simba", "nemo", "walle", "moana", "elsa", "spiderman", "mickey", 

12 "pooh", "chase", "spongebob", "turtle", "bluey", "hellokitty", 

13 "mlp", "peppa", "mario", "dino", "empoleon", 

14) 

15 

16MAX_CHILD_ID_RETRIES = 60 

17 

18 

19def utc_now_iso() -> str: 

20 return datetime.now(timezone.utc).isoformat() 

21 

22 

23def normalize_child_id(child_id: str) -> str: 

24 return (child_id or "").strip() 

25 

26 

27def normalize_name(value: str) -> str: 

28 return " ".join((value or "").strip().split()) 

29 

30 

31def normalize_icon_key(icon_key: str) -> str: 

32 return (icon_key or "").strip().lower() 

33 

34 

35def _row_to_child(row: Any) -> Dict[str, Any]: 

36 payload: Dict[str, Any] = { 

37 "child_id": row["child_id"], 

38 "expert_id": row["expert_id"], 

39 "first_name": row["first_name"], 

40 "last_name": row["last_name"], 

41 "icon_key": row["icon_key"], 

42 "interaction_mode": row["interaction_mode"] or "flexible", 

43 "is_active": bool(row["is_active"]), 

44 "created_at": row["created_at"], 

45 "updated_at": row["updated_at"], 

46 } 

47 if "expert_name" in row.keys(): 

48 payload["expert_name"] = row["expert_name"] 

49 return payload 

50 

51 

52def _ensure_expert_exists(expert_id: str) -> None: 

53 with get_conn() as conn: 

54 row = conn.execute( 

55 "SELECT 1 FROM experts WHERE expert_id = ?", 

56 (expert_id,), 

57 ).fetchone() 

58 if not row: 

59 raise ValueError("expert_id not found") 

60 

61 

62def _child_id_exists(child_id: str) -> bool: 

63 with get_conn() as conn: 

64 row = conn.execute( 

65 "SELECT 1 FROM children WHERE child_id = ?", 

66 (child_id,), 

67 ).fetchone() 

68 return row is not None 

69 

70# Generate a 6-digit child ID and retry on rare collisions. 

71def generate_child_id() -> str: 

72 for _ in range(MAX_CHILD_ID_RETRIES): 

73 candidate = f"{secrets.randbelow(1_000_000):06d}" 

74 if not _child_id_exists(candidate): 

75 return candidate 

76 raise RuntimeError("child_id_generation_failed") 

77 

78 

79def get_child(child_id: str, include_inactive: bool = True) -> Optional[Dict[str, Any]]: 

80 child_id = normalize_child_id(child_id) 

81 if not child_id: 

82 return None 

83 

84 query = """ 

85 SELECT c.child_id, c.expert_id, c.first_name, c.last_name, c.icon_key, 

86 c.interaction_mode, c.is_active, c.created_at, c.updated_at, e.display_name AS expert_name 

87 FROM children c 

88 LEFT JOIN experts e ON e.expert_id = c.expert_id 

89 WHERE c.child_id = ? 

90 """ 

91 params: List[Any] = [child_id] 

92 if not include_inactive: 

93 query += " AND c.is_active = 1" 

94 

95 with get_conn() as conn: 

96 row = conn.execute(query, tuple(params)).fetchone() 

97 

98 if not row: 

99 return None 

100 return _row_to_child(row) 

101 

102 

103def list_children( 

104 expert_id: Optional[str] = None, 

105 include_inactive: bool = False, 

106) -> List[Dict[str, Any]]: 

107 filters: List[str] = [] 

108 values: List[Any] = [] 

109 

110 normalized_expert_id = None 

111 if expert_id is not None: 

112 normalized_expert_id = normalize_expert_id(expert_id) 

113 if not normalized_expert_id: 

114 return [] 

115 filters.append("lower(trim(c.expert_id)) = ?") 

116 values.append(normalized_expert_id) 

117 

118 if not include_inactive: 

119 filters.append("c.is_active = 1") 

120 

121 where_sql = "" 

122 if filters: 

123 where_sql = "WHERE " + " AND ".join(filters) 

124 

125 with get_conn() as conn: 

126 rows = conn.execute( 

127 f""" 

128 SELECT c.child_id, c.expert_id, c.first_name, c.last_name, c.icon_key, c.interaction_mode, 

129 c.is_active, c.created_at, c.updated_at, e.display_name AS expert_name 

130 FROM children c 

131 LEFT JOIN experts e ON e.expert_id = c.expert_id 

132 {where_sql} 

133 ORDER BY lower(c.expert_id), lower(c.last_name), lower(c.first_name), c.child_id 

134 """, 

135 tuple(values), 

136 ).fetchall() 

137 

138 return [_row_to_child(row) for row in rows] 

139 

140 

141# Each child must belong to an existing expert. 

142def create_child( 

143 expert_id: str, 

144 first_name: str, 

145 last_name: str, 

146 icon_key: str, 

147 interaction_mode: str = "flexible", 

148) -> Dict[str, Any]: 

149 expert_id = normalize_expert_id(expert_id) 

150 first_name = normalize_name(first_name) 

151 last_name = normalize_name(last_name) 

152 icon_key = normalize_icon_key(icon_key) 

153 interaction_mode = (interaction_mode or "flexible").strip().lower() 

154 

155 if not expert_id: 

156 raise ValueError("expert_id is required") 

157 if not first_name: 

158 raise ValueError("first_name is required") 

159 if not last_name: 

160 raise ValueError("last_name is required") 

161 if icon_key not in ALLOWED_CHILD_ICON_KEYS: 

162 raise ValueError("icon_key is invalid") 

163 

164 _ensure_expert_exists(expert_id) 

165 

166 now = utc_now_iso() 

167 child_id = generate_child_id() 

168 

169# DB unique index prevents duplicate first+last name under the same expert. 

170 try: 

171 with get_conn() as conn: 

172 conn.execute( 

173 """ 

174 INSERT INTO children ( 

175 child_id, expert_id, first_name, last_name, icon_key, interaction_mode, is_active, created_at, updated_at 

176 ) 

177 VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?) 

178 """, 

179 (child_id, expert_id, first_name, last_name, icon_key, interaction_mode, now, now), 

180 ) 

181 conn.commit() 

182 except Exception as exc: 

183 message = str(exc) 

184 if "idx_children_unique_profile_per_expert" in message: 

185 raise RuntimeError("duplicate_child_profile") from exc 

186 if "UNIQUE constraint failed: children.child_id" in message: 

187 raise RuntimeError("duplicate_child_id") from exc 

188 raise 

189 

190 created = get_child(child_id) 

191 if not created: 

192 raise RuntimeError("create_failed") 

193 return created 

194 

195 

196def update_child( 

197 child_id: str, 

198 first_name: Optional[str] = None, 

199 last_name: Optional[str] = None, 

200 icon_key: Optional[str] = None, 

201 interaction_mode: Optional[str] = None, 

202 is_active: Optional[bool] = None, 

203 expert_id: Optional[str] = None, 

204) -> Optional[Dict[str, Any]]: 

205 child_id = normalize_child_id(child_id) 

206 if not child_id: 

207 return None 

208 

209 updates: List[str] = [] 

210 values: List[Any] = [] 

211 

212 if expert_id is not None: 

213 if expert_id == "": 

214 # empty string means unlink 

215 updates.append("expert_id = ?") 

216 values.append(None) 

217 else: 

218 cleaned_expert = normalize_expert_id(expert_id) 

219 _ensure_expert_exists(cleaned_expert) 

220 updates.append("expert_id = ?") 

221 values.append(cleaned_expert) 

222 

223 if first_name is not None: 

224 cleaned_first = normalize_name(first_name) 

225 if not cleaned_first: 

226 raise ValueError("first_name cannot be empty") 

227 updates.append("first_name = ?") 

228 values.append(cleaned_first) 

229 

230 if last_name is not None: 

231 cleaned_last = normalize_name(last_name) 

232 if not cleaned_last: 

233 raise ValueError("last_name cannot be empty") 

234 updates.append("last_name = ?") 

235 values.append(cleaned_last) 

236 

237 if icon_key is not None: 

238 cleaned_icon = normalize_icon_key(icon_key) 

239 if cleaned_icon not in ALLOWED_CHILD_ICON_KEYS: 

240 raise ValueError("icon_key is invalid") 

241 updates.append("icon_key = ?") 

242 values.append(cleaned_icon) 

243 

244 if interaction_mode is not None: 

245 updates.append("interaction_mode = ?") 

246 values.append(interaction_mode.strip().lower()) 

247 

248 if is_active is not None: 

249 if not isinstance(is_active, bool): 

250 raise ValueError("is_active must be true or false") 

251 updates.append("is_active = ?") 

252 values.append(1 if is_active else 0) 

253 

254 if not updates: 

255 return get_child(child_id) 

256 

257 updates.append("updated_at = ?") 

258 values.append(utc_now_iso()) 

259 values.append(child_id) 

260 

261 try: 

262 with get_conn() as conn: 

263 cur = conn.execute( 

264 f"UPDATE children SET {', '.join(updates)} WHERE child_id = ?", 

265 tuple(values), 

266 ) 

267 conn.commit() 

268 except Exception as exc: 

269 message = str(exc) 

270 if "idx_children_unique_profile_per_expert" in message: 

271 raise RuntimeError("duplicate_child_profile") from exc 

272 raise 

273 

274 if cur.rowcount == 0: 

275 return None 

276 return get_child(child_id) 

277 

278# Soft-delete behavior: mark inactive instead of deleting. (might be changed later since we are hard coding it to 60 students/learners) 

279def deactivate_child(child_id: str) -> Optional[Dict[str, Any]]: 

280 return update_child(child_id, is_active=False) 

281 

282 

283def delete_child(child_id: str) -> bool: 

284 child_id = normalize_child_id(child_id) 

285 if not child_id: 

286 return False 

287 with get_conn() as conn: 

288 cur = conn.execute("DELETE FROM children WHERE child_id = ?", (child_id,)) 

289 conn.commit() 

290 return cur.rowcount > 0