Skip to main content

channels

The channels module is Digitorn's unified bidirectional I/O layer. One YAML providers: block declares every input adapter (webhook, cron, email, file watcher, RSS, queue) and every output channel (Slack, Telegram, Discord, email, webhook, log). Inbound events run through an activation pipeline that starts an agent turn; agents reply or broadcast through the same or different providers.

PropertyValue
Module idchannels
Version1.0.0
LLM-exposed actions11
Adapter count11 built-ins
Activation pipeline

Full reference (every adapter, full activation pipeline, security, custom adapter API, complete IT-support example): Channels. This page is a quick module-level summary.

The 11 built-in adapters

Lazy imports - adding the optional pip dep enables the adapter at restart.

AdapterInboundOutboundOptional depPurpose
webhookyesyesaiohttp (outbound)HTTP POST in / out.
cronyes-croniter (precise)Scheduled activations.
file_watcheryes-noneTrigger on filesystem changes.
emailyesyesnone (stdlib imaplib / smtplib)IMAP in / SMTP out.
rssyes-feedparserPoll RSS / Atom feeds.
log-yesnoneStructured Python logging.
queueyesyesnoneBridge to the queue module.
telegramyesyesaiohttpBot API (long polling + REST).
discordyesyesaiohttpWebSocket Gateway + REST.
slackyesyesaiohttpSocket Mode + Web API.
voiceyesyesaiohttp (+ edge-tts)Phone / browser calls (Twilio CR + WebSocket backends).

Register custom adapters at runtime:

from digitorn.modules.channels.adapters import register_adapter
from my_pkg.kafka_adapter import KafkaAdapter
register_adapter("kafka", KafkaAdapter)

The 11 actions

ToolSourceRiskPurpose
channels.send_messagemediumSend on a specific provider.
channels.replymediumReply on the channel that triggered this activation (uses reply_context).
channels.broadcasthighFan out the same message to many providers.
channels.list_providerslowConfigured providers + available adapter catalog.
channels.provider_statuslowStatus / capabilities of one provider.
channels.pause_providermediumPause inbound listener.
channels.resume_providermediumResume a paused listener.
channels.provider_historylowRecent inbound + outbound history.
channels.statslowAggregate counters across providers.
channels.simulate_eventmediumDrop a synthetic inbound event into a provider.
channels.test_sendmediumOutbound smoke test.

Aliases (FR + EN): envoyer_message, repondre, diffuser, lister_canaux, historique_canaux, stats_canaux, pause_canal, reprendre_canal.

Module-level config

ChannelsModuleConfig:

tools:
modules:
channels:
config:
default_agent: "" # empty → entry agent
max_turns: 30 # [1, 200]
timeout: 120.0 # [5, 3600] seconds per activation
history_limit: 200 # event records kept in memory
secret_filter_enabled: true # mask secrets in outbound text

providers:
notify_slack:
adapter: slack
enabled: true
max_concurrent: 5 # [1, 100] concurrent activations
config:
bot_token: "{{secret.SLACK_BOT_TOKEN}}"
activation:
session: per_event # per_event | shared | "{{template}}"
message: "{{event.message}}"
reply: auto # auto | none | explicit
filter: []
prepare: []
route: null

The activation pipeline (per-provider)

::ActivationPipeline.process_event(event, provider)`:

  1. Filter - drop events that don't match every filter[] condition (equals / not_equals / contains / gt / lt on a dot-path field).
  2. Prepare - call tools via the ServiceBus, stash results under as: <name>, available later as {{<name>.X}}.
  3. Route - pick an agent by matching field against rules[].match (falls back to the default: true rule then to default_agent).
  4. Session - pick or create a session keyed by session: (per_event / shared / {{template}}). Shared sessions hold an asyncio.Lock per session key so concurrent events serialise.
  5. Activate - render message + context templates, call agent_turn with max_turns + timeout from the module config.
  6. Reply - reply: auto sends the agent's first reply back through the originating adapter via adapter.send_reply(reply_ctx, text). explicit → agent must call channels.reply. none → no reply.

Lifecycle (3 phases)

PhaseMethodWhat happens
1. Deployon_config_update(cfg)Parse config, create adapters, call adapter.on_start. Listeners NOT started yet. Providers status="ready".
2. Runstart_listenersRestore shared sessions from DB; launch one asyncio.Task per inbound listener. Providers flip to status="active".
3. Stopon_stopCancel all listener tasks, await in-flight activations, call adapter.stop_listener + adapter.on_stop.

Splitting deploy from run lets the daemon validate config at deploy time without binding to webhooks / IMAP / Telegram until the app actually starts (via run_background or an entry-point HTTP activation).

Security highlights

#Guard
1Payload size enforced before JSON parse (webhook).
2HMAC SHA-256 with constant-time compare (hmac.compare_digest).
3API key constant-time compare.
4Content-Type whitelist + sanitisation (__proto__, __class__, ...).
5Sensitive header stripping (Authorization, Cookie, X-API-Key, X-Signature-*).
6Outbound SSRF blocklist (RFC 1918, loopback, link-local, multicast, AWS / GCP metadata).
7Outbound secret filtering (OpenAI sk-*, Anthropic sk-ant-*, GitHub ghp_*, AWS AKIA*, JWT eyJ*, Bearer, Basic, Digitorn dk_*).
8Header masking in logs (Authorization, Cookie, X-API-Key***masked***).
9No eval / exec in templates - single-pass {{var}}.
10{{secret.*}} / {{env.*}} blocked at runtime (compile-time only).
11Per-provider max_concurrent semaphore + per-shared-session lock.

There is no daemon-wide loopback auth bypass, and channel inbound endpoints don't add one either - every webhook still goes through its adapter's HMAC / token / signature auth layer regardless of source IP.

Constraints

:

NameTypeDefaultPurpose
allowed_adaptersstring_listunrestrictedRestrict which adapter types this app can use.
max_providersinteger20Upper bound on provider instance count.

Cross-references