Coverage for app \ services \ expert_auth_service.py: 60%

195 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 20:58 -0400

1 

2#password hasing + safe comparison + random 

3import hashlib 

4import hmac 

5import os 

6import re 

7from datetime import datetime, timezone 

8from typing import Any, Dict, List, Optional 

9 

10from app.services.sqlite_store import get_conn 

11 

12#label store in a hash string 

13PBKDF2_ALGO = "pbkdf2_sha256" 

14PBKDF2_ITERATIONS = 120_000 

15#rnadom salt by 16 bytes 

16SALT_BYTES = 16 

17EXPERT_ID_PATTERN = re.compile(r"^[a-z0-9][a-z0-9_-]{2,49}$") 

18 

19 

20def utc_now_iso() -> str: 

21 # UTC ISO timestamps keep storage simple and sortable. 

22 return datetime.now(timezone.utc).isoformat() 

23 

24#never store plain text passwsowrds 

25def hash_password(password: str) -> str: 

26 if not isinstance(password, str) or not password: 

27 raise ValueError("Password is required") 

28 

29 salt = os.urandom(SALT_BYTES) 

30 digest = hashlib.pbkdf2_hmac( 

31 "sha256", 

32 password.encode("utf-8"), 

33 salt, 

34 PBKDF2_ITERATIONS, 

35 ) 

36 return f"{PBKDF2_ALGO}${PBKDF2_ITERATIONS}${salt.hex()}${digest.hex()}" 

37 

38 

39def verify_password(password: str, stored_hash: str) -> bool: 

40 try: 

41 algo, iteration_text, salt_hex, digest_hex = stored_hash.split("$", 3) 

42 if algo != PBKDF2_ALGO: 

43 return False 

44 

45 iterations = int(iteration_text) 

46 salt = bytes.fromhex(salt_hex) 

47 expected_digest = bytes.fromhex(digest_hex) 

48 except Exception: 

49 return False 

50 

51 candidate = hashlib.pbkdf2_hmac( 

52 "sha256", 

53 password.encode("utf-8"), 

54 salt, 

55 iterations, 

56 ) 

57 return hmac.compare_digest(candidate, expected_digest) 

58 

59 

60def normalize_expert_id(expert_id: str) -> str: 

61 return (expert_id or "").strip().lower() 

62 

63 

64def is_valid_expert_id(expert_id: str) -> bool: 

65 return bool(EXPERT_ID_PATTERN.fullmatch(normalize_expert_id(expert_id))) 

66 

67 

68def _row_to_expert(row: Any) -> Dict[str, Any]: 

69 return { 

70 "expert_id": row["expert_id"], 

71 "display_name": row["display_name"], 

72 "is_active": bool(row["is_active"]), 

73 "created_at": row["created_at"], 

74 "updated_at": row["updated_at"], 

75 } 

76 

77 

78def list_experts() -> List[Dict[str, Any]]: 

79 with get_conn() as conn: 

80 rows = conn.execute( 

81 """ 

82 SELECT expert_id, display_name, is_active, created_at, updated_at 

83 FROM experts 

84 ORDER BY expert_id ASC 

85 """ 

86 ).fetchall() 

87 return [_row_to_expert(row) for row in rows] 

88 

89 

90def get_expert(expert_id: str) -> Optional[Dict[str, Any]]: 

91 expert_id = normalize_expert_id(expert_id) 

92 if not expert_id: 

93 return None 

94 

95 with get_conn() as conn: 

96 row = conn.execute( 

97 """ 

98 SELECT expert_id, display_name, is_active, created_at, updated_at 

99 FROM experts 

100 WHERE expert_id = ? 

101 """, 

102 (expert_id,), 

103 ).fetchone() 

104 

105 if not row: 

106 return None 

107 return _row_to_expert(row) 

108 

109 

110def create_expert(expert_id: str, display_name: str, password: str) -> Dict[str, Any]: 

111 expert_id = normalize_expert_id(expert_id) 

112 display_name = (display_name or "").strip() 

113 

114 if not is_valid_expert_id(expert_id): 

115 raise ValueError("expert_id must be 3-50 chars: lowercase letters, numbers, _ or -") 

116 if not display_name: 

117 raise ValueError("display_name is required") 

118 if not password: 

119 raise ValueError("password is required") 

120 

121 now = utc_now_iso() 

122 password_hash = hash_password(password) 

123 

124 try: 

125 with get_conn() as conn: 

126 conn.execute( 

127 """ 

128 INSERT INTO experts (expert_id, display_name, password_hash, is_active, created_at, updated_at) 

129 VALUES (?, ?, ?, 1, ?, ?) 

130 """, 

131 (expert_id, display_name, password_hash, now, now), 

132 ) 

133 conn.commit() 

134 except Exception as exc: 

135 if "UNIQUE constraint failed" in str(exc): 

136 raise RuntimeError("duplicate_expert_id") from exc 

137 raise 

138 

139 created = get_expert(expert_id) 

140 if not created: 

141 raise RuntimeError("create_failed") 

142 return created 

143 

144 

145def update_expert( 

146 expert_id: str, 

147 display_name: Optional[str] = None, 

148 password: Optional[str] = None, 

149 is_active: Optional[bool] = None, 

150) -> Optional[Dict[str, Any]]: 

151 expert_id = normalize_expert_id(expert_id) 

152 if not expert_id: 

153 return None 

154 

155 updates = [] 

156 values: List[Any] = [] 

157 

158 if display_name is not None: 

159 cleaned = display_name.strip() 

160 if not cleaned: 

161 raise ValueError("display_name cannot be empty") 

162 updates.append("display_name = ?") 

163 values.append(cleaned) 

164 

165 if password is not None: 

166 if not password: 

167 raise ValueError("password cannot be empty") 

168 updates.append("password_hash = ?") 

169 values.append(hash_password(password)) 

170 

171 if is_active is not None: 

172 updates.append("is_active = ?") 

173 values.append(1 if is_active else 0) 

174 

175 if not updates: 

176 return get_expert(expert_id) 

177 

178 updates.append("updated_at = ?") 

179 values.append(utc_now_iso()) 

180 values.append(expert_id) 

181 

182 with get_conn() as conn: 

183 cur = conn.execute( 

184 f"UPDATE experts SET {', '.join(updates)} WHERE expert_id = ?", 

185 tuple(values), 

186 ) 

187 conn.commit() 

188 

189 if cur.rowcount == 0: 

190 return None 

191 return get_expert(expert_id) 

192 

193 

194def deactivate_expert(expert_id: str) -> Optional[Dict[str, Any]]: 

195 return update_expert(expert_id, is_active=False) 

196 

197def delete_expert(expert_id: str) -> bool: 

198 expert_id = normalize_expert_id(expert_id) 

199 with get_conn() as conn: 

200 cur = conn.execute("DELETE FROM experts WHERE expert_id = ?", (expert_id,)) 

201 conn.commit() 

202 return cur.rowcount > 0 

203 

204 

205def authenticate_expert(expert_id: str, password: str) -> Optional[Dict[str, Any]]: 

206 expert_id = normalize_expert_id(expert_id) 

207 if not expert_id or not password: 

208 return None 

209 

210 with get_conn() as conn: 

211 row = conn.execute( 

212 """ 

213 SELECT expert_id, display_name, password_hash, is_active 

214 FROM experts 

215 WHERE expert_id = ? 

216 """, 

217 (expert_id,), 

218 ).fetchone() 

219 

220 if not row: 

221 return None 

222 if not bool(row["is_active"]): 

223 return None 

224 if not verify_password(password, row["password_hash"]): 

225 return None 

226 

227 return { 

228 "expert_id": row["expert_id"], 

229 "display_name": row["display_name"], 

230 } 

231 

232 

233def ensure_video_assignment_rows(video_ids: List[str]) -> None: 

234 # Keeps assignment table in sync with downloads folders. 

235 now = utc_now_iso() 

236 normalized = sorted({(video_id or "").strip() for video_id in video_ids if (video_id or "").strip()}) 

237 if not normalized: 

238 return 

239 

240 with get_conn() as conn: 

241 for video_id in normalized: 

242 conn.execute( 

243 """ 

244 INSERT OR IGNORE INTO video_assignments ( 

245 video_id, expert_id, assignment_source, assigned_at, updated_at 

246 ) VALUES (?, NULL, 'unassigned', NULL, ?) 

247 """, 

248 (video_id, now), 

249 ) 

250 conn.commit() 

251 

252def add_video_assignment(video_id,expert_id,source : str = "admin"): 

253 # Upsert a video-expert pair into the many-to-many table. 

254 video_id = (video_id or "").strip() 

255 expert_id = normalize_expert_id(expert_id) 

256 

257 if not video_id: 

258 raise ValueError ("video_id is required") 

259 

260 if not expert_id: 

261 raise ValueError("expert_id is required") 

262 

263 if source not in {"admin", "expert_claim", "unassigned"}: 

264 raise ValueError("invalid assignment source") 

265 # If the pair already exists, just update the source and timestamp. 

266 now = utc_now_iso() 

267 with get_conn() as conn: 

268 conn.execute(""" 

269 INSERT INTO video_expert_assignments (video_id, expert_id, assignment_source, assigned_at, updated_at) 

270 VALUES (?, ?, ?, ?, ?) 

271 ON CONFLICT(video_id, expert_id) DO UPDATE SET 

272 assignment_source = excluded.assignment_source, 

273 updated_at = excluded.updated_at """,(video_id,expert_id,source,now,now)) 

274 conn.commit() 

275 

276def remove_video_assignment(video_id, expert_id): 

277 video_id = (video_id or "").strip() 

278 expert_id = (expert_id or "").strip() 

279 

280 if not video_id: 

281 raise ValueError ("video_id is required") 

282 

283 if not expert_id: 

284 raise ValueError("expert_id is required") 

285 

286 with get_conn() as conn: 

287 conn.execute( 

288 "DELETE FROM video_expert_assignments WHERE video_id = ? AND expert_id = ?", 

289 (video_id, expert_id) 

290 ) 

291 

292 conn.commit() 

293 

294def list_experts_for_video(video_id:str): 

295 

296 video_id = (video_id or "").strip() 

297 if not video_id: 

298 raise ValueError("video_id is required") 

299 

300 with get_conn() as conn: 

301 rows = conn.execute(""" 

302 SELECT vea.*, e.display_name AS expert_name 

303 FROM video_expert_assignments vea 

304 JOIN experts e ON e.expert_id = vea.expert_id 

305 WHERE vea.video_id = ? 

306 """, (video_id,)).fetchall() 

307 return [dict(row) for row in rows] 

308 

309def list_video_assignments() -> List[Dict[str, Any]]: 

310 # Returns all video-expert pairs — one row per pair, not one row per video. 

311 with get_conn() as conn: 

312 rows = conn.execute( 

313 """ 

314 SELECT vea.video_id, vea.expert_id, vea.assignment_source, vea.assigned_at, vea.updated_at, 

315 e.display_name AS expert_name 

316 FROM video_expert_assignments vea 

317 JOIN experts e ON e.expert_id = vea.expert_id 

318 ORDER BY vea.video_id ASC 

319 """ 

320 ).fetchall() 

321 

322 return [dict(row) for row in rows] 

323 

324def list_video_ids_for_expert(expert_id: str) -> List[str]: 

325 expert_id = normalize_expert_id(expert_id) 

326 if not expert_id: 

327 return [] 

328 

329 with get_conn() as conn: 

330 rows = conn.execute( 

331 """ 

332 SELECT DISTINCT video_id 

333 FROM video_expert_assignments 

334 WHERE lower(trim(expert_id)) = ? 

335 ORDER BY video_id ASC 

336 """, 

337 (expert_id,), 

338 ).fetchall() 

339 

340 return [str(row["video_id"]) for row in rows] 

341 

342def can_expert_access_video(expert_id: str, video_id: str) -> bool: 

343 # Pair-based check — True if this expert has any assignment row for this video. 

344 expert_id = normalize_expert_id(expert_id) 

345 video_id = (video_id or "").strip().lower() 

346 if not expert_id or not video_id: 

347 return False 

348 

349 with get_conn() as conn: 

350 row = conn.execute( 

351 """ 

352 SELECT 1 

353 FROM video_expert_assignments 

354 WHERE lower(trim(video_id)) = ? AND lower(trim(expert_id)) = ? 

355 """, 

356 (video_id, expert_id), 

357 ).fetchone() 

358 

359 return row is not None 

360 

361def claim_video_for_expert(expert_id: str, video_id: str) -> None: 

362 # Assigned-only mode: confirm existing assignment, do not create new access. 

363 expert_id = normalize_expert_id(expert_id) 

364 normalized_video_id = (video_id or "").strip().lower() 

365 if not expert_id or not normalized_video_id: 

366 raise ValueError("expert_id and video_id are required") 

367 

368 now = utc_now_iso() 

369 with get_conn() as conn: 

370 existing = conn.execute( 

371 """ 

372 SELECT 1 

373 FROM video_expert_assignments 

374 WHERE lower(trim(video_id)) = ? AND lower(trim(expert_id)) = ? 

375 """, 

376 (normalized_video_id, expert_id), 

377 ).fetchone() 

378 if not existing: 

379 raise RuntimeError("assignment_not_found") 

380 

381 conn.execute( 

382 """ 

383 UPDATE video_expert_assignments 

384 SET updated_at = ? 

385 WHERE lower(trim(video_id)) = ? AND lower(trim(expert_id)) = ? 

386 """, 

387 (now, normalized_video_id, expert_id), 

388 ) 

389 conn.commit()