Airflow XComs คืออะไร
Airflow เป็น Open Source Platform ที่ช่วยจัดการ Workflow ด้วยการสร้าง Data Pipeline สามารถเขียนโปรแกรมภาษา python เพื่อควบคุม และจัดการกับข้อมูลมหาศาลได้ โดย Airflow มีการเขียน Workflow เป็น DAG (Directed Acyclic Graph) ซึ่ง DAG ประกอบไปด้วยหลายๆ Task ที่เชื่อมต่อกันและในแต่ละ Task ก็จะมีกระบวนการทำงานที่ต่างกันไปแล้วสงสัยกันไหมครับว่า Task แต่ละส่วนสื่อสารกันได้ยังไง วันนี้ผมจะมาอธิบายข้อสงสัยนี้ครับ
Airflow XComs
Airflow XComs (cross-communications) เป็นเครื่องมือที่ช่วยแบ่งข้อมูลกันระหว่าง DAG หรือระหว่าง Task ก็ได้โดยจัดเก็บข้อมูลไว้รวมกันที่ศูนย์กลางที่เรียกว่า XComs ทุก Task สามารถเข้าถึง XComs ได้เหมือนทำให้ตัวแปร Local ย้ายไปอยู่ที่ Global🌍 สามารถส่งข้อมูลถึงกันได้โดยไม่จำเป็นต้องเป็น Task ที่อยู่ติดกัน
Figure 1: XComs workflow example
XComs จะเป็นตัวกลางในการแลกเปลี่ยนข้อมูลระหว่าง Task โดยมีตัวแปรอ้างอิง 4 ตัวคือ
dag_id
ชื่อของ DAG ใช้อ้างอิงในกรณีที่ต้องการดึงข้อมูลจาก DAG อื่นtask_ids
ชื่อของ Task ที่ต้องการดึงข้อมูลkey
ชื่อตัวแปรที่ต้องการดึงข้อมูลvalue
ค่าที่เก็บอยู่ในตัวแปร
วันนี้เราจะมาอธิบายการสื่อสารกันระหว่าง Task ใน DAG เดียวกันตัวอย่างต่อไปนี้จึงไม่มี dag_id
ใน code นะครับ
ฟังก์ชั่นที่ใช้
-
xcom_push เป็นฟังก์ชั่นที่ใช้ส่งค่าไปยัง XComs
-
xcom_pull เป็นฟังก์ชั่นที่ใช้ดึงค่าจาก XComs มาใช้
ต่อไปจะเป็นตัวอย่าง code ครับมาเริ่มกันเลย
วิธีรับและส่งข้อมูลผ่าน XComs
การส่งข้อมูลไปเก็บไว้ที่ XComs มี 2 วิธีคือ
-
Return Value ออกมาจากฟังก์ชั่นวิธีนี้ก็เหมือนกันส่งค่าออกมาจากฟังก์ชันที่เราคุ้นเคยกันอยู่แล้วโดย Airflow จะนำค่าที่ Return ออกมาไปเก็บไว้ที่ XComs โดยส่งค่า
key
ชื่อว่า return_value ให้อัตโนมัติ@task def push_by_returning(ti=None): value_1 = {"a": "b"} return value_1
-
ใช้ฟังก์ชั่น xcom_push โดยต้องมีพารามิเตอร์สองตัวคือ
key
และvalue
ในการส่งข้อมูล@task def push(ti=None): value_2 = [1, 2, 3] ti.xcom_push( key="push_key", value=value_2 )
เมื่อ Run 2 Task นี้ก็จะมีข้อมูลส่งไปที่ XComs ซึ่งสามารถเข้าไปดูได้ที่แถบเมนูใน UI ของ Airflow เลือก Admin ➜ XComs
Figure 4: Show the XComs UI
หรือเข้าไปที่ UI ของแต่ละ Task จะมี XCom อยู่ซึ่งจะแสดงแค่เฉพาะตัวแปรที่ Task นั่นๆส่งไปที่ XComs
Figure 5: Show the XCom UI
ตัวอย่างข้อมูลที่ถูกส่งขึ้นมาใน XComs
Figure 6: Show data in XComs
แล้วเราจะดึงข้อมูลจาก XComs มาใช้ได้ยังไง ?
หลังจาก push ข้อมูลไปยัง XComs เราสามารถดึงข้อมูลได้ด้วยฟังก์ชั่น xcom_pull โดยต้องมีพารามิเตอร์หนึ่งตัวที่จำเป็นต้องใส่คือ task_ids
ส่วนตัวแปร key
นั่นถ้าไม่กำหนดค่าจะดึงค่าจากตัวแปรที่ชื่อว่า return_value ออกมาโดย default
เดียวเราจะลองมาดูตัวอย่างการ pull ข้อมูลกันนะครับ
ตัวอย่างแรก pulled_value_1 เราจะ pull จาก Task push_by_returning โดยไม่ใส่ key จะได้ผลลัพธ์เป็น {‘a’: ‘b’} เพราะ Task push_by_returning ใช้ฟังก์ชั่น return เมื่อไม่ใส่ key
, xcom_pull จะเรียกตัวแปร return_value ออกมา
ตัวอย่างที่สอง pulled_value_2 เราจะ pull จาก Task push โดยใส่ key เป็น push_key เพื่อดึงข้อมูลจากตัวแปร push_key ใน XComs ออกมาจะได้ผลลัพธ์เป็น [1, 2, 3]
ตัวอย่างที่สาม pulled_value_3 คราวนี้เราลองไม่ใส่ key ในการดึงข้อมูลจาก Task push บ้างข้อมูลจะออกมาเป็น [1, 2, 3] เหมือน pulled_value_2 ไหมก็ดึงข้อมูลจาก Task เดียวกันนี่หน่า ผลลัพธ์ที่ได้คือ None เพราะว่าเมื่อเราไม่ใส่ key ฟังก์ชั่น xcom_pull จะเรียกตัวแปร return_value ออกมาแต่เนื่องจาก Task push ของเราไม่ได้ใช้ฟังก์ชั่น return จึงไม่มีตัวแปรที่ชื่อว่า return_value นั่นเองครับ
ตัวอย่าง code
@task
def pull_data_from_xcom(ti=None):
pulled_value_1 = ti.xcom_pull(
task_ids="push_by_returning"
)
pulled_value_2 = ti.xcom_pull(
task_ids="push", key="push_key"
)
pulled_value_3 = ti.xcom_pull(
task_ids="push",
)
print(f"pulled_value_1 : {pulled_value_1}")
print(f"pulled_value_2 : {pulled_value_2}")
print(f"pulled_value_3 : {pulled_value_3}")
ซึ่งจะได้ผลลัพธ์จากการรันตามรูปนี้
[2023-01-28, 16:48:48 UTC] {logging_mixin.py:137} INFO - pulled_value_1 : {'a': 'b'}
[2023-01-28, 16:48:48 UTC] {logging_mixin.py:137} INFO - pulled_value_2 : [1, 2, 3]
[2023-01-28, 16:48:48 UTC] {logging_mixin.py:137} INFO - pulled_value_3 : None
ดูตัวอย่าง code ทั้งหมดได้ที่
github: GitHub - Infanna/airflow-task-xcom-example
ข้อจำกัดของ XComs
XComs สามารถเก็บข้อมูลได้หลายประเภทเช่น ข้อความ, ตัวเลข ขนาดเล็กได้แต่ไม่ควรส่งข้อมูลขนาดใหญ่เช่น dataframes เพราะอาจทำให้หน่วยความจำเต็มโดยข้อจำกัดของหน่วยความจำขึ้นอยู่กับฐานข้อมูลที่เลือกใช้
Figure 6: data base example
Reference
https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html