
Long-running AI pipelines fail. This is not a possibility to plan for; it is a certainty to design around. API rate limits hit. Provider outages occur. Processes get killed. Machines restart. Network connections drop. And when your pipeline has been running for 45 minutes processing a large dataset, the last thing you want is to start over from the beginning.
The checkpoint/resume pattern solves this: save progress incrementally so that when a failure occurs, the pipeline resumes from where it left off rather than from scratch. This sounds straightforward, and the core concept is. The implementation details, however, involve real trade-offs that affect cost, reliability, and complexity.
We learned this building PlanOpticon. A full meeting analysis involves transcription, frame extraction, frame classification, diagram analysis, content synthesis, and knowledge graph construction. Processing a two-hour recording can take 30-60 minutes and make hundreds of API calls. Without checkpoint/resume, a single failure at minute 55 means losing all that work. With it, a failure at any point means losing at most one step.
This article covers the pattern in detail: implementation strategies, trade-offs, and practical code in Python.
At its simplest, checkpoint/resume involves three operations:
class CheckpointManager:
def __init__(self, pipeline_id: str, checkpoint_dir: str = ".checkpoints"):
self.pipeline_id = pipeline_id
self.checkpoint_dir = Path(checkpoint_dir) / pipeline_id
self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
def save(self, step_id: str, data: dict):
path = self.checkpoint_dir / f"{step_id}.json"
# Write to temp file first, then atomic rename
temp_path = path.with_suffix(".tmp")
with open(temp_path, "w") as f:
json.dump(data, f)
temp_path.rename(path)
def load(self, step_id: str) -> dict | None:
path = self.checkpoint_dir / f"{step_id}.json"
if path.exists():
with open(path) as f:
return json.load(f)
return None
def has_checkpoint(self, step_id: str) -> bool:
return (self.checkpoint_dir / f"{step_id}.json").exists()
The atomic write (write to temp file, then rename) is critical. If the process crashes during a write, you get either the complete old checkpoint or the complete new checkpoint, never a corrupted partial write. This is a small detail that prevents a large category of bugs.
The most important design decision is checkpoint granularity: how often do you save state?
Coarse-grained checkpointing saves state after major pipeline stages complete. A 10-stage pipeline has 10 checkpoints. This is simple to implement and has minimal overhead, but a failure within a stage means replaying the entire stage.
Fine-grained checkpointing saves state after each individual operation. A stage that processes 1,000 items has 1,000 checkpoints within that stage. This minimizes rework after failure but adds I/O overhead and complexity.
Adaptive checkpointing adjusts granularity based on cost and duration. Cheap, fast operations use coarse checkpointing. Expensive, slow operations use fine checkpointing. This is the approach we recommend.
class AdaptiveCheckpointer:
def __init__(self, manager: CheckpointManager):
self.manager = manager
self.cost_tracker = CostTracker()
def should_checkpoint(self, step_id: str, cost_so_far: float,
time_so_far: float) -> bool:
# Always checkpoint after expensive operations
if cost_so_far > 0.10: # More than $0.10 since last checkpoint
return True
# Always checkpoint after long operations
if time_so_far > 60: # More than 60 seconds since last checkpoint
return True
# Don't checkpoint trivial operations
return False
In PlanOpticon, transcription is a single expensive step that checkpoints once on completion. Frame classification processes hundreds of frames and checkpoints every 50 frames. The granularity matches the cost structure: if classification fails at frame 451, we replay from frame 400, not from frame 1.
Where you store checkpoints affects reliability, performance, and portability.
Local filesystem. The simplest option. Fast writes, no network dependency. But checkpoints are lost if the machine fails or the directory is cleaned up. Suitable for pipelines that run on a single machine where the failure mode is process-level (crashes, OOM) rather than machine-level (hardware failure).
Cloud object storage (S3, GCS). Durable and accessible from any machine. Higher write latency, but for checkpoints that happen every 30-60 seconds, the latency is acceptable. Enables resuming on a different machine than the one that failed, which is valuable for cloud-native workloads.
Database. PostgreSQL, Redis, or DynamoDB. Provides durability with lower latency than object storage. Supports atomic operations and querying across pipeline runs. More operational overhead than file-based storage.
Hybrid. Write checkpoints to local filesystem for speed, and asynchronously replicate to cloud storage for durability. This gives you the performance of local storage with the durability of cloud storage, at the cost of slight complexity.
class HybridCheckpointManager:
def __init__(self, pipeline_id: str, local_dir: str, s3_bucket: str):
self.local = LocalCheckpointManager(pipeline_id, local_dir)
self.remote = S3CheckpointManager(pipeline_id, s3_bucket)
def save(self, step_id: str, data: dict):
# Write locally first (fast)
self.local.save(step_id, data)
# Async replicate to S3 (durable)
self._async_replicate(step_id, data)
def load(self, step_id: str) -> dict | None:
# Try local first
data = self.local.load(step_id)
if data:
return data
# Fall back to remote
data = self.remote.load(step_id)
if data:
# Populate local cache
self.local.save(step_id, data)
return data
Not all state is worth checkpointing. Saving too much wastes storage and slows writes. Saving too little means checkpoints are not useful for resumption.
Always checkpoint:
Usually do not checkpoint:
The serialization question. Checkpoint data needs to be serializable. For simple data (dicts, lists, strings, numbers), JSON works. For complex objects (numpy arrays, dataframes, model outputs), you need either a richer serialization format (pickle, msgpack) or a conversion step that transforms complex objects into JSON-serializable form before checkpointing.
We strongly recommend JSON over pickle for checkpoints. Pickle is fragile – a change to your class definitions can make old pickles unloadable. JSON is human-readable, debuggable, and version-agnostic. If you need to checkpoint binary data, store it as a separate file and reference it from the JSON checkpoint.
There are several ways to integrate checkpoint/resume into a pipeline architecture:
Decorator pattern. Wrap individual pipeline steps with a checkpoint decorator that handles the save/load logic transparently:
def checkpointed(step_id_fn=None):
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
step_id = step_id_fn(*args, **kwargs) if step_id_fn else func.__name__
# Check for existing checkpoint
cached = self.checkpoint_manager.load(step_id)
if cached is not None:
logger.info(f"Resuming from checkpoint: {step_id}")
return cached
# Execute the step
result = func(self, *args, **kwargs)
# Save checkpoint
self.checkpoint_manager.save(step_id, result)
return result
return wrapper
return decorator
class AnalysisPipeline:
def __init__(self, pipeline_id: str):
self.checkpoint_manager = CheckpointManager(pipeline_id)
@checkpointed(lambda self, audio_path: f"transcribe_{hash(audio_path)}")
def transcribe(self, audio_path: str) -> dict:
return whisper_api.transcribe(audio_path)
@checkpointed(lambda self, frame_id, image: f"classify_{frame_id}")
def classify_frame(self, frame_id: str, image: bytes) -> dict:
return vision_api.classify(image)
This pattern keeps the checkpoint logic separate from the business logic. Each step function is pure: it takes inputs and returns outputs. The decorator handles checking for cached results and saving new results.
Pipeline runner pattern. A separate orchestrator manages the pipeline execution and checkpointing:
class PipelineRunner:
def __init__(self, pipeline_id: str, steps: list):
self.checkpoint_manager = CheckpointManager(pipeline_id)
self.steps = steps
def run(self):
results = {}
for step in self.steps:
step_id = step.id
cached = self.checkpoint_manager.load(step_id)
if cached is not None:
logger.info(f"Skipping completed step: {step_id}")
results[step_id] = cached
continue
logger.info(f"Executing step: {step_id}")
try:
result = step.execute(results)
self.checkpoint_manager.save(step_id, result)
results[step_id] = result
except Exception as e:
logger.error(f"Step {step_id} failed: {e}")
raise PipelineError(step_id, e, results)
return results
This pattern gives you more control over the execution flow, including error handling, progress reporting, and conditional step execution.
Over time, your pipeline changes. Steps are added, removed, or modified. Checkpoint data from an older version of the pipeline might not be compatible with the current version. This is the schema evolution problem.
Strategies:
Version your checkpoints. Include a version number in each checkpoint. When loading, check whether the checkpoint version matches the current step version. If not, either migrate the data or invalidate the checkpoint.
def save(self, step_id: str, data: dict, version: int):
checkpoint = {
"version": version,
"timestamp": datetime.utcnow().isoformat(),
"data": data
}
# ... write to storage
def load(self, step_id: str, current_version: int) -> dict | None:
checkpoint = self._read(step_id)
if checkpoint is None:
return None
if checkpoint["version"] != current_version:
logger.warning(f"Checkpoint version mismatch for {step_id}: "
f"found v{checkpoint['version']}, need v{current_version}")
return None # Invalidate stale checkpoint
return checkpoint["data"]
Content-addressed checkpoints. Hash the step definition (parameters, model version, prompt template) and use the hash as part of the checkpoint key. If anything about the step changes, the hash changes, and old checkpoints are automatically bypassed.
Migration functions. For expensive steps where invalidation is costly, write migration functions that transform old checkpoint data to the new format. This is more complex but preserves work that would otherwise need to be redone.
Checkpoints accumulate. Without a cleanup strategy, your checkpoint storage grows indefinitely.
Time-based cleanup. Delete checkpoints older than N days. Simple and effective for most use cases.
Completion-based cleanup. Delete all checkpoints for a pipeline run when the run completes successfully. Keep checkpoints for failed runs so they can be resumed.
Space-based cleanup. Monitor total checkpoint storage and delete the oldest checkpoints when a threshold is exceeded. This prevents runaway storage costs.
Manual cleanup. Provide a CLI command to list and delete checkpoints. This is the fallback when automated cleanup is not appropriate.
class CheckpointCleaner:
def __init__(self, manager: CheckpointManager):
self.manager = manager
def clean_completed(self, pipeline_id: str):
"""Remove all checkpoints for a successfully completed pipeline."""
self.manager.delete_all(pipeline_id)
def clean_old(self, max_age_days: int = 7):
"""Remove checkpoints older than max_age_days."""
cutoff = datetime.utcnow() - timedelta(days=max_age_days)
for checkpoint in self.manager.list_all():
if checkpoint.timestamp < cutoff:
self.manager.delete(checkpoint.id)
From building checkpoint/resume into PlanOpticon and several client systems:
Checkpoints should be idempotent to load. Loading a checkpoint and then immediately saving it should produce an identical checkpoint. If loading mutates the data (converting types, adding defaults), you get checkpoint drift where each load/save cycle changes the data slightly.
Test the resume path explicitly. It is common to build checkpointing, test it by running the full pipeline successfully, and never actually test the resume path. Kill your pipeline at each stage and verify it resumes correctly. Automate these tests.
Log checkpoint hits and misses. When a pipeline runs, log which steps used cached checkpoints and which executed fresh. This tells you whether your checkpointing is working and how much time/money it is saving on resumed runs.
Handle partial step completion carefully. If a step processes 100 items and fails at item 73, the checkpoint needs to record which items completed. On resume, only the remaining items should be processed. This requires item-level tracking within a step, which adds complexity but prevents duplicate processing.
Do not trust checkpoints blindly in development. During development, stale checkpoints from a previous code version can mask bugs in the current code. Add a “force re-run” flag that bypasses checkpoints for development and testing.
The checkpoint/resume pattern is not glamorous. It does not make demos more impressive or pitch decks more exciting. But it is the difference between an AI pipeline that works in a demo and one that works in production. Every long-running pipeline that processes real data at real scale needs it. The only question is whether you build it before or after your first catastrophic mid-pipeline failure.
We recommend before.