Airflow variables getting updated even if the DAG is not running(即使DAG未运行,气流变量也会更新)
本文介绍了即使DAG未运行,气流变量也会更新的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我从气流变量中读取一个整数变量,然后在每次DAG运行时将该值加1,并再次将其设置为该变量。
但在下面的代码之后,每次刷新页面时,UI处的变量都会更改。 了解导致此类行为的原因
counter = Variable.get('counter')
s = BashOperator(
task_id='echo_start_variable',
bash_command='echo ' + counter,
dag=dag,
)
Variable.set("counter", int(counter) + 1)
sql_query = "SELECT * FROM UNNEST(SEQUENCE({start}, {end}))"
sql_query = sql_query.replace('{start}', start).replace('{end}', end)
submit_query = PythonOperator(
task_id='submit_athena_query',
python_callable=run_athena_query,
op_kwargs={'query': sql_query, 'db': 'db',
's3_output': 's3://s3-path/rohan/date=' + current_date + '/'},
dag=dag)
e = BashOperator(
task_id='echo_end_variable',
bash_command='echo ' + counter,
dag=dag,
)
s >> submit_query >> e
Airflow每30秒处理一次推荐答案文件(默认设置为)这意味着您所有顶级代码都是每30秒运行一次,因此
将导致变量计数器每30秒递增1。
在顶级代码中与变量交互是一种糟糕的做法(不管值的增加问题如何)。它每隔30秒打开一个到Metore数据库的连接,这可能会导致严重问题并使数据库不堪重负。
要获取变量的值,可以使用JJJA:
e = BashOperator(
task_id='echo_end_variable',
bash_command='echo {{ var.value.counter }}',
dag=dag,
)
这是使用变量的一种安全方式,因为只有在执行运算符时才会检索值。
如果要将变量的值增加1,则使用PythonOpeartor:
def increase():
counter = Variable.get('counter')
Variable.set("counter", int(counter) + 1)
increase_op = PythonOperator(
task_id='increase_task',
python_callable=increase,
dag=dag)
只有在运算符运行时,才会执行可调用的python。
这篇关于即使DAG未运行,气流变量也会更新的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
编程基础网
本文标题为:即使DAG未运行,气流变量也会更新
基础教程推荐
猜你喜欢
- pyserial - 可以从线程 a 写入串行端口,是否阻塞从线程 b 读取? 2022-01-01
- 在 Celery 工作人员中捕获 Heroku SIGTERM 以优雅地关 2022-01-01
- 尝试制作WhatsApp机器人 2022-01-01
- 使用生成器和迭代器时 Python 多循环失败 2022-01-01
- numpy float:比算术运算中内置的慢 10 倍? 2022-01-01
- 用 Python 编写 Fortran 无格式文件 2022-01-01
- Discord.py 缺少必需的参数 2022-01-01
- 由Python将MP3转换为MIDI(类型错误:无法加载插件:mtg-Melodia:Melodia) 2022-01-01
- 将 x 轴刻度更改为自定义字符串 2022-01-01
- 与常规 dict 相比,Python manager.dict() 非常慢 2022-01-01
