四、30秒實作第一個 Airflow DAG

如何實作一個 Airflow DAG、設定任務之間的關係 、創建 dags 資料夾和 Python 檔案、Create 一個 DAG 物件、Create Tasks任務

四、30秒實作第一個 Airflow  DAG
Photo by Agê Barros / Unsplash

這系列的文章源自我 2023/9/16-2023/10/15  參加的 iT 鐵人賽:
2023 鐵人賽系列文章:Airflow 是什麼? 能吃嗎 ? 數據水管工的超級蘑菇

Before 實作

step1: 開啟專案資料夾 airflow-local

  • 可以用 vscode 或是直接開 terminal/命令提示字元 cd 進入也可以。

step2: 進入虛擬環境

source airflow-env/bin/activate
https://ithelp.ithome.com.tw/upload/images/20230924/20135427NwpzDZNGG4.png
  • 看到 terminal 前面有 (airflow-env) 代表進入虛擬環境了。

step3: 開啟 Webserver

airflow webserver -p 8080
https://ithelp.ithome.com.tw/upload/images/20230922/201354279JqwAWPnEc.png
  • 看到上面有三行通知:
    1.The scheduler does not appear to be running:提醒你還沒開啟 Scheduler
    2.Do not use SQLite as metadata DB in production:在正式開發不要用 SQLite
    3.Do not use the SequentialExecutor in production:在正式開發不要用 SequentialExecutor

SequentialExecutor 是第一次安裝 Airflow 時的預設執行器,也是唯一一個可以和 sqlite 協作的執行器,但缺點就是一次只能執行一項任務,所以執行效率很差,但目前測試也沒關係。
Ref: 官方說明

step4: 開啟 Scheduler

開啟一個新的 terminal,運行 airflow scheduler ,接著打開 http://localhost:8080/F5 刷新頁面,第一行通知就會消失了。

開始實作

1. 創建 dags 資料夾和 Python 檔案

首先我們要創建一個 dags/ 資料夾在 ~/airflow/ 專案當中,並在裡面建立一個 python 檔案,dags/first_dag.py

  • 目前整個資料夾結構如下:
--- dags/
    --- first_dag.py
--- logs/
--- airflow.cfg
--- airflow.db
--- webserver_config.py

2. 在 first_dag.py 中匯入套件

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
  • datetime 模組是用來設定開始時間、間隔時間和時區。
  • airflow 的 DAG 就是我們要創建的模組
  • BashOperator 是我們等等要使用的 Operator,是最基礎的,就像是執行 terminal 指令一樣。

3. Create 一個 DAG 物件

with DAG(
    dag_id = 'first_dag',
    start_date = datetime(2023, 9, 22),
    schedule_interval='@daily'
) as dag:
  • 在這裡我們會幫 DAG 做基礎的設定,dag_id 就是在 dag 的唯一識別名稱
!強烈建議!要和 python 檔案用相同名稱,不然之後 DAG 一多就會很混亂
  • start_date 就是 DAG 要開始執行的日期
  • schedule_interval 是時間間隔,'@daily' 代表以天為單位

4. Create 任務們

) as dag:下方我們就可以直接來寫任務了

...
) as dag:
    task1 = BashOperator(
        task_id='task1',
        bash_command="echo start!!"
    )

    task2 = BashOperator(
        task_id='task2',
        bash_command="echo finish!!"
    )
  • task_id: task 的名稱
  • bash_command:我們要執行的 bash 指令
  • echo start!!:echo 就是 Python 當中的 print 意思,印出 start!!
  • task2 以此類推

5. 設定任務之間的關係 / 相依性 (Dependecies)

task1 >> task2
  • 先執行 task1 再執行 task2

6. 重新運行 webserver

airflow webserver -p 8080
airflow scheduler
https://ithelp.ithome.com.tw/upload/images/20230923/201354273VKvenQQRg.png

完整的程式碼

  • path: /Users/{user_name}/airflow-local/dags/first_dag.py
  • virtual-env: airflow-env
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
    dag_id = 'first_dag',
    start_date = datetime(2023, 9, 22),
    schedule_interval=None
) as dag:
    task1 = BashOperator(
        task_id='task1',
        bash_command="echo start!!"
    )

    task2 = BashOperator(
        task_id='task2',
        bash_command="echo finish!!"
    )

    task1 >> task2