สวัสดีค่ะ ตอนนี้อยากจะเชื่อม Airflow กับ MSSQL ค่ะเเต่ติด DAG Import Error ตามภาพที่แนบนี้เลยค่ะ
เบื้องต้นได้install pip install apache-airflow-providers-microsoft-mssql เรียบร้อยเเล้วค่ะเเต่ยังมี ERROR นี้อยู่ค่ะ รบกวนช่วยแนะนำด้วยนะคะ
สวัสดีค่ะ ตอนนี้อยากจะเชื่อม Airflow กับ MSSQL ค่ะเเต่ติด DAG Import Error ตามภาพที่แนบนี้เลยค่ะ
ตอนนี้รัน Airflow แบบไหนเอ่ย ใช้ Docker หรือว่าติดตั้งผ่าน Python virtual environment พอจะสะดวกแชร์โค้ด หรือว่า Repository มาให้ช่วยดูเพิ่มไหมครับ
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
import pandas as pd
from sqlalchemy import create_engine
from urllib.parse import quote_plus
def extract_data():
url1 = “https://my.api.mockaroo.com/mockupdata1_forp.json?key=299b8490”
url2 = “https://my.api.mockaroo.com/mockupdata2_forp.json?key=299b8490”
r1 = requests.get(url1)
r2 = requests.get(url2)
data1 = r1.json()
data2 = r2.json()
df1 = pd.DataFrame(data1)
df2 = pd.DataFrame(data2)
df1.columns = df1.columns.str.strip()
df2.columns = df2.columns.str.strip()
df2 = df2.rename(columns={'STAFFCODEs': 'STAFFCODE'})
merged_df = pd.merge(df1, df2, how='right', on='STAFFCODE')
return merged_df
def transform_data(ti):
df = ti.xcom_pull(task_ids=‘extract_data’)
selected_columns = [‘POSITIONIDs’, ‘POSITIONSTATUSs’, ‘STAFFCODE’ ,‘POSITIONNAMEs’, ‘ACADEMICPOSITION’, ‘WORKPOSITIONs’,‘STAFFTYPE’,‘DEPARTMENTNAMEs’,‘DEPARTMENTID’,‘ADMITDATE’, ‘EXPIREDATEs’,‘STAFFSEXs’,‘STAFFAGEs’,‘DEGREELEVELs’,‘GENERATIONs’, ‘GROUPEXPERIENCEs’,‘BIRTHPROVINCENAMEs’,‘YEAREXPERIENCEs’,‘STAFFSTATUSs’, ‘SUBDEPARTMENTs’,‘REMARK2s’,‘Timestamp’,‘OLDSTAFFs’,‘PREFIXNAMEENG’, ‘STAFFNAMEENG’,‘STAFFSURNAMEENG’]
transformed_df = df[selected_columns]
transformed_df.insert(loc=transformed_df.columns.get_loc(‘REMARK2s’)+1, column=‘fiscal_year_rate’, value=None)
return transformed_df
def load_data(ti):
df = ti.xcom_pull(task_ids=‘transform_data’)
engine = create_engine(r’mssql+pyodbc://DESKTOP-12NQBC8\User@DESKTOP-12NQBC8?driver=ODBC+Driver+17+for+SQL+Server&database=masterdata’)
with engine.begin() as connection:
df.to_sql(‘masterdata’, con=connection, if_exists=‘append’, index=False)
with DAG(
“ETL_load_to_MSSQL”,
start_date=datetime(2024, 3, 30),
schedule_interval= None,
tags=[“MSSQLtest”]
) as dag:
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
dag=dag,
)
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
dag=dag,
)
load_task = PythonOperator(
task_id='load_data',
python_callable=load_data,
dag=dag,
)
extract_task >> transform_task >> load_task
ตอนนี้พอทำได้แล้วค่ะ
เเต่ติด ERROR ตรงนี้
[2024-04-01, 12:49:28 UTC] {taskinstance.py:2731} ERROR - Task failed with exception
[2024-04-01, 12:49:28 UTC] {standard_task_runner.py:107} ERROR - Failed to execute job 39 for task load_data ((pyodbc.Error) (‘01000’, “[01000] [unixODBC][Driver Manager]Can’t open lib ‘ODBC Driver 17 for SQL Server’ : file not found (0) (SQLDriverConnect)”)
งดงามมากครับ