一款优秀的 调度工具

前提

现有版本用的 kattle
因为是图像界面操作 再者 不好做版本控制
便把目光转向了 airflow

安装

docker

1
2
wget https://download.oracle.com/otn_software/linux/instantclient/2380000/instantclient-basic-linux.x64-23.8.0.25.04.zip
unzip instantclient-basic-linux.x64-23.8.0.25.04.zip
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
FROM apache/airflow:3.0.2
USER root
RUN sed -i 's#http://deb.debian.org#https://mirror.nju.edu.cn#g' /etc/apt/sources.list.d/debian.sources \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
libaio1 \
&& apt-get autoremove -yqq --purge \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
COPY ./instantclient_23_8 /opt/oracle_instantclient_23_8
RUN echo /opt/oracle_instantclient_23_8 > /etc/ld.so.conf.d/oracle_instantclient.conf && ldconfig
USER airflow
RUN pip install --proxy http://172.17.0.1:8889 -i https://mirror.nju.edu.cn/pypi/web/simple --no-cache-dir \
"apache-airflow==3.0.2" \
apache-airflow[mysql] \
apache-airflow[microsoft-mssql] \
apache-airflow[odbc] \
apache-airflow[oracle] \
apache-airflow[postgres] \
apache-airflow[redis] \
apache-airflow[samba] \
apache-airflow[http]
1
2
3
docker build -t puzzle9/airflow:3.0.2 .

#docker build -t puzzle9/airflow:3.0.2 --build-arg http_proxy="http://192.168.1.1:8888" --build-arg https_proxy="http://192.168.1.1:8888" .

好了 基础镜像就出来了

再来个 docker-compose.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# https://airflow.apache.org/docs/apache-airflow/3.0.2/docker-compose.yaml
---
x-airflow-common: &airflow-common
image: puzzle9/airflow:3.0.2
# build: .
environment: &airflow-common-env
AIRFLOW__CORE__DEFAULT_TIMEZONE: Asia/Shanghai
AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC: ${AIRFLOW_SCHEDULER_HEARTBEAT_SEC}
AIRFLOW__DAG_PROCESSOR__MIN_FILE_PROCESS_INTERVAL: ${AIRFLOW_MIN_FILE_PROCESS_INTERVAL}
AIRFLOW__CORE__PARALLELISM: ${AIRFLOW_PARALLELISM}
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: ${AIRFLOW_MAX_ACTIVE_RUNS_PER_DAG}
AIRFLOW__CORE__EXECUTOR: ${AIRFLOW_EXECUTOR}
AIRFLOW__CORE__AUTH_MANAGER: airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager
AIRFLOW__DAG_PROCESSOR__DISABLE_BUNDLE_VERSIONING: 'true'
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://${POSTGRES_USERNAME}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT:-5432}/${POSTGRES_DATABASE}
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://${POSTGRES_USERNAME}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT:-5432}/${POSTGRES_DATABASE}
AIRFLOW__CELERY__BROKER_URL: redis://:${VALKEY_PASSWORD}@${VALKEY_HOST}:${VALKEY_PORT:-6379}/${VALKEY_DATABASE:-0}
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__CORE__EXECUTION_API_SERVER_URL: 'http://airflow-apiserver:8080/execution/'
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'false'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg'
volumes:
- ./data/dags:/opt/airflow/dags
- ./data/logs:/opt/airflow/logs
- ./data/config:/opt/airflow/config
- ./data/plugins:/opt/airflow/plugins
- /etc/localtime:/etc/localtime:ro
- /etc/timezone:/etc/timezone:ro
user: '${AIRFLOW_UID:-50000}:0'

services:
airflow-apiserver:
<<: *airflow-common
command: api-server
ports:
- '8080:8080'
restart: always

airflow-scheduler:
<<: *airflow-common
command: scheduler
restart: always

airflow-dag-processor:
<<: *airflow-common
command: dag-processor
restart: always

airflow-triggerer:
<<: *airflow-common
command: triggerer
restart: always

airflow-worker:
<<: *airflow-common
profiles: ['airflow-worker']
command: celery worker
healthcheck:
# yamllint disable rule:line-length
test:
- 'CMD-SHELL'
- 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
environment:
<<: *airflow-common-env
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: '0'
restart: always

airflow-init:
<<: *airflow-common
profiles: ['airflow-init']
entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
echo
export AIRFLOW_UID=$(id -u)
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
echo
echo "Creating missing opt dirs if missing:"
echo
mkdir -v -p /opt/airflow/{logs,dags,plugins,config}
echo
echo "Airflow version:"
/entrypoint airflow version
echo
echo "Files in shared volumes:"
echo
ls -la /opt/airflow/{logs,dags,plugins,config}
echo
echo "Running airflow config list to create default config file if missing."
echo
/entrypoint airflow config list >/dev/null
echo
echo "Files in shared volumes:"
echo
ls -la /opt/airflow/{logs,dags,plugins,config}
echo
echo "Change ownership of files in /opt/airflow to ${AIRFLOW_UID}:0"
echo
chown -R "${AIRFLOW_UID}:0" /opt/airflow/
echo
echo "Change ownership of files in shared volumes to ${AIRFLOW_UID}:0"
echo
chown -v -R "${AIRFLOW_UID}:0" /opt/airflow/{logs,dags,plugins,config}
echo
echo "Files in shared volumes:"
echo
ls -la /opt/airflow/{logs,dags,plugins,config}

# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
_AIRFLOW_DB_MIGRATE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: '0:0'

用的是 LocalExecutor 模式部署的
所以把 healthcheck 也都去掉了

后端的 postgresql redis 独立部署的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# airflow
# echo $(id -u)
AIRFLOW_UID=1000
AIRFLOW_SCHEDULER_HEARTBEAT_SEC=30
# 刷新dag文件秒数 正式环境改为600
AIRFLOW_MIN_FILE_PROCESS_INTERVAL=60
AIRFLOW_PARALLELISM=18
AIRFLOW_MAX_ACTIVE_RUNS_PER_DAG=16
AIRFLOW_EXECUTOR=LocalExecutor

# postgres
POSTGRES_HOST=172.17.0.1
POSTGRES_USERNAME=postgres
POSTGRES_PASSWORD=postgres
POSTGRES_DATABASE=airflow

# valkey
VALKEY_HOST=172.17.0.1
VALKEY_PASSWORD=valkey

docker compose up airflow-init 初始化
docker compose up -d 一键启动

Q&A

dag 不会出来

需要给 执行 权限 chmod +x data/dags/*.py

mssql 相关

mssql 连接时候报 DB-Lib error message 20002

用这个解决

connectionsExtra Fields JSON 中加入 {"tds_version": "7.0"}

oracle 相关

DPY-4027: no configuration directory specified

connectionsExtra Fields JSON 中加入 {"service_name": "orcl"}

DPY-3015: password verifier type 0x939 is not supported by python-oracledb in thin mode

connectionsExtra Fields JSON 中加入 {"thick_mode": true, "thick_mode_lib_dir": "/opt/oracle_instantclient_23_8"}

ORA-28726: set current schema operation failed because the given schema name is either invalid or does not exist

修改 Schema 为 库名

如何本地化运行

1
docker run --name "airflow" -d -p 8080:8080 --env "_AIRFLOW_DB_MIGRATE=true" --env "_AIRFLOW_WWW_USER_CREATE=true" --env "_AIRFLOW_WWW_USER_PASSWORD=admin" -v ./data/dags:/opt/airflow/dags apache/airflow:3.0.2 airflow standalone

如何删除dag历史记录

1
2
3
4
5
6
# 删除指定的
airflow dags delete -v [dag_id]
airflow tasks clear -v [dag_id]

# 删除全部的
airflow db clean --skip-archive --clean-before-timestamp '2020-01-01 00:00:00+01:00' -y

超时相关

运行模式有 LocalExecutorCeleryExecutor 两种

都用的 orphaned_tasks_check_intervaltask_instance_heartbeat_timeout 作为默认时间
也都用 parallelism 作为 work 数量

CeleryExecutor 中 可以用 execution_timeout 定义超时时间 部署时需要启用 airflow-worker
LocalExecutor 中 开始时间为 入队时间

心跳 在 3.0.2 好像有点问题 不会自己更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import time

from airflow import DAG
from airflow.decorators import task
from datetime import datetime, timedelta

with DAG(
dag_id='test_timeouta2',
schedule='@once',
start_date=datetime(2025, 7, 10),
default_args={
"execution_timeout": timedelta(seconds=86),
},
) as dag:
@task
def start():
print("start")


@task
def sleep_dag_600():
print("start sleep 600")
time.sleep(600)
print("sleep 600 ok")


@task(execution_timeout=timedelta(seconds=96))
def sleep_dag_95():
print("start sleep 95")
time.sleep(95)
print("sleep 95 ok")


@task
def sleep_dag_90():
print("start sleep 90")
time.sleep(90)
print("sleep 90 ok")


@task
def sleep_dag_85():
print("start sleep 85")
time.sleep(85)
print("sleep 85 ok")


@task
def sleep_dag_60():
print("start sleep 60")
time.sleep(60)
print("sleep 60 ok")


@task(execution_timeout=timedelta(seconds=30))
def sleep_dag_30():
print("start sleep 30")
time.sleep(30)
print("sleep 30 ok")


@task
def sleep_dag_6():
print("start sleep 6")
time.sleep(6)
print("sleep 6 ok")


start() >> [
sleep_dag_600(),
sleep_dag_95(),
sleep_dag_90(),
sleep_dag_85(),
sleep_dag_60(),
sleep_dag_30(),
sleep_dag_6(),
]

参考地址