Home/Architecture/Data Pipeline
Core Component
๐Ÿ”„

Data Pipeline

Real-time data retrieval from multiple sources, intelligent normalization, caching strategies, and validation to ensure agents work with high-quality data.

01

Pipeline Overview

๐Ÿ”
Query
User input
โ†’
๐ŸŒ
Fetch
API calls
โ†’
๐Ÿ”€
Merge
Combine sources
โ†’
๐Ÿงน
Normalize
Clean & format
โ†’
โœ“
Validate
Quality check
โ†’
๐Ÿ’พ
Cache
Store results
โ†’
๐Ÿ“ค
Deliver
To agents
02

Data Sources & Connectors

๐ŸŽ“

Academic Sources

OpenAlexActive
api.openalex.org
Rate: 100K/dayFormat: JSON
Semantic ScholarActive
api.semanticscholar.org
Rate: 100/5minFormat: JSON
ArXivActive
export.arxiv.org
Rate: 3/secFormat: Atom/XML
CrossRefActive
api.crossref.org
Rate: 50/secFormat: JSON
๐Ÿข

Business Sources

Yahoo FinanceActive
query1.finance.yahoo.com
Rate: UnlimitedFormat: JSON
SEC EDGARActive
data.sec.gov
Rate: 10/secFormat: JSON/XML
Alpha VantageActive
alphavantage.co
Rate: 5/minFormat: JSON
FREDActive
api.stlouisfed.org
Rate: 120/minFormat: JSON
03

Data Normalization

Raw Input (Multiple Formats)

// OpenAlex format
{
  "id": "W2741809807",
  "display_name": "Attention Is All...",
  "cited_by_count": 98000,
  "publication_year": 2017
}

// Semantic Scholar format  
{
  "paperId": "204e3073...",
  "title": "Attention Is All...",
  "citationCount": 97842,
  "year": 2017
}

Normalized Output

// QphiQ unified format
{
  "id": "qphiq_paper_123",
  "title": "Attention Is All...",
  "citations": 98000,
  "year": 2017,
  "sources": [
    "openalex",
    "semantic_scholar"
  ],
  "confidence": 0.98,
  "lastUpdated": "2024-12-30"
}

Normalization Rules

Title
Trim, normalize whitespace, title case
Citations
Take max across sources, flag discrepancies > 5%
Authors
Parse names, deduplicate, link to ORCID
Dates
Convert to ISO 8601, validate ranges
04

Caching Strategy

๐ŸŒ

L1: Edge Cache

TTL: 5 minutesVercel Edge

Vercel Edge CDN caches API responses globally

๐Ÿ”

L2: Query Cache

TTL: 1 hourIn-memory

Deduplicates identical queries within session

๐Ÿ’พ

L3: Result Cache

TTL: 24 hoursRedis/KV

Persists analysis results for quick retrieval

05

Rate Limiting & Throttling

Strategies

1
Token Bucket
Smooth out burst traffic
2
Request Queue
Buffer excess requests
3
Backoff
Exponential retry on 429
4
Fallback
Switch to backup source

Implementation

const rateLimiter = new TokenBucket({
  capacity: 100,
  refillRate: 10, // per second
});

async function fetchWithLimit(url) {
  await rateLimiter.acquire();
  
  try {
    return await fetch(url);
  } catch (err) {
    if (err.status === 429) {
      await exponentialBackoff();
      return fetchWithLimit(url);
    }
    throw err;
  }
}