Airflow Bigquery upsert

มีใครเคยใช้ BigQueryUpsertTableOperator (Link) ไหมครับ ผมพยายามจะใช้กับ Pipeline เพื่อทำการ Upsert จาก Table A ไป Table B แต่หาตัวอย่างการใช้งานไม่เจอ ลองทำตามใน Doc แล้วแต่ก็ยังไม่สำเร็จครับ

ลองดูแถว ๆ นี้ดูนะครับ Astronomer - BigQueryUpsertTableOperator

image

หรือสามารถแปะโค้ดบางส่วนกับ error มาเพิ่มเติมได้เลยครับผม

ตัว operator นี้ผมยังไม่เคยใช้ครับ แต่เท่าที่อ่านจากโค้ดแล้ว ได้ความว่า

  • dataset_id คือ destination ที่เราจะ upsert เข้าไป
  • ส่วน table_resource คือ ข้อมูลที่เราจะ upsert เข้าไปใน dataset_id

คือถ้ามี table ที่ชื่อเดียวกับที่ table_resource อยู่แล้ว ใน dataset_id มันจะ upsert เข้าไปให้ แต่ถ้ายังไม่มี มันก็จะสร้าง table ชื่อเดียวกับใน table_resource ใน dataset_id ให้ครับผม

เช่น ถ้าเรามีโค้ดประมาณนี้

upsert_table = BigQueryUpsertTableOperator(
    task_id="upsert_table",
    dataset_id="my_dataset",
    table_resource={
        "tableReference": {
            "datasetId": "my_temp_dataset",
            "tableId": "hello",
        },
    },
)

ข้อมูลที่อยู่ใน table ที่ชื่อ my_temp_dataset.hello ก็จะ upsert เข้าไปใน my_dataset.hello ครับ