四、30秒實作第一個 Airflow DAG
如何實作一個 Airflow DAG、設定任務之間的關係 、創建 dags 資料夾和 Python 檔案、Create 一個 DAG 物件、Create Tasks任務
這系列的文章源自我 2023/9/16-2023/10/15 參加的 iT 鐵人賽:
2023 鐵人賽系列文章:Airflow 是什麼? 能吃嗎 ? 數據水管工的超級蘑菇
Before 實作
step1: 開啟專案資料夾 airflow-local
- 可以用 vscode 或是直接開 terminal/命令提示字元
cd
進入也可以。
step2: 進入虛擬環境
source airflow-env/bin/activate
- 看到 terminal 前面有
(airflow-env)
代表進入虛擬環境了。
step3: 開啟 Webserver
airflow webserver -p 8080
- 打開 http://localhost:8080/ 就可以看到這個畫面了
- 看到上面有三行通知:
1.The scheduler does not appear to be running:提醒你還沒開啟 Scheduler
2.Do not use SQLite as metadata DB in production:在正式開發不要用 SQLite
3.Do not use the SequentialExecutor in production:在正式開發不要用 SequentialExecutor
SequentialExecutor 是第一次安裝 Airflow 時的預設執行器,也是唯一一個可以和 sqlite 協作的執行器,但缺點就是一次只能執行一項任務,所以執行效率很差,但目前測試也沒關係。
Ref: 官方說明
step4: 開啟 Scheduler
開啟一個新的 terminal,運行 airflow scheduler
,接著打開 http://localhost:8080/ 按 F5
刷新頁面,第一行通知就會消失了。
開始實作
1. 創建 dags 資料夾和 Python 檔案
首先我們要創建一個 dags/
資料夾在 ~/airflow/
專案當中,並在裡面建立一個 python 檔案,dags/first_dag.py
- 目前整個資料夾結構如下:
--- dags/
--- first_dag.py
--- logs/
--- airflow.cfg
--- airflow.db
--- webserver_config.py
2. 在 first_dag.py 中匯入套件
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
- datetime 模組是用來設定開始時間、間隔時間和時區。
- airflow 的 DAG 就是我們要創建的模組
- BashOperator 是我們等等要使用的 Operator,是最基礎的,就像是執行 terminal 指令一樣。
3. Create 一個 DAG 物件
with DAG(
dag_id = 'first_dag',
start_date = datetime(2023, 9, 22),
schedule_interval='@daily'
) as dag:
- 在這裡我們會幫 DAG 做基礎的設定,
dag_id
就是在 dag 的唯一識別名稱
!強烈建議!要和 python 檔案用相同名稱,不然之後 DAG 一多就會很混亂
start_date
就是 DAG 要開始執行的日期schedule_interval
是時間間隔,'@daily'
代表以天為單位
4. Create 任務們
在 ) as dag:
下方我們就可以直接來寫任務了
...
) as dag:
task1 = BashOperator(
task_id='task1',
bash_command="echo start!!"
)
task2 = BashOperator(
task_id='task2',
bash_command="echo finish!!"
)
task_id
: task 的名稱bash_command
:我們要執行的 bash 指令echo start!!
:echo 就是 Python 當中的 print 意思,印出start!!
task2
以此類推
5. 設定任務之間的關係 / 相依性 (Dependecies)
task1 >> task2
- 先執行 task1 再執行 task2
6. 重新運行 webserver
airflow webserver -p 8080
airflow scheduler
- 開啟 http://localhost:8080/ ,就可以看到我們的 DAG 了
完整的程式碼
- path: /Users/{user_name}/airflow-local/dags/first_dag.py
- virtual-env: airflow-env
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
dag_id = 'first_dag',
start_date = datetime(2023, 9, 22),
schedule_interval=None
) as dag:
task1 = BashOperator(
task_id='task1',
bash_command="echo start!!"
)
task2 = BashOperator(
task_id='task2',
bash_command="echo finish!!"
)
task1 >> task2