เล่า spark on k8s แบบสั้นๆ ตอน 3

จากตอน 2(เล่า spark on k8s แบบสั้นๆ ตอน 2) ผมปิด event log ไปครับ
ตอน 3 นี้ก็จะสรุปเรื่อง event log + history server ของ spark ครับ และก็จะเปิด event log เพื่อใช้กับ spark history server

ตอนท้ายของตอน 2 ที่เกริ่นไว้ว่าจะลอง spark operator จาก GitHub - GoogleCloudPlatform/spark-on-k8s-operator: Kubernetes operator for managing the lifecycle of Apache Spark applications on Kubernetes.
คิดว่าไม่น่าได้ใช้ประโยชน์จากตรงนี้มาก เลยข้ามดีกว่า ไปลองของที่จำเป็นอย่าง event log ดีกว่า

เล่าสั้นๆ เรื่องของ event log ของ spark ก่อน
หลังจากเราสร้าง SparkContext ขึ้นมาในแต่ละ spark job มันจะ launch web ui ที่ port 4040 เพื่อให้ดูว่าเกิดอะไรขึ้นบ้างใน spark job นี้
โดยเราจะเรียกสิ่งที่เกิดขึ้นพวกนี้ว่า event log
แต่หลังจาก job เสร็จแล้ว เราก็จะเข้าไปดูไม่ได้
เราเลยต้องที่ที่เก็บข้อมูลพวกนี้ หรือ event log ไว้สักที่ แล้วก็เปิด web ui นี้สักที่ เพื่อแสดงว่าเกิดไรขึ้นบ้าง แล้วเราก็เรียกมันว่า Spark history server

เพราะฉะนั้น สิ่งที่ผมต้องทำก็จะมี 2 สิ่งคือ

  1. สั่งให้ spark job บันทึก event log ไว้สักที่
    ปกติแล้ว จะบันทึกไว้ใน hdfs แต่ด้วยบริบทที่ผมทำงาน มี storage ที่ใช้ protocol ของ s3 ผมจะเรียกมันว่า s3-compat ละกัน
    ซึ่งผมจะเก็บไว้ใน s3-compat นี้แหละ
    และ
  2. สั่งรัน spark history server
    ซึ่ง spark history server ก็ bundle มาแล้วใน docker image ที่ปั้นมาในตอน 2 แล้วให้ไปอ่าน event log ที่เก็บไว้ใน s3-compat

Implementation #1 สั่งให้ spark job บันทึก event log ไว้ที่ s3-compat
ต้องใช้ hadoop-aws-x.y.z.jar และ dependency (เพื่อ build image ในตอน 2)
สิ่งที่ยากที่สุดในขั้นตอนนี้คือการเลือก version ของ jar file นี่แหละ
โดยผมเลือก spark-3.1.3-bin-hadoop2.7 เพราะ hadoop ที่ integrate ด้วยยังเป็น version เก่า
ถ้าเช็คดูจะเจอว่า version จะเป็น hadoop 2.7.4
ดังนั้น jar สำหรับ hadoop-aws จะต้องเป็น hadoop-aws-2.7.4.jar(และ dependency) ที่ต้องโหลดมาเก็บใน $SPARK_HOME/jars หรือจะเพิ่ม class path เองก็ไม่ติด
และ config ที่จำเป็นตอน submit คือ การเปิด eventlog และ config ของ s3

spark.eventLog.enabled=true
spark.eventLog.dir=s3a://<bucket_name>/<your_path>
spark.hadoop.fs.s3a.access.key=<ACCESS_KEY>
spark.hadoop.fs.s3a.secret.key=<SECRET_KEY>
spark.hadoop.fs.s3a.endpoint=<S3_COMPAT_URL>
spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem

หรือจะเก็บใน $SPARK_HOME/conf/spark-defaults.conf ก็ได้

Implementation #2 สั่งรัน spark history server
ผมเลือก spark-3.1.3-bin-hadoop3.2 ครับ (ต้องเลือก hadoop-aws-3.2.0.jar(และ dependency)) ซึ่งต่างจาก #1
ซึ่งไม่ได้อยากจะใช้ท่าพิศดารอะไร เพราะจริงๆ เราสามารถ re-use image ที่ build ไว้แล้วได้
แต่พอลองจริงๆ เจอ error ตามนี้

  1. ถ้า build ด้วย spark-3.1.3-bin-hadoop2.7 จะเจอ apache spark - ApacheSpark read from S3 Exception: Premature end of Content-Length delimited message body (expected: 2,250,236; received: 16,360) - Stack Overflow
    คร่าวๆ คือ น่าจะเป็นปัญหาจาก version ของ java แต่ว่าไม่อยากปรับ version เพราะกลัวที่ test ไว้แล้วมันไม่ work เลยไปค้นดูเจอว่ามีคนลองด้วย hadoop3 แล้วรอด เลยลอง spark-3.1.3-bin-hadoop3.2
  2. จะเจอปัญหา kerberos authentication ซึ่งตรงนี้ยังงงๆ นิดหน่อยว่า by default มันน่าจะถูกปิด แต่ก็ explicit declare ด้วย config

spark.hadoop.security.authentication=simple
spark.hadoop.security.authorization=false

เพื่อปิด kerberos authentication เพราะการรัน spark history server ไม่ต้องใช้อะไร เพราะเราเก็บ event log ไว้ s3-compat ที่ไม่ได้ใช้ kerberos

โดย command รวมๆ ในการ start spark history server บน k8s cluster จะประมาณนี้

apiVersion: apps/v1
kind: Deployment
metadata:
  name: spark-history-server
  namespace: <if-you-have>

spec:
  replicas: 1
  selector:
    matchLabels:
      app: spark-history-server

  template:
    metadata:
      name: spark-history-server
      labels:
        app: spark-history-server

    spec:
      containers:
        - name: spark-history-server
          image: <spark-history-server-image>

          command:
            - /opt/spark/bin/spark-class
            - -Dspark.history.fs.logDirectory=s3a://<bucket_name>/<your_path>
            - -Dspark.hadoop.security.authentication=simple
            - -Dspark.hadoop.security.authorization=false
            - -Dspark.hadoop.fs.s3a.access.key=<ACCESS_KEY>
            - -Dspark.hadoop.fs.s3a.secret.key=<SECRET_KEY>
            - -Dspark.hadoop.fs.s3a.endpoint=<S3_COMPAT_URL>
            - -Dspark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
            - org.apache.spark.deploy.history.HistoryServer

          ports:
            - name: http
              protocol: TCP
              containerPort: 18080

          readinessProbe:
            timeoutSeconds: 4
            httpGet:
              path: /
              port: http

          livenessProbe:
            timeoutSeconds: 4
            httpGet:
              path: /
              port: http

จบละตอน 3
เรื่อง setting/config นี่ไม่ยาก
แต่สิ่งที่ผมเสียเวลาคือการเสียเวลากับ version ต่างๆ ที่มันไม่ compat จนทำให้เกิด error นี่แหละคับ

1 Like

กราบขอบคุณครับพี่เหน่ง :pray:

ตอนผมทำคอร์สสอน Airflow นี่ท้อแท้มากครับเรื่องนี้ :joy:

1 Like

เจอปัญหาเรื่อง spark pipelines แต่ละอันใช้ resource ไม่เท่ากันอยู่
ตอนแรกคิดว่า อาจจะให้มันเรียก spot instance จาก dagster
แต่ไปๆ มาๆ อาจจะตายเพราะ config ก่อน เผลอๆ อาจจะเจอกับดักว่าไม่ portable พอด้วย (และจะยิ่งแย่ ถ้า เทสบน Local ไม่ได้ มุแง้)
ก็เลยคิดว่า อาจจะไปมัดให้มันโยนขึ้น gcp dataproc serverless
แต่มันเพิ่งมาไม่นาน เลยอาจจะต้องกัดฟันเขียนขึ้นมาเอง
ปัญหา scaling นี่โลกแตกมาก :rofl: