การ Save Data ด้วย pyspark หลังจาก drop duplicate

สวัสดีครับ อยากจะขอถาม use case นิดนึงครับ ผมเพิ่งจะเริ่มมาจัดการ data จริงจังหลังจากย้ายมาทำ Data engineer และเพิ่งเริ่มใช้ Spark and Databricks ครับ

ในกรณีที่ อ่านข้อมูลที่อยู่ใน parquet format ซึ่งมี data duplicates อยู่ 2 rows(อ่านข้อมูลจาก blob และ filter ตาม condition) หลังจากจัดการ drop duplicates แล้ว ก็จะเหลือแค่ row เดียว ทีนี้เราจะเซฟข้อมูลกลับไปที่เดิมยังไงครับ ที่ไปอ่านใน document ของ databricks/pyspark ก็จะมีการ append / overwrite ซึ่งผมเข้าใจว่าถ้า overwrite ด้วยข้อมูล row เดียวมันจะไปเอา content นี้ไปแทนที่ข้อมูลทั้งหมดใน blob ใช่หรือป่าวครับ ส่วน append จะเพิ่ม row นั้นเข้าไปใน table ที่หรือ dir ที่มีอยู่แล้ว ไม่แน่ใจว่ามันเป็นการเพิ่ม duplicate เข้าไปอีกทีไหมครับ

ช่วยแชร์ทีครับว่าครรทำแบบไหน หรือมีวิธีอื่นเพิ่มเติมไหมครับ ผมกังวลเรื่อง data loss เลยพยายามทำความเข้าใจตรงนี้อยู่ครับ

1 Like

เข้าใจถูกแล้วครับ overwrite คือทับลงไปที่ไฟล์เดิม ซึ่งเราจะเสียความเป็น origin ไป แล้วก็ append เป็นการเพิ่ม row เข้าไปครับ ซึ่งก็จะเพิ่ม duplicate เข้าไปอีกที

ตอนนี้ผมใช้วิธีแยก 2 folders (หรือ 2 buckets) ครับ โดยที่ folder แรกเก็บ data ที่มาจาก source จริง ๆ เลย ออกแนวมาแบบไหนก็เก็บประมาณนั้น ก็จะมี duplicates อยู่ในนั้น ถ้าเป็น parquet อยู่แล้วก็ดีเลย แล้วก็ transform + dedup เอามาเก็บอีก folder หนึ่งครับผม

ประมาณนี้

raw-to-staging

เดี๋ยวรอเพื่อน ๆ ท่านอื่นมาเสริมเพิ่มเติม :blush:

ปล. pts-ark นี่คือ pyspark ใช่เปล่านะ o_O

1 Like

ขอบคุณมากครับ ช่วยคลายความสงสัยไปเลยครับ แล้วก็ขอบคุณที่ช่วยแชร์ use case ให้นะครับ กำลังจะต้องเริ่มทำ pipeline ด้วยเลยครับ

ปล. แก้หัวข้อแล้วครับ แหะๆ เมื่อคืนน่าจะเบลอครับ อ่านแต่ docs ทั้งวันเลยครับ

1 Like

ควรจะเขียนลงที่ใหม่ ไม่ควรเขียนทับที่เดิม

อ้อ แล้วก็ระวังเรื่อง partition size ด้วย
uneven spark partitions มีผลกับ performance
แล้วก็ partition size ควรจะมีขนาดใหญ่พอประมาณนึง ไม่งั้นมันจะไม่คุ้มกะ spark overhead (อย่างต่ำขอประมาณ 10-20mb per partition)

2 Likes

ขอถามเพิ่มเติมอีกนิดครับ กรณีที่ถ้าอยากจะเซฟไว้ที่เดิม

  1. ถ้าผมอ่านเป็น delta table ขึ้น มาจากนั้น
  2. อ่านข้อมูลอีกชุดที่อ่านมาจาก source เดียวกันแต่อ่านเป็น DataFrame แล้ว filter ตาม condition เพื่อหา duplicates แล้วเซฟเก็บไว้ในตัวแปรแค่ row เดียว
  3. เลือก drop ทั้ง 2 rows จาก delta table (1)
  4. แล้วทำการ update เข้าไปใหม่ด้วยการใช้ merge แล้ว set condition ตาม column ที่กำหนดให้ตรงกัน จากนั้นใช้ whenMatchedUpdatedAll() แบบนี้สามารถทำได้ไหมครับ

ขอบคุณมากครับ ตอนนี้ยังไม่มีความรู้เรื่อง performance เท่าไหร่ ตรงนี้ พอจะมี resource ให้ตามไปศึกษาเพิ่มเติมไหมครับ

เล่มนี้โอเคอยู่

delta table ไม่เคยใช้ แต่ตามหลักการแล้วไม่ควรมีการเขียนทับ source table
ลองนึกสภาพดูว่า ถ้า source table มี data เข้ามาเรื่อยๆ
สมมติ ตอนนี้ 2:00PM, มีการ ingest data มาทำกับ de-duplication
แล้ว process นี้ ใช้เวลา 5 นาที
ระหว่างนี้ มี 50 rows เพิ่มเข้ามาใน source table
ถ้าเขียนทับไป จบเลยนะ ข้อมูลหาย

2 Likes

ขอบคุณมากเลยครับ พอจะเข้าใจละครับ

ชอบตัวอย่างนี้มากครัช แจ่มเลย :+1: