Coverage for app \ services \ expert_auth_service.py: 60%
195 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 20:58 -0400
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 20:58 -0400
2#password hasing + safe comparison + random
3import hashlib
4import hmac
5import os
6import re
7from datetime import datetime, timezone
8from typing import Any, Dict, List, Optional
10from app.services.sqlite_store import get_conn
12#label store in a hash string
13PBKDF2_ALGO = "pbkdf2_sha256"
14PBKDF2_ITERATIONS = 120_000
15#rnadom salt by 16 bytes
16SALT_BYTES = 16
17EXPERT_ID_PATTERN = re.compile(r"^[a-z0-9][a-z0-9_-]{2,49}$")
20def utc_now_iso() -> str:
21 # UTC ISO timestamps keep storage simple and sortable.
22 return datetime.now(timezone.utc).isoformat()
24#never store plain text passwsowrds
25def hash_password(password: str) -> str:
26 if not isinstance(password, str) or not password:
27 raise ValueError("Password is required")
29 salt = os.urandom(SALT_BYTES)
30 digest = hashlib.pbkdf2_hmac(
31 "sha256",
32 password.encode("utf-8"),
33 salt,
34 PBKDF2_ITERATIONS,
35 )
36 return f"{PBKDF2_ALGO}${PBKDF2_ITERATIONS}${salt.hex()}${digest.hex()}"
39def verify_password(password: str, stored_hash: str) -> bool:
40 try:
41 algo, iteration_text, salt_hex, digest_hex = stored_hash.split("$", 3)
42 if algo != PBKDF2_ALGO:
43 return False
45 iterations = int(iteration_text)
46 salt = bytes.fromhex(salt_hex)
47 expected_digest = bytes.fromhex(digest_hex)
48 except Exception:
49 return False
51 candidate = hashlib.pbkdf2_hmac(
52 "sha256",
53 password.encode("utf-8"),
54 salt,
55 iterations,
56 )
57 return hmac.compare_digest(candidate, expected_digest)
60def normalize_expert_id(expert_id: str) -> str:
61 return (expert_id or "").strip().lower()
64def is_valid_expert_id(expert_id: str) -> bool:
65 return bool(EXPERT_ID_PATTERN.fullmatch(normalize_expert_id(expert_id)))
68def _row_to_expert(row: Any) -> Dict[str, Any]:
69 return {
70 "expert_id": row["expert_id"],
71 "display_name": row["display_name"],
72 "is_active": bool(row["is_active"]),
73 "created_at": row["created_at"],
74 "updated_at": row["updated_at"],
75 }
78def list_experts() -> List[Dict[str, Any]]:
79 with get_conn() as conn:
80 rows = conn.execute(
81 """
82 SELECT expert_id, display_name, is_active, created_at, updated_at
83 FROM experts
84 ORDER BY expert_id ASC
85 """
86 ).fetchall()
87 return [_row_to_expert(row) for row in rows]
90def get_expert(expert_id: str) -> Optional[Dict[str, Any]]:
91 expert_id = normalize_expert_id(expert_id)
92 if not expert_id:
93 return None
95 with get_conn() as conn:
96 row = conn.execute(
97 """
98 SELECT expert_id, display_name, is_active, created_at, updated_at
99 FROM experts
100 WHERE expert_id = ?
101 """,
102 (expert_id,),
103 ).fetchone()
105 if not row:
106 return None
107 return _row_to_expert(row)
110def create_expert(expert_id: str, display_name: str, password: str) -> Dict[str, Any]:
111 expert_id = normalize_expert_id(expert_id)
112 display_name = (display_name or "").strip()
114 if not is_valid_expert_id(expert_id):
115 raise ValueError("expert_id must be 3-50 chars: lowercase letters, numbers, _ or -")
116 if not display_name:
117 raise ValueError("display_name is required")
118 if not password:
119 raise ValueError("password is required")
121 now = utc_now_iso()
122 password_hash = hash_password(password)
124 try:
125 with get_conn() as conn:
126 conn.execute(
127 """
128 INSERT INTO experts (expert_id, display_name, password_hash, is_active, created_at, updated_at)
129 VALUES (?, ?, ?, 1, ?, ?)
130 """,
131 (expert_id, display_name, password_hash, now, now),
132 )
133 conn.commit()
134 except Exception as exc:
135 if "UNIQUE constraint failed" in str(exc):
136 raise RuntimeError("duplicate_expert_id") from exc
137 raise
139 created = get_expert(expert_id)
140 if not created:
141 raise RuntimeError("create_failed")
142 return created
145def update_expert(
146 expert_id: str,
147 display_name: Optional[str] = None,
148 password: Optional[str] = None,
149 is_active: Optional[bool] = None,
150) -> Optional[Dict[str, Any]]:
151 expert_id = normalize_expert_id(expert_id)
152 if not expert_id:
153 return None
155 updates = []
156 values: List[Any] = []
158 if display_name is not None:
159 cleaned = display_name.strip()
160 if not cleaned:
161 raise ValueError("display_name cannot be empty")
162 updates.append("display_name = ?")
163 values.append(cleaned)
165 if password is not None:
166 if not password:
167 raise ValueError("password cannot be empty")
168 updates.append("password_hash = ?")
169 values.append(hash_password(password))
171 if is_active is not None:
172 updates.append("is_active = ?")
173 values.append(1 if is_active else 0)
175 if not updates:
176 return get_expert(expert_id)
178 updates.append("updated_at = ?")
179 values.append(utc_now_iso())
180 values.append(expert_id)
182 with get_conn() as conn:
183 cur = conn.execute(
184 f"UPDATE experts SET {', '.join(updates)} WHERE expert_id = ?",
185 tuple(values),
186 )
187 conn.commit()
189 if cur.rowcount == 0:
190 return None
191 return get_expert(expert_id)
194def deactivate_expert(expert_id: str) -> Optional[Dict[str, Any]]:
195 return update_expert(expert_id, is_active=False)
197def delete_expert(expert_id: str) -> bool:
198 expert_id = normalize_expert_id(expert_id)
199 with get_conn() as conn:
200 cur = conn.execute("DELETE FROM experts WHERE expert_id = ?", (expert_id,))
201 conn.commit()
202 return cur.rowcount > 0
205def authenticate_expert(expert_id: str, password: str) -> Optional[Dict[str, Any]]:
206 expert_id = normalize_expert_id(expert_id)
207 if not expert_id or not password:
208 return None
210 with get_conn() as conn:
211 row = conn.execute(
212 """
213 SELECT expert_id, display_name, password_hash, is_active
214 FROM experts
215 WHERE expert_id = ?
216 """,
217 (expert_id,),
218 ).fetchone()
220 if not row:
221 return None
222 if not bool(row["is_active"]):
223 return None
224 if not verify_password(password, row["password_hash"]):
225 return None
227 return {
228 "expert_id": row["expert_id"],
229 "display_name": row["display_name"],
230 }
233def ensure_video_assignment_rows(video_ids: List[str]) -> None:
234 # Keeps assignment table in sync with downloads folders.
235 now = utc_now_iso()
236 normalized = sorted({(video_id or "").strip() for video_id in video_ids if (video_id or "").strip()})
237 if not normalized:
238 return
240 with get_conn() as conn:
241 for video_id in normalized:
242 conn.execute(
243 """
244 INSERT OR IGNORE INTO video_assignments (
245 video_id, expert_id, assignment_source, assigned_at, updated_at
246 ) VALUES (?, NULL, 'unassigned', NULL, ?)
247 """,
248 (video_id, now),
249 )
250 conn.commit()
252def add_video_assignment(video_id,expert_id,source : str = "admin"):
253 # Upsert a video-expert pair into the many-to-many table.
254 video_id = (video_id or "").strip()
255 expert_id = normalize_expert_id(expert_id)
257 if not video_id:
258 raise ValueError ("video_id is required")
260 if not expert_id:
261 raise ValueError("expert_id is required")
263 if source not in {"admin", "expert_claim", "unassigned"}:
264 raise ValueError("invalid assignment source")
265 # If the pair already exists, just update the source and timestamp.
266 now = utc_now_iso()
267 with get_conn() as conn:
268 conn.execute("""
269 INSERT INTO video_expert_assignments (video_id, expert_id, assignment_source, assigned_at, updated_at)
270 VALUES (?, ?, ?, ?, ?)
271 ON CONFLICT(video_id, expert_id) DO UPDATE SET
272 assignment_source = excluded.assignment_source,
273 updated_at = excluded.updated_at """,(video_id,expert_id,source,now,now))
274 conn.commit()
276def remove_video_assignment(video_id, expert_id):
277 video_id = (video_id or "").strip()
278 expert_id = (expert_id or "").strip()
280 if not video_id:
281 raise ValueError ("video_id is required")
283 if not expert_id:
284 raise ValueError("expert_id is required")
286 with get_conn() as conn:
287 conn.execute(
288 "DELETE FROM video_expert_assignments WHERE video_id = ? AND expert_id = ?",
289 (video_id, expert_id)
290 )
292 conn.commit()
294def list_experts_for_video(video_id:str):
296 video_id = (video_id or "").strip()
297 if not video_id:
298 raise ValueError("video_id is required")
300 with get_conn() as conn:
301 rows = conn.execute("""
302 SELECT vea.*, e.display_name AS expert_name
303 FROM video_expert_assignments vea
304 JOIN experts e ON e.expert_id = vea.expert_id
305 WHERE vea.video_id = ?
306 """, (video_id,)).fetchall()
307 return [dict(row) for row in rows]
309def list_video_assignments() -> List[Dict[str, Any]]:
310 # Returns all video-expert pairs — one row per pair, not one row per video.
311 with get_conn() as conn:
312 rows = conn.execute(
313 """
314 SELECT vea.video_id, vea.expert_id, vea.assignment_source, vea.assigned_at, vea.updated_at,
315 e.display_name AS expert_name
316 FROM video_expert_assignments vea
317 JOIN experts e ON e.expert_id = vea.expert_id
318 ORDER BY vea.video_id ASC
319 """
320 ).fetchall()
322 return [dict(row) for row in rows]
324def list_video_ids_for_expert(expert_id: str) -> List[str]:
325 expert_id = normalize_expert_id(expert_id)
326 if not expert_id:
327 return []
329 with get_conn() as conn:
330 rows = conn.execute(
331 """
332 SELECT DISTINCT video_id
333 FROM video_expert_assignments
334 WHERE lower(trim(expert_id)) = ?
335 ORDER BY video_id ASC
336 """,
337 (expert_id,),
338 ).fetchall()
340 return [str(row["video_id"]) for row in rows]
342def can_expert_access_video(expert_id: str, video_id: str) -> bool:
343 # Pair-based check — True if this expert has any assignment row for this video.
344 expert_id = normalize_expert_id(expert_id)
345 video_id = (video_id or "").strip().lower()
346 if not expert_id or not video_id:
347 return False
349 with get_conn() as conn:
350 row = conn.execute(
351 """
352 SELECT 1
353 FROM video_expert_assignments
354 WHERE lower(trim(video_id)) = ? AND lower(trim(expert_id)) = ?
355 """,
356 (video_id, expert_id),
357 ).fetchone()
359 return row is not None
361def claim_video_for_expert(expert_id: str, video_id: str) -> None:
362 # Assigned-only mode: confirm existing assignment, do not create new access.
363 expert_id = normalize_expert_id(expert_id)
364 normalized_video_id = (video_id or "").strip().lower()
365 if not expert_id or not normalized_video_id:
366 raise ValueError("expert_id and video_id are required")
368 now = utc_now_iso()
369 with get_conn() as conn:
370 existing = conn.execute(
371 """
372 SELECT 1
373 FROM video_expert_assignments
374 WHERE lower(trim(video_id)) = ? AND lower(trim(expert_id)) = ?
375 """,
376 (normalized_video_id, expert_id),
377 ).fetchone()
378 if not existing:
379 raise RuntimeError("assignment_not_found")
381 conn.execute(
382 """
383 UPDATE video_expert_assignments
384 SET updated_at = ?
385 WHERE lower(trim(video_id)) = ? AND lower(trim(expert_id)) = ?
386 """,
387 (now, normalized_video_id, expert_id),
388 )
389 conn.commit()