ใช้ SQL สำหรับแก้ปัญหาตอนรัน Airflow Migration หลังจากอัพเกรดเวอร์ชั่น

ตอนที่เปลี่ยนมาเป็น Airflow เวอร์ชั่น 2.2.0 แล้ว ตัว migration รันแล้วเจอ error แบบนี้

[2021-10-11 19:21:13,069] {db.py:817} ERROR - The task_instance table has 53 rows without a corresponding dag_run row. You must manually correct this problem (possibly by deleting the problem rows).
[2021-10-11 19:21:13,071] {db.py:817} ERROR - The task_fail table has 24 rows without a corresponding dag_run row. You must manually correct this problem (possibly by deleting the problem rows).

เหมือนมีข้อมูลที่ไม่ได้ลิ้งค์กับใครเลยค้างอยู่ เราต้องเคลียร์พวกนี้ออกก่อน วิธีแก้ปัญหาตอนนี้คือใช้ SQL ใน issue ด้านล่างนี้เลย

BEGIN;

-- Remove dag runs without a valid run_id
DELETE FROM dag_run WHERE run_id is NULL;

-- Remove task fails without a run_id
WITH task_fails_to_remove AS (
  SELECT 
    task_fail.dag_id,
    task_fail.task_id,
    task_fail.execution_date
  FROM
    task_fail
  LEFT JOIN 
    dag_run ON 
    dag_run.dag_id = task_fail.dag_id 
    AND dag_run.execution_date = task_fail.execution_date
  WHERE
    dag_run.run_id IS NULL
)
DELETE FROM
    task_fail
USING 
    task_fails_to_remove
WHERE (
    task_fail.dag_id = task_fails_to_remove.dag_id
    AND task_fail.task_id = task_fails_to_remove.task_id
    AND task_fail.execution_date = task_fails_to_remove.execution_date
);

-- Remove task instances without a run_id
WITH task_instances_to_remove AS (
  SELECT
    task_instance.dag_id,
    task_instance.task_id,
    task_instance.execution_date
  FROM
    task_instance
  LEFT JOIN 
    dag_run 
    ON dag_run.dag_id = task_instance.dag_id
    AND dag_run.execution_date = task_instance.execution_date
  WHERE 
    dag_run.run_id is NULL
)
DELETE FROM 
    task_instance
USING
    task_instances_to_remove
WHERE (
    task_instance.dag_id = task_instances_to_remove.dag_id
    AND task_instance.task_id = task_instances_to_remove.task_id
    AND task_instance.execution_date = task_instances_to_remove.execution_date
);

COMMIT;

ล่าสุดใช้ Query นี้ก็น่าจะเพียงพอ

WITH task_fails_to_remove AS (
  SELECT 
    task_fail.dag_id,
    task_fail.task_id,
    task_fail.execution_date
  FROM
    task_fail
  LEFT JOIN 
    dag_run ON 
    dag_run.dag_id = task_fail.dag_id 
    AND dag_run.execution_date = task_fail.execution_date
  WHERE
    dag_run.run_id IS NULL
)
DELETE FROM
    task_fail
USING 
    task_fails_to_remove
WHERE (
    task_fail.dag_id = task_fails_to_remove.dag_id
    AND task_fail.task_id = task_fails_to_remove.task_id
    AND task_fail.execution_date = task_fails_to_remove.execution_date
);

ปัญหานี้เดี๋ยวทาง Airflow Community จะแก้ให้เวลาที่เราลบ DAG Run มันจะลบแบบ cascade (ไปลบตัวอื่นๆ เช่น task instance ที่เกี่ยวข้องด้วย)