ขอวิธีเชื่อม Airflow กับ MSSQL หน่อยค่า

สวัสดีค่ะ ตอนนี้อยากจะเชื่อม Airflow กับ MSSQL ค่ะเเต่ติด DAG Import Error ตามภาพที่แนบนี้เลยค่ะ


เบื้องต้นได้install pip install apache-airflow-providers-microsoft-mssql เรียบร้อยเเล้วค่ะเเต่ยังมี ERROR นี้อยู่ค่ะ รบกวนช่วยแนะนำด้วยนะคะ

ตอนนี้รัน Airflow แบบไหนเอ่ย ใช้ Docker หรือว่าติดตั้งผ่าน Python virtual environment พอจะสะดวกแชร์โค้ด หรือว่า Repository มาให้ช่วยดูเพิ่มไหมครับ :blush:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
import pandas as pd
from sqlalchemy import create_engine
from urllib.parse import quote_plus

def extract_data():
url1 = “https://my.api.mockaroo.com/mockupdata1_forp.json?key=299b8490
url2 = “https://my.api.mockaroo.com/mockupdata2_forp.json?key=299b8490

r1 = requests.get(url1)
r2 = requests.get(url2)

data1 = r1.json()
data2 = r2.json()

df1 = pd.DataFrame(data1)
df2 = pd.DataFrame(data2)

df1.columns = df1.columns.str.strip()
df2.columns = df2.columns.str.strip()

df2 = df2.rename(columns={'STAFFCODEs': 'STAFFCODE'})

merged_df = pd.merge(df1, df2, how='right', on='STAFFCODE')

return merged_df

def transform_data(ti):
df = ti.xcom_pull(task_ids=‘extract_data’)
selected_columns = [‘POSITIONIDs’, ‘POSITIONSTATUSs’, ‘STAFFCODE’ ,‘POSITIONNAMEs’, ‘ACADEMICPOSITION’, ‘WORKPOSITIONs’,‘STAFFTYPE’,‘DEPARTMENTNAMEs’,‘DEPARTMENTID’,‘ADMITDATE’, ‘EXPIREDATEs’,‘STAFFSEXs’,‘STAFFAGEs’,‘DEGREELEVELs’,‘GENERATIONs’, ‘GROUPEXPERIENCEs’,‘BIRTHPROVINCENAMEs’,‘YEAREXPERIENCEs’,‘STAFFSTATUSs’, ‘SUBDEPARTMENTs’,‘REMARK2s’,‘Timestamp’,‘OLDSTAFFs’,‘PREFIXNAMEENG’, ‘STAFFNAMEENG’,‘STAFFSURNAMEENG’]
transformed_df = df[selected_columns]
transformed_df.insert(loc=transformed_df.columns.get_loc(‘REMARK2s’)+1, column=‘fiscal_year_rate’, value=None)
return transformed_df

def load_data(ti):
df = ti.xcom_pull(task_ids=‘transform_data’)
engine = create_engine(r’mssql+pyodbc://DESKTOP-12NQBC8\User@DESKTOP-12NQBC8?driver=ODBC+Driver+17+for+SQL+Server&database=masterdata’)
with engine.begin() as connection:
df.to_sql(‘masterdata’, con=connection, if_exists=‘append’, index=False)

with DAG(
“ETL_load_to_MSSQL”,
start_date=datetime(2024, 3, 30),
schedule_interval= None,
tags=[“MSSQLtest”]
) as dag:

extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag,
)

transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
dag=dag,
)

load_task = PythonOperator(
task_id='load_data',
python_callable=load_data,
dag=dag,
)

extract_task >> transform_task >> load_task

ตอนนี้พอทำได้แล้วค่ะ

เเต่ติด ERROR ตรงนี้
[2024-04-01, 12:49:28 UTC] {taskinstance.py:2731} ERROR - Task failed with exception
[2024-04-01, 12:49:28 UTC] {standard_task_runner.py:107} ERROR - Failed to execute job 39 for task load_data ((pyodbc.Error) (‘01000’, “[01000] [unixODBC][Driver Manager]Can’t open lib ‘ODBC Driver 17 for SQL Server’ : file not found (0) (SQLDriverConnect)”)

1 Like

หนูทำได้แล้วค่ะ ขอบคุณค่ะ

2 Likes

งดงามมากครับ :tada: