Skip to main content

queue

Event-driven async message queue. Priority ordering, consumer groups, dead-letter, delayed delivery, background subscriptions that notify the agent when messages arrive.

PropertyValue
Module idqueue
Version1.0.0
Typeuser
Pip depsredis (only for the Redis backend)

Backends

BackendURL schemeStorageConsumer groups
InMemoryQueueBackendnull (default)per-queue heapqsimulated
RedisQueueBackendredis://host:6379/0Redis Streamsnative (XADD / XREADGROUP / XACK)

Dead-letter queue naming: dlq:{app_id}:{queue_name}.

The 13 actions

ToolSourcePurpose
queue.create_queueCreate / ensure a queue. Sets max_size, dead_letter_enabled, max_retries.
queue.publishPublish a message. Optional priority (0=highest, 9=lowest), delay_seconds, consumer_group, headers.
queue.subscribeStart a background consumer that pushes arriving messages via stream notifications.
queue.unsubscribeStop a background subscription.
queue.receivePull mode - blocking with timeout. ack_mode='manual' to ack after processing.
queue.ackAcknowledge messages after success.
queue.nackReject - requeue to retry, otherwise → dead-letter.
queue.peekPreview without consuming.
queue.queue_statsDepth, consumer count, throughput.
queue.list_queuesAll known queues.
queue.delete_queueDelete a queue + all messages.
queue.purgeRemove all messages, keep the queue.
queue.dead_letterInspect the DLQ for messages that exceeded max_retries.

Message format

QueueMessage(
id: str, # uuid4
queue: str,
body: Any, # JSON-serialisable
headers: dict[str, str],
priority: int, # 0..9, 0=highest
timestamp: float,
attempts: int,
max_retries: int,
delay_until: float | None, # epoch timestamp for delayed delivery
consumer_group: str | None,
ack_id: str | None,
)

Configuration

tools:
modules:
queue:
config:
backend_url: null # null = InMemory; redis://host:6379/0
default_max_retries: 3
default_visibility_timeout: 30 # seconds before a non-acked message reappears
max_queues: 50
constraints:
allowed_queues: [orders, events, notifications]
max_queues: 100

Constraints

:

ConstraintTypeDefaultDescription
allowed_queuesstring_listunrestrictedRestrict which queue names the agent can use.
max_queuesinteger50Maximum queues this app may create.

Aliases (FR / EN)

A few highlights - full list in @action(aliases=[...]):

ActionAliases
queue.publishpublier, envoyer, send, emit
queue.subscribeabonner, ecouter, listen
queue.receiverecevoir, pull, poll
queue.ackconfirmer, acknowledge
queue.nackrejeter, reject
queue.dead_letterlettres_mortes, dlq

Cross-references