Skip to main content

Output Channels

Output channels are the notification delivery infrastructure for Digitorn. When a scheduled job fires, a watcher detects a change, or a background task completes, the result is routed to an output channel for delivery.

Channels are not modules. Modules expose tools to the LLM agent. Channels deliver results to external systems — Slack, email, Kafka, Telegram, SMS, phone calls, webhooks, MQTT, or any custom destination.

Architecture

Quick Start

1. Basic — LLM Notification (default, no config needed)

Every app has the llm_notification channel built-in. It pushes notifications directly to the LLM agent's conversation:

execution:
scheduler: true
watchers: true
# default_channel: llm_notification ← implicit

2. Add a Webhook Channel

channels:
slack_alerts:
type: webhook
config:
url: "{{env.SLACK_WEBHOOK_URL}}"
headers:
Content-Type: "application/json"

execution:
scheduler: true
default_channel: slack_alerts # All jobs/watchers use this by default

3. Multiple Channels

channels:
slack_alerts:
type: webhook
config:
url: "{{env.SLACK_WEBHOOK}}"

audit_log:
type: log
config:
logger_name: "digitorn.audit"
level: "INFO"
format: json
include_data: true

ops_webhook:
type: webhook
config:
url: "https://ops.internal/api/notify"
headers:
Authorization: "Bearer {{env.OPS_TOKEN}}"
timeout: 5

execution:
scheduler: true
watchers: true
default_channel: slack_alerts

Then in the agent conversation, the LLM can target specific channels:

"Schedule a health check on https://api.example.com every 5 minutes,
send alerts to ops_webhook"

Built-in Channel Types

llm_notification — LLM Agent (default)

Delivers notifications directly to the LLM agent's conversation loop. If no consumer is connected, notifications are buffered in the KV store and delivered when the session reconnects.

  • Always available — no config needed, always registered
  • Zero external dependencies — local memory + KV storage
  • Automatic fallback — other channels fall back to this if they fail
channels:
# Not needed — llm_notification is always registered

webhook — HTTP POST

Send notifications via HTTP to any URL. Compatible with Slack Incoming Webhooks, Discord, Microsoft Teams, Zapier, Make, n8n, and any REST API.

channels:
my_hook:
type: webhook
config:
url: "https://hooks.slack.com/services/T.../B.../..."
method: POST # POST (default), PUT, PATCH
headers: # Custom HTTP headers
Content-Type: "application/json"
timeout: 10 # Request timeout (seconds)
verify_ssl: true # SSL verification (default: true)
payload_template: | # Optional: custom payload format
{"text": "{{message}}", "channel": "#alerts"}

Per-delivery overrides (from output_config on jobs/watchers):

FieldDescription
urlOverride target URL
headersAdditional headers (merged with global)
payloadCustom payload dict (replaces template)

Retry policy: 3 retries, exponential backoff 2s -- 30s. Retries on HTTP 429 (rate limit) and 5xx (server errors).

Fallback: If aiohttp is not installed, falls back to urllib (synchronous in thread pool).

log — Structured Logging

Write notifications to Python's logging system. Useful for debugging, audit trails, and integration with log aggregation systems (ELK, Loki, Datadog, Grafana).

channels:
audit:
type: log
config:
logger_name: "digitorn.notifications" # Python logger name
level: "INFO" # DEBUG, INFO, WARNING, ERROR
format: "json" # "text" (default) or "json"
include_data: true # Include structured data

No retry — logging is local and effectively never fails.

Plugin Channels

Anyone can create a channel plugin. Install via pip, it auto-registers:

pip install digitorn-channel-slack
pip install digitorn-channel-telegram
pip install digitorn-channel-gmail
pip install digitorn-channel-kafka
pip install digitorn-channel-sms

Then use it in YAML:

channels:
team_slack:
type: slack
config:
webhook_url: "{{secret.SLACK_WEBHOOK}}"
default_channel: "#engineering"

Creating a Plugin Channel

  1. Create a Python package with a class extending BaseOutputChannel:
from digitorn.core.app.channels import BaseOutputChannel, ChannelPayload, DeliveryResult

class TelegramChannel(BaseOutputChannel):
CHANNEL_ID = "telegram"
CHANNEL_NAME = "Telegram"
CHANNEL_VERSION = "1.0.0"
CHANNEL_DESCRIPTION = "Send notifications via Telegram Bot API"

def capabilities(self):
return ChannelCapabilities(
supports_rich_text=True,
max_message_length=4096,
supported_formats=["text", "markdown", "html"],
)

def config_schema(self):
return {
"required": {
"bot_token": "Telegram Bot API token (from @BotFather)",
},
"optional": {
"default_chat_id": "Default chat/group ID",
"parse_mode": "Message format: Markdown or HTML",
},
}

def per_delivery_config_schema(self):
return {
"optional": {
"chat_id": "Override target chat",
},
}

async def deliver(self, app_id, payload, config):
import aiohttp

token = self.channel_config["bot_token"]
chat_id = config.get("chat_id", self.channel_config.get("default_chat_id"))
text = self.format_text(payload)

url = f"https://api.telegram.org/bot{token}/sendMessage"
async with aiohttp.ClientSession() as session:
async with session.post(url, json={
"chat_id": chat_id,
"text": text,
"parse_mode": self.channel_config.get("parse_mode", "Markdown"),
}) as resp:
if resp.status == 200:
data = await resp.json()
return DeliveryResult(
success=True,
channel_id=self.CHANNEL_ID,
delivery_id=str(data["result"]["message_id"]),
)
return DeliveryResult(
success=False,
channel_id=self.CHANNEL_ID,
error=f"Telegram API: HTTP {resp.status}",
retryable=resp.status >= 500,
)
  1. Register via Python entry points in pyproject.toml:
[project.entry-points."digitorn.channels"]
telegram = "my_package:TelegramChannel"
  1. Install and use:
pip install my-telegram-channel
channels:
ops_telegram:
type: telegram
config:
bot_token: "{{secret.TELEGRAM_TOKEN}}"
default_chat_id: "-100123456789"

Channel Interface

Every channel implements BaseOutputChannel. Here's the full contract:

Required

MemberDescription
CHANNEL_IDUnique type identifier (e.g. "slack", "telegram")
CHANNEL_NAMEHuman-readable name
CHANNEL_VERSIONSemver version string
deliver(app_id, payload, config)Core delivery method

Optional (override for richer behavior)

MethodDefaultDescription
capabilities()Text-onlyDeclare rich text, attachments, threading, batching
config_schema()EmptyRequired + optional config fields
per_delivery_config_schema()EmptyPer-job/watcher config overrides
validate_config()Check required fieldsDeep validation (test connectivity, credentials)
on_start()No-opInitialize connections, pools, tokens
on_stop()No-opClose connections, flush queues
health_check()Internal trackerRuntime health monitoring
retry_policy()3 retries, exponential 1s -- 60sCustom retry behavior
format_text(payload)[title] message tagsPlain text formatting
format_rich(payload)rich_message or HTML fallbackRich text formatting

ChannelPayload (universal input)

Every channel receives the same structured payload:

FieldTypeDescription
messagestrAlways present. Plain text fallback.
titlestrSubject/header (email subject, Slack header)
rich_messagestrHTML/Markdown version
structured_datadictRaw JSON for machine-readable channels
attachmentslist[PayloadAttachment]File attachments
metadatadictSource info: job_id, trigger_type, timestamp, run_count
thread_id`strNone`
prioritystr"low", "normal", "high", "critical"
tagslist[str]Routing/filtering tags

DeliveryResult (universal output)

FieldTypeDescription
successboolDelivered successfully
channel_idstrWhich channel handled it
delivery_id`strNone`
error`strNone`
retryableboolIs the error transient?
bufferedboolWas it buffered for later?
metadatadictChannel-specific response data

ChannelCapabilities

Channels declare what they support — the system adapts:

CapabilityExample channels
supports_rich_textSlack, Email, Telegram
supports_attachmentsEmail, Slack, Telegram
supports_threadingSlack, Email
supports_batchingKafka, MQTT
max_message_lengthSMS: 1600, Slack: 40000
supported_formats["text", "html", "markdown", "json", "blocks"]

RetryPolicy

Channels declare retry behavior; the registry handles the loop:

FieldDefaultDescription
max_retries3Maximum retry attempts
backoff_base1.0sInitial retry delay
backoff_max60.0sMaximum retry delay
backoff_multiplier2.0Exponential backoff factor

Registry Architecture

The ChannelRegistry manages two levels:

Types (global, loaded at daemon startup)

  • Built-in: llm_notification, webhook, log — always available
  • Plugins: loaded from Python entry points (digitorn.channels group)

Instances (per-app, created at deploy time)

  • Created from the channels: block in the app YAML
  • Config resolved (variables, secrets substituted)
  • on_start() called at deploy, on_stop() at undeploy
  • Cleaned up automatically when the app is undeployed

Delivery flow

registry.deliver("slack_alerts", app_id, payload, config)
1. Lookup instance "slack_alerts"
2. (miss?) Lookup by type ID (backward compat)
3. (miss?) Fall back to "llm_notification" (default)
4. Convert dict -- ChannelPayload if needed
5. Call instance.deliver()
6. On failure: check retryable, apply retry_policy
7. Record health metrics (_record_success / _record_failure)
8. Return DeliveryResult

Multi-channel fanout

Send one notification to multiple channels simultaneously:

results = await registry.deliver_multi(
["slack_alerts", "audit_log", "ops_webhook"],
app_id, payload, config
)

All deliveries run concurrently via asyncio.gather().

Health Monitoring

Every channel tracks:

  • Total deliveries (success + failure)
  • Failure rate -- auto-degrades status (ok -- degraded at >10% failures)
  • Last error message
  • Last successful delivery timestamp
  • Delivery latency (milliseconds)

Query health:

health = await registry.health("slack_alerts")
# ChannelHealth(status="ok", latency_ms=142.3, deliveries_total=1523, ...)

all_health = await registry.health_all()
# {"slack_alerts": ChannelHealth(...), "audit_log": ChannelHealth(...)}

Secrets Handling

Channel configs support the same variable resolution as the rest of the YAML:

variables:
slack_channel: "#production-alerts"

channels:
slack:
type: webhook
config:
url: "{{env.SLACK_WEBHOOK_URL}}" # From environment variable
headers:
Authorization: "Bearer {{env.API_TOKEN}}"

Secrets are resolved at compile time via resolve_variables() — the same mechanism used for module configs and brain configs. Never store secrets in plain text.

Integration with Scheduler and Watchers

Jobs and watchers reference channels by instance name:

# Agent calls:
schedule_once(
when="in 5m",
action_type="tool_call",
tool_name="http.get",
tool_params={"url": "https://api.example.com/health"},
output_channel="ops_webhook" # ← routes to the webhook channel
)

watch_start(
name="http.get",
params={"url": "https://api.example.com"},
interval=60,
notify_when="on_error",
# output_channel defaults to execution.default_channel
)

The output_channel field on ScheduledJob and watcher params maps to a channel instance name. If not specified, it uses execution.default_channel (defaults to "llm_notification").

Per-User Channel Resolution

When the same app serves many users (10, 100, 10,000), each user's notifications must go to their email, phone, or Telegram — not a shared destination. The user resolver solves this automatically.

The Problem

Without auto-resolution, the LLM agent would need to specify output_config manually for every delivery:

schedule_once(when="in 5m", output_channel="sms", output_config={"to_number": "+33612345678"})

This doesn't scale. The agent doesn't know the user's phone number, and you can't hardcode it for 10,000 users.

The Solution: user_resolver

Add a user_resolver to any channel. The system automatically looks up the current user's delivery address from a data source (database, API, etc.) using the session_id.

channels:
sms_alerts:
type: sms
config:
account_sid: "{{env.TWILIO_SID}}"
auth_token: "{{env.TWILIO_TOKEN}}"
from_number: "+33600000000"
user_resolver:
module: database
action: fetch_results
params:
query: "SELECT phone, email FROM users WHERE session_id = :session_id"
mapping:
to_number: phone # DB column 'phone' -- channel field 'to_number'
cache_ttl: 300 # cache for 5 minutes

Now the agent just says:

schedule_once(when="in 5m", output_channel="sms_alerts", ...)

The system:

  1. Knows the current user via session_id (captured when the job was created)
  2. Runs database.fetch_results(query="SELECT phone, email FROM users WHERE session_id = 'abc123'")
  3. Maps the phone column to the channel's to_number field
  4. Delivers the SMS to the right number

How It Works

Job fires -- SchedulerService._fire_job()
|
v
ChannelRegistry.deliver(channel, app_id, payload, config, session_id=job.session_id)
|
+--> Has user_resolver? -- UserResolver.resolve(session_id)
| |
| +--> Execute module.action(params) with :session_id replaced
| +--> Map result fields via mapping config
| +--> Merge with any explicit output_config (explicit wins)
| |
| v
| resolved_config = {"to_number": "+33612345678"}
|
+--> No resolver? -- Use output_config as-is (backward compat)
|
v
instance.deliver(app_id, payload, resolved_config)

user_resolver Fields

FieldRequiredDefaultDescription
moduleYesModule ID to query (e.g. database, http)
actionYesAction to call (e.g. fetch_results, get)
paramsNo{}Action parameters. :session_id and {{session_id}} are replaced with the actual session ID
mappingNo{}Maps result fields to per-delivery config fields. e.g. {to_number: phone}
cache_ttlNo300Cache duration in seconds (0 = no cache)

Multi-Channel Example

One resolver per channel — each maps to different fields:

modules:
database:
setup:
- action: connect
params:
driver: sqlite
database: "{{workspace}}/users.db"

channels:
sms_alerts:
type: sms
config:
account_sid: "{{env.TWILIO_SID}}"
from_number: "+33600000000"
user_resolver:
module: database
action: fetch_results
params:
query: "SELECT phone FROM users WHERE session_id = :session_id"
mapping:
to_number: phone

email_reports:
type: email
config:
smtp_host: "smtp.example.com"
smtp_port: 587
user_resolver:
module: database
action: fetch_results
params:
query: "SELECT email, full_name FROM users WHERE session_id = :session_id"
mapping:
to_address: email
recipient_name: full_name

telegram_ops:
type: telegram
config:
bot_token: "{{env.TELEGRAM_TOKEN}}"
user_resolver:
module: database
action: fetch_results
params:
query: "SELECT telegram_chat_id FROM users WHERE session_id = :session_id"
mapping:
chat_id: telegram_chat_id

All three channels query the same users table but map different columns. The LLM agent doesn't need to know any of this — it just sets output_channel and the system handles the rest.

HTTP API Resolver

The resolver works with any module, not just database:

channels:
push_notification:
type: push
config:
api_key: "{{env.PUSH_API_KEY}}"
user_resolver:
module: http
action: json_api
params:
url: "https://auth.internal/users/:session_id/contacts"
method: GET
mapping:
device_token: push_token
platform: device_platform

Programmatic Resolver (Plugin Channels)

Plugin channels can also override resolve_recipient() directly in Python for custom logic:

class SmartSMSChannel(BaseOutputChannel):
CHANNEL_ID = "smart_sms"

async def resolve_recipient(self, context):
"""Look up phone from our custom user service."""
if not context.session_id:
return context.output_config

phone = await self._lookup_phone(context.session_id)
resolved = {"to_number": phone}
# Explicit output_config always overrides auto-resolved values
resolved.update(context.output_config)
return resolved

This is called as a fallback when no YAML user_resolver is configured. Both approaches (YAML resolver and Python resolve_recipient) work together — the YAML resolver takes precedence.

Caching

The resolver caches results per session_id to avoid querying the database on every notification. Default TTL: 5 minutes.

  • cache_ttl: 0 — disable cache (query on every delivery)
  • cache_ttl: 3600 — cache for 1 hour (stable user data)
  • Cache is in-memory per channel instance, evicted at 10,000 entries

Error Handling

If the resolver fails (DB down, user not found, etc.):

  1. A warning is logged
  2. The system falls back to explicit output_config (if any)
  3. The delivery proceeds — the channel decides what to do with missing fields

This is resilient: a resolver failure doesn't block the entire notification pipeline.

Complete Example

app:
app_id: monitoring-bot
name: "Monitoring Bot"

variables:
workspace: "{{env.PWD}}"

channels:
slack_alerts:
type: webhook
config:
url: "{{env.SLACK_WEBHOOK}}"
payload_template: |
{"text": "{{message}}", "channel": "#alerts"}

audit:
type: log
config:
logger_name: "digitorn.audit"
level: INFO
format: json
include_data: true

modules:
http:
constraints:
allowed_actions: [get, head, json_api, fetch_page]

agents:
- id: monitor
brain:
provider: openai
model: gpt-4o-mini
backend: openai_compat
config:
api_key: "{{env.OPENAI_API_KEY}}"
system_prompt: |
Tu es un bot de monitoring. Utilise les watchers et le scheduler
pour surveiller les endpoints. Envoie les alertes critiques
sur slack_alerts.

execution:
mode: conversation
watchers: true
scheduler: true
default_channel: slack_alerts
greeting: |
Monitoring bot ready. I can:
- Watch HTTP endpoints and alert on errors
- Schedule periodic health checks
- Route alerts to Slack or internal audit log
What would you like to monitor?