""" Example DAG for demonstrating the usage of event driven scheduling using assets and triggers. """ from __future__ import annotations from airflow.providers.standard.triggers.file import FileDeleteTrigger from airflow.sdk import DAG, Asset, AssetWatcher, chain, task file_path = '/tmp/test' trigger = FileDeleteTrigger(filepath=file_path) asset = Asset( uri='example_asset', watchers=[AssetWatcher(name='test_asset_watcher', trigger=trigger)] ) with DAG( dag_id='example_asset_with_watchers', schedule=[asset], catchup=False ): @task def test_task(): print('Hello world') chain(test_task())
For the above code, when will the dag run?
This DAG will run when the watched file is deleted.
Let’s unpack exactly how and why, step by step.
Key pieces in your code
1️⃣ Asset with a watcher
This defines:
-
An asset called
example_asset -
With an asset watcher
-
The watcher uses
FileDeleteTrigger -
The trigger fires when
/tmp/testis deleted
📌 Important:
The watcher is what produces asset events, not a task.
2️⃣ DAG scheduled on the asset
This DAG:
-
Has no time-based schedule
-
Is asset-triggered
-
Runs when
example_assetis updated
Putting it together: execution timeline
Initial state
-
/tmp/testexists (or not — doesn’t matter yet) -
DAG does not run
-
No polling by a task, no scheduler loop in the DAG itself
When /tmp/test is deleted
-
The triggerer process (not the scheduler) detects:
-
FileDeleteTriggerfires -
The
AssetWatcher:-
Emits an asset event
-
Marks
example_assetas updated
-
-
Airflow scheduler sees:
-
The scheduler creates a DAG run for:
-
test_taskruns and prints:
When the DAG will NOT run
-
❌ On a schedule (there is none)
-
❌ When the DAG is turned on
-
❌ On file creation
-
❌ On file modification
-
❌ On Airflow restart
-
❌ On backfill
Only file deletion triggers it.
Important subtleties
1️⃣ This is event-driven, not polling
-
No sensor task
-
No worker slot occupied
-
Trigger runs in the Triggerer, asynchronously
2️⃣ Multiple deletions → multiple runs
If /tmp/test is:
-
Deleted
-
Re-created
-
Deleted again
👉 The DAG will run each time it is deleted
3️⃣ catchup=False effect
Even if the file was deleted while the DAG was paused:
-
No historical run is created
-
Only future delete events trigger runs
4️⃣ Asset identity matters
The DAG listens specifically to:
If another watcher updates a different asset, this DAG won’t run.
Mental model
One-sentence answer
This DAG runs every time the file
/tmp/testis deleted, because the asset watcher emits an asset update event that triggers the asset-scheduled DAG.

浙公网安备 33010602011771号