Building AI Pipelines That Don't Break at Scale
The Notebook-to-Production Gap
A data scientist built an AI pipeline in a Jupyter notebook. It ingested customer data, generated embeddings, clustered users into segments, and produced recommendations. It worked beautifully on 10,000 records. In production with 2 million records, it ran out of memory. When an API call failed mid-pipeline, there was no retry logic. When the model produced bad outputs, nobody knew for hours.
The gap between "works in a notebook" and "works in production" is where most AI projects die.
The Pipeline Architecture
┌─────────────┐ ┌──────────────┐ ┌───────────────┐
│ Data │───▶│ Transform │───▶│ AI Model │
│ Ingestion │ │ & Validate │ │ Inference │
└─────────────┘ └──────────────┘ └───────────────┘
│ │ │
▼ ▼ ▼
┌─────────┐ ┌──────────┐ ┌───────────┐
│ Dead │ │ Schema │ │ Output │
│ Letter │ │ Errors │ │ Validation│
│ Queue │ │ Log │ │ & Store │
└─────────┘ └──────────┘ └───────────┘
Step 1: Idempotent Ingestion
// Every pipeline step should be safely re-runnable
async function ingestBatch(batchId: string, source: DataSource) {
// Check if this batch was already processed
const existing = await db.pipelineRuns.findUnique({
where: { batchId_step: { batchId, step: "ingest" } },
});
if (existing?.status === "completed") {
console.log(`Batch ${batchId} already ingested, skipping`);
return existing.outputPath;
}
// Mark as in-progress
await db.pipelineRuns.upsert({
where: { batchId_step: { batchId, step: "ingest" } },
create: { batchId, step: "ingest", status: "in_progress", startedAt: new Date() },
update: { status: "in_progress", startedAt: new Date() },
});
try {
const data = await source.fetchBatch(batchId);
const outputPath = await storage.write(`ingested/${batchId}.parquet`, data);
await db.pipelineRuns.update({
where: { batchId_step: { batchId, step: "ingest" } },
data: { status: "completed", outputPath, completedAt: new Date() },
});
return outputPath;
} catch (error) {
await db.pipelineRuns.update({
where: { batchId_step: { batchId, step: "ingest" } },
data: { status: "failed", error: error.message },
});
throw error;
}
}Step 2: Schema Validation
import { z } from "zod";
const CustomerRecordSchema = z.object({
id: z.string().uuid(),
email: z.string().email(),
purchaseHistory: z.array(z.object({
productId: z.string(),
amount: z.number().positive(),
date: z.string().datetime(),
})),
createdAt: z.string().datetime(),
});
async function validateBatch(records: unknown[]): Promise<ValidationResult> {
const valid: CustomerRecord[] = [];
const invalid: { record: unknown; errors: string[] }[] = [];
for (const record of records) {
const result = CustomerRecordSchema.safeParse(record);
if (result.success) {
valid.push(result.data);
} else {
invalid.push({
record,
errors: result.error.errors.map(e => `${e.path.join(".")}: ${e.message}`),
});
}
}
// Alert if error rate exceeds threshold
const errorRate = invalid.length / records.length;
if (errorRate > 0.05) {
await alerting.send({
severity: "warning",
message: `Validation error rate ${(errorRate * 100).toFixed(1)}% exceeds 5% threshold`,
details: { totalRecords: records.length, invalidCount: invalid.length },
});
}
return { valid, invalid, errorRate };
}Step 3: Batched AI Inference
// Process in chunks to avoid OOM and enable progress tracking
async function runInference(
records: CustomerRecord[],
batchSize: number = 100
): Promise<InferenceResult[]> {
const results: InferenceResult[] = [];
const totalBatches = Math.ceil(records.length / batchSize);
for (let i = 0; i < records.length; i += batchSize) {
const batch = records.slice(i, i + batchSize);
const batchNum = Math.floor(i / batchSize) + 1;
try {
const batchResults = await withRetry(
() => aiModel.predict(batch),
{ maxRetries: 3, backoff: "exponential", initialDelay: 1000 }
);
// Validate model outputs
for (const result of batchResults) {
if (!isValidOutput(result)) {
await deadLetterQueue.push({ record: batch[result.index], reason: "invalid_output" });
continue;
}
results.push(result);
}
// Progress tracking
await updateProgress(batchNum, totalBatches);
} catch (error) {
// Log failed batch, continue with remaining
await deadLetterQueue.pushBatch(
batch.map(r => ({ record: r, reason: "inference_failed", error: error.message }))
);
}
}
return results;
}Step 4: Output Validation
// Don't blindly trust model outputs
function validateRecommendations(
recommendations: Recommendation[],
constraints: BusinessRules
): ValidatedRecommendation[] {
return recommendations.filter(rec => {
// Business rule: don't recommend out-of-stock products
if (!constraints.inStockProducts.has(rec.productId)) return false;
// Business rule: confidence threshold
if (rec.confidence < constraints.minConfidence) return false;
// Business rule: don't recommend products customer already bought
if (constraints.purchasedProducts.has(rec.productId)) return false;
// Sanity check: price should be within reasonable range
if (rec.predictedValue < 0 || rec.predictedValue > 10000) {
logger.warn("Suspicious predicted value", { rec });
return false;
}
return true;
});
}Monitoring and Alerting
What to monitor in AI pipelines:
Input quality:
→ Schema validation error rate (alert if > 5%)
→ Data freshness (alert if source data is stale)
→ Record count anomalies (alert if ±30% from expected)
Model performance:
→ Inference latency (p50, p95, p99)
→ Prediction confidence distribution
→ Output distribution shift (are outputs changing unexpectedly?)
→ Error rate per batch
Pipeline health:
→ End-to-end runtime
→ Step-level success/failure rates
→ Dead letter queue size (growing = problem)
→ Memory and CPU utilization per step
AI pipelines aren't special — they're data pipelines with a model in the middle. Apply the same engineering discipline you'd use for any production system: idempotency, validation, retries, monitoring, and graceful failure handling. The model is the easy part. The pipeline around it is what makes it production-ready.