/images/blog/conflict-bg.png

Long-running AI pipelines fail. This is not a possibility to plan for; it is a certainty to design around. API rate limits hit. Provider outages occur. Processes get killed. Machines restart. Network connections drop. And when your pipeline has been running for 45 minutes processing a large dataset, the last thing you want is to start over from the beginning.

The checkpoint/resume pattern solves this: save progress incrementally so that when a failure occurs, the pipeline resumes from where it left off rather than from scratch. This sounds straightforward, and the core concept is. The implementation details, however, involve real trade-offs that affect cost, reliability, and complexity.

We learned this building PlanOpticon. A full meeting analysis involves transcription, frame extraction, frame classification, diagram analysis, content synthesis, and knowledge graph construction. Processing a two-hour recording can take 30-60 minutes and make hundreds of API calls. Without checkpoint/resume, a single failure at minute 55 means losing all that work. With it, a failure at any point means losing at most one step.

This article covers the pattern in detail: implementation strategies, trade-offs, and practical code in Python.

The Core Pattern

At its simplest, checkpoint/resume involves three operations:

  1. Save state after each meaningful unit of work completes.
  2. Check for existing state before starting each unit of work.
  3. Resume from saved state when restarting after a failure.
class CheckpointManager:
    def __init__(self, pipeline_id: str, checkpoint_dir: str = ".checkpoints"):
        self.pipeline_id = pipeline_id
        self.checkpoint_dir = Path(checkpoint_dir) / pipeline_id
        self.checkpoint_dir.mkdir(parents=True, exist_ok=True)

    def save(self, step_id: str, data: dict):
        path = self.checkpoint_dir / f"{step_id}.json"
        # Write to temp file first, then atomic rename
        temp_path = path.with_suffix(".tmp")
        with open(temp_path, "w") as f:
            json.dump(data, f)
        temp_path.rename(path)

    def load(self, step_id: str) -> dict | None:
        path = self.checkpoint_dir / f"{step_id}.json"
        if path.exists():
            with open(path) as f:
                return json.load(f)
        return None

    def has_checkpoint(self, step_id: str) -> bool:
        return (self.checkpoint_dir / f"{step_id}.json").exists()

The atomic write (write to temp file, then rename) is critical. If the process crashes during a write, you get either the complete old checkpoint or the complete new checkpoint, never a corrupted partial write. This is a small detail that prevents a large category of bugs.

Granularity: The First Trade-Off

The most important design decision is checkpoint granularity: how often do you save state?

Coarse-grained checkpointing saves state after major pipeline stages complete. A 10-stage pipeline has 10 checkpoints. This is simple to implement and has minimal overhead, but a failure within a stage means replaying the entire stage.

Fine-grained checkpointing saves state after each individual operation. A stage that processes 1,000 items has 1,000 checkpoints within that stage. This minimizes rework after failure but adds I/O overhead and complexity.

Adaptive checkpointing adjusts granularity based on cost and duration. Cheap, fast operations use coarse checkpointing. Expensive, slow operations use fine checkpointing. This is the approach we recommend.

class AdaptiveCheckpointer:
    def __init__(self, manager: CheckpointManager):
        self.manager = manager
        self.cost_tracker = CostTracker()

    def should_checkpoint(self, step_id: str, cost_so_far: float,
                          time_so_far: float) -> bool:
        # Always checkpoint after expensive operations
        if cost_so_far > 0.10:  # More than $0.10 since last checkpoint
            return True
        # Always checkpoint after long operations
        if time_so_far > 60:  # More than 60 seconds since last checkpoint
            return True
        # Don't checkpoint trivial operations
        return False

In PlanOpticon, transcription is a single expensive step that checkpoints once on completion. Frame classification processes hundreds of frames and checkpoints every 50 frames. The granularity matches the cost structure: if classification fails at frame 451, we replay from frame 400, not from frame 1.

Storage Strategies

Where you store checkpoints affects reliability, performance, and portability.

Local filesystem. The simplest option. Fast writes, no network dependency. But checkpoints are lost if the machine fails or the directory is cleaned up. Suitable for pipelines that run on a single machine where the failure mode is process-level (crashes, OOM) rather than machine-level (hardware failure).

Cloud object storage (S3, GCS). Durable and accessible from any machine. Higher write latency, but for checkpoints that happen every 30-60 seconds, the latency is acceptable. Enables resuming on a different machine than the one that failed, which is valuable for cloud-native workloads.

Database. PostgreSQL, Redis, or DynamoDB. Provides durability with lower latency than object storage. Supports atomic operations and querying across pipeline runs. More operational overhead than file-based storage.

Hybrid. Write checkpoints to local filesystem for speed, and asynchronously replicate to cloud storage for durability. This gives you the performance of local storage with the durability of cloud storage, at the cost of slight complexity.

class HybridCheckpointManager:
    def __init__(self, pipeline_id: str, local_dir: str, s3_bucket: str):
        self.local = LocalCheckpointManager(pipeline_id, local_dir)
        self.remote = S3CheckpointManager(pipeline_id, s3_bucket)

    def save(self, step_id: str, data: dict):
        # Write locally first (fast)
        self.local.save(step_id, data)
        # Async replicate to S3 (durable)
        self._async_replicate(step_id, data)

    def load(self, step_id: str) -> dict | None:
        # Try local first
        data = self.local.load(step_id)
        if data:
            return data
        # Fall back to remote
        data = self.remote.load(step_id)
        if data:
            # Populate local cache
            self.local.save(step_id, data)
        return data

What to Checkpoint

Not all state is worth checkpointing. Saving too much wastes storage and slows writes. Saving too little means checkpoints are not useful for resumption.

Always checkpoint:

  • Results of expensive API calls (LLM responses, external service results)
  • Results of operations that consume rate-limited resources
  • Intermediate results that took significant time to compute
  • Metadata about pipeline progress (what steps completed, timestamps, costs)

Usually do not checkpoint:

  • Raw input data (it should be available from the original source)
  • Easily recomputable derived data (hash values, simple transformations)
  • Large binary data that can be re-fetched (images, audio files)

The serialization question. Checkpoint data needs to be serializable. For simple data (dicts, lists, strings, numbers), JSON works. For complex objects (numpy arrays, dataframes, model outputs), you need either a richer serialization format (pickle, msgpack) or a conversion step that transforms complex objects into JSON-serializable form before checkpointing.

We strongly recommend JSON over pickle for checkpoints. Pickle is fragile – a change to your class definitions can make old pickles unloadable. JSON is human-readable, debuggable, and version-agnostic. If you need to checkpoint binary data, store it as a separate file and reference it from the JSON checkpoint.

Pipeline Integration Patterns

There are several ways to integrate checkpoint/resume into a pipeline architecture:

Decorator pattern. Wrap individual pipeline steps with a checkpoint decorator that handles the save/load logic transparently:

def checkpointed(step_id_fn=None):
    def decorator(func):
        @wraps(func)
        def wrapper(self, *args, **kwargs):
            step_id = step_id_fn(*args, **kwargs) if step_id_fn else func.__name__

            # Check for existing checkpoint
            cached = self.checkpoint_manager.load(step_id)
            if cached is not None:
                logger.info(f"Resuming from checkpoint: {step_id}")
                return cached

            # Execute the step
            result = func(self, *args, **kwargs)

            # Save checkpoint
            self.checkpoint_manager.save(step_id, result)
            return result
        return wrapper
    return decorator


class AnalysisPipeline:
    def __init__(self, pipeline_id: str):
        self.checkpoint_manager = CheckpointManager(pipeline_id)

    @checkpointed(lambda self, audio_path: f"transcribe_{hash(audio_path)}")
    def transcribe(self, audio_path: str) -> dict:
        return whisper_api.transcribe(audio_path)

    @checkpointed(lambda self, frame_id, image: f"classify_{frame_id}")
    def classify_frame(self, frame_id: str, image: bytes) -> dict:
        return vision_api.classify(image)

This pattern keeps the checkpoint logic separate from the business logic. Each step function is pure: it takes inputs and returns outputs. The decorator handles checking for cached results and saving new results.

Pipeline runner pattern. A separate orchestrator manages the pipeline execution and checkpointing:

class PipelineRunner:
    def __init__(self, pipeline_id: str, steps: list):
        self.checkpoint_manager = CheckpointManager(pipeline_id)
        self.steps = steps

    def run(self):
        results = {}
        for step in self.steps:
            step_id = step.id

            cached = self.checkpoint_manager.load(step_id)
            if cached is not None:
                logger.info(f"Skipping completed step: {step_id}")
                results[step_id] = cached
                continue

            logger.info(f"Executing step: {step_id}")
            try:
                result = step.execute(results)
                self.checkpoint_manager.save(step_id, result)
                results[step_id] = result
            except Exception as e:
                logger.error(f"Step {step_id} failed: {e}")
                raise PipelineError(step_id, e, results)

        return results

This pattern gives you more control over the execution flow, including error handling, progress reporting, and conditional step execution.

Handling Schema Evolution

Over time, your pipeline changes. Steps are added, removed, or modified. Checkpoint data from an older version of the pipeline might not be compatible with the current version. This is the schema evolution problem.

Strategies:

Version your checkpoints. Include a version number in each checkpoint. When loading, check whether the checkpoint version matches the current step version. If not, either migrate the data or invalidate the checkpoint.

def save(self, step_id: str, data: dict, version: int):
    checkpoint = {
        "version": version,
        "timestamp": datetime.utcnow().isoformat(),
        "data": data
    }
    # ... write to storage

def load(self, step_id: str, current_version: int) -> dict | None:
    checkpoint = self._read(step_id)
    if checkpoint is None:
        return None
    if checkpoint["version"] != current_version:
        logger.warning(f"Checkpoint version mismatch for {step_id}: "
                       f"found v{checkpoint['version']}, need v{current_version}")
        return None  # Invalidate stale checkpoint
    return checkpoint["data"]

Content-addressed checkpoints. Hash the step definition (parameters, model version, prompt template) and use the hash as part of the checkpoint key. If anything about the step changes, the hash changes, and old checkpoints are automatically bypassed.

Migration functions. For expensive steps where invalidation is costly, write migration functions that transform old checkpoint data to the new format. This is more complex but preserves work that would otherwise need to be redone.

Cleanup and Lifecycle

Checkpoints accumulate. Without a cleanup strategy, your checkpoint storage grows indefinitely.

Time-based cleanup. Delete checkpoints older than N days. Simple and effective for most use cases.

Completion-based cleanup. Delete all checkpoints for a pipeline run when the run completes successfully. Keep checkpoints for failed runs so they can be resumed.

Space-based cleanup. Monitor total checkpoint storage and delete the oldest checkpoints when a threshold is exceeded. This prevents runaway storage costs.

Manual cleanup. Provide a CLI command to list and delete checkpoints. This is the fallback when automated cleanup is not appropriate.

class CheckpointCleaner:
    def __init__(self, manager: CheckpointManager):
        self.manager = manager

    def clean_completed(self, pipeline_id: str):
        """Remove all checkpoints for a successfully completed pipeline."""
        self.manager.delete_all(pipeline_id)

    def clean_old(self, max_age_days: int = 7):
        """Remove checkpoints older than max_age_days."""
        cutoff = datetime.utcnow() - timedelta(days=max_age_days)
        for checkpoint in self.manager.list_all():
            if checkpoint.timestamp < cutoff:
                self.manager.delete(checkpoint.id)

Real-World Lessons

From building checkpoint/resume into PlanOpticon and several client systems:

Checkpoints should be idempotent to load. Loading a checkpoint and then immediately saving it should produce an identical checkpoint. If loading mutates the data (converting types, adding defaults), you get checkpoint drift where each load/save cycle changes the data slightly.

Test the resume path explicitly. It is common to build checkpointing, test it by running the full pipeline successfully, and never actually test the resume path. Kill your pipeline at each stage and verify it resumes correctly. Automate these tests.

Log checkpoint hits and misses. When a pipeline runs, log which steps used cached checkpoints and which executed fresh. This tells you whether your checkpointing is working and how much time/money it is saving on resumed runs.

Handle partial step completion carefully. If a step processes 100 items and fails at item 73, the checkpoint needs to record which items completed. On resume, only the remaining items should be processed. This requires item-level tracking within a step, which adds complexity but prevents duplicate processing.

Do not trust checkpoints blindly in development. During development, stale checkpoints from a previous code version can mask bugs in the current code. Add a “force re-run” flag that bypasses checkpoints for development and testing.

The checkpoint/resume pattern is not glamorous. It does not make demos more impressive or pitch decks more exciting. But it is the difference between an AI pipeline that works in a demo and one that works in production. Every long-running pipeline that processes real data at real scale needs it. The only question is whether you build it before or after your first catastrophic mid-pipeline failure.

We recommend before.