มีอะไรใหม่บ้างนะใน Airflow 2.3 🤩

เวอร์ชั่น 2.3 นี่เพิ่งออกมาได้เมื่อ 1 พ.ค.​ 65 นี่เลยครับ แล้วก็ทาง Astronomer เค้าจัด Webinar หัวข้อ What’s New in Airflow 2.3

TLDR: มี feature ใหม่หลักๆ 2 features คือ Dynamic Task Mapping ที่ทำ dynamic task หรือ DAG ตอน run time ได้ และ Grid View ที่มาแทน Tree View

สรุปได้มาประมาณนี้ครับ :sunglasses:

เค้าเกริ่นให้ฟังก่อนว่าตั้งแต่ 2.0 มาเนี่ย มี feature แจ่มๆ มาอย่างต่อเนื่องเลย ไม่ว่าจะเป็น HA Scheduler, TaskFlow API, DAG Dependencies, Calendar View, Deferrable Operators, Tabletables จนมาถึงเวอร์ชั่น 2.3

สิ่งที่เกิดขึ้นใน 2.3 นี้ก็มี 2 major new features มี > 30 minor new features และมี PRs มากกว่า 200 PRs!

ปล่อยของกันรัวเลย อย่างโหด :scream:

Major New Features

ที่เค้าว่ามี 2 features นั่นก็คือ

  1. Dynamic Task Mapping
  2. Grid View

ตัว Grid View นี่ไม่ต้องพูดอะไรกันมากมายครับ ดูตามรูปก็เห็นถึงความดีงามของมันล่ะ ซึ่งตรงนี้เค้าเอามาแทน Tree View กันไปเลย

ตัวที่เป็น game changer จริงๆ ใน 2.3 นี้เลยคือ Dynamic Task Mapping ครับ

มันคือการทำ dynamic task หรือ DAG ตอน “run time”! :star_struck: ซึ่งตรงนี้ต่างจากก่อนหน้านี้ครับ ที่เราวนลูปสร้าง dynamic task หรือ DAG เพราะว่าตอนนั้นมันจะเกิดขึ้นตอน DAG parsing time และตัว graph ที่เราได้ มันก็จะเป็นหน้าตาแบบนั้นไปตลอด ซึ่ง… dynamic task mapping เนี่ย มันจะขึ้นอยู่กับ input เราเลย แล้วก็ use case ก็ตามในรูปสไลด์ด้านบนเลยครับ อิอิ

ที่หลายๆ คน รวมถึงผมด้วยคิดว่ามันเป็น game changer เลยคือ เรามักจะมีหลายๆ use case ที่เราจำเป็นต้องทำอะไรๆ ที่มัน dynamic อย่างเช่น… วันนี้เรามีไฟล์แค่ 3 ไฟล์ เราก็ควรมีแค่ 3 tasks ที่มาจัดการ หรือพรุ่งนี้มีแค่ 2 ไฟล์ เราก็ควรจะมีแค่ 2 tasks

ถ้าทำพวก machine learning ก็น่าจะยิ้มเลย ตอน tune พวก hyperparameters รวมไปถึงเอา model มาทดสอบก็น่าจะเล่นได้หลากหลายมากขึ้น

ปกติถ้าเราปรับเปลี่ยน graph ของเรา พวก history ของ DAG run จะหายครับ แต่ใน dynamic task mapping ข้อมูล DAG run เก่าๆ จะไม่หายแล้ว อยู่ครบ

ส่วนรูปด้านล่างเป็นตัวอย่างการใช้งานครับ จะมี 2 ฟังก์ชั่นคือ partial กับ expand

คิดง่ายๆ คือตรง partial จะเป็นค่าที่เรายึดให้คงที่ไว้ ในที่นี้คือ y = 10 ส่วน expand เนี่ยจะเป็นค่าต่างๆ ที่เราจะ map ไปในแต่ละตัว ในที่นี้คือ x = [1, 2, 3] ซึ่งในโค้ดด้านบนจะได้ผลลัพธ์คือ เราจะมี 3 task ที่คำนวณค่า 1 + 10, 2 + 10 และ 3 + 10 นั่นเอง!

ถ้าเราเขียน DAG แบบง่ายๆ ใช้ partial กับ expand ก็จะประมาณนี้

จากรูป 3 รูปด้านบนจะเห็นว่า graph ก็จะมีหน้าตาเป็น DAG ปกติครับ แต่จะมี ติดมา ถ้าเลข 2 หมายถึงมี 2 task ที่ถูกสร้างขึ้นมาตอน run time ใน DAG run นั้นๆ ซึ่งเราก็จะสามารถดูรายละเอียดต่างๆ ได้ใน Grid View เลย

หรือถ้าเราไปใช้กับ operator ปกติ เช่น ในตัวอย่างด้านล่าง เค้าเอาไปใช้กับ S3ToSnowflakeOperator ที่รับ s3_keys เป็น list

สังเกตวิธีใช้ partial ดูครับ เราใช้ต่อจากชื่อ operator แล้วหลังจากนั้นเราก็ expand ตัว argument ที่เราจะรับค่าเข้าไป

งดงามแท้

Minor New Features

ต่อไปพวก minor features ครับ ดูรูปเลย

มี SmoothOperator ที่เอาไว้พ่น log มาเป็น link ไปยัง YouTube วีดีโอด้านล่างนี้ :joy:

อันนี้ PR ที่เค้าเปิดมาสร้าง operator ตัวนี้ครับ เข้าไปอ่านขำๆ ได้ ฮ่าๆ

นอกเหนือจากด้านบนแล้วก็มี

อีกคำสั่งที่เค้าไม่ได้พูดใน webinar ที่ผมคิดว่ามีประโยชน์มากๆ คือ

airflow db clean

ซึ่งเราเอาไว้ purging old data และไม่ต้องสร้าง maintenance DAGs มาอีกต่อไป~

มี LocalKubernetesExecutor แล้ว เอาไว้รันแบบ LocalExecutor บน K8s ครับ ซึ่งถ้าใช้แค่ KubernetesExecutor ตัว K8s ก็สร้าง pod มารันงานให้เราตามปกติ แต่ทีนี้เราสามารถให้ scheduler รันได้เลยถ้าเราเลือก LocalExecutor

แล้วก็… เวอร์ชั่นนี้เริ่มจะใช้กับ ARM CPU ได้แล้ว! :tada:

โอเคครับ พอหอมปากหอมคอ อิอิ ถ้าใครสนใจดูว่ามีอะไรใหม่นอกเหนือจากนี้ สามารถไปดูต่อได้ที่ Airflow 2.3.0 Release Note ครับผม :smiling_face:

2 Likes

ลองสร้าง DAG เล่นกับ @atb และ @lif มา โดยเอา BashOperator มาทำ dynamic task mapping

import random

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils import timezone


default_args = {
    "start_date": timezone.datetime(2022, 5, 5),
}
with DAG(
    "dataengineercafe",
    schedule_interval="@daily",
    default_args=default_args,
    catchup=False,
) as dag:

    @dag.task
    def gen_random_numbers():
        return [f"echo '{i + 1}'" for i in range(random.randint(1, 5))]


    echo = BashOperator.partial(task_id="echo").expand(bash_command=gen_random_numbers())

ลองรันครั้งแรก มี 2 tasks ถูกสร้าง เราจะได้

ลองรันอีกครั้ง มี 3 tasks ถูกสร้าง หน้าตาจะได้แบบนี้

แจ่มมาก :100:

2 Likes

มาอัพเดทครับ พอดีเพิ่งได้ลองรันบน M1… อยากบอกว่ามันเร็วกว่าเดิมโคตรๆ ครับ :sunglasses:

ปล. ปกติกว่าจะรัน Airflow บน M1 ขึ้นเนี่ย… น่าจะประมาณ 10 กว่านาที :sweat_smile: