Most teams enrich their contact data the same way: export a CSV, upload it to an enrichment tool, wait for results, then import them back. It works, but it's slow, error-prone, and the records start decaying the moment you import them.
A better approach is to enrich records the moment they enter your system. When a new lead fills out a form, when a contact syncs from a partner integration, when a user signs up, the enrichment happens inline, before the record is even fully committed to your CRM.
This article walks through how to build a lightweight, event-driven enrichment pipeline using a person enrichment API. The same architecture works for company enrichment (firmographics like employee count, industry, and growth trends). Just swap the endpoint. The examples use the Data Legion API, but the pattern applies to any enrichment provider.
The Architecture
The pipeline has three components:
- Trigger. An event that fires when a new contact enters the system (webhook, form submission, database insert).
- Enrichment. An API call that takes a sparse record and returns a full profile.
- Write-back. The enriched data gets written to your CRM, database, or downstream system.
New Lead → Your Server → Person Enrich API → CRM / Database
↓ ↓
(email, name) (full profile returned)The key design decision is whether to enrich synchronously (block until the API responds) or asynchronously (fire the request and process the result via callback). For most use cases, synchronous enrichment is simpler and fast enough. Enrichment APIs typically respond in under a second.
Step 1: The Enrichment Call
Here's the core enrichment call. You send whatever identifiers you have. An email is the strongest match signal, but name, phone, LinkedIn URL, and other fields all work.
import os
import requests
response = requests.post(
"https://api.datalegion.ai/person/enrich",
headers={"API-Key": os.environ["DATA_LEGION_API_KEY"]},
json={"email": "jane.doe@techcompany.com"},
)
data = response.json()The API returns a matches array with enriched person records. Each record includes name, job title, company, seniority level, location, phone numbers, emails, work history, education, and social profiles, each with a confidence score.
{
"matches": [
{
"person": {
"full_name": "jane marie doe",
"first_name": "jane",
"last_name": "doe",
"job_title": "senior product manager",
"company_name": "tech company",
"company_domain": "techcompany.com",
"company_industry": "technology, information and internet",
"company_size": "1001-5000",
"seniority_level": "senior",
"job_function": "product",
"city": "san francisco",
"state": "california",
"years_of_experience": 12,
"linkedin_url": "https://www.linkedin.com/in/janedoe",
"work_email": "jane.doe@techcompany.com",
"mobile_phone": "+15551234567",
"emails": [
{
"address": "jane.doe@techcompany.com",
"type": "professional",
"current": true,
"validated": true,
"confidence": "high"
}
],
"phones": [
{
"type": "mobile",
"number": "+15551234567",
"current": true,
"confidence": "high"
}
]
},
"match_metadata": {
"matched_on": ["email"],
"match_type": "exact",
"match_confidence": "high"
}
}
],
"total": 1
}A 404 means no match was found.
Step 2: Wiring It Into a Webhook Handler
In practice, the enrichment call sits inside a webhook handler or API route that fires when a new contact arrives. Here's a complete example using a FastAPI endpoint:
from fastapi import FastAPI
import os
import requests
app = FastAPI()
@app.post("/webhook/new-lead")
def handle_new_lead(lead: dict):
# Step 1: Enrich the lead
person = enrich_person(lead["email"])
if not person:
# No match, save the raw lead as-is
save_to_database(lead)
return {"enriched": False}
# Step 2: Merge enriched data with the original lead
merged = {
"email": lead["email"],
"first_name": person.get("first_name") or lead.get("first_name"),
"last_name": person.get("last_name") or lead.get("last_name"),
"job_title": person.get("job_title"),
"company_name": person.get("company_name"),
"company_domain": person.get("company_domain"),
"seniority_level": person.get("seniority_level"),
"city": person.get("city"),
"state": person.get("state"),
"linkedin_url": person.get("linkedin_url"),
"phone": person.get("mobile_phone"),
}
# Step 3: Upsert into your database or CRM
save_to_database(merged)
return {"enriched": True}
def enrich_person(email):
response = requests.post(
"https://api.datalegion.ai/person/enrich",
headers={"API-Key": os.environ["DATA_LEGION_API_KEY"]},
json={"email": email},
)
if response.status_code == 404:
return None
response.raise_for_status()
data = response.json()
matches = data.get("matches", [])
return matches[0]["person"] if matches else NoneThe upsert pattern is important: use email as the primary key and update existing records rather than creating duplicates. Most CRMs support this natively. Salesforce checks Leads then Contacts; HubSpot uses a unified contacts table.
Step 3: Handling Rate Limits and Errors
The Data Legion API returns rate limit headers on every response:
| Header | Description |
|---|---|
RateLimit-Policy |
Active rate limit policy (e.g. 100/min) |
RateLimit-Limit |
Your rate limit ceiling |
RateLimit-Remaining |
Requests remaining in the current window |
RateLimit-Reset |
Unix timestamp when the limit resets |
When you hit the limit, the API returns HTTP 429 with a Retry-After header. A resilient client handles this with exponential backoff:
import random
import time
def enrich_with_retry(email, max_retries=3):
delay = 1.0
for attempt in range(max_retries + 1):
response = requests.post(
"https://api.datalegion.ai/person/enrich",
headers={"API-Key": os.environ["DATA_LEGION_API_KEY"]},
json={"email": email},
)
if response.status_code == 429:
retry_after = response.headers.get("Retry-After")
wait_time = float(retry_after) if retry_after else delay
# Add jitter to prevent thundering herd
jitter = random.random() * 0.5
time.sleep(wait_time + jitter)
delay *= 2
continue
if response.status_code == 404:
return None
response.raise_for_status()
data = response.json()
matches = data.get("matches", [])
return matches[0]["person"] if matches else None
raise Exception("Max retries exceeded")Two things to note:
- Check rate limit headers proactively, not just on 429s. If
RateLimit-Remainingis low, slow down before you hit the wall. - Add jitter to your backoff. Without it, multiple workers that hit the limit at the same time will all retry at the same time, causing another spike.
Step 4: Caching to Avoid Redundant Lookups
If the same email comes through multiple systems (your marketing platform, your CRM, your support tool) you don't want to enrich it three times. A simple cache prevents duplicate API calls:
_enrichment_cache = {}
CACHE_TTL = 86400 # 24 hours in seconds
def enrich_with_cache(email):
normalized = email.lower().strip()
cached = _enrichment_cache.get(normalized)
if cached and cached["expiry"] > time.time():
return cached["data"]
result = enrich_with_retry(normalized)
_enrichment_cache[normalized] = {
"data": result,
"expiry": time.time() + CACHE_TTL,
}
return resultFor production systems, swap the in-memory dict for Redis or another shared cache. The TTL should match your freshness requirements. 24 hours is a reasonable default for contact data that doesn't change minute to minute.
Batch Enrichment for Historical Data
Real-time enrichment handles the flow of new records. But you likely have thousands of existing records that were never enriched, or were enriched months ago and have gone stale.
For backfills, process records in batches with controlled concurrency to stay within rate limits:
from concurrent.futures import ThreadPoolExecutor
def enrich_batch(emails, concurrency=5):
results = []
with ThreadPoolExecutor(max_workers=concurrency) as pool:
for i in range(0, len(emails), concurrency):
batch = emails[i : i + concurrency]
batch_results = list(pool.map(enrich_with_cache, batch))
results.extend(batch_results)
# Pause between batches to respect rate limits
if i + concurrency < len(emails):
time.sleep(1.5)
return resultsThe hybrid approach is the current best practice: run a batch enrichment job to clean up historical data, then keep real-time enrichment running to ensure every new record arrives fully enriched. This way your database progressively improves without any manual export/import cycles.
Choosing the Right Fields
The enrichment API returns a broad set of fields per person, but you probably don't need all of them for every use case. Use the include_fields body parameter to request only what you need. It reduces response size and makes your pipeline easier to maintain:
curl -X POST "https://api.datalegion.ai/person/enrich" \
-H "Content-Type: application/json" \
-H "API-Key: YOUR_API_KEY" \
-d '{
"email": "jane.doe@techcompany.com",
"include_fields": "full_name,job_title,company_name,seniority_level,emails,phones"
}'Common field selections by use case:
- Sales prospecting:
job_title,seniority_level,company_name,company_size,emails,phones - Lead scoring:
seniority_level,job_function,company_industry,company_size,years_of_experience - Data hygiene:
full_name,emails,phones,locations,current_jobs_last_confirmed
The Payoff
A real-time enrichment pipeline changes the dynamics of your contact data:
- New leads arrive complete. Your sales team sees a full profile (title, company, seniority, phone number) within seconds of a form submission, not hours or days later.
- No more CSV round-trips. The manual export-enrich-import cycle disappears entirely.
- Data stays fresh. Because enrichment happens on every new record, your database doesn't accumulate stale, incomplete contacts.
- Enrichment scales with your pipeline. You only enrich records that actually enter your system, rather than re-enriching an entire database on a schedule.
The total implementation is a few hundred lines of code: an endpoint, a retry wrapper, and a cache. From there, every new contact that enters your system arrives with the full picture attached.
The same pipeline pattern works for company enrichment. Instead of triggering on new contacts, trigger on new accounts or domains. Use the company enrich API to fill in firmographics (employee count, industry classification, growth trends, workforce distributions) before routing the account to sales. Many teams run both in parallel: person enrichment for contact intelligence and company enrichment for account intelligence.