An Airflow plugin and standalone REST API for monitoring DAG failures, SLA misses, task health, and scheduling delays — with built-in alerting via Slack, Email, and PagerDuty.
Airflow Watcher ships two independent components that share the same monitors, notifiers, metrics layer, and pluggable backend system:
| Component | Runs in | Auth | Use case |
|---|---|---|---|
| Plugin | Airflow webserver process | Airflow session cookies + FAB RBAC | UI dashboards, internal API at /api/watcher |
| Standalone API | Separate FastAPI process | Bearer token + per-key RBAC | External integrations, CI/CD, dashboards, automation |
Both components support three data backends controlled by a single env var:
AIRFLOW_WATCHER_BACKEND |
Data source |
|---|---|
airflow (default) |
Airflow metadata DB (Postgres / MySQL / SQLite via Airflow ORM) |
bigquery |
External BigQuery table (e.g. dbt callback results) |
sqlalchemy |
Any SQLAlchemy-compatible DB with a flat results table |
- Features
- Installation
- Alerting & Monitoring
- Plugin
- Standalone API
- Common
- 📊 Dashboard View — 7 custom Airflow UI pages for monitoring
- 🔒 Airflow RBAC — DAG-level filtering via Airflow's FAB security manager
- 🔌 Zero Config — Auto-registers via Airflow's plugin entry point
- 📡 Internal REST API —
/api/watcher/*endpoints on the webserver
- 🚀 Standalone FastAPI — Runs outside the Airflow webserver
- 🔑 Bearer Token Auth — Constant-time token validation with key rotation
- 🗂️ Per-Key RBAC — Map API keys to specific DAGs
- 📄 Swagger UI — Interactive docs at
/docs - ⚡ Response Caching — Thread-safe TTL cache (default 60s)
- 🚦 Rate Limiting — Per-IP sliding window (default 120 req/min, returns 429 with
Retry-After) - 📋 Structured Logging — JSON log format with request IDs (
AIRFLOW_WATCHER_LOG_FORMAT=json) /healthzLiveness Probe — No auth required, checks DB connectivity
- 🚨 DAG Failure Monitoring — Real-time tracking of DAG and task failures
- ⏰ SLA Miss Detection — Alerts when DAGs miss their SLA deadlines
- 📈 Trend Analysis — Historical failure and SLA miss trends
- 🔔 Multi-channel Notifications — Slack, Email, and PagerDuty alerts
- 📡 Metrics Export — StatsD/Datadog and Prometheus support
- ⚙️ Flexible Alert Rules — Pre-defined templates or custom rules
| Airflow Version | Status |
|---|---|
| 2.7.x – 2.10.x | Fully supported — tested in CI |
| 2.5.x – 2.6.x | Should work, not actively tested |
| 3.x | Not yet supported — the <3.0 pin in pyproject.toml is intentional. Airflow 3.0 removes airflow.models.SlaMiss (used by SLAMonitor) and changes the metadata DB schema. A compatibility release is planned; track progress in the issue tracker. |
📖 See INSTALL.md for detailed installation and configuration instructions.
📖 See ALERTING.md for complete alerting configuration:
- Slack — Rich notifications with blocks
- Email — SMTP-based alerts
- PagerDuty — Incident management with deduplication
- StatsD/Datadog — Real-time metrics
- Prometheus —
/metricsendpoint for scraping
# Slack alerts
export AIRFLOW_WATCHER_SLACK_WEBHOOK_URL="https://hooks.slack.com/..."
# PagerDuty (optional)
export AIRFLOW_WATCHER_PAGERDUTY_ROUTING_KEY="your-key"
# Choose alert template
export AIRFLOW_WATCHER_ALERT_TEMPLATE="production_balanced"The plugin runs inside the Airflow webserver process. Once installed, it auto-registers with Airflow, adds a "Watcher" menu to the UI, and exposes REST endpoints at /api/watcher/*.
By default the plugin reads from Airflow's own metadata DB — no extra configuration needed. Set AIRFLOW_WATCHER_BACKEND=bigquery (or sqlalchemy) in your Airflow environment to switch every plugin endpoint to an external data source instead.
+--------------------------------------------------------------+
| Airflow Webserver |
| |
| +--------------------------------------------------------+ |
| | Airflow Watcher Plugin | |
| | | |
| | +-------------+ +------------------------------+ | |
| | | Flask Views | | monitor_provider | | |
| | | (Dashboard) |<---| routes based on BACKEND env | | |
| | | | +----------+-------------------+ | |
| | | REST API | | | |
| | | /api/watcher | +---------+----------+ | |
| | +-------------+ | | | | |
| | ▼ ▼ ▼ | |
| | +--------+ +------+--+ +--------+ | |
| | | Airflow| |BigQuery | | SQL | | |
| | | ORM | | Client | | Client | | |
| | |(default| |(BQ table| |(any DB)| | |
| | +---+----+ +----+----+ +---+----+ | |
| | | | | | |
| +------------------+-----------+-----------+-------------+ |
| | | | |
| +----------+ +-----+ +-------+ |
| ▼ ▼ ▼ |
| +--------------+ +----------+ +----------+ |
| | Airflow DB | | BigQuery | | Postgres | |
| | (meta DB) | | Table | | MySQL | |
| +--------------+ +----------+ | SQLite | |
| +----------+ |
+--------------------------------------------------------------+
Once installed, navigate to Watcher in the Airflow UI navigation to access:
| Menu Item | Description |
|---|---|
| Airflow Dashboard | Overview metrics |
| Airflow Health | DAG health status (success/failed/delayed/stale) |
| DAG Scheduling | Queue and pool utilization |
| DAG Failures | Recent failures with details |
| SLA Tracker | SLA misses and delays |
| Task Health | Long-running and zombie tasks |
| Dependencies | Cross-DAG dependency tracking |
The plugin integrates with Airflow's built-in FAB security manager to enforce DAG-level access control. No separate configuration is needed — it reads directly from Airflow's role and permission system.
- Admin / Op roles see all DAGs across every Watcher page and API endpoint
- Custom roles only see DAGs they have
can_readpermission on - Filtering is mandatory and applied server-side — restricted users cannot bypass it
- Aggregate stats (failure counts, SLA misses, health scores) are recomputed per-user so no global data leaks
- A 🔒 badge appears in the filter bar for non-admin users
Add access_control to your DAG definitions to grant team-specific access:
from airflow import DAG
dag = DAG(
dag_id="weather_data_pipeline",
schedule_interval="@hourly",
access_control={
"team_weather": {"can_read", "can_edit"},
},
)Then create matching roles in Airflow (Admin → Security → List Roles) and assign users to them. The Watcher plugin will automatically pick up the permissions.
| Area | Filtering |
|---|---|
| Dashboard stats | Failure count, SLA misses, health score — all scoped to user's DAGs |
| Failures page | Only failures from accessible DAGs |
| SLA page | Only SLA misses from accessible DAGs |
| Health page | Health status, stale DAGs, scheduling lag — filtered |
| Task health | Long-running tasks, zombies, retries — filtered |
| Scheduling | Concurrent runs, delayed DAGs — filtered |
| Dependencies | Cross-DAG deps, correlations — filtered |
All /api/watcher endpoints |
Same RBAC enforcement as UI pages |
Admin — sees all DAGs:
Weather user — sees only weather & stock DAGs:
Ecommerce user — sees only ecommerce & data quality DAGs:
The plugin exposes a REST API at /api/watcher/* on the Airflow webserver. Authentication uses Airflow's session cookies (same login as the UI). All endpoints return JSON with a standard {status, data, timestamp} envelope.
| Method | Path | Params | Description |
|---|---|---|---|
| GET | /api/watcher/failures |
dag_id, hours, limit |
Recent DAG failures |
| GET | /api/watcher/failures/stats |
hours |
Failure rate statistics |
| Method | Path | Params | Description |
|---|---|---|---|
| GET | /api/watcher/sla/misses |
dag_id, hours, limit |
SLA miss events |
| GET | /api/watcher/sla/stats |
hours |
SLA miss statistics |
| Method | Path | Params | Description |
|---|---|---|---|
| GET | /api/watcher/health |
— | System health summary (200 healthy, 503 degraded) |
| GET | /api/watcher/health/<dag_id> |
— | Health for a specific DAG |
| Method | Path | Params | Description |
|---|---|---|---|
| GET | /api/watcher/tasks/long-running |
threshold_minutes |
Tasks exceeding duration threshold |
| GET | /api/watcher/tasks/retries |
hours, min_retries |
Tasks with excessive retries |
| GET | /api/watcher/tasks/zombies |
threshold_minutes |
Potential zombie tasks |
| GET | /api/watcher/tasks/failure-patterns |
hours |
Task failure pattern analysis |
| Method | Path | Params | Description |
|---|---|---|---|
| GET | /api/watcher/scheduling/lag |
hours, threshold_minutes |
Scheduling delay percentiles |
| GET | /api/watcher/scheduling/queue |
— | Current queue status |
| GET | /api/watcher/scheduling/pools |
— | Pool utilization |
| GET | /api/watcher/scheduling/stale-dags |
expected_interval_hours |
DAGs not running on schedule |
| GET | /api/watcher/scheduling/concurrent |
— | DAGs with multiple concurrent runs |
| Method | Path | Params | Description |
|---|---|---|---|
| GET | /api/watcher/dags/import-errors |
— | DAG import errors |
| GET | /api/watcher/dags/status-summary |
— | DAG status summary with health score |
| GET | /api/watcher/dags/complexity |
— | DAG complexity analysis |
| GET | /api/watcher/dags/inactive |
days |
Inactive DAGs |
| Method | Path | Params | Description |
|---|---|---|---|
| GET | /api/watcher/dependencies/upstream-failures |
hours |
Upstream failure cascade analysis |
| GET | /api/watcher/dependencies/cross-dag |
— | Cross-DAG dependencies |
| GET | /api/watcher/dependencies/correlations |
hours |
Failure correlations |
| GET | /api/watcher/dependencies/impact/<dag_id>/<task_id> |
— | Downstream impact analysis |
| Method | Path | Params | Description |
|---|---|---|---|
| GET | /api/watcher/overview |
— | Combined overview of all monitoring data |
To test the plugin locally with sample DAGs and pre-configured RBAC users:
cd demo
docker-compose up -dThen visit http://localhost:8080 and navigate to the Watcher menu.
| User | Password | Role | Visible DAGs |
|---|---|---|---|
admin |
admin |
Admin | All 8 DAGs |
weather_user |
weather123 |
team_weather | weather_data_pipeline, stock_market_collector |
ecommerce_user |
ecommerce123 |
team_ecommerce | ecommerce_sales_etl, data_quality_checks |
See demo/README.md for more details.
By default the plugin reads from Airflow's metadata DB. Set AIRFLOW_WATCHER_BACKEND in the Airflow webserver environment to switch to an external source — no code changes, no redeploy of the plugin itself.
# In docker-compose.yml, airflow.cfg, or MWAA/Composer env vars:
AIRFLOW_WATCHER_BACKEND=bigquery
AIRFLOW_WATCHER_BQ_TABLE=project.dataset.table
# Optional: override column names if your table differs from the dbt-callback default
# AIRFLOW_WATCHER_SCHEMA_JSON='{"col_dag_id":"pipeline_id","col_elapsed_time":"duration_seconds"}'Authentication uses Application Default Credentials (Workload Identity on GKE/Composer, service account key otherwise).
AIRFLOW_WATCHER_BACKEND=sqlalchemy
AIRFLOW_WATCHER_BQ_TABLE=my_results_table # flat table name (no project/dataset prefix)
AIRFLOW_WATCHER_EXTERNAL_DB_URI=postgresql://user:pass@host/db
# Optional column overrides:
# AIRFLOW_WATCHER_SCHEMA_JSON='{"col_dag_id":"pipeline_id"}'See the Feature matrix in the BigQuery Backend section below. The same matrix applies when the backends are used via the plugin.
A lightweight, standalone REST API that runs outside the Airflow webserver. Use this when you want to call monitoring endpoints from external services, dashboards, or CI/CD pipelines without adding load to the Airflow webserver.
📖 Interactive API Docs (Swagger UI) — browse all 28 endpoints without running the server.
┌─────────────────┐
│ External Client │
│ (curl / app) │
└────────┬─────────┘
│
Authorization: Bearer <key>
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Standalone FastAPI Service │
│ http://localhost:8081 │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────────┐ │
│ │ Auth │──▶│ RBAC │──▶│ Cache │──▶│ Monitors │ │
│ │(Bearer) │ │(dag map) │ │ (TTL 60s)│ │ (6 core) │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────┬─────┘ │
│ │ │
│ ┌──────────────────────────────────────────────────┐ │ │
│ │ API Routers (11) │ │ │
│ │ /failures /sla /tasks /scheduling /dags │◀┘ │
│ │ /dependencies /overview /health /alerts │ │
│ │ /cache /metrics │ │
│ └──────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────┐ ┌────▼──────┐ ┌─────────────────────────┐ │
│ │ Notifiers │ │ Envelope │ │ Emitters │ │
│ │ Slack/Email │ │ {status, │ │ StatsD / Prometheus │ │
│ │ PagerDuty │ │ data} │ │ /metrics │ │
│ └──────────────┘ └───────────┘ └─────────────────────────┘ │
└──────────────────────────┬──────────────────────────────────────┘
│
▼
┌───────────────────────┐
│ Airflow Metadata DB │
│ (PostgreSQL / MySQL) │
└───────────────────────┘
📖 See INSTALL.md — API Quick Start for setup instructions.
# 1. Install standalone extras
pip install -e ".[standalone]"
# 2. Create .env file (already gitignored)
cat > .env << 'EOF'
AIRFLOW_WATCHER_DB_URI=postgresql://airflow:airflow@localhost:5432/airflow
AIRFLOW_WATCHER_API_KEYS=your-secret-api-key-here
EOF
# 3. Start the server
python src/airflow_watcher/api/main.py
# → Uvicorn running on http://0.0.0.0:8081
# 4. Test it
curl -H "Authorization: Bearer your-secret-api-key-here" \
http://localhost:8081/api/v1/health/📖 Browse the full API spec online: Airflow Watcher API Docs (Swagger UI — no server required)
When running locally, the same docs are available at http://localhost:8081/docs.
📖 See INSTALL.md for full authentication, configuration, and RBAC setup.
Key environment variables:
| Variable | Default | Description |
|---|---|---|
AIRFLOW_WATCHER_DB_URI |
(required for airflow backend) |
Airflow metadata DB connection string |
AIRFLOW_WATCHER_API_KEYS |
(none) | Comma-separated Bearer tokens; omit to disable auth |
AIRFLOW_WATCHER_BACKEND |
airflow |
Data source: airflow (metadata DB), bigquery, or sqlalchemy |
AIRFLOW_WATCHER_BQ_TABLE |
(none) | Required for bigquery/sqlalchemy — table name (project.dataset.table for BQ, plain table name for SQL) |
AIRFLOW_WATCHER_EXTERNAL_DB_URI |
(none) | Required for sqlalchemy backend — SQLAlchemy connection string |
AIRFLOW_WATCHER_SCHEMA_JSON |
(none) | JSON object overriding column names for external backends |
AIRFLOW_WATCHER_RBAC_ENABLED |
false |
Enable per-key DAG filtering |
AIRFLOW_WATCHER_CACHE_TTL |
60 |
Response cache TTL in seconds |
See the BigQuery Backend section for BQ-specific setup.
All endpoints return a standard JSON envelope:
{
"status": "success",
"data": { ... },
"timestamp": "2026-03-27T12:34:56.789000Z"
}| Method | Path | Params | Description |
|---|---|---|---|
| GET | / |
dag_id, hours (1–8760, default 24), limit (1–500, default 50), offset (default 0) |
Recent DAG failures |
| GET | /stats |
hours (1–8760, default 24) |
Failure rate statistics |
| Method | Path | Params | Description |
|---|---|---|---|
| GET | /misses |
dag_id, hours (1–8760, default 24), limit (1–500, default 50), offset (default 0) |
SLA miss events |
| GET | /stats |
hours (1–8760, default 24) |
SLA miss statistics |
| Method | Path | Params | Description |
|---|---|---|---|
| GET | /long-running |
threshold_minutes (1–10080, default 60) |
Tasks exceeding duration threshold |
| GET | /retries |
hours (1–8760, default 24), min_retries (1–100, default 2) |
Tasks with excessive retries |
| GET | /zombies |
threshold_minutes (1–10080, default 120) |
Potential zombie tasks |
| GET | /failure-patterns |
hours (1–8760, default 168) |
Task failure pattern analysis |
| Method | Path | Params | Description |
|---|---|---|---|
| GET | /lag |
hours (1–8760, default 24), threshold_minutes (1–10080, default 10) |
Scheduling delay percentiles |
| GET | /queue |
— | Current queue status |
| GET | /pools |
— | Pool utilization |
| GET | /stale-dags |
expected_interval_hours (1–720, default 24) |
DAGs not running on schedule |
| GET | /concurrent |
— | DAGs with multiple concurrent runs |
| Method | Path | Params | Description |
|---|---|---|---|
| GET | /import-errors |
— | DAG import/parse errors |
| GET | /status-summary |
— | Overall DAG counts and health score |
| GET | /complexity |
— | DAGs ranked by task count |
| GET | /inactive |
days (1–365, default 30) |
Active DAGs with no recent runs |
| Method | Path | Params | Description |
|---|---|---|---|
| GET | /upstream-failures |
hours (1–8760, default 24) |
Tasks in upstream_failed state |
| GET | /cross-dag |
— | Cross-DAG dependency map |
| GET | /correlations |
hours (1–8760, default 24) |
Failure correlations between DAGs |
| GET | /impact/{dag_id}/{task_id} |
— | Downstream cascading failure impact |
| Method | Path | Description |
|---|---|---|
| GET | / |
Combined snapshot of all monitors (cached 60s) |
| Method | Path | HTTP Code | Description |
|---|---|---|---|
| GET | / |
200 if healthy, 503 if degraded | System health (score ≥ 70 & no import errors = healthy) |
| GET | /{dag_id} |
200 | Per-DAG health with recent failures and SLA misses |
| Method | Path | Description |
|---|---|---|
| GET | /rules |
List all configured alert rules |
| POST | /evaluate |
Evaluate rules and dispatch notifications |
| Method | Path | Description |
|---|---|---|
| POST | /invalidate |
Clear all cached metrics |
| Method | Path | Description |
|---|---|---|
| GET | /metrics |
Prometheus exposition format (requires AIRFLOW_WATCHER_PROMETHEUS_ENABLED=true) |
| Method | Path | Description |
|---|---|---|
| GET | /healthz |
Liveness probe: {status: "ok"/"degraded", uptime_seconds, db_connected} |
Example API Responses (click to expand)
GET /api/v1/overview/
{
"status": "success",
"data": {
"failure_stats": {"total_runs": 142, "failed_runs": 3, "failure_rate": 2.11, "unique_failed_dags": 2,
"most_failing_dags": [{"dag_id": "etl_customers", "failure_count": 2}]},
"sla_stats": {"total_misses": 1, "top_dags_with_misses": [{"dag_id": "etl_customers", "miss_count": 1}]},
"long_running_tasks": 0, "zombie_count": 0,
"queue_status": {"queued_count": 2, "scheduled_count": 5},
"dag_summary": {"total_dags": 18, "active_dags": 15, "paused_dags": 3, "health_score": 92},
"import_errors": 0
},
"timestamp": "2026-04-10T08:15:32.456000Z"
}GET /api/v1/health/ — returns HTTP 200 when healthy, 503 when degraded
{
"status": "success",
"data": {
"status": "healthy", "health_score": 92,
"summary": {"total_dags": 18, "active_dags": 15, "paused_dags": 3, "health_score": 92},
"dag_health": {"healthy": 15, "degraded": 2, "critical": 1},
"import_error_count": 0
},
"timestamp": "2026-04-10T08:15:33.012000Z"
}GET /api/v1/failures/?hours=24
{
"status": "success",
"data": {
"failures": [{"dag_id": "etl_customers", "run_id": "scheduled__2026-04-10T06:00:00+00:00",
"state": "failed", "duration": 153.0,
"failed_tasks": [{"task_id": "load_to_bq", "operator": "BigQueryInsertJobOperator",
"try_number": 3, "max_tries": 3, "state": "failed"}]}],
"count": 1, "filters": {"dag_id": null, "hours": 24}
},
"timestamp": "2026-04-10T08:15:34.789000Z"
}GET /healthz
{"status": "ok", "uptime_seconds": 3612.45, "db_connected": true, "read_db_connected": true}read_db_connected only appears when a read-replica is configured.
When AIRFLOW_WATCHER_RBAC_ENABLED=true, each API key can only see its mapped DAGs:
export AIRFLOW_WATCHER_RBAC_ENABLED=true
export AIRFLOW_WATCHER_RBAC_KEY_DAG_MAPPING='{"team-a-key": ["weather_pipeline", "stock_collector"], "team-b-key": ["ecommerce_etl"]}'
export AIRFLOW_WATCHER_API_KEYS="team-a-key,team-b-key,admin-key"| Key | Sees |
|---|---|
team-a-key |
Only weather_pipeline and stock_collector |
team-b-key |
Only ecommerce_etl |
admin-key |
Empty results (not in mapping — denied by default) |
To grant full access to a key, add it to the mapping with ["*"]:
export AIRFLOW_WATCHER_RBAC_KEY_DAG_MAPPING='{"team-a-key": ["weather_pipeline"], "admin-key": ["*"]}'Or set AIRFLOW_WATCHER_RBAC_FAIL_OPEN=true to allow unmapped keys to see all DAGs (not recommended for production).
Client Request
│
├─▶ Middleware: adds X-API-Version: 1.0 header
│
├─▶ Auth Dependency: validates Bearer token (constant-time)
│ └── 401 if invalid/missing
│
├─▶ RBAC Dependency: resolves allowed DAG IDs for this key
│ └── 403 if DAG not in allowed set
│
├─▶ Cache Layer: check MetricsCache (thread-safe, TTL-based)
│ ├── HIT → return cached response
│ └── MISS → call monitor
│
├─▶ Monitor: queries Airflow metadata DB via SQLAlchemy
│
├─▶ RBAC Filter: strips DAGs the caller isn't allowed to see
│
└─▶ Envelope: wraps in {status, data, timestamp} → HTTP 200
curl:
# Health check (no auth)
curl http://localhost:8081/healthz
# Get failures (with auth)
curl -H "Authorization: Bearer $API_KEY" \
"http://localhost:8081/api/v1/failures/?hours=24&limit=10"
# Get overview snapshot
curl -H "Authorization: Bearer $API_KEY" \
http://localhost:8081/api/v1/overview/
# Evaluate and fire alerts
curl -X POST -H "Authorization: Bearer $API_KEY" \
http://localhost:8081/api/v1/alerts/evaluate
# Clear cache
curl -X POST -H "Authorization: Bearer $API_KEY" \
http://localhost:8081/api/v1/cache/invalidatePython:
import requests
API_URL = "http://localhost:8081"
HEADERS = {"Authorization": "Bearer your-secret-key"}
# Get system health
resp = requests.get(f"{API_URL}/api/v1/health/", headers=HEADERS)
health = resp.json()["data"]
print(f"Score: {health['health_score']}, Status: {health['status']}")
# Get recent failures
resp = requests.get(f"{API_URL}/api/v1/failures/", headers=HEADERS,
params={"hours": 12, "limit": 20})
for failure in resp.json()["data"]["failures"]:
print(f" {failure['dag_id']} failed at {failure['execution_date']}")
# Get scheduling lag
resp = requests.get(f"{API_URL}/api/v1/scheduling/lag", headers=HEADERS)
lag = resp.json()["data"]
print(f"p95 lag: {lag['scheduling_lag']['p95']}s")JavaScript (fetch):
const API_URL = "http://localhost:8081";
const headers = { Authorization: "Bearer your-secret-key" };
// Get overview
const resp = await fetch(`${API_URL}/api/v1/overview/`, { headers });
const { data } = await resp.json();
console.log(`Failures: ${data.failure_stats.failed_runs}`);
console.log(`Health: ${data.dag_summary.health_score}`);Airflow Watcher can read dbt run results from BigQuery instead of the Airflow metadata DB. This is useful when:
- The Airflow DB is not reachable from the monitoring host
- You already ingest dbt callback data into BigQuery (e.g. via the dbt callback plugin)
- You want a read-only monitoring API with no DB access
Works in both the plugin and the standalone API — same env vars, same behaviour.
pip install "airflow-watcher[bigquery]"
# or include alongside standalone:
pip install "airflow-watcher[standalone,bigquery]"export AIRFLOW_WATCHER_BACKEND=bigquery
export AIRFLOW_WATCHER_BQ_TABLE=project.dataset.table
# DB_URI is NOT required in BigQuery mode
export AIRFLOW_WATCHER_API_KEYS=your-secret-key # standalone onlyOverride individual column names when your table schema differs from the dbt-callback default:
export AIRFLOW_WATCHER_SCHEMA_JSON='{"col_dag_id":"pipeline_id","col_elapsed_time":"duration_seconds","col_state":"run_state"}'Authentication uses Application Default Credentials (gcloud auth application-default login or a service account).
# Standalone API
python src/airflow_watcher/api/main.py
# Plugin — set env vars in your Airflow webserver environment
# (docker-compose.yml, airflow.cfg, MWAA/Composer env vars, etc.)| Feature | Airflow backend | BigQuery backend |
|---|---|---|
| Recent failures | Full (DB) | dbt model errors from BQ |
| Failure statistics | Full | Full |
| Scheduling lag | Full | Derived from dbt timing |
| Long-running tasks | Full | dbt invocation elapsed time |
| Stale / inactive DAGs | Full | Derived from BQ dag overview |
| DAG health summary | Full | Derived from BQ dag overview |
| Failure correlations | Full | Full (co-occurrence in BQ) |
| Cost analysis (GB billed) | Not available | Full (adapter_response metadata) |
| SLA misses | Full | Not available (backend_note returned) |
| Retry counts | Full | Not available |
| Zombie tasks | Full | Not available (no heartbeat in BQ) |
| Worker pool utilization | Full | Not available |
| Live queue depth | Full | Not available |
| DAG import errors | Full | Not available |
| Cross-DAG dependency graph | Full (DagBag) | Not available |
| Cascading failure impact | Full (DagBag) | Not available |
Endpoints backed by unavailable data return empty results with a backend_note field — they do not error.
Read monitoring data from any flat table on a SQLAlchemy-compatible database (PostgreSQL, MySQL, SQLite). Useful when dbt results are written to a relational DB rather than BigQuery.
Works in both the plugin and the standalone API.
pip install "airflow-watcher[standalone]" # SQLAlchemy is includedexport AIRFLOW_WATCHER_BACKEND=sqlalchemy
export AIRFLOW_WATCHER_EXTERNAL_DB_URI=postgresql://user:pass@host/db
export AIRFLOW_WATCHER_BQ_TABLE=my_results_table # flat table name
# Optional column overrides:
# export AIRFLOW_WATCHER_SCHEMA_JSON='{"col_dag_id":"pipeline_id"}'
export AIRFLOW_WATCHER_API_KEYS=your-secret-key # standalone onlyThe table must be a flat structure (one row per task/model run). Default column names match the dbt-callback schema; override any with AIRFLOW_WATCHER_SCHEMA_JSON.
Same as the BigQuery feature matrix above, minus BigQuery-specific cost fields (bytes_billed, slot_ms). SLA misses, retries, zombies, queue/pool data, and import errors are not available — these live only in Airflow's metadata DB.
| Aspect | Plugin (/api/watcher) |
Standalone API (/api/v1) |
|---|---|---|
| Process | Inside Airflow webserver | Separate FastAPI process |
| Auth | Airflow session cookies | Bearer token |
| RBAC | Airflow FAB roles + access_control |
Per-key DAG mapping via env var |
| Install | pip install airflow-watcher |
pip install airflow-watcher[standalone] |
| Port | Same as Airflow (default 8080) | Separate (default 8081) |
| UI | 7 dashboard pages | Swagger UI at /docs |
| Caching | No built-in cache | TTL-based response cache |
| Backend support | airflow / bigquery / sqlalchemy |
airflow / bigquery / sqlalchemy |
| Liveness probe | /api/watcher/health |
/healthz (no auth) |
| Best for | Airflow operators using the UI | External tools, CI/CD, dashboards |
airflow-watcher/
├── src/
│ └── airflow_watcher/
│ ├── __init__.py
│ ├── plugins/ # Airflow plugin definitions ← Plugin
│ ├── views/ # Flask Blueprint views (plugin UI) ← Plugin
│ ├── templates/ # Jinja2 templates (plugin UI) ← Plugin
│ ├── api/ # Standalone FastAPI service ← API
│ │ ├── main.py # App entry point & create_app()
│ │ ├── auth.py # Bearer token authentication
│ │ ├── rbac_dep.py # RBAC dependency (per-key DAG filtering)
│ │ ├── db.py # SQLAlchemy session management
│ │ ├── envelope.py # Standard JSON response wrapper
│ │ ├── standalone_config.py # Env var config loading
│ │ └── routers/ # 11 API routers
│ │ ├── failures.py
│ │ ├── sla.py
│ │ ├── tasks.py
│ │ ├── scheduling.py
│ │ ├── dags.py
│ │ ├── dependencies.py
│ │ ├── overview.py
│ │ ├── health.py
│ │ ├── alerts.py
│ │ ├── cache.py
│ │ └── metrics.py
│ ├── monitors/ # DAG & SLA monitoring logic ← Shared
│ ├── notifiers/ # Slack, Email, PagerDuty ← Shared
│ ├── metrics/ # Prometheus, StatsD emitters ← Shared
│ └── utils/ # Cache, helpers, RBAC utilities ← Shared
├── tests/
│ ├── test_routers.py # Router endpoint tests
│ ├── test_auth.py # Authentication tests
│ ├── test_security.py # Penetration & security tests
│ ├── test_load.py # Load & stress tests
│ ├── ... # 17 unit test files total
│ └── live/ # Live integration tests (need Docker)
│ ├── conftest.py # Auto-skips when containers are down
│ ├── test_qa_deep.py # Standalone API deep QA (334 tests)
│ ├── test_qa_plugin.py # Flask plugin API deep QA (138 tests)
│ ├── test_live_comprehensive.py
│ ├── test_live_api.py
│ └── test_live_data.py
├── demo/ # Local demo Airflow environment
├── .env # Local credentials (gitignored)
└── pyproject.toml
# Unit tests (302 pass)
pytest tests/ --ignore=tests/live -v --no-cov
# Security & penetration tests only
pytest tests/test_security.py -v
# Load & stress tests only
pytest tests/test_load.py -v
# Live integration tests (requires demo Docker environment)
python tests/live/test_qa_deep.py # Standalone API deep QA (334 tests)
python tests/live/test_qa_plugin.py # Flask plugin API deep QA (138 tests)
python tests/live/test_live_comprehensive.py # End-to-end integration
# Skip the pre-existing DB connectivity test
pytest tests/ -k "not test_logs_error_on_unreachable_db"📖 See INSTALL.md for full deployment guides:
- AWS MWAA —
requirements.txt+ S3 upload, env var config, MWAA Local Runner testing - Google Cloud Composer — PyPI install via
gcloud, Composer 1 vs 2, networking - Kubernetes / Helm — Helm values, K8s secrets, ServiceMonitor, standalone API as sidecar
- Production Deployment — Nginx, Gunicorn, systemd, Docker Compose, production checklist
# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Run linting
ruff check src tests
black --check src tests
# Type checking
mypy srcApache License 2.0 - See LICENSE for details.
Ramanujam Solaimalai (@ram07eng)
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request



