วันก่อน @atb เอาโพสต์ของพี่มาร์คบน LinkedIn มาแชร์ให้ทีมดู
มันน่าสนใจมาก ก่อนหน้านี้ทางทีมใช้ airflow-dbt-python กัน แล้วเราต้องมานั่งเพิ่ม operator กันเองว่าอยากจะทำอะไรกับ dbt เช่น อยากจะรันโมเดล หรืออยากจะรันเทส อะไรแบบนี้
โพสต์ของพี่มาร์คพูดถึงโปรเจค Astronomer Cosmos ครับ
A framework for dynamically generating Apache Airflow DAGs from other tools and frameworks. Develop your workflow in your tool of choice and render it in Airflow as a DAG or Task Group!
หมายความว่าเราไม่ต้องมานั่งเพิ่ม ๆ ลด ๆ operator แบบเดิมอีกต่อไป~ โยนโปรเจค dbt เข้าไป แล้วเดี๋ยวสิ่งนี้เอาไปจัดการสร้าง DAG สร้าง tasks ให้เอง ก่อนหน้านี้เราจะเขียน DAG ไว้ประมาณนี้
@dag(
default_args=default_args,
schedule_interval="@hourly",
catchup=False,
tags=["some_project", "dbt"],
)
def my_dag():
start = EmptyOperator(task_id="start")
dbt_seed = DbtSeedOperator(
task_id="dbt_seed",
project_dir=PROJECT_DIR,
profiles_dir=PROJECT_DIR,
project_conn_id=CONN,
profiles_conn_id=CONN,
full_refresh=True,
)
dbt_snapshot = DbtSnapshotOperator(
task_id="dbt_snapshot",
project_dir=PROJECT_DIR,
profiles_dir=PROJECT_DIR,
project_conn_id=CONN,
profiles_conn_id=CONN,
)
dbt_run = DbtRunOperator(
task_id="dbt_run",
project_dir=PROJECT_DIR,
profiles_dir=PROJECT_DIR,
project_conn_id=CONN,
profiles_conn_id=CONN,
fail_fast=True,
)
dbt_test = DbtTestOperator(
task_id="dbt_test",
project_dir=PROJECT_DIR,
profiles_dir=PROJECT_DIR,
project_conn_id=CONN,
profiles_conn_id=CONN,
fail_fast=True,
)
dbt_check_source_freshness = DbtSourceFreshnessOperator(
task_id="dbt_check_source_freshness",
project_dir=PROJECT_DIR,
profiles_dir=PROJECT_DIR,
project_conn_id=CONN,
profiles_conn_id=CONN,
fail_fast=True,
)
end = EmptyOperator(task_id="end")
start >> dbt_seed >> dbt_snapshot >> dbt_run >> dbt_test >> end
start >> dbt_check_source_freshness >> end
dag = my_dag()
ตอนนี้เหลือเท่านี้!
my_dag = DbtDag(
dag_id="my_dag",
schedule_interval="@hourly",
start_date=timezone.datetime(2022, 11, 27),
conn_id="example_db",
catchup=False,
dbt_project_name="example_dbt_project",
dbt_args={
"schema": "public"
},
dbt_root_path="/opt/airflow/dbt",
)
งดงามมาก ชีวิตสบายขึ้นเยอะ
ซึ่งจริง ๆ แล้วทาง Dagster เค้าทำสิ่งนี้ไว้สักพักแล้วล่ะ ลองอ่านเพิ่มเติมได้ที่ Software-defined assets and the Modern Data Stack
อย่างไรก็ดีครับ สุดท้ายผลประโยชน์ก็ตกอยู่กับพวกเราาาา เย้ ทีนี้ผมเลยลองทำตัวอย่างการใช้งานทั้ง Astronomer Cosmos กับ Dagster Software Defined Assets เพื่อตั้ง schedule โปรเจค dbt ไว้ที่ GitHub repo ด้านล่างนี้
ลองไปส่องกันได้นะครับ