Airflow XComs คืออะไร

Airflow XComs คืออะไร

Airflow เป็น Open Source Platform ที่ช่วยจัดการ Workflow ด้วยการสร้าง Data Pipeline สามารถเขียนโปรแกรมภาษา python เพื่อควบคุม และจัดการกับข้อมูลมหาศาลได้ โดย Airflow มีการเขียน Workflow เป็น DAG (Directed Acyclic Graph) ซึ่ง DAG ประกอบไปด้วยหลายๆ Task ที่เชื่อมต่อกันและในแต่ละ Task ก็จะมีกระบวนการทำงานที่ต่างกันไปแล้วสงสัยกันไหมครับว่า Task แต่ละส่วนสื่อสารกันได้ยังไง :thinking: วันนี้ผมจะมาอธิบายข้อสงสัยนี้ครับ

Airflow XComs

Airflow XComs (cross-communications) เป็นเครื่องมือที่ช่วยแบ่งข้อมูลกันระหว่าง DAG หรือระหว่าง Task ก็ได้โดยจัดเก็บข้อมูลไว้รวมกันที่ศูนย์กลางที่เรียกว่า XComs ทุก Task สามารถเข้าถึง XComs ได้เหมือนทำให้ตัวแปร Local ย้ายไปอยู่ที่ Global🌍 สามารถส่งข้อมูลถึงกันได้โดยไม่จำเป็นต้องเป็น Task ที่อยู่ติดกัน


Figure 1: XComs workflow example

XComs จะเป็นตัวกลางในการแลกเปลี่ยนข้อมูลระหว่าง Task โดยมีตัวแปรอ้างอิง 4 ตัวคือ

  1. dag_id ชื่อของ DAG ใช้อ้างอิงในกรณีที่ต้องการดึงข้อมูลจาก DAG อื่น
  2. task_idsชื่อของ Task ที่ต้องการดึงข้อมูล
  3. keyชื่อตัวแปรที่ต้องการดึงข้อมูล
  4. valueค่าที่เก็บอยู่ในตัวแปร

วันนี้เราจะมาอธิบายการสื่อสารกันระหว่าง Task ใน DAG เดียวกันตัวอย่างต่อไปนี้จึงไม่มี dag_id ใน code นะครับ

ฟังก์ชั่นที่ใช้

  • xcom_push เป็นฟังก์ชั่นที่ใช้ส่งค่าไปยัง XComs

  • xcom_pull เป็นฟังก์ชั่นที่ใช้ดึงค่าจาก XComs มาใช้

ต่อไปจะเป็นตัวอย่าง code ครับมาเริ่มกันเลย :rocket:

วิธีรับและส่งข้อมูลผ่าน XComs

การส่งข้อมูลไปเก็บไว้ที่ XComs มี 2 วิธีคือ

  1. Return Value ออกมาจากฟังก์ชั่นวิธีนี้ก็เหมือนกันส่งค่าออกมาจากฟังก์ชันที่เราคุ้นเคยกันอยู่แล้วโดย Airflow จะนำค่าที่ Return ออกมาไปเก็บไว้ที่ XComs โดยส่งค่า key ชื่อว่า return_value ให้อัตโนมัติ

    @task
    def push_by_returning(ti=None):
    value_1 = {"a": "b"}
    return value_1
    
  2. ใช้ฟังก์ชั่น xcom_push โดยต้องมีพารามิเตอร์สองตัวคือ key และ value ในการส่งข้อมูล

    @task
    def push(ti=None):
    value_2 = [1, 2, 3]
    ti.xcom_push(
    key="push_key",
    value=value_2
    )
    

เมื่อ Run 2 Task นี้ก็จะมีข้อมูลส่งไปที่ XComs ซึ่งสามารถเข้าไปดูได้ที่แถบเมนูใน UI ของ Airflow เลือก Admin ➜ XComs


Figure 4: Show the XComs UI

หรือเข้าไปที่ UI ของแต่ละ Task จะมี XCom อยู่ซึ่งจะแสดงแค่เฉพาะตัวแปรที่ Task นั่นๆส่งไปที่ XComs


Figure 5: Show the XCom UI

ตัวอย่างข้อมูลที่ถูกส่งขึ้นมาใน XComs


Figure 6: Show data in XComs

แล้วเราจะดึงข้อมูลจาก XComs มาใช้ได้ยังไง ?

หลังจาก push ข้อมูลไปยัง XComs เราสามารถดึงข้อมูลได้ด้วยฟังก์ชั่น xcom_pull โดยต้องมีพารามิเตอร์หนึ่งตัวที่จำเป็นต้องใส่คือ task_ids

ส่วนตัวแปร keyนั่นถ้าไม่กำหนดค่าจะดึงค่าจากตัวแปรที่ชื่อว่า return_value ออกมาโดย default

เดียวเราจะลองมาดูตัวอย่างการ pull ข้อมูลกันนะครับ

ตัวอย่างแรก pulled_value_1 เราจะ pull จาก Task push_by_returning โดยไม่ใส่ key จะได้ผลลัพธ์เป็น {‘a’: ‘b’} เพราะ Task push_by_returning ใช้ฟังก์ชั่น return เมื่อไม่ใส่ key, xcom_pull จะเรียกตัวแปร return_value ออกมา

ตัวอย่างที่สอง pulled_value_2 เราจะ pull จาก Task push โดยใส่ key เป็น push_key เพื่อดึงข้อมูลจากตัวแปร push_key ใน XComs ออกมาจะได้ผลลัพธ์เป็น [1, 2, 3]

ตัวอย่างที่สาม pulled_value_3 คราวนี้เราลองไม่ใส่ key ในการดึงข้อมูลจาก Task push บ้างข้อมูลจะออกมาเป็น [1, 2, 3] เหมือน pulled_value_2 ไหมก็ดึงข้อมูลจาก Task เดียวกันนี่หน่า ผลลัพธ์ที่ได้คือ None เพราะว่าเมื่อเราไม่ใส่ key ฟังก์ชั่น xcom_pull จะเรียกตัวแปร return_value ออกมาแต่เนื่องจาก Task push ของเราไม่ได้ใช้ฟังก์ชั่น return จึงไม่มีตัวแปรที่ชื่อว่า return_value นั่นเองครับ

ตัวอย่าง code

@task
def pull_data_from_xcom(ti=None):
    pulled_value_1 = ti.xcom_pull(
    task_ids="push_by_returning"
)

pulled_value_2 = ti.xcom_pull(
    task_ids="push", key="push_key"
)

pulled_value_3 = ti.xcom_pull(
    task_ids="push",
)
print(f"pulled_value_1 : {pulled_value_1}")
print(f"pulled_value_2 : {pulled_value_2}")
print(f"pulled_value_3 : {pulled_value_3}")

ซึ่งจะได้ผลลัพธ์จากการรันตามรูปนี้

[2023-01-28, 16:48:48 UTC] {logging_mixin.py:137} INFO - pulled_value_1 : {'a': 'b'}
[2023-01-28, 16:48:48 UTC] {logging_mixin.py:137} INFO - pulled_value_2 : [1, 2, 3]
[2023-01-28, 16:48:48 UTC] {logging_mixin.py:137} INFO - pulled_value_3 : None

ดูตัวอย่าง code ทั้งหมดได้ที่ :point_down:t3:

github: GitHub - Infanna/airflow-task-xcom-example

ข้อจำกัดของ XComs

XComs สามารถเก็บข้อมูลได้หลายประเภทเช่น ข้อความ, ตัวเลข ขนาดเล็กได้แต่ไม่ควรส่งข้อมูลขนาดใหญ่เช่น dataframes เพราะอาจทำให้หน่วยความจำเต็มโดยข้อจำกัดของหน่วยความจำขึ้นอยู่กับฐานข้อมูลที่เลือกใช้


Figure 6: data base example

Reference

https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html

2 Likes

ผมเคยโยน Data ผ่าน Xcom มาละ สรุปแตก (> 1GB) ตอนนั้นมือใหม่ 5555 เนื่องจากเคยเขียนด้วย Apache Beam มาก่อน ก็นึกว่าจะ | Data ใหลไปตามทางได้เลย แหม

ถ้าอยากทำเอาความสนุก ก็แก้ XComs เอง ให้ไปเซฟที่อื่นครับ :joy: ซึ่งไม่แนะนำอย่างยิ่ง

ส่วนถ้าอยากดูที่เป็นประมาณข้อมูลไหลผ่าน tasks (หรือ assets) เครื่องมืออีกตัวที่ทำให้เราเห็นแบบนั้นได้คือ Dagster ครับ

ซึ่งข้างในก็เป็น storage แยกออกมา แต่เค้าออกแบบโค้ดไว้เนียนมาก เราสามารถสลับ storage ได้ โดยแทบไม่ต้องแก้ส่วน logic เลย