Hello, Astronomer Cosmos! A framework for dynamically converting your dbt projects into DAGs and tasks

วันก่อน @atb เอาโพสต์ของพี่มาร์คบน LinkedIn มาแชร์ให้ทีมดู

atb's post about Cosmos from Marc

มันน่าสนใจมาก ก่อนหน้านี้ทางทีมใช้ airflow-dbt-python กัน แล้วเราต้องมานั่งเพิ่ม operator กันเองว่าอยากจะทำอะไรกับ dbt เช่น อยากจะรันโมเดล หรืออยากจะรันเทส อะไรแบบนี้

โพสต์ของพี่มาร์คพูดถึงโปรเจค Astronomer Cosmos ครับ

A framework for dynamically generating Apache Airflow DAGs from other tools and frameworks. Develop your workflow in your tool of choice and render it in Airflow as a DAG or Task Group!

หมายความว่าเราไม่ต้องมานั่งเพิ่ม ๆ ลด ๆ operator แบบเดิมอีกต่อไป~ โยนโปรเจค dbt เข้าไป แล้วเดี๋ยวสิ่งนี้เอาไปจัดการสร้าง DAG สร้าง tasks ให้เอง ก่อนหน้านี้เราจะเขียน DAG ไว้ประมาณนี้

@dag(
    default_args=default_args,
    schedule_interval="@hourly",
    catchup=False,
    tags=["some_project", "dbt"],
)
def my_dag():

    start = EmptyOperator(task_id="start")

    dbt_seed = DbtSeedOperator(
        task_id="dbt_seed",
        project_dir=PROJECT_DIR,
        profiles_dir=PROJECT_DIR,
        project_conn_id=CONN,
        profiles_conn_id=CONN,
        full_refresh=True,
    )

    dbt_snapshot = DbtSnapshotOperator(
        task_id="dbt_snapshot",
        project_dir=PROJECT_DIR,
        profiles_dir=PROJECT_DIR,
        project_conn_id=CONN,
        profiles_conn_id=CONN,
    )

    dbt_run = DbtRunOperator(
        task_id="dbt_run",
        project_dir=PROJECT_DIR,
        profiles_dir=PROJECT_DIR,
        project_conn_id=CONN,
        profiles_conn_id=CONN,
        fail_fast=True,
    )

    dbt_test = DbtTestOperator(
        task_id="dbt_test",
        project_dir=PROJECT_DIR,
        profiles_dir=PROJECT_DIR,
        project_conn_id=CONN,
        profiles_conn_id=CONN,
        fail_fast=True,
    )

    dbt_check_source_freshness = DbtSourceFreshnessOperator(
        task_id="dbt_check_source_freshness",
        project_dir=PROJECT_DIR,
        profiles_dir=PROJECT_DIR,
        project_conn_id=CONN,
        profiles_conn_id=CONN,
        fail_fast=True,
    )

    end = EmptyOperator(task_id="end")

    start >> dbt_seed >> dbt_snapshot >> dbt_run >> dbt_test >> end
    start >> dbt_check_source_freshness >> end


dag = my_dag()

ตอนนี้เหลือเท่านี้! :star_struck:

my_dag = DbtDag(
    dag_id="my_dag",
    schedule_interval="@hourly",
    start_date=timezone.datetime(2022, 11, 27),
    conn_id="example_db",
    catchup=False,
    dbt_project_name="example_dbt_project",
    dbt_args={
        "schema": "public"
    },
    dbt_root_path="/opt/airflow/dbt",
)

งดงามมาก ชีวิตสบายขึ้นเยอะ :sunglasses:

ซึ่งจริง ๆ แล้วทาง Dagster เค้าทำสิ่งนี้ไว้สักพักแล้วล่ะ ลองอ่านเพิ่มเติมได้ที่ :point_right:t2: Software-defined assets and the Modern Data Stack

อย่างไรก็ดีครับ สุดท้ายผลประโยชน์ก็ตกอยู่กับพวกเราาาา เย้ :joy: ทีนี้ผมเลยลองทำตัวอย่างการใช้งานทั้ง Astronomer Cosmos กับ Dagster Software Defined Assets เพื่อตั้ง schedule โปรเจค dbt ไว้ที่ GitHub repo ด้านล่างนี้ :point_down:

ลองไปส่องกันได้นะครับ

2 Likes