AI Visibility Analytics: Real-Time Monitoring and Alerting Systems
Learn how to implement real-time monitoring, alerting, and analytics dashboards for AI visibility tracking. Technical guide with implementation details for streaming data pipelines, alert engines, and dashboard APIs.
Real-time monitoring and alerting are essential for proactive brand visibility management. When your brand's visibility changes significantly, you need to know immediately—not days later. This guide provides technical implementation details for building real-time analytics systems that track brand mentions, detect anomalies, and send alerts across multiple channels.
Real-Time Monitoring Architecture
A real-time monitoring system consists of several key components:
- Stream Processing: Process LLM responses as they arrive
- Analytics Engine: Calculate metrics in real-time
- Alert Engine: Detect significant changes and trigger alerts
- Time-Series Storage: Store metrics for historical analysis
- Dashboard API: Serve real-time data to dashboards
- Notification System: Deliver alerts via multiple channels
Core Components
from kafka import KafkaConsumer, KafkaProducer
import json
import asyncio
from typing import Dict, List
class RealTimeMonitoringPipeline:
def __init__(self):
self.consumer = KafkaConsumer(
'llm-responses',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.alert_engine = AlertEngine()
self.analytics_engine = AnalyticsEngine()
async def process_stream(self):
"""Process incoming LLM responses in real-time"""
for message in self.consumer:
response_data = message.value
# Extract brand mentions
mentions = self._extract_mentions(response_data)
# Update analytics in real-time
await self.analytics_engine.update_metrics(mentions)
# Check for alert conditions
alerts = await self.alert_engine.check_alerts(mentions)
# Send alerts if needed
if alerts:
await self._send_alerts(alerts)
# Store in time-series database
await self._store_metrics(mentions)
def _extract_mentions(self, response_data: Dict) -> List[Dict]:
"""Extract brand mentions from LLM response"""
# Implementation for mention extraction
passfrom typing import Dict, List, Optional
from datetime import datetime, timedelta
import statistics
class AlertEngine:
def __init__(self):
self.alert_rules = {
'sov_drop': {
'threshold': -0.10, # 10% drop
'time_window': timedelta(hours=24),
'min_confidence': 0.95
},
'competitor_surge': {
'threshold': 0.15, # 15% increase
'time_window': timedelta(hours=24),
'competitor': None # Any competitor
},
'new_mention_source': {
'threshold': 1, # At least 1 new source
'time_window': timedelta(hours=1)
}
}
async def check_alerts(
self,
current_mentions: Dict,
historical_data: Optional[Dict] = None
) -> List[Dict]:
"""Check if current data triggers any alerts"""
alerts = []
# Check SOV drop
if self._check_sov_drop(current_mentions, historical_data):
alerts.append({
'type': 'sov_drop',
'severity': 'high',
'message': f"Share of Voice dropped by {current_mentions.get('sov_change', 0):.2f}%",
'timestamp': datetime.utcnow()
})
# Check competitor surge
competitor_alerts = self._check_competitor_surge(current_mentions, historical_data)
alerts.extend(competitor_alerts)
# Check new mention sources
if self._check_new_sources(current_mentions):
alerts.append({
'type': 'new_source',
'severity': 'info',
'message': 'New citation source detected',
'timestamp': datetime.utcnow()
})
return alerts
def _check_sov_drop(
self,
current: Dict,
historical: Optional[Dict]
) -> bool:
"""Check if SOV dropped significantly"""
if not historical:
return False
current_sov = current.get('sov', 0)
historical_sov = historical.get('sov', 0)
if historical_sov == 0:
return False
change = (current_sov - historical_sov) / historical_sov
rule = self.alert_rules['sov_drop']
return change <= rule['threshold']import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List
import numpy as np
class TimeSeriesAnalytics:
def __init__(self, db_connection):
self.db = db_connection
async def store_metrics(self, metrics: Dict):
"""Store metrics in time-series database"""
query = """
INSERT INTO brand_metrics (
timestamp, brand, platform, sov, mentions_count,
sentiment_score, citations_count
) VALUES ($1, $2, $3, $4, $5, $6, $7)
"""
await self.db.execute(
query,
datetime.utcnow(),
metrics['brand'],
metrics['platform'],
metrics['sov'],
metrics['mentions_count'],
metrics['sentiment_score'],
metrics['citations_count']
)
async def get_trending_metrics(
self,
brand: str,
time_range: timedelta = timedelta(days=30)
) -> pd.DataFrame:
"""Get trending metrics for a brand"""
query = """
SELECT
timestamp,
platform,
sov,
mentions_count,
sentiment_score
FROM brand_metrics
WHERE brand = $1
AND timestamp >= $2
ORDER BY timestamp ASC
"""
results = await self.db.fetch(
query,
brand,
datetime.utcnow() - time_range
)
df = pd.DataFrame(results)
# Calculate moving averages
df['sov_ma_7d'] = df.groupby('platform')['sov'].transform(
lambda x: x.rolling(window=7, min_periods=1).mean()
)
df['sov_ma_30d'] = df.groupby('platform')['sov'].transform(
lambda x: x.rolling(window=30, min_periods=1).mean()
)
# Calculate trends
df['sov_trend'] = df.groupby('platform')['sov'].transform(
lambda x: x.diff()
)
return df
def detect_anomalies(self, df: pd.DataFrame) -> List[Dict]:
"""Detect anomalies in time-series data"""
anomalies = []
for platform in df['platform'].unique():
platform_data = df[df['platform'] == platform]['sov']
# Use Z-score for anomaly detection
mean = platform_data.mean()
std = platform_data.std()
if std > 0:
z_scores = np.abs((platform_data - mean) / std)
anomaly_indices = z_scores[z_scores > 3].index
for idx in anomaly_indices:
anomalies.append({
'platform': platform,
'timestamp': df.loc[idx, 'timestamp'],
'sov': df.loc[idx, 'sov'],
'z_score': z_scores.loc[idx]
})
return anomaliesDashboard and Notification Components
from fastapi import FastAPI, WebSocket
from typing import List, Dict
import asyncio
import json
app = FastAPI()
active_connections = set()
class DashboardAPI:
@app.get("/api/v1/analytics/current")
async def get_current_metrics(brand: str) -> Dict:
"""Get current brand visibility metrics"""
analytics = AnalyticsEngine()
return await analytics.get_current_metrics(brand)
@app.get("/api/v1/analytics/trends")
async def get_trends(
brand: str,
days: int = 30,
platform: str = None
) -> Dict:
"""Get trending metrics over time"""
analytics = TimeSeriesAnalytics(db)
df = await analytics.get_trending_metrics(brand, timedelta(days=days))
if platform:
df = df[df['platform'] == platform]
return {
'data': df.to_dict('records'),
'summary': {
'current_sov': df['sov'].iloc[-1] if len(df) > 0 else 0,
'avg_sov': df['sov'].mean(),
'trend': 'up' if df['sov'].iloc[-1] > df['sov'].iloc[0] else 'down'
}
}
@app.websocket("/ws/analytics")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint for real-time updates"""
await websocket.accept()
active_connections.add(websocket)
try:
while True:
# Send updates every 5 seconds
metrics = await get_latest_metrics()
await websocket.send_json(metrics)
await asyncio.sleep(5)
except:
active_connections.remove(websocket)import aiohttp
import smtplib
from email.mime.text import MIMEText
from typing import Dict, List
class AlertNotifier:
def __init__(self):
self.slack_webhook = SLACK_WEBHOOK_URL
self.email_config = EMAIL_CONFIG
self.webhook_urls = WEBHOOK_URLS
async def send_alert(self, alert: Dict, channels: List[str]):
"""Send alert through specified channels"""
tasks = []
if 'slack' in channels:
tasks.append(self._send_slack_alert(alert))
if 'email' in channels:
tasks.append(self._send_email_alert(alert))
if 'webhook' in channels:
tasks.append(self._send_webhook_alert(alert))
await asyncio.gather(*tasks, return_exceptions=True)
async def _send_slack_alert(self, alert: Dict):
"""Send alert to Slack"""
async with aiohttp.ClientSession() as session:
payload = {
'text': f"🚨 Brand Visibility Alert: {alert['message']}",
'attachments': [{
'color': 'danger' if alert['severity'] == 'high' else 'warning',
'fields': [
{'title': 'Type', 'value': alert['type'], 'short': True},
{'title': 'Severity', 'value': alert['severity'], 'short': True},
{'title': 'Time', 'value': alert['timestamp'].isoformat(), 'short': False}
]
}]
}
async with session.post(self.slack_webhook, json=payload) as resp:
return await resp.json()
async def _send_email_alert(self, alert: Dict):
"""Send alert via email"""
msg = MIMEText(f"Brand Visibility Alert\n\n{alert['message']}")
msg['Subject'] = f"Alert: {alert['type']}"
msg['From'] = self.email_config['from']
msg['To'] = self.email_config['to']
with smtplib.SMTP(self.email_config['smtp_server']) as server:
server.send_message(msg)Best Practices
1. Stream Processing
- Use Kafka or similar message queue for reliable stream processing
- Implement idempotent processing to handle duplicate messages
- Use consumer groups for parallel processing and scalability
- Monitor consumer lag to ensure real-time processing
2. Alert Tuning
- Avoid alert fatigue by setting appropriate thresholds
- Use confidence intervals to reduce false positives
- Implement alert deduplication to prevent spam
- Create alert escalation policies for critical issues
3. Time-Series Optimization
- Use specialized time-series databases (InfluxDB, TimescaleDB)
- Implement data retention policies to manage storage
- Use downsampling for long-term historical data
- Index timestamp and brand columns for fast queries
4. Dashboard Performance
- Use WebSockets for real-time updates instead of polling
- Implement data aggregation for faster dashboard loads
- Cache frequently accessed metrics
- Use pagination for large datasets
Alert Types and Thresholds
- SOV drop > 10% in 24 hours
- Competitor surge > 15%
- Negative sentiment spike
- Complete absence of mentions
- SOV change > 5% in 24 hours
- New citation source detected
- Platform-specific visibility change
- Trend reversal detected
Need Real-Time Brand Monitoring?
Elatify's AI Visibility Agent includes real-time monitoring, intelligent alerting, and comprehensive dashboards. Get instant notifications when your brand visibility changes and track trends in real-time.
