Speed up OSM parsing with multiprocessing in Python Jump to heading

OpenStreetMap .pbf extraction pipelines routinely encounter CPU-bound bottlenecks during tag deserialization, geometry validation, and attribute mapping. While single-threaded parsers efficiently stream binary protobuf data, they remain constrained by the Python Global Interpreter Lock (GIL) when executing heavy regex normalization, cross-referencing, or graph conversion logic. Implementing a robust multiprocessing architecture requires precise worker isolation, chunk-aware memory budgeting, and deterministic error propagation. This guide details production-grade multiprocessing patterns for OSM ETL pipelines, focusing on edge-case resilience, diagnostic profiling, and exact API configurations.

Process Pool Architecture & Memory Isolation Jump to heading

flowchart LR
    G["chunk_generator<br/>(main process)"] --> S{submit}
    S --> F1["Future · chunk 1"] --> W1["Worker process 1<br/>parse_chunk()"]
    S --> F2["Future · chunk 2"] --> W2["Worker process 2<br/>parse_chunk()"]
    S --> Fn["Future · chunk N"] --> Wn["Worker process N<br/>parse_chunk()"]
    W1 --> AC["as_completed iterator"]
    W2 --> AC
    Wn --> AC
    AC --> Y["yield normalized chunk"]

Naive multiprocessing.Pool instantiations frequently trigger worker OOM kills or pickle serialization failures when passing large OSM feature dictionaries. The correct approach isolates heavy I/O from CPU-bound transformations using concurrent.futures.ProcessPoolExecutor with explicit maxtasksperchild limits to prevent memory fragmentation across long-running workers.

python
import os
import gc
import logging
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Iterator, Dict, List, Any
import psutil

logger = logging.getLogger(__name__)

def worker_initializer() -> None:
    """Disable garbage collection in workers to prevent GIL contention during chunk processing."""
    gc.disable()

def parse_chunk(chunk_data: List[Dict[str, Any]]) -> Dict[str, Any]:
    """
    CPU-bound transformation: geometry validation + tag normalization.
    chunk_data contains pre-filtered OSM elements from a single bounding box or feature class.
    """
    normalized = []
    errors = []
    for idx, elem in enumerate(chunk_data):
        try:
            if not elem.get("tags"):
                continue
            normalized.append({
                "osm_id": elem["id"],
                "geometry": elem.get("geometry"),
                "tags": elem["tags"],
                "worker_pid": os.getpid()
            })
        except Exception as e:
            errors.append({"index": idx, "osm_id": elem.get("id"), "error": str(e)})
    
    # Manual collection only at chunk boundaries to stabilize RSS
    gc.collect()
    return {"normalized": normalized, "errors": errors}

def run_parallel_pipeline(
    chunk_generator: Iterator[List[Dict[str, Any]]],
    max_workers: int | None = None,
) -> Iterator[Dict[str, Any]]:
    if max_workers is None:
        max_workers = min(os.cpu_count() or 1, 8)

    # Surface available RAM for observability; consumers can use it to size chunks.
    available_mb = psutil.virtual_memory().available // (1024 ** 2)
    logger.info("Spawning %d workers; %d MB RAM available.", max_workers, available_mb)
    os.environ["OMP_NUM_THREADS"] = "1"  # Disable OpenMP thread spawning in underlying C libs.

    with ProcessPoolExecutor(
        max_workers=max_workers,
        initializer=worker_initializer,
        mp_context=mp.get_context("spawn"),
        max_tasks_per_child=50,  # Recycle workers to mitigate C-extension memory leaks (Python 3.11+).
    ) as executor:
        futures = {executor.submit(parse_chunk, chunk): i for i, chunk in enumerate(chunk_generator)}
        for future in as_completed(futures):
            chunk_idx = futures[future]
            try:
                yield future.result()
            except Exception as e:
                logger.error("Chunk %s failed with unrecoverable error: %s", chunk_idx, e)
                continue

Memory-Efficient Chunk Processing & Generator Pipelines Jump to heading

Loading entire continental extracts into memory is unsustainable for production ETL. Instead, pipelines must leverage generator-based streaming that yields bounded feature arrays. When integrating with Parsing & Tag Normalization Workflows, chunk sizing should be dynamically calculated against available RSS rather than hardcoded. A safe baseline for pyrosm==0.6.2 or osmium==3.5.0 is 1,500 elements per chunk for dense urban extracts, scaling down to 500 for relation-heavy rural datasets.

Implement a sliding window buffer that flushes to disk or downstream queues once chunk_size_mb > 128. This prevents multiprocessing from attempting to serialize multi-gigabyte payloads across IPC boundaries, which routinely triggers BrokenProcessPool exceptions on Linux kernels with restrictive vm.max_map_count settings.

CPU-Bound Transformations: Regex Cleaning & Batch Attribute Mapping Jump to heading

Tag deserialization and value standardization represent the heaviest CPU load in OSM parsing. Compiling regex patterns inside worker processes incurs redundant overhead. Pre-compile patterns at the module level and pass them via functools.partial or shared multiprocessing.Value arrays.

For batch attribute mapping, avoid dictionary lookups inside tight loops. Convert mapping tables to numpy arrays or pandas categoricals before distribution. When normalizing highway classifications, surface materials, or access restrictions, apply vectorized string operations post-chunk rather than per-element. This aligns with Async PBF Parsing with Pyrosm paradigms where I/O and CPU stages are decoupled, allowing the parser to stream while workers apply deterministic tag harmonization.

Cross-Region Tag Harmonization & Deterministic Error Handling Jump to heading

OSM tagging conventions vary significantly across regions (e.g., highway=trunk vs highway=motorway, or localized name:* keys). Multiprocessing workers should receive a region-specific configuration dictionary injected via executor.submit. This enables parallel application of localized normalization rules without conditional branching overhead.

Error handling must be deterministic. Instead of raising exceptions that terminate the pool, workers should return structured error payloads containing osm_id, chunk_index, and sanitized stack traces. Implement a dead-letter queue (DLQ) that logs malformed geometries or invalid tag combinations to a separate Parquet partition. This allows GIS analysts to audit edge cases without halting the primary pipeline. Reference the official Python concurrent.futures documentation for robust as_completed error trapping patterns.

Integration with Graph Conversion & Async PBF Workflows Jump to heading

Once normalized, OSM features often feed into osmnx==1.8.1 for topological graph construction. Graph conversion is inherently memory-intensive due to adjacency matrix allocation. Parallelize this by partitioning the normalized dataset into spatial tiles (e.g., H3 resolution 7 or S2 level 10), processing each tile independently, and merging edge lists post-conversion.

When combining multiprocessing with async I/O, use asyncio.to_thread() or loop.run_in_executor() to offload blocking ProcessPoolExecutor calls. This prevents the event loop from stalling during heavy tag validation or coordinate reprojection. Ensure that all networkx graph objects are serialized via pickle protocol 5 (available in Python 3.8+) to leverage out-of-band buffers and reduce IPC latency.

Emergency Pipeline Scaling & Production Diagnostics Jump to heading

When extraction jobs stall or encounter sudden memory spikes, emergency scaling strategies must be pre-configured:

  1. Dynamic Worker Adjustment: Monitor psutil.Process().memory_percent() per worker. If RSS exceeds 85% of the allocated threshold, gracefully drain the queue and respawn with maxtasksperchild=1.
  2. Checkpointing: Write chunk offsets to a lightweight SQLite WAL file. On failure, resume from the last committed offset rather than restarting the full .pbf stream.
  3. Diagnostic Profiling: Use py-spy or tracemalloc to identify C-extension memory leaks in osmium or shapely bindings. Pin shapely==2.0.4 to avoid known GEOS serialization regressions that cause silent worker crashes.
  4. GIL Contention Mitigation: Ensure all heavy math operations use numpy or numba with nogil=True decorators. The OSM PBF Format Specification confirms that protobuf decoding is thread-safe when isolated per worker, but tag string interning can still trigger GIL thrashing if not explicitly managed.

By enforcing strict memory budgets, isolating CPU-bound transformations, and implementing deterministic error routing, mapping engineers can reliably parse multi-gigabyte OSM extracts at scale. These patterns form the backbone of resilient GIS ETL architectures capable of handling daily planet diffs and regional emergency response data pipelines.