Elatify
AI Visibility Integration

Integrating Multiple LLM APIs for Brand Monitoring: Technical Guide

Learn how to integrate and manage multiple LLM APIs (ChatGPT, Perplexity, Claude, Gemini) for comprehensive brand monitoring. Technical implementation with code examples and best practices.

13 min read
January 12, 2025

Comprehensive brand monitoring requires integrating multiple LLM providers to ensure complete visibility across all AI platforms. Each provider has unique APIs, rate limits, and response formats. This guide provides technical implementation details for building a unified system that manages multiple LLM integrations efficiently.

Why Multiple LLM Integration?

Integrating multiple LLM providers offers several advantages:

  • Comprehensive Coverage: Different platforms serve different user bases
  • Redundancy: Fallback options if one provider fails
  • Response Diversity: Different models may provide different insights
  • Cost Optimization: Use cheaper providers for simple queries
  • Performance: Parallel queries reduce total response time

Individual Provider Integrations

OpenAI ChatGPT
Endpoint: https://api.openai.com/v1/chat/completions

Key Features:

GPT-4, GPT-3.5
Streaming support
Function calling
JSON mode

Implementation:

import openai
from typing import List, Dict

class OpenAIClient:
    def __init__(self, api_key: str):
        self.client = openai.OpenAI(api_key=api_key)
    
    async def query_brand(
        self,
        brand: str,
        prompts: List[str],
        model: str = "gpt-4"
    ) -> List[Dict]:
        """Query OpenAI API for brand mentions"""
        results = []
        for prompt in prompts:
            response = self.client.chat.completions.create(
                model=model,
                messages=[
                    {"role": "system", "content": "You are a helpful assistant."},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.7,
                max_tokens=1000
            )
            results.append({
                'platform': 'chatgpt',
                'prompt': prompt,
                'response': response.choices[0].message.content,
                'model': model,
                'usage': response.usage.dict()
            })
        return results
Anthropic Claude
Endpoint: https://api.anthropic.com/v1/messages

Key Features:

Claude 3.5 Sonnet
Long context (200K tokens)
Tool use
Constitutional AI

Implementation:

import anthropic
from typing import List, Dict

class ClaudeClient:
    def __init__(self, api_key: str):
        self.client = anthropic.Anthropic(api_key=api_key)
    
    async def query_brand(
        self,
        brand: str,
        prompts: List[str],
        model: str = "claude-3-5-sonnet-20241022"
    ) -> List[Dict]:
        """Query Claude API for brand mentions"""
        results = []
        for prompt in prompts:
            message = self.client.messages.create(
                model=model,
                max_tokens=1024,
                messages=[
                    {"role": "user", "content": prompt}
                ]
            )
            results.append({
                'platform': 'claude',
                'prompt': prompt,
                'response': message.content[0].text,
                'model': model,
                'usage': {
                    'input_tokens': message.usage.input_tokens,
                    'output_tokens': message.usage.output_tokens
                }
            })
        return results
Perplexity AI
Endpoint: https://api.perplexity.ai/chat/completions

Key Features:

Real-time web search
Citations
Multiple models
Streaming

Implementation:

import requests
from typing import List, Dict

class PerplexityClient:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.perplexity.ai"
    
    async def query_brand(
        self,
        brand: str,
        prompts: List[str],
        model: str = "llama-3.1-sonar-large-128k-online"
    ) -> List[Dict]:
        """Query Perplexity API with web search"""
        results = []
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        for prompt in prompts:
            response = requests.post(
                f"{self.base_url}/chat/completions",
                headers=headers,
                json={
                    "model": model,
                    "messages": [
                        {"role": "user", "content": prompt}
                    ],
                    "temperature": 0.7,
                    "max_tokens": 1000
                }
            )
            data = response.json()
            results.append({
                'platform': 'perplexity',
                'prompt': prompt,
                'response': data['choices'][0]['message']['content'],
                'citations': data.get('citations', []),
                'model': model
            })
        return results
Google Gemini
Endpoint: https://generativelanguage.googleapis.com/v1beta/models

Key Features:

Gemini Pro
Multimodal
Function calling
Safety settings

Implementation:

import google.generativeai as genai
from typing import List, Dict

class GeminiClient:
    def __init__(self, api_key: str):
        genai.configure(api_key=api_key)
        self.model = genai.GenerativeModel('gemini-pro')
    
    async def query_brand(
        self,
        brand: str,
        prompts: List[str]
    ) -> List[Dict]:
        """Query Google Gemini API"""
        results = []
        for prompt in prompts:
            response = self.model.generate_content(
                prompt,
                generation_config={
                    "temperature": 0.7,
                    "max_output_tokens": 1000,
                }
            )
            results.append({
                'platform': 'gemini',
                'prompt': prompt,
                'response': response.text,
                'model': 'gemini-pro'
            })
        return results

Unified Client Architecture

Unified LLM Client Architecture
Create a unified interface for multiple LLM providers
from abc import ABC, abstractmethod
from typing import List, Dict, Optional
import asyncio
from enum import Enum

class LLMProvider(Enum):
    OPENAI = "openai"
    CLAUDE = "claude"
    PERPLEXITY = "perplexity"
    GEMINI = "gemini"

class BaseLLMClient(ABC):
    @abstractmethod
    async def query(self, prompt: str, **kwargs) -> Dict:
        """Query the LLM provider"""
        pass
    
    @abstractmethod
    def get_provider_name(self) -> str:
        """Get provider name"""
        pass

class UnifiedLLMClient:
    def __init__(self):
        self.clients = {
            LLMProvider.OPENAI: OpenAIClient(OPENAI_API_KEY),
            LLMProvider.CLAUDE: ClaudeClient(CLAUDE_API_KEY),
            LLMProvider.PERPLEXITY: PerplexityClient(PERPLEXITY_API_KEY),
            LLMProvider.GEMINI: GeminiClient(GEMINI_API_KEY),
        }
        self.rate_limiters = {}
        self.circuit_breakers = {}
    
    async def query_all_providers(
        self,
        prompt: str,
        providers: Optional[List[LLMProvider]] = None
    ) -> Dict[str, Dict]:
        """Query all or specified providers concurrently"""
        if providers is None:
            providers = list(LLMProvider)
        
        tasks = []
        for provider in providers:
            if self._is_circuit_open(provider):
                tasks.append(self._query_with_fallback(provider, prompt))
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        return {
            provider.value: result
            for provider, result in zip(providers, results)
            if not isinstance(result, Exception)
        }
    
    async def _query_with_fallback(
        self,
        provider: LLMProvider,
        prompt: str
    ) -> Dict:
        """Query with automatic fallback and retry logic"""
        max_retries = 3
        backoff_factor = 2
        
        for attempt in range(max_retries):
            try:
                if not self._check_rate_limit(provider):
                    await asyncio.sleep(backoff_factor ** attempt)
                    continue
                
                result = await self.clients[provider].query(prompt)
                self._record_success(provider)
                return result
            
            except Exception as e:
                if attempt == max_retries - 1:
                    self._record_failure(provider)
                    raise
                await asyncio.sleep(backoff_factor ** attempt)
        
        return None

Rate Limiting and Circuit Breakers

Rate Limiting and Circuit Breakers
Implement rate limiting and circuit breakers for API reliability
import time
from collections import deque
from typing import Dict
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"  # Normal operation
    OPEN = "open"      # Failing, don't send requests
    HALF_OPEN = "half_open"  # Testing if service recovered

class RateLimiter:
    def __init__(self, max_requests: int, time_window: int):
        self.max_requests = max_requests
        self.time_window = time_window
        self.requests = deque()
    
    def is_allowed(self) -> bool:
        """Check if request is allowed under rate limit"""
        now = time.time()
        
        # Remove old requests outside time window
        while self.requests and self.requests[0] < now - self.time_window:
            self.requests.popleft()
        
        if len(self.requests) < self.max_requests:
            self.requests.append(now)
            return True
        
        return False
    
    def get_wait_time(self) -> float:
        """Get time to wait before next request"""
        if not self.requests:
            return 0.0
        oldest = self.requests[0]
        return max(0, self.time_window - (time.time() - oldest))

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        success_threshold: int = 2
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold
        self.failure_count = 0
        self.success_count = 0
        self.state = CircuitState.CLOSED
        self.last_failure_time = None
    
    def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker protection"""
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                self.success_count = 0
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            self._record_success()
            return result
        except Exception as e:
            self._record_failure()
            raise e
    
    def _record_success(self):
        """Record successful call"""
        self.failure_count = 0
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
    
    def _record_failure(self):
        """Record failed call"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

Best Practices

1. API Key Management

  • Store API keys in environment variables or secure key management systems
  • Rotate keys regularly for security
  • Use separate keys for development and production
  • Monitor API key usage and set up alerts for unusual activity

2. Error Handling

  • Implement exponential backoff for retries
  • Handle rate limit errors gracefully with wait times
  • Log all errors for debugging and monitoring
  • Implement fallback mechanisms when providers fail

3. Response Normalization

  • Create a unified response format across all providers
  • Extract common fields (text, citations, metadata)
  • Handle provider-specific features (citations, function calls)
  • Normalize error responses for consistent handling

4. Cost Optimization

  • Use cheaper models for simple queries
  • Implement caching to avoid duplicate API calls
  • Batch requests when possible
  • Monitor token usage and optimize prompt lengths

5. Monitoring and Observability

  • Track API response times and success rates
  • Monitor rate limit usage and remaining quotas
  • Set up alerts for provider failures
  • Log all API interactions for audit trails

Need Help with LLM Integration?

Elatify's AI Visibility Agent includes pre-built integrations for all major LLM providers with rate limiting, circuit breakers, and unified response handling. Get comprehensive brand monitoring without the integration complexity.

Related Insights

Building an AI Visibility Monitoring System
Technical architecture guide for building a production-ready monitoring system.
AI Visibility Analytics: Real-Time Monitoring
Learn how to implement real-time monitoring and alerting systems.