Integrating Multiple LLM APIs for Brand Monitoring: Technical Guide
Learn how to integrate and manage multiple LLM APIs (ChatGPT, Perplexity, Claude, Gemini) for comprehensive brand monitoring. Technical implementation with code examples and best practices.
Comprehensive brand monitoring requires integrating multiple LLM providers to ensure complete visibility across all AI platforms. Each provider has unique APIs, rate limits, and response formats. This guide provides technical implementation details for building a unified system that manages multiple LLM integrations efficiently.
Why Multiple LLM Integration?
Integrating multiple LLM providers offers several advantages:
- Comprehensive Coverage: Different platforms serve different user bases
- Redundancy: Fallback options if one provider fails
- Response Diversity: Different models may provide different insights
- Cost Optimization: Use cheaper providers for simple queries
- Performance: Parallel queries reduce total response time
Individual Provider Integrations
https://api.openai.com/v1/chat/completionsKey Features:
Implementation:
import openai
from typing import List, Dict
class OpenAIClient:
def __init__(self, api_key: str):
self.client = openai.OpenAI(api_key=api_key)
async def query_brand(
self,
brand: str,
prompts: List[str],
model: str = "gpt-4"
) -> List[Dict]:
"""Query OpenAI API for brand mentions"""
results = []
for prompt in prompts:
response = self.client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=1000
)
results.append({
'platform': 'chatgpt',
'prompt': prompt,
'response': response.choices[0].message.content,
'model': model,
'usage': response.usage.dict()
})
return resultshttps://api.anthropic.com/v1/messagesKey Features:
Implementation:
import anthropic
from typing import List, Dict
class ClaudeClient:
def __init__(self, api_key: str):
self.client = anthropic.Anthropic(api_key=api_key)
async def query_brand(
self,
brand: str,
prompts: List[str],
model: str = "claude-3-5-sonnet-20241022"
) -> List[Dict]:
"""Query Claude API for brand mentions"""
results = []
for prompt in prompts:
message = self.client.messages.create(
model=model,
max_tokens=1024,
messages=[
{"role": "user", "content": prompt}
]
)
results.append({
'platform': 'claude',
'prompt': prompt,
'response': message.content[0].text,
'model': model,
'usage': {
'input_tokens': message.usage.input_tokens,
'output_tokens': message.usage.output_tokens
}
})
return resultshttps://api.perplexity.ai/chat/completionsKey Features:
Implementation:
import requests
from typing import List, Dict
class PerplexityClient:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.perplexity.ai"
async def query_brand(
self,
brand: str,
prompts: List[str],
model: str = "llama-3.1-sonar-large-128k-online"
) -> List[Dict]:
"""Query Perplexity API with web search"""
results = []
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
for prompt in prompts:
response = requests.post(
f"{self.base_url}/chat/completions",
headers=headers,
json={
"model": model,
"messages": [
{"role": "user", "content": prompt}
],
"temperature": 0.7,
"max_tokens": 1000
}
)
data = response.json()
results.append({
'platform': 'perplexity',
'prompt': prompt,
'response': data['choices'][0]['message']['content'],
'citations': data.get('citations', []),
'model': model
})
return resultshttps://generativelanguage.googleapis.com/v1beta/modelsKey Features:
Implementation:
import google.generativeai as genai
from typing import List, Dict
class GeminiClient:
def __init__(self, api_key: str):
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel('gemini-pro')
async def query_brand(
self,
brand: str,
prompts: List[str]
) -> List[Dict]:
"""Query Google Gemini API"""
results = []
for prompt in prompts:
response = self.model.generate_content(
prompt,
generation_config={
"temperature": 0.7,
"max_output_tokens": 1000,
}
)
results.append({
'platform': 'gemini',
'prompt': prompt,
'response': response.text,
'model': 'gemini-pro'
})
return resultsUnified Client Architecture
from abc import ABC, abstractmethod
from typing import List, Dict, Optional
import asyncio
from enum import Enum
class LLMProvider(Enum):
OPENAI = "openai"
CLAUDE = "claude"
PERPLEXITY = "perplexity"
GEMINI = "gemini"
class BaseLLMClient(ABC):
@abstractmethod
async def query(self, prompt: str, **kwargs) -> Dict:
"""Query the LLM provider"""
pass
@abstractmethod
def get_provider_name(self) -> str:
"""Get provider name"""
pass
class UnifiedLLMClient:
def __init__(self):
self.clients = {
LLMProvider.OPENAI: OpenAIClient(OPENAI_API_KEY),
LLMProvider.CLAUDE: ClaudeClient(CLAUDE_API_KEY),
LLMProvider.PERPLEXITY: PerplexityClient(PERPLEXITY_API_KEY),
LLMProvider.GEMINI: GeminiClient(GEMINI_API_KEY),
}
self.rate_limiters = {}
self.circuit_breakers = {}
async def query_all_providers(
self,
prompt: str,
providers: Optional[List[LLMProvider]] = None
) -> Dict[str, Dict]:
"""Query all or specified providers concurrently"""
if providers is None:
providers = list(LLMProvider)
tasks = []
for provider in providers:
if self._is_circuit_open(provider):
tasks.append(self._query_with_fallback(provider, prompt))
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
provider.value: result
for provider, result in zip(providers, results)
if not isinstance(result, Exception)
}
async def _query_with_fallback(
self,
provider: LLMProvider,
prompt: str
) -> Dict:
"""Query with automatic fallback and retry logic"""
max_retries = 3
backoff_factor = 2
for attempt in range(max_retries):
try:
if not self._check_rate_limit(provider):
await asyncio.sleep(backoff_factor ** attempt)
continue
result = await self.clients[provider].query(prompt)
self._record_success(provider)
return result
except Exception as e:
if attempt == max_retries - 1:
self._record_failure(provider)
raise
await asyncio.sleep(backoff_factor ** attempt)
return NoneRate Limiting and Circuit Breakers
import time
from collections import deque
from typing import Dict
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, don't send requests
HALF_OPEN = "half_open" # Testing if service recovered
class RateLimiter:
def __init__(self, max_requests: int, time_window: int):
self.max_requests = max_requests
self.time_window = time_window
self.requests = deque()
def is_allowed(self) -> bool:
"""Check if request is allowed under rate limit"""
now = time.time()
# Remove old requests outside time window
while self.requests and self.requests[0] < now - self.time_window:
self.requests.popleft()
if len(self.requests) < self.max_requests:
self.requests.append(now)
return True
return False
def get_wait_time(self) -> float:
"""Get time to wait before next request"""
if not self.requests:
return 0.0
oldest = self.requests[0]
return max(0, self.time_window - (time.time() - oldest))
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
success_threshold: int = 2
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.failure_count = 0
self.success_count = 0
self.state = CircuitState.CLOSED
self.last_failure_time = None
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection"""
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._record_success()
return result
except Exception as e:
self._record_failure()
raise e
def _record_success(self):
"""Record successful call"""
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
def _record_failure(self):
"""Record failed call"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPENBest Practices
1. API Key Management
- Store API keys in environment variables or secure key management systems
- Rotate keys regularly for security
- Use separate keys for development and production
- Monitor API key usage and set up alerts for unusual activity
2. Error Handling
- Implement exponential backoff for retries
- Handle rate limit errors gracefully with wait times
- Log all errors for debugging and monitoring
- Implement fallback mechanisms when providers fail
3. Response Normalization
- Create a unified response format across all providers
- Extract common fields (text, citations, metadata)
- Handle provider-specific features (citations, function calls)
- Normalize error responses for consistent handling
4. Cost Optimization
- Use cheaper models for simple queries
- Implement caching to avoid duplicate API calls
- Batch requests when possible
- Monitor token usage and optimize prompt lengths
5. Monitoring and Observability
- Track API response times and success rates
- Monitor rate limit usage and remaining quotas
- Set up alerts for provider failures
- Log all API interactions for audit trails
Need Help with LLM Integration?
Elatify's AI Visibility Agent includes pre-built integrations for all major LLM providers with rate limiting, circuit breakers, and unified response handling. Get comprehensive brand monitoring without the integration complexity.
