Core Component
๐
Data Pipeline
Real-time data retrieval from multiple sources, intelligent normalization, caching strategies, and validation to ensure agents work with high-quality data.
01
Pipeline Overview
๐
Query
User input
โ
๐
Fetch
API calls
โ
๐
Merge
Combine sources
โ
๐งน
Normalize
Clean & format
โ
โ
Validate
Quality check
โ
๐พ
Cache
Store results
โ
๐ค
Deliver
To agents
02
Data Sources & Connectors
๐
Academic Sources
OpenAlexActive
api.openalex.org
Rate: 100K/dayFormat: JSON
Semantic ScholarActive
api.semanticscholar.org
Rate: 100/5minFormat: JSON
ArXivActive
export.arxiv.org
Rate: 3/secFormat: Atom/XML
CrossRefActive
api.crossref.org
Rate: 50/secFormat: JSON
๐ข
Business Sources
Yahoo FinanceActive
query1.finance.yahoo.com
Rate: UnlimitedFormat: JSON
SEC EDGARActive
data.sec.gov
Rate: 10/secFormat: JSON/XML
Alpha VantageActive
alphavantage.co
Rate: 5/minFormat: JSON
FREDActive
api.stlouisfed.org
Rate: 120/minFormat: JSON
03
Data Normalization
Raw Input (Multiple Formats)
// OpenAlex format
{
"id": "W2741809807",
"display_name": "Attention Is All...",
"cited_by_count": 98000,
"publication_year": 2017
}
// Semantic Scholar format
{
"paperId": "204e3073...",
"title": "Attention Is All...",
"citationCount": 97842,
"year": 2017
}Normalized Output
// QphiQ unified format
{
"id": "qphiq_paper_123",
"title": "Attention Is All...",
"citations": 98000,
"year": 2017,
"sources": [
"openalex",
"semantic_scholar"
],
"confidence": 0.98,
"lastUpdated": "2024-12-30"
}Normalization Rules
Title
Trim, normalize whitespace, title case
Citations
Take max across sources, flag discrepancies > 5%
Authors
Parse names, deduplicate, link to ORCID
Dates
Convert to ISO 8601, validate ranges
04
Caching Strategy
๐
L1: Edge Cache
TTL: 5 minutesVercel Edge
Vercel Edge CDN caches API responses globally
๐
L2: Query Cache
TTL: 1 hourIn-memory
Deduplicates identical queries within session
๐พ
L3: Result Cache
TTL: 24 hoursRedis/KV
Persists analysis results for quick retrieval
05
Rate Limiting & Throttling
Strategies
1
Token Bucket
Smooth out burst traffic
2
Request Queue
Buffer excess requests
3
Backoff
Exponential retry on 429
4
Fallback
Switch to backup source
Implementation
const rateLimiter = new TokenBucket({
capacity: 100,
refillRate: 10, // per second
});
async function fetchWithLimit(url) {
await rateLimiter.acquire();
try {
return await fetch(url);
} catch (err) {
if (err.status === 429) {
await exponentialBackoff();
return fetchWithLimit(url);
}
throw err;
}
}