# structured_logger.py
import os
import random
from datetime import datetime
from typing import Any, Dict, Optional
from pymongo import MongoClient
from services.mongo_service import get_tracker
from services.llm_service import get_reaction_emoji, get_reply_message
[docs]
class SlackNameCache:
"""
Caches user/channel lookups to avoid wasting resources.
"""
def __init__(self, client, ttl_seconds: int = 3600):
self.client = client
self.ttl = ttl_seconds
self._users: Dict[str, Dict[str, Any]] = {}
self._channels: Dict[str, Dict[str, Any]] = {}
def _fresh(self, rec: Dict[str, Any]) -> bool:
return (datetime.utcnow().timestamp() - rec["cached_at"]) < self.ttl
[docs]
def user_name(self, user_id: Optional[str]) -> Optional[str]:
if not user_id:
return None
rec = self._users.get(user_id)
if rec and self._fresh(rec):
return rec["name"]
try:
info = self.client.users_info(user=user_id)
profile = info["user"].get("profile", {})
name = (
profile.get("display_name")
or profile.get("real_name")
or info["user"].get("name")
or user_id
)
self._users[user_id] = {"name": name, "cached_at": datetime.utcnow().timestamp()}
return name
except Exception:
self._users[user_id] = {"name": user_id, "cached_at": datetime.utcnow().timestamp()}
return user_id
[docs]
def channel_name(self, channel_id: Optional[str]) -> Optional[str]:
if not channel_id:
return None
rec = self._channels.get(channel_id)
if rec and self._fresh(rec):
return rec["name"]
try:
info = self.client.conversations_info(channel=channel_id)
name = info["channel"].get("name") or channel_id
self._channels[channel_id] = {"name": name, "cached_at": datetime.utcnow().timestamp()}
return name
except Exception:
self._channels[channel_id] = {"name": channel_id, "cached_at": datetime.utcnow().timestamp()}
return channel_id
[docs]
def install_structured_message_logging(app, client, cfg=None, log_file: str = None):
"""
Installs a Bolt event handler that saves messages to MongoDB,
organized by workspace and channel. Also optionally adds emoji reactions via LLM.
"""
mongo_client = MongoClient(os.getenv("MONGO_URI"))
db = mongo_client["vibecheck"]
installations_col = db["installations"]
cache = SlackNameCache(client)
@app.event("message")
def _on_message(event, body):
team_id = (
body.get("team_id")
or (body.get("authorizations") or [{}])[0].get("team_id")
or None
)
channel_id = event.get("channel")
user_id = event.get("user")
# Look up team name to use as collection name
team_name = None
if team_id:
record = installations_col.find_one({"team_id": team_id})
if record:
team_name = record.get("team_name")
collection_name = f"messages_{team_name}" if team_name else f"messages_{team_id or 'unknown'}"
messages_col = db[collection_name]
row = {
"ingested_at_utc": datetime.utcnow().isoformat(timespec="seconds") + "Z",
"team_id": team_id,
"channel_id": channel_id,
"channel_name": cache.channel_name(channel_id),
"user_id": user_id,
"user_name": cache.user_name(user_id),
"ts": event.get("ts"),
"thread_ts": event.get("thread_ts"),
"subtype": event.get("subtype"),
"text": event.get("text"),
}
try:
messages_col.insert_one(row)
except Exception as e:
print(f"[LOGGER] Failed to save message to MongoDB: {e}")
# Count this as a response to the active prompt in this channel,
# but only for real user messages (not bot posts or subtypes).
# Exclude bot_id to skip reactions on messages from bots (including this bot)
# Allow "file_share" subtype so image posts still get reactions/replies.
if user_id and (not event.get("subtype") or event.get("subtype") == "file_share") and not event.get("bot_id"):
tracker = get_tracker()
if tracker and team_id:
tracker.record_response(channel_id, team_id)
# Add LLM-generated emoji reaction (if enabled and probabilistically)
if cfg and cfg.llm_reactions_enabled:
if random.random() < cfg.llm_reactions_probability:
text = event.get("text") or ""
timestamp = event.get("ts")
# Extract image URLs from files attached to the message
image_urls = [
f["url_private"]
for f in event.get("files", [])
if f.get("mimetype", "").startswith("image/") and f.get("url_private")
]
# Resolve the workspace bot token up front (needed for image download + reactions)
reaction_token = None
if team_id:
record = installations_col.find_one({"team_id": team_id})
if record:
reaction_token = record.get("bot_token")
# Validate we have content (text or images) and a timestamp before calling LLM
if (text or image_urls) and timestamp:
emoji = get_reaction_emoji(
text,
image_urls=image_urls,
slack_token=reaction_token or cfg.token,
)
if emoji:
try:
from slack_sdk import WebClient as _WebClient
reaction_client = _WebClient(token=reaction_token or cfg.token)
reaction_client.reactions_add(
channel=channel_id,
timestamp=timestamp,
name=emoji
)
print(f"[REACTION] Added emoji :{emoji}: to message in {cache.channel_name(channel_id)}")
except Exception as e:
print(f"[REACTION] Failed to add emoji reaction ({emoji}): {e}")
# Send an LLM-generated reply (if enabled and probabilistically)
if cfg and cfg.llm_replies_enabled:
if random.random() < cfg.llm_replies_probability:
text = event.get("text") or ""
timestamp = event.get("ts")
# Extract image URLs from files attached to the message
image_urls = [
f["url_private"]
for f in event.get("files", [])
if f.get("mimetype", "").startswith("image/") and f.get("url_private")
]
# Resolve the workspace bot token
reply_token = None
if team_id:
record = installations_col.find_one({"team_id": team_id})
if record:
reply_token = record.get("bot_token")
if (text or image_urls) and timestamp:
reply_text = get_reply_message(
text,
image_urls=image_urls,
slack_token=reply_token or cfg.token,
)
if reply_text:
try:
from slack_sdk import WebClient as _WebClient
reply_client = _WebClient(token=reply_token or cfg.token)
reply_client.chat_postMessage(
channel=channel_id,
text=reply_text,
)
print(f"[REPLY] Sent reply in {cache.channel_name(channel_id)}")
except Exception as e:
print(f"[REPLY] Failed to send reply: {e}")
# check + announce streaks for real user messages
if user_id and not event.get("bot_id") and channel_id:
try:
from services.streak_service import check_and_announce_streak
check_and_announce_streak(user_id, cache.user_name(user_id) or user_id, client, channel_id)
except Exception as e:
print(f"[streaks] {e}")