Elatify
AI Visibility Analytics

AI Visibility Analytics: Real-Time Monitoring and Alerting Systems

Learn how to implement real-time monitoring, alerting, and analytics dashboards for AI visibility tracking. Technical guide with implementation details for streaming data pipelines, alert engines, and dashboard APIs.

11 min read
January 11, 2025

Real-time monitoring and alerting are essential for proactive brand visibility management. When your brand's visibility changes significantly, you need to know immediately—not days later. This guide provides technical implementation details for building real-time analytics systems that track brand mentions, detect anomalies, and send alerts across multiple channels.

Real-Time Monitoring Architecture

A real-time monitoring system consists of several key components:

  1. Stream Processing: Process LLM responses as they arrive
  2. Analytics Engine: Calculate metrics in real-time
  3. Alert Engine: Detect significant changes and trigger alerts
  4. Time-Series Storage: Store metrics for historical analysis
  5. Dashboard API: Serve real-time data to dashboards
  6. Notification System: Deliver alerts via multiple channels

Core Components

Real-Time Data Pipeline
Stream processing architecture for real-time brand mention tracking
from kafka import KafkaConsumer, KafkaProducer
import json
import asyncio
from typing import Dict, List

class RealTimeMonitoringPipeline:
    def __init__(self):
        self.consumer = KafkaConsumer(
            'llm-responses',
            bootstrap_servers=['localhost:9092'],
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        self.producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        self.alert_engine = AlertEngine()
        self.analytics_engine = AnalyticsEngine()
    
    async def process_stream(self):
        """Process incoming LLM responses in real-time"""
        for message in self.consumer:
            response_data = message.value
            
            # Extract brand mentions
            mentions = self._extract_mentions(response_data)
            
            # Update analytics in real-time
            await self.analytics_engine.update_metrics(mentions)
            
            # Check for alert conditions
            alerts = await self.alert_engine.check_alerts(mentions)
            
            # Send alerts if needed
            if alerts:
                await self._send_alerts(alerts)
            
            # Store in time-series database
            await self._store_metrics(mentions)
    
    def _extract_mentions(self, response_data: Dict) -> List[Dict]:
        """Extract brand mentions from LLM response"""
        # Implementation for mention extraction
        pass
Alert Engine
Intelligent alerting system for significant brand visibility changes
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import statistics

class AlertEngine:
    def __init__(self):
        self.alert_rules = {
            'sov_drop': {
                'threshold': -0.10,  # 10% drop
                'time_window': timedelta(hours=24),
                'min_confidence': 0.95
            },
            'competitor_surge': {
                'threshold': 0.15,  # 15% increase
                'time_window': timedelta(hours=24),
                'competitor': None  # Any competitor
            },
            'new_mention_source': {
                'threshold': 1,  # At least 1 new source
                'time_window': timedelta(hours=1)
            }
        }
    
    async def check_alerts(
        self,
        current_mentions: Dict,
        historical_data: Optional[Dict] = None
    ) -> List[Dict]:
        """Check if current data triggers any alerts"""
        alerts = []
        
        # Check SOV drop
        if self._check_sov_drop(current_mentions, historical_data):
            alerts.append({
                'type': 'sov_drop',
                'severity': 'high',
                'message': f"Share of Voice dropped by {current_mentions.get('sov_change', 0):.2f}%",
                'timestamp': datetime.utcnow()
            })
        
        # Check competitor surge
        competitor_alerts = self._check_competitor_surge(current_mentions, historical_data)
        alerts.extend(competitor_alerts)
        
        # Check new mention sources
        if self._check_new_sources(current_mentions):
            alerts.append({
                'type': 'new_source',
                'severity': 'info',
                'message': 'New citation source detected',
                'timestamp': datetime.utcnow()
            })
        
        return alerts
    
    def _check_sov_drop(
        self,
        current: Dict,
        historical: Optional[Dict]
    ) -> bool:
        """Check if SOV dropped significantly"""
        if not historical:
            return False
        
        current_sov = current.get('sov', 0)
        historical_sov = historical.get('sov', 0)
        
        if historical_sov == 0:
            return False
        
        change = (current_sov - historical_sov) / historical_sov
        rule = self.alert_rules['sov_drop']
        
        return change <= rule['threshold']
Time-Series Analytics
Efficient storage and querying of time-series brand visibility data
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List
import numpy as np

class TimeSeriesAnalytics:
    def __init__(self, db_connection):
        self.db = db_connection
    
    async def store_metrics(self, metrics: Dict):
        """Store metrics in time-series database"""
        query = """
        INSERT INTO brand_metrics (
            timestamp, brand, platform, sov, mentions_count,
            sentiment_score, citations_count
        ) VALUES ($1, $2, $3, $4, $5, $6, $7)
        """
        await self.db.execute(
            query,
            datetime.utcnow(),
            metrics['brand'],
            metrics['platform'],
            metrics['sov'],
            metrics['mentions_count'],
            metrics['sentiment_score'],
            metrics['citations_count']
        )
    
    async def get_trending_metrics(
        self,
        brand: str,
        time_range: timedelta = timedelta(days=30)
    ) -> pd.DataFrame:
        """Get trending metrics for a brand"""
        query = """
        SELECT 
            timestamp,
            platform,
            sov,
            mentions_count,
            sentiment_score
        FROM brand_metrics
        WHERE brand = $1
        AND timestamp >= $2
        ORDER BY timestamp ASC
        """
        results = await self.db.fetch(
            query,
            brand,
            datetime.utcnow() - time_range
        )
        
        df = pd.DataFrame(results)
        
        # Calculate moving averages
        df['sov_ma_7d'] = df.groupby('platform')['sov'].transform(
            lambda x: x.rolling(window=7, min_periods=1).mean()
        )
        df['sov_ma_30d'] = df.groupby('platform')['sov'].transform(
            lambda x: x.rolling(window=30, min_periods=1).mean()
        )
        
        # Calculate trends
        df['sov_trend'] = df.groupby('platform')['sov'].transform(
            lambda x: x.diff()
        )
        
        return df
    
    def detect_anomalies(self, df: pd.DataFrame) -> List[Dict]:
        """Detect anomalies in time-series data"""
        anomalies = []
        
        for platform in df['platform'].unique():
            platform_data = df[df['platform'] == platform]['sov']
            
            # Use Z-score for anomaly detection
            mean = platform_data.mean()
            std = platform_data.std()
            
            if std > 0:
                z_scores = np.abs((platform_data - mean) / std)
                anomaly_indices = z_scores[z_scores > 3].index
                
                for idx in anomaly_indices:
                    anomalies.append({
                        'platform': platform,
                        'timestamp': df.loc[idx, 'timestamp'],
                        'sov': df.loc[idx, 'sov'],
                        'z_score': z_scores.loc[idx]
                    })
        
        return anomalies

Dashboard and Notification Components

Real-Time Dashboard API
RESTful API for serving real-time analytics data
from fastapi import FastAPI, WebSocket
from typing import List, Dict
import asyncio
import json

app = FastAPI()
active_connections = set()

class DashboardAPI:
    @app.get("/api/v1/analytics/current")
    async def get_current_metrics(brand: str) -> Dict:
        """Get current brand visibility metrics"""
        analytics = AnalyticsEngine()
        return await analytics.get_current_metrics(brand)
    
    @app.get("/api/v1/analytics/trends")
    async def get_trends(
        brand: str,
        days: int = 30,
        platform: str = None
    ) -> Dict:
        """Get trending metrics over time"""
        analytics = TimeSeriesAnalytics(db)
        df = await analytics.get_trending_metrics(brand, timedelta(days=days))
        
        if platform:
            df = df[df['platform'] == platform]
        
        return {
            'data': df.to_dict('records'),
            'summary': {
                'current_sov': df['sov'].iloc[-1] if len(df) > 0 else 0,
                'avg_sov': df['sov'].mean(),
                'trend': 'up' if df['sov'].iloc[-1] > df['sov'].iloc[0] else 'down'
            }
        }
    
    @app.websocket("/ws/analytics")
    async def websocket_endpoint(websocket: WebSocket):
        """WebSocket endpoint for real-time updates"""
        await websocket.accept()
        active_connections.add(websocket)
        
        try:
            while True:
                # Send updates every 5 seconds
                metrics = await get_latest_metrics()
                await websocket.send_json(metrics)
                await asyncio.sleep(5)
        except:
            active_connections.remove(websocket)
Alert Notification System
Multi-channel alert delivery (Slack, Email, Webhook)
import aiohttp
import smtplib
from email.mime.text import MIMEText
from typing import Dict, List

class AlertNotifier:
    def __init__(self):
        self.slack_webhook = SLACK_WEBHOOK_URL
        self.email_config = EMAIL_CONFIG
        self.webhook_urls = WEBHOOK_URLS
    
    async def send_alert(self, alert: Dict, channels: List[str]):
        """Send alert through specified channels"""
        tasks = []
        
        if 'slack' in channels:
            tasks.append(self._send_slack_alert(alert))
        
        if 'email' in channels:
            tasks.append(self._send_email_alert(alert))
        
        if 'webhook' in channels:
            tasks.append(self._send_webhook_alert(alert))
        
        await asyncio.gather(*tasks, return_exceptions=True)
    
    async def _send_slack_alert(self, alert: Dict):
        """Send alert to Slack"""
        async with aiohttp.ClientSession() as session:
            payload = {
                'text': f"🚨 Brand Visibility Alert: {alert['message']}",
                'attachments': [{
                    'color': 'danger' if alert['severity'] == 'high' else 'warning',
                    'fields': [
                        {'title': 'Type', 'value': alert['type'], 'short': True},
                        {'title': 'Severity', 'value': alert['severity'], 'short': True},
                        {'title': 'Time', 'value': alert['timestamp'].isoformat(), 'short': False}
                    ]
                }]
            }
            async with session.post(self.slack_webhook, json=payload) as resp:
                return await resp.json()
    
    async def _send_email_alert(self, alert: Dict):
        """Send alert via email"""
        msg = MIMEText(f"Brand Visibility Alert\n\n{alert['message']}")
        msg['Subject'] = f"Alert: {alert['type']}"
        msg['From'] = self.email_config['from']
        msg['To'] = self.email_config['to']
        
        with smtplib.SMTP(self.email_config['smtp_server']) as server:
            server.send_message(msg)

Best Practices

1. Stream Processing

  • Use Kafka or similar message queue for reliable stream processing
  • Implement idempotent processing to handle duplicate messages
  • Use consumer groups for parallel processing and scalability
  • Monitor consumer lag to ensure real-time processing

2. Alert Tuning

  • Avoid alert fatigue by setting appropriate thresholds
  • Use confidence intervals to reduce false positives
  • Implement alert deduplication to prevent spam
  • Create alert escalation policies for critical issues

3. Time-Series Optimization

  • Use specialized time-series databases (InfluxDB, TimescaleDB)
  • Implement data retention policies to manage storage
  • Use downsampling for long-term historical data
  • Index timestamp and brand columns for fast queries

4. Dashboard Performance

  • Use WebSockets for real-time updates instead of polling
  • Implement data aggregation for faster dashboard loads
  • Cache frequently accessed metrics
  • Use pagination for large datasets

Alert Types and Thresholds

High Priority Alerts
  • SOV drop > 10% in 24 hours
  • Competitor surge > 15%
  • Negative sentiment spike
  • Complete absence of mentions
Medium Priority Alerts
  • SOV change > 5% in 24 hours
  • New citation source detected
  • Platform-specific visibility change
  • Trend reversal detected

Need Real-Time Brand Monitoring?

Elatify's AI Visibility Agent includes real-time monitoring, intelligent alerting, and comprehensive dashboards. Get instant notifications when your brand visibility changes and track trends in real-time.

Related Insights

Building an AI Visibility Monitoring System
Technical architecture guide for building a production-ready monitoring system.
LLM Brand Monitoring: Share of Voice Calculation
Learn how to calculate share of voice with statistical reliability.