WorkGrinder¶
- class WorkGrinder(*, executor_manager, max_wait_seconds=10.0, batch_size_threshold=20, lease_seconds=60.0, owner_prefix='work-grinder', logger=None)[source]¶
Bases:
objectAsync work batcher backed by leased executors.
Multiple async callers submit sync work. The grinder starts processing a batch when either:
the oldest pending work has waited at least max_wait_seconds, or
pending work count reaches batch_size_threshold.
Once a batch is ready, it leases one executor and submits the whole batch.
Initialize a WorkGrinder instance.
- Parameters:
executor_manager (LeasedExecutorManager) – The executor manager to lease executors from.
max_wait_seconds (float, optional) – The maximum time to wait before processing a batch. Defaults to 10.0.
batch_size_threshold (int, optional) – The number of pending work items to trigger batch processing. Defaults to 20.
lease_seconds (float, optional) – The duration to lease an executor for each batch. Defaults to 60.0.
owner_prefix (str, optional) – The prefix for the owner identifier of each batch. Defaults to “work-grinder”.
logger (logging.Logger | None)
- Raises:
ValueError – If max_wait_seconds is not greater than 0.
ValueError – If batch_size_threshold is not greater than 0.
ValueError – If lease_seconds is not greater than 0.
- __init__(*, executor_manager, max_wait_seconds=10.0, batch_size_threshold=20, lease_seconds=60.0, owner_prefix='work-grinder', logger=None)[source]¶
Initialize a WorkGrinder instance.
- Parameters:
executor_manager (LeasedExecutorManager) – The executor manager to lease executors from.
max_wait_seconds (float, optional) – The maximum time to wait before processing a batch. Defaults to 10.0.
batch_size_threshold (int, optional) – The number of pending work items to trigger batch processing. Defaults to 20.
lease_seconds (float, optional) – The duration to lease an executor for each batch. Defaults to 60.0.
owner_prefix (str, optional) – The prefix for the owner identifier of each batch. Defaults to “work-grinder”.
logger (Logger | None)
- Raises:
ValueError – If max_wait_seconds is not greater than 0.
ValueError – If batch_size_threshold is not greater than 0.
ValueError – If lease_seconds is not greater than 0.
- async start()[source]¶
Start the WorkGrinder.
This method initializes the event loop and starts the grinder loop task.
- Return type:
- async enqueue(fn, /, *args, owner=None, **kwargs)[source]¶
Enqueue a work item to the WorkGrinder.
- Parameters:
- Raises:
RuntimeError – If the WorkGrinder is not started.
RuntimeError – If the WorkGrinder is stopping.
- Returns:
A future representing the result of the work item.
- Return type:
asyncio.Future[Any]
- submit_from_thread(fn, /, *args, owner=None, **kwargs)[source]¶
Submit a work item to the WorkGrinder from a different thread.
- Parameters:
- Raises:
RuntimeError – If the WorkGrinder is not started.
- Returns:
A future representing the result of the work item.
- Return type:
ConcurrentFuture[Any]
- stats()[source]¶
Get the current statistics of the WorkGrinder.
This method must be called from the WorkGrinder event-loop thread while the grinder is running. Use stats_from_thread() from other threads. It is also safe before start or after stop.
- stats_from_thread(timeout=None)[source]¶
Get the current statistics of the WorkGrinder from a different thread.
- Parameters:
timeout (float | None, optional) – The maximum time to wait for the statistics. Defaults to None.
- Raises:
RuntimeError – If the WorkGrinder is not started.
- Returns:
A dictionary containing the current statistics.
- Return type:
Manual summary¶
Constructor¶
WorkGrinder(
*,
executor_manager,
max_wait_seconds=10.0,
batch_size_threshold=20,
lease_seconds=60.0,
owner_prefix="work-grinder",
logger=None,
)
Lifecycle¶
await grinder.start()Start the background grinder task and bind the grinder to the current event loop.
await grinder.stop(cancel_pending=False)Stop the grinder. Pending work is drained by default. With
cancel_pending=True, queued pending work is cancelled and the grinder task is cancelled if it is blocked waiting for a lease or waiting for in-flight executor work.
Submitting work¶
await grinder.submit(fn, *args, owner=None, **kwargs)Queue work and wait for its result.
await grinder.enqueue(fn, *args, owner=None, **kwargs)Queue work and return an
asyncio.Futureimmediately.grinder.submit_from_thread(fn, *args, owner=None, **kwargs)Thread-safe submission API for non-owner OS threads. Raises
RuntimeErrorif called from the owning event-loop thread.
Diagnostics¶
grinder.stats()Return a diagnostic snapshot. Safe before start and after stop. While the grinder is running, call it from the owning event loop.
await grinder.astats()Return a diagnostic snapshot from the owning event loop.
grinder.stats_from_thread(timeout=None)Thread-safe stats API for non-owner OS threads. Raises
RuntimeErrorif called from the owning event-loop thread.
Loop ownership¶
Async WorkGrinder methods must be called from the event loop that started the
grinder. Use submit_from_thread() and stats_from_thread() from other OS
threads.