edge Expert 20 min read

The GIL and Threading in Pyvorin Edge

Understand how the GIL interacts with compiled kernels, keep Pipeline.run() single-threaded, and safely use ThreadPoolExecutor and ProcessPoolExecutor for I/O and CPU parallelism.

Published Jun 2, 2026

Introduction

Pyvorin Edge runs on CPython, which means the Global Interpreter Lock (GIL) is always present. Even when your code is compiled to a native .so, the GIL can become a bottleneck if you do not explicitly release it inside C extensions. This article covers:

  • How the GIL affects compiled kernels loaded through ModuleLoader.
  • Why Pipeline.run() is single-threaded and should stay that way.
  • Releasing the GIL inside native kernels with Py_BEGIN_ALLOW_THREADS.
  • Using ThreadPoolExecutor for I/O-bound adapters.
  • Using ProcessPoolExecutor for CPU-bound work.

The GIL and Compiled Code

When ModuleLoader.call() invokes a symbol from a .so, ctypes acquires the GIL before entering C and releases it on return. For short scalar calls this is fine. For long-running vectorized kernels it serializes otherwise parallel-ready code.

The Problem

import ctypes
import numpy as np
from pyv_edge_agent.module_host.abi import ABIContract
from pyv_edge_agent.module_host.loader import ModuleLoader

abi = ABIContract(
    function_name="heavy_compute",
    arg_types=[ctypes.POINTER(ctypes.c_double), ctypes.c_size_t],
    return_type=None,
)
loader = ModuleLoader()
loader.load("./libheavy.so", abi)

# This call holds the GIL for the entire duration
loader.call("heavy_compute", arr.ctypes.data_as(ctypes.POINTER(ctypes.c_double)), n)

If another Python thread wants to process MQTT messages or HTTP callbacks while the kernel runs, it must wait.

Releasing the GIL in Native Kernels

To allow true parallelism, wrap the compute-intensive section of your C kernel with the Python C-API macros. This tells the interpreter that your thread may safely drop the GIL.

#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <stddef.h>

void heavy_compute(double *restrict data, size_t n) {
    Py_BEGIN_ALLOW_THREADS
    for (size_t i = 0; i < n; i++) {
        data[i] = data[i] * data[i] + 1.0;
    }
    Py_END_ALLOW_THREADS
}

Build Command with Python Headers

gcc -shared -fPIC -O3 -march=armv8-a+fp+simd \
    $(python3-config --includes) \
    -o libheavy.so heavy_compute.c \
    $(python3-config --ldflags)

Thread Safety in Pipeline

Pipeline.run() in edge_sdk/pyvorin_edge/pipeline.py is not thread-safe. It mutates internal lists (_window_objects, _events, _last_rule_fires) without locks. Calling run() from multiple threads simultaneously will corrupt state.

from pyvorin_edge.pipeline import Pipeline

pipeline = Pipeline("unsafe")

# WRONG: two threads calling run() concurrently
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=2) as ex:
    ex.submit(pipeline.run, readings_batch_a)
    ex.submit(pipeline.run, readings_batch_b)   # race condition!

The correct pattern is a single event loop that serializes pipeline execution, while external threads handle I/O.

ThreadPoolExecutor for I/O-Bound Adapters

HTTP polling, MQTT subscriptions, and file tailing are I/O-bound. Run them in a ThreadPoolExecutor and feed results into a thread-safe queue consumed by the single pipeline thread.

import queue
import time
from concurrent.futures import ThreadPoolExecutor
from typing import List

import requests

from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyvorin_edge.sensors import Sensor, SensorReading, SensorType


class HTTPAdapter:
    def __init__(self, url: str, interval: float = 5.0):
        self.url = url
        self.interval = interval

    def poll(self, q: queue.Queue[SensorReading]) -> None:
        while True:
            try:
                resp = requests.get(self.url, timeout=10)
                resp.raise_for_status()
                data = resp.json()
                q.put(
                    SensorReading(
                        sensor_name="http_metric",
                        timestamp=time.time(),
                        value=float(data["value"]),
                        unit=data.get("unit", ""),
                    )
                )
            except Exception as exc:
                print(f"Poll error: {exc}")
            time.sleep(self.interval)


def main() -> None:
    q: queue.Queue[SensorReading] = queue.Queue(maxsize=1000)
    adapter = HTTPAdapter("https://api.example.com/metric")

    pipeline = Pipeline("http_pipeline")
    pipeline.add_sensor(Sensor("http_metric", SensorType.GENERIC, "count"))
    pipeline.add_rule(
        RuleConfig(
            name="spike",
            condition_expr="ctx.value > 1000.0",
            severity="warning",
        )
    )

    with ThreadPoolExecutor(max_workers=1) as executor:
        executor.submit(adapter.poll, q)

        # Single pipeline thread
        while True:
            batch: List[SensorReading] = []
            deadline = time.time() + 1.0
            while time.time() < deadline and len(batch) < 100:
                try:
                    batch.append(q.get(timeout=0.05))
                except queue.Empty:
                    break
            if batch:
                result = pipeline.run(batch)
                for ev in result.events:
                    print(f"Event: {ev.rule_name} @ {ev.timestamp}")


if __name__ == "__main__":
    main()

ProcessPoolExecutor for CPU-Bound Work

If you need true CPU parallelism for compiled kernels, spawn a separate process. The GIL is per-process, so a ProcessPoolExecutor lets multiple cores work on different chunks simultaneously.

import ctypes
import numpy as np
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path

from pyv_edge_agent.module_host.abi import ABIContract
from pyv_edge_agent.module_host.loader import ModuleLoader

SO_PATH = Path("./libvecadd.so")
abi = ABIContract(
    function_name="vec_add_f32",
    arg_types=[
        ctypes.POINTER(ctypes.c_float),
        ctypes.POINTER(ctypes.c_float),
        ctypes.POINTER(ctypes.c_float),
        ctypes.c_size_t,
    ],
    return_type=None,
)


def worker_chunk(args):
    """Run in a child process. Each process loads its own copy of the .so."""
    a_bytes, b_bytes, offset, length = args
    a = np.frombuffer(a_bytes, dtype=np.float32, count=length)
    b = np.frombuffer(b_bytes, dtype=np.float32, count=length)
    out = np.empty(length, dtype=np.float32)

    loader = ModuleLoader()
    loader.load(str(SO_PATH), abi)
    loader.call(
        "vec_add_f32",
        a.ctypes.data_as(ctypes.POINTER(ctypes.c_float)),
        b.ctypes.data_as(ctypes.POINTER(ctypes.c_float)),
        out.ctypes.data_as(ctypes.POINTER(ctypes.c_float)),
        length,
    )
    loader.unload()
    return offset, out.tobytes()


def parallel_vec_add(a: np.ndarray, b: np.ndarray, workers: int = 4) -> np.ndarray:
    assert len(a) == len(b)
    n = len(a)
    chunk = n // workers
    futures = []

    with ProcessPoolExecutor(max_workers=workers) as executor:
        for i in range(workers):
            start = i * chunk
            end = start + chunk if i < workers - 1 else n
            args = (
                a[start:end].tobytes(),
                b[start:end].tobytes(),
                start,
                end - start,
            )
            futures.append(executor.submit(worker_chunk, args))

    out = np.empty(n, dtype=np.float32)
    for fut in futures:
        offset, data = fut.result()
        out[offset : offset + len(data) // 4] = np.frombuffer(data, dtype=np.float32)
    return out


# Example
if __name__ == "__main__":
    n = 1_000_000
    a = np.random.rand(n).astype(np.float32)
    b = np.random.rand(n).astype(np.float32)
    result = parallel_vec_add(a, b, workers=4)
    np.testing.assert_allclose(result, a + b, rtol=1e-5)
    print("Process-parallel kernel passed.")

Summary

Keep Pipeline.run() on a single thread. Release the GIL inside C kernels with Py_BEGIN_ALLOW_THREADS so other Python threads can run. Use ThreadPoolExecutor for I/O-bound adapters and a queue to serialize data into the pipeline. Use ProcessPoolExecutor only when you need multi-core parallelism for CPU-bound kernels, and load the .so inside each worker.