WorkGrinder

WorkGrinder batches many submitters into executor work batches.

Use it when many async callers submit small synchronous jobs and you want one component to control batching, leasing, completion, and shutdown.

How batching works

The grinder processes a batch when either:

  • pending work count reaches batch_size_threshold;

  • the oldest pending item has waited max_wait_seconds.

Each batch acquires one lease from the configured manager, submits each work item through that lease, resolves each caller’s future, and releases the lease.

Basic submit

submit() queues work and waits for the result:

examples/08_work_grinder_submit.py
"""
WorkGrinder with `submit()`.

`submit()` queues work and waits for the result.

The grinder processes a batch when either:

- pending count reaches batch_size_threshold
- the oldest pending item waits max_wait_seconds
"""

from __future__ import annotations

import asyncio
import time

from leasepool import LeasedExecutorManager, WorkGrinder


def blocking_square(value: int) -> int:
    time.sleep(0.05)
    return value * value


async def main() -> None:
    manager = LeasedExecutorManager(
        backend="thread",
        max_pools=1,
        min_pools=1,
        workers_per_pool=4,
    )
    grinder = WorkGrinder(
        executor_manager=manager,
        batch_size_threshold=10,
        max_wait_seconds=1.0,
        lease_seconds=30.0,
        owner_prefix="square-grinder",
    )

    await manager.start()
    await grinder.start()

    try:
        results = await asyncio.gather(
            *(grinder.submit(blocking_square, i, owner=f"item-{i}") for i in range(20))
        )

        print("Results:", results)
        print("Grinder stats:", grinder.stats())

    finally:
        await grinder.stop(cancel_pending=True)
        await manager.stop()


if __name__ == "__main__":
    asyncio.run(main())

enqueue

enqueue() queues work and returns an asyncio.Future immediately. This is useful when you want to queue multiple items first and await later.

examples/09_work_grinder_enqueue.py
"""
WorkGrinder with `enqueue()`.

`enqueue()` returns an asyncio.Future immediately. This is useful when you want
to queue work first and await it later.
"""

from __future__ import annotations

import asyncio
import time

from leasepool import LeasedExecutorManager, WorkGrinder


def blocking_format(value: int) -> str:
    time.sleep(0.05)
    return f"value={value}"


async def main() -> None:
    manager = LeasedExecutorManager(
        backend="thread",
        max_pools=1,
        min_pools=1,
        workers_per_pool=4,
    )
    grinder = WorkGrinder(
        executor_manager=manager,
        batch_size_threshold=5,
        max_wait_seconds=1.0,
        lease_seconds=30.0,
    )

    await manager.start()
    await grinder.start()

    try:
        futures = []

        for i in range(5):
            future = await grinder.enqueue(blocking_format, i, owner=f"format-{i}")
            futures.append(future)

        print("Queued futures:", len(futures))
        print("Stats after enqueue:", grinder.stats())

        results = await asyncio.gather(*futures)

        print("Results:", results)
        print("Stats after completion:", grinder.stats())

    finally:
        await grinder.stop(cancel_pending=True)
        await manager.stop()


if __name__ == "__main__":
    asyncio.run(main())

submit_from_thread

submit_from_thread() is for non-async code or another OS thread. It returns a concurrent.futures.Future.

examples/10_submit_from_thread.py
"""
Submitting work to WorkGrinder from another OS thread.

Use `submit_from_thread()` only from non-async code or another thread. It returns
a `concurrent.futures.Future`.
"""

from __future__ import annotations

import asyncio
import threading
import time

from leasepool import LeasedExecutorManager, WorkGrinder


def blocking_add(left: int, right: int) -> int:
    time.sleep(0.05)
    return left + right


def thread_entrypoint(grinder: WorkGrinder) -> None:
    future = grinder.submit_from_thread(
        blocking_add,
        20,
        22,
        owner="external-thread",
    )

    print("Worker thread got result:", future.result(timeout=5))


async def main() -> None:
    manager = LeasedExecutorManager(
        backend="thread",
        max_pools=1,
        min_pools=1,
        workers_per_pool=2,
    )
    grinder = WorkGrinder(
        executor_manager=manager,
        batch_size_threshold=1,
        max_wait_seconds=1.0,
    )

    await manager.start()
    await grinder.start()

    try:
        thread = threading.Thread(
            target=thread_entrypoint,
            args=(grinder,),
            name="external-submitter",
        )
        thread.start()

        while thread.is_alive():
            await asyncio.sleep(0.05)

        thread.join()

    finally:
        await grinder.stop(cancel_pending=True)
        await manager.stop()


if __name__ == "__main__":
    asyncio.run(main())

Event-loop ownership

WorkGrinder belongs to the event loop that started it.

Use these from the owning event loop:

  • await grinder.submit(...);

  • await grinder.enqueue(...);

  • await grinder.stop(...);

  • await grinder.astats().

Use these from other OS threads:

  • grinder.submit_from_thread(...);

  • grinder.stats_from_thread(...).

Calling async WorkGrinder methods from another event loop raises RuntimeError.

Shutdown behavior

await grinder.stop(cancel_pending=False)

Drains queued work before stopping. This is the graceful shutdown path.

await grinder.stop(cancel_pending=True)

Cancels queued pending work and cancels the grinder task if it is blocked waiting for a lease or waiting for in-flight executor work.

Already-running synchronous functions follow the underlying executor semantics. For thread workers, cancelling the asyncio wrapper does not forcibly stop Python code that is already running in a worker thread. The executor lease remains managed safely by the manager’s lease-draining behavior.

Stop the grinder before stopping the manager it depends on:

await grinder.stop(cancel_pending=True)
await manager.stop()

Validation

WorkGrinder validates its batching and lease configuration at construction time.

batch_size_threshold must be a strict positive integer. Fractional values such as 1.9 are rejected instead of being truncated.

max_wait_seconds and lease_seconds must be finite positive numbers. NaN, positive infinity, negative infinity, booleans, strings, zero, and negative values are rejected.

Cross-thread APIs

submit_from_thread() and stats_from_thread() are only for non-owner OS threads. They must not be called from the event-loop thread that started the grinder.

From the owning event loop, use:

await grinder.submit(...)
await grinder.enqueue(...)
grinder.stats()
await grinder.astats()

From another OS thread, use:

future = grinder.submit_from_thread(sync_fn, arg)
result = future.result(timeout=2)

stats = grinder.stats_from_thread(timeout=2)

Cancellation

If a future returned by enqueue() is cancelled while still pending, the item is removed from the pending queue promptly. This keeps grinder.stats() accurate and avoids retaining cancelled work until the next batch threshold, timeout, or shutdown.

Partial batch submission failures

WorkGrinder treats batch items independently.

If a submitted callable raises, only that caller receives the callable exception. Other submitted work in the same batch can still complete normally.

If executor submission fails part-way through a batch, already-submitted work is awaited and receives its real result or exception. Only work that was not submitted receives the submission failure.

Diagnostics

Call grinder.stats() from the event-loop thread to get:

  • started;

  • stopping;

  • pending;

  • batch_size_threshold;

  • max_wait_seconds;

  • lease_seconds;

  • oldest_wait_seconds.