Airflow Authentication with Keycloak

ก่อนที่จะไปเล่น Keycloak เราควรมาทำความรู้จักเรื่อง OAuth กับ OpenID Connect (OIDC) กันก่อน

ด้านล่างเป็นรูปที่ผมตัดมาจากวีดีโอด้านบน เอามาสรุปเป็นภาพรวมให้เห็นกันครับ

ดูการใช้งาน Keycloak เบื้องต้น

หน่วยใหญ่สุดคือ Realm เรามองเป็นเหมือน namespace ใน K8s ก็ได้นะ มันเอาไว้จัดการ objects ต่างๆ ตรงนี้ถ้าให้ง่ายที่สุด เราอาจจะสร้าง realm มาเป็น dev, UAT, prod อะไรแบบนี้

Client ใน Keycloak เราสามารถมองเทียบเป็น application ได้ เช่น ถ้าเราอยากจะทำ auth กับ Airflow เราก็สร้าง client ของ Airflow มา หรือถ้าเราอยากทำ auth กับ Django app เราก็ทำ client สำหรับ Django app มา

เรื่อง Role ก็จะมีในระดับ Realm แล้วก็ในระดับ Client ด้วย ยกตัวอย่างเช่น Role ใน Airflow เรามี Viewer กับ Admin เราก็สามารถไปสร้าง Role ใน Keycloak ให้มีทั้ง Viewer และ Admin ได้

ส่วน Group ก็เหมือนกับ Group ของ User ทั่วไป เราสามารถทำ Role Mappings ได้ คือแทนที่เราจะเอา Role ผูกไว้กับ User เราก็ผูกไว้กับ Group แทน แล้วจับ User เข้าไปใส่ใน Group นั้นๆ

ในการทำ authentication กับ Airflow ได้ไอเดียมาจาก Airflow authentication with RBAC and Keycloak เขียนไว้ได้อย่างงดงามเลยทีเดียว

ทำขั้นตอนพื้นฐานตามในลิ้งค์ด้านบนได้เลย

  1. สร้าง Realm
  2. สร้าง Client
  3. สร้าง Role ใน Client
  4. สร้าง Group
  5. สร้าง User แล้วจับใส่ Group

และเราจำเป็นต้องแก้ใน Airflow ด้วย เพื่อให้เราสามารถที่จะ authenticate เข้าไปใน Airflow ได้ โดยใช้ User ใน Keycloak โค้ดหลักๆ มีตามนี้ แก้ที่ไฟล์ webserver_config.py

import jwt
import logging
import os

from flask_appbuilder import expose
from flask_appbuilder.security.views import AuthOAuthView

from airflow.www.fab_security.manager import AUTH_OAUTH
from airflow.www.security import AirflowSecurityManager

log = logging.getLogger(__name__)

MY_PROVIDER = "keycloak"
KEYCLOAK_CLIENT_ID = os.environ.get("KEYCLOAK_CLIENT_ID")
KEYCLOAK_CLIENT_SECRET = os.environ.get("KEYCLOAK_CLIENT_SECRET")
KEYCLOAK_BASE_URL = os.environ.get("KEYCLOAK_BASE_URL")
KEYCLOAK_TOKEN_URL = os.environ.get("KEYCLOAK_TOKEN_URL")
KEYCLOAK_AUTH_URL = os.environ.get("KEYCLOAK_AUTH_URL")

# สำหรับ override ส่วน logout
class CustomAuthRemoteUserView(AuthOAuthView):
    @expose("/logout/")
    def logout(self):
        """Delete access token before logging out."""
        return super().logout()

# สำหรับ override ในจังหวะที่เรา authenticate
class CustomSecurityManager(AirflowSecurityManager):
    authoauthview = CustomAuthRemoteUserView

    def oauth_user_info(self, provider, response):
        if provider == MY_PROVIDER:
            token = response["access_token"]
            me = jwt.decode(token, algorithms="RS256", verify=False)
            # หน้าตา resource_access
            # {
            #   "resource_access": { "airflow": { "roles": ["airflow_admin"] }}
            # }
            groups = me["resource_access"]["airflow"]["roles"]  # unsafe
            log.info("groups: {0}".format(groups))
            if len(groups) < 1:
                groups = ["airflow_public"]
            else:
                groups = [str for str in groups if "airflow" in str]

            # ตรงนี้เราจำเป็นที่จะต้องแมพค่าต่างๆ ให้ Airflow ด้วย
            userinfo = {
                "username": me.get("preferred_username"),
                "email": me.get("email"),
                "first_name": me.get("given_name"),
                "last_name": me.get("family_name"),
                "role_keys": groups,
            }
            log.info("user info: {0}".format(userinfo))

            return userinfo
        else:
            return {}

# บอก Airflow ว่าเราจะใช้ OAuth
AUTH_TYPE = AUTH_OAUTH

# ถ้ายังไม่มี User ใน Airflow เลย เราจะให้สร้างใหม่
AUTH_USER_REGISTRATION = True
# มี Role ใน Airflow พื้นฐานคือ Public
AUTH_USER_REGISTRATION_ROLE = "Public"
# เราจะบอกว่าเดี๋ยวเราจะ replace Role ใน Airflow ด้วย Role จาก OAuth
AUTH_ROLES_SYNC_AT_LOGIN = True
PERMANENT_SESSION_LIFETIME = 1800

# ตรงนี้เราต้องทำ Role Mappings ด้วย ค่า key คือค่า role ที่มาจาก Keycloak ส่วน value คือ role ใน Airflow
AUTH_ROLES_MAPPING = {
    "airflow_admin": ["Admin"],
    "airflow_op": ["Op"],
    "airflow_user": ["User"],
    "airflow_viewer": ["Viewer"],
    "airflow_public": ["Public"],
}

# เซตค่า OAUTH_PROVIDERS ด้วยค่าต่างๆ จาก Keycloak
# ดูเพิ่มเติมได้ที่ https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth
OAUTH_PROVIDERS = [{
    "name": "keycloak",
    "token_key": "access_token",
    "icon": "fa-circle-o",
    "remote_app": {
        "api_base_url": KEYCLOAK_BASE_URL,
        "client_kwargs": {
            "scope": "email profile"
        },
        "access_token_url": KEYCLOAK_TOKEN_URL,
        "authorize_url": KEYCLOAK_AUTH_URL,
        "request_token_url": None,
        "client_id": KEYCLOAK_CLIENT_ID,
        "client_secret": KEYCLOAK_CLIENT_SECRET,
    }
}]

# Override ตัว security manager ให้ไปใช้คลาสที่เราสร้างขึ้น
SECURITY_MANAGER_CLASS = CustomSecurityManager

หลังจากนั้นก็เปิด Airflow ขึ้นมา เราก็จะได้หน้า Login งามๆ

พอกดปุ่ม ตัวเว็บจะ redirect เรามาที่หน้า Keycloak sign in ครับ

หลังจาก sign in เสร็จ ผมก็จะได้ user บน Airflow เป็นที่เรียบร้อย โดยมี role ตามที่ผมกำหนดไว้ใน Keycloak (ในทีนี้ผมกำหนดให้ user ชื่อ myairflowuser มี role เป็น airflow_admin ใน Keycloak ครับ)

ลองเอาไปเล่นกันดูนะ :wink:

3 Likes

สำหรับ Airflow 2.3.3+ ให้ปรับ code ในการ decode JWT ตามนี้

class CustomSecurityManager(AirflowSecurityManager):
    authoauthview = CustomAuthRemoteUserView

    def oauth_user_info(self, provider, response):
        if provider == MY_PROVIDER:
            token = response["access_token"]
-           me = jwt.decode(token, algorithms="RS256", verify=False)
+           me = jwt.decode(token, algorithms="RS256", options={"verify_signature": False})

เนื่องจากมีการเปลี่ยน version ของ pyJWT จาก 1.X to 2.X

1 Like