WorkGrinder¶
WorkGrinder batches many submitters into executor work batches.
Use it when many async callers submit small synchronous jobs and you want one component to control batching, leasing, completion, and shutdown.
How batching works¶
The grinder processes a batch when either:
pending work count reaches
batch_size_threshold;the oldest pending item has waited
max_wait_seconds.
Each batch acquires one lease from the configured manager, submits each work item through that lease, resolves each caller’s future, and releases the lease.
Basic submit¶
submit() queues work and waits for the result:
"""
WorkGrinder with `submit()`.
`submit()` queues work and waits for the result.
The grinder processes a batch when either:
- pending count reaches batch_size_threshold
- the oldest pending item waits max_wait_seconds
"""
from __future__ import annotations
import asyncio
import time
from leasepool import LeasedExecutorManager, WorkGrinder
def blocking_square(value: int) -> int:
time.sleep(0.05)
return value * value
async def main() -> None:
manager = LeasedExecutorManager(
backend="thread",
max_pools=1,
min_pools=1,
workers_per_pool=4,
)
grinder = WorkGrinder(
executor_manager=manager,
batch_size_threshold=10,
max_wait_seconds=1.0,
lease_seconds=30.0,
owner_prefix="square-grinder",
)
await manager.start()
await grinder.start()
try:
results = await asyncio.gather(
*(grinder.submit(blocking_square, i, owner=f"item-{i}") for i in range(20))
)
print("Results:", results)
print("Grinder stats:", grinder.stats())
finally:
await grinder.stop(cancel_pending=True)
await manager.stop()
if __name__ == "__main__":
asyncio.run(main())
enqueue¶
enqueue() queues work and returns an asyncio.Future immediately. This is
useful when you want to queue multiple items first and await later.
"""
WorkGrinder with `enqueue()`.
`enqueue()` returns an asyncio.Future immediately. This is useful when you want
to queue work first and await it later.
"""
from __future__ import annotations
import asyncio
import time
from leasepool import LeasedExecutorManager, WorkGrinder
def blocking_format(value: int) -> str:
time.sleep(0.05)
return f"value={value}"
async def main() -> None:
manager = LeasedExecutorManager(
backend="thread",
max_pools=1,
min_pools=1,
workers_per_pool=4,
)
grinder = WorkGrinder(
executor_manager=manager,
batch_size_threshold=5,
max_wait_seconds=1.0,
lease_seconds=30.0,
)
await manager.start()
await grinder.start()
try:
futures = []
for i in range(5):
future = await grinder.enqueue(blocking_format, i, owner=f"format-{i}")
futures.append(future)
print("Queued futures:", len(futures))
print("Stats after enqueue:", grinder.stats())
results = await asyncio.gather(*futures)
print("Results:", results)
print("Stats after completion:", grinder.stats())
finally:
await grinder.stop(cancel_pending=True)
await manager.stop()
if __name__ == "__main__":
asyncio.run(main())
submit_from_thread¶
submit_from_thread() is for non-async code or another OS thread. It returns a
concurrent.futures.Future.
"""
Submitting work to WorkGrinder from another OS thread.
Use `submit_from_thread()` only from non-async code or another thread. It returns
a `concurrent.futures.Future`.
"""
from __future__ import annotations
import asyncio
import threading
import time
from leasepool import LeasedExecutorManager, WorkGrinder
def blocking_add(left: int, right: int) -> int:
time.sleep(0.05)
return left + right
def thread_entrypoint(grinder: WorkGrinder) -> None:
future = grinder.submit_from_thread(
blocking_add,
20,
22,
owner="external-thread",
)
print("Worker thread got result:", future.result(timeout=5))
async def main() -> None:
manager = LeasedExecutorManager(
backend="thread",
max_pools=1,
min_pools=1,
workers_per_pool=2,
)
grinder = WorkGrinder(
executor_manager=manager,
batch_size_threshold=1,
max_wait_seconds=1.0,
)
await manager.start()
await grinder.start()
try:
thread = threading.Thread(
target=thread_entrypoint,
args=(grinder,),
name="external-submitter",
)
thread.start()
while thread.is_alive():
await asyncio.sleep(0.05)
thread.join()
finally:
await grinder.stop(cancel_pending=True)
await manager.stop()
if __name__ == "__main__":
asyncio.run(main())
Event-loop ownership¶
WorkGrinder belongs to the event loop that started it.
Use these from the owning event loop:
await grinder.submit(...);await grinder.enqueue(...);await grinder.stop(...);await grinder.astats().
Use these from other OS threads:
grinder.submit_from_thread(...);grinder.stats_from_thread(...).
Calling async WorkGrinder methods from another event loop raises
RuntimeError.
Shutdown behavior¶
await grinder.stop(cancel_pending=False)
Drains queued work before stopping. This is the graceful shutdown path.
await grinder.stop(cancel_pending=True)
Cancels queued pending work and cancels the grinder task if it is blocked waiting for a lease or waiting for in-flight executor work.
Already-running synchronous functions follow the underlying executor semantics. For thread workers, cancelling the asyncio wrapper does not forcibly stop Python code that is already running in a worker thread. The executor lease remains managed safely by the manager’s lease-draining behavior.
Stop the grinder before stopping the manager it depends on:
await grinder.stop(cancel_pending=True)
await manager.stop()
Validation¶
WorkGrinder validates its batching and lease configuration at construction
time.
batch_size_threshold must be a strict positive integer. Fractional values
such as 1.9 are rejected instead of being truncated.
max_wait_seconds and lease_seconds must be finite positive numbers.
NaN, positive infinity, negative infinity, booleans, strings, zero, and
negative values are rejected.
Cross-thread APIs¶
submit_from_thread() and stats_from_thread() are only for non-owner OS
threads. They must not be called from the event-loop thread that started the
grinder.
From the owning event loop, use:
await grinder.submit(...)
await grinder.enqueue(...)
grinder.stats()
await grinder.astats()
From another OS thread, use:
future = grinder.submit_from_thread(sync_fn, arg)
result = future.result(timeout=2)
stats = grinder.stats_from_thread(timeout=2)
Cancellation¶
If a future returned by enqueue() is cancelled while still pending, the item
is removed from the pending queue promptly. This keeps grinder.stats() accurate
and avoids retaining cancelled work until the next batch threshold, timeout, or
shutdown.
Partial batch submission failures¶
WorkGrinder treats batch items independently.
If a submitted callable raises, only that caller receives the callable exception. Other submitted work in the same batch can still complete normally.
If executor submission fails part-way through a batch, already-submitted work is awaited and receives its real result or exception. Only work that was not submitted receives the submission failure.
Diagnostics¶
Call grinder.stats() from the event-loop thread to get:
started;stopping;pending;batch_size_threshold;max_wait_seconds;lease_seconds;oldest_wait_seconds.