ZhangZhihui's Blog  

 

"""
Example DAG for demonstrating the usage of event driven scheduling using assets and triggers.
"""

from __future__ import annotations

from airflow.providers.standard.triggers.file import FileDeleteTrigger
from airflow.sdk import DAG, Asset, AssetWatcher, chain, task

file_path = '/tmp/test'

trigger = FileDeleteTrigger(filepath=file_path)
asset = Asset(
    uri='example_asset',
    watchers=[AssetWatcher(name='test_asset_watcher', trigger=trigger)]
)


with DAG(
    dag_id='example_asset_with_watchers',
    schedule=[asset],
    catchup=False
):
    @task
    def test_task():
        print('Hello world')
    
    chain(test_task())

For the above code, when will the dag run?

 

This DAG will run when the watched file is deleted.

Let’s unpack exactly how and why, step by step.


Key pieces in your code

1️⃣ Asset with a watcher

asset = Asset(
    uri="example_asset",
    watchers=[
        AssetWatcher(
            name="test_asset_watcher",
            trigger=FileDeleteTrigger(filepath="/tmp/test")
        )
    ]
)

This defines:

  • An asset called example_asset

  • With an asset watcher

  • The watcher uses FileDeleteTrigger

  • The trigger fires when /tmp/test is deleted

📌 Important:
The watcher is what produces asset events, not a task.


2️⃣ DAG scheduled on the asset

with DAG(
    dag_id="example_asset_with_watchers",
    schedule=[asset],
    catchup=False,
):

This DAG:

  • Has no time-based schedule

  • Is asset-triggered

  • Runs when example_asset is updated


Putting it together: execution timeline

Initial state

  • /tmp/test exists (or not — doesn’t matter yet)

  • DAG does not run

  • No polling by a task, no scheduler loop in the DAG itself


When /tmp/test is deleted

  1. The triggerer process (not the scheduler) detects:

    /tmp/test has been deleted
  2. FileDeleteTrigger fires

  3. The AssetWatcher:

    • Emits an asset event

    • Marks example_asset as updated

  4. Airflow scheduler sees:

    Asset example_asset updated
  5. The scheduler creates a DAG run for:

    example_asset_with_watchers
  6. test_task runs and prints:

    Hello world

When the DAG will NOT run

  • ❌ On a schedule (there is none)

  • ❌ When the DAG is turned on

  • ❌ On file creation

  • ❌ On file modification

  • ❌ On Airflow restart

  • ❌ On backfill

Only file deletion triggers it.


Important subtleties

1️⃣ This is event-driven, not polling

  • No sensor task

  • No worker slot occupied

  • Trigger runs in the Triggerer, asynchronously


2️⃣ Multiple deletions → multiple runs

If /tmp/test is:

  • Deleted

  • Re-created

  • Deleted again

👉 The DAG will run each time it is deleted


3️⃣ catchup=False effect

Even if the file was deleted while the DAG was paused:

  • No historical run is created

  • Only future delete events trigger runs


4️⃣ Asset identity matters

The DAG listens specifically to:

Asset(uri="example_asset")

If another watcher updates a different asset, this DAG won’t run.


Mental model

File system event
      ↓
Trigger fires (Triggerer)
      ↓
Asset updated
      ↓
DAG scheduled
      ↓
Tasks run

One-sentence answer

This DAG runs every time the file /tmp/test is deleted, because the asset watcher emits an asset update event that triggers the asset-scheduled DAG.

 

posted on 2025-12-15 22:30  ZhangZhihuiAAA  阅读(0)  评论(0)    收藏  举报