百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

大数据调度平台 Airflow(六):Airflow Operators 及案例

csdh11 2025-03-26 11:12 20 浏览

#头条创作挑战赛#

Airflow Operators及案例

Airflow中最重要的还是各种Operator,其允许生成特定类型的任务,这个任务在实例化时称为DAG中的任务节点,所有的Operator均派生自BaseOparator,并且继承了许多属性和方法。关于BaseOperator的参数可以参照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator

BaseOperator中常用参数如下:

task_id(str) : 唯一task_id标记

owner(str):任务的所有者,建议使用linux用户名

email(str or liststr):出问题时,发送报警Email的地址,可以填写多个,用逗号隔开。

email_on_retry(bool):当任务重试时是否发送电子邮件

email_on_failure(bool):当任务执行失败时是否发送电子邮件

retries(int):在任务失败之前应该重试的次数

retry_delay(datetime.timedelta):重试间隔,必须是timedelta对象

start_date(datetime.datetime):DAG开始执行时间,这个参数必须是datetime对象,不可以使用字符串。

end_date(datetime.datetime):DAG运行结束时间,任务启动后一般都会一直执行下去,一般不设置此参数。

depends_on_past(bool,默认False):是否依赖于过去,如果为True,那么必须之前的DAG调度成功了,现在的DAG调度才能执行。

dag(airflow.models.DAG):指定的dag。

execution_timeout(datetime.timedelta):执行此任务实例允许的最长时间,超过最长时间则任务失败。

trigger_rule(str):定义依赖的触发规则,包括选项如下:{ all_success | all_failed | all_done | one_success | one_failed | none_failed | none_failed_or_skipped | none_skipped | dummy(无条件执行)} default is all_success。

一、BashOperator及调度Shell命令及脚本

BashOperator主要执行bash脚本或命令,BashOperator参数如下:

bash_command(str):要执行的命令或脚本(脚本必须是.sh结尾)


  • BashOperator 调度Shell命令案例
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner':'zhangsan',
    'start_date':datetime(2021, 9, 23),
    'email':'kettle_test1@163.com', #pwd:kettle123456
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_shell_cmd',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

t1=BashOperator(
    task_id='print_date',
    bash_command='date',
    dag = dag
)

t2=BashOperator(
    task_id='print_helloworld',
    bash_command='echo "hello world!"',
    dag=dag
)

t3=BashOperator(
    task_id='tempplated',
    bash_command="""
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ params.name}}"
        echo "{{ params.age}}"
    {% endfor %}
    """,
    params={'name':'wangwu','age':10},
    dag=dag
)

t1 >> t2 >> t3


注意在t3中使用了Jinja模板,“{% %}”内部是for标签,用于循环操作,但是必须以{% endfor %}结束。“{{}}”内部是变量,其中ds是执行日期,是airflow的宏变量,params.name和params.age是自定义变量。

在default_args中的email是指当DAG执行失败时,发送邮件到指定邮箱,想要使用airflow发送邮件,需要在$AIRFLOW_HOME/airflow.cfg中配置如下内容:

[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user =kettle_test2
# Example: smtp_password = airflow
smtp_password =VIOFSYMFDIKKIUEA
smtp_port = 25
smtp_mail_from =kettle_test2@163.com
smtp_timeout = 30
smtp_retry_limit = 5


此外,关于邮箱的信息如下:

邮箱1:kettle_test1@163.com password:kettle123456

邮箱2:kettle_test2@163.com password:kettle123456

163邮箱SMTP服务器地址: smtp.163.com 端口:25

配置163邮箱时需要开启“POP3/SMTP/IMAP服务”服务,设置如下:

kettle_test1@163.com FECJJVEPGPTZJYMQ

kettle_test2@163.com VIOFSYMFDIKKIUEA

  1. BashOperator 调度Shell脚本案例

准备如下两个shell脚本,将以下两个脚本放在$AIRFLOW_HOME/dags目录下,

BashOperator默认执行脚本时,默认从/tmp/airflow**临时目录查找对应脚本,由于临时目录名称不定,这里建议执行脚本时,在“bash_command”中写上绝对路径。如果要写相对路径,可以将脚本放在/tmp目录下,在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。

first_shell.sh

#!/bin/bash

dt=$1

echo "==== execute first shell ===="

echo "---- first : time is ${dt}"


second_shell.sh

#!/bin/bash
dt=$1
echo "==== execute second shell ===="
echo "---- second : time is ${dt}"


编写airflow python 配置:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner':'zhangsan',
    'start_date':datetime(2021, 9, 23),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_shell_sh',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=BashOperator(
    task_id='first',
    #脚本路径建议写绝对路径
    bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
    dag = dag
)

second=BashOperator(
    task_id='second',
    #脚本路径建议写绝对路径
    bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
    dag=dag
)

first >> second


执行结果:

特别注意:在“bash_command”中写执行脚本时,一定要在脚本后跟上空格,有没有参数都要跟上空格,否则会找不到对应的脚本。如下:

二、SSHOperator及调度远程Shell脚本

在实际的调度任务中,任务脚本大多分布在不同的机器上,我们可以使用SSHOperator来调用远程机器上的脚本任务。SSHOperator使用ssh协议与远程主机通信,需要注意的是SSHOperator调用脚本时并不会读取用户的配置文件,最好在脚本中加入以下代码以便脚本被调用时会自动读取当前用户的配置信息:

#Ubunto系统
. ~/.profile

#CentoOS或者RedHat系统
. ~/.bashrc


关于SSHOperator参数详解可以参照:

airflow.providers.ssh.operators.ssh — apache-airflow-providers-ssh Documentation

SSHOperator的常用参数如下:

ssh_conn_id(str):ssh连接id,名称自取,需要在airflow webserver界面配置,具体配置参照案例。
remote_host(str):远程连接节点host,如果配置,可替换ssh_conn_id中配置的远程host,可选。
command(str):在远程主机上执行的命令或脚本。


  • SSHOperator调度远程节点脚本案例

按照如下步骤来使用SSHOperator调度远程节点脚本:

1、安装“
apache-airflow-providers-ssh ”provider package

首先停止airflow webserver与scheduler,在node4节点切换到python37环境,安装ssh Connection包。另外,关于Providers package安装方式可以参照如下官网地址:

https://airflow.apache.org/docs/apache-airflow-providers/packages-ref.html#apache-airflow-providers-ssh

#切换Python37环境
[root@node4 ~]# conda activate python37

#安装ssh provider package
(python37) [root@node4 ~]# pip install apache-airflow-providers-ssh==2.1.1

#启动airflow
(python37) [root@node4 ~]# airflow webserver --port 8080
(python37) [root@node4 ~]# airflow scheduler


2、配置SSH Connection连接

登录airflow webui ,选择“Admin”->“Connections”:

点击“+”添加连接,这里host连接的是node5节点:

3、准备远程执行脚本

在node5节点/root路径下创建first_shell.sh,内容如下:

#!/bin/bash
echo "==== execute first shell ===="


在node3节点/root路径下创建second_shell.sh,内容如下:

#!/bin/bash
echo "==== execute second shell ===="


4、编写DAG python配置文件

注意在本地开发工具编写python配置时,需要用到SSHOperator,需要在本地对应的python环境中安装对应的provider package。

C:\Users\wubai>d:
D:\>cd d:\ProgramData\Anaconda3\envs\python37\Scripts
d:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow-providers-ssh==2.1.1


python配置文件:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.ssh.operators.ssh import SSHOperator

default_args = {
    'owner':'lisi',
    'start_date':datetime(2021, 9, 23),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_remote_shell',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=SSHOperator(
    task_id='first',
    ssh_conn_id='ssh-node5',# 配置在Airflow webui Connection中配置的SSH Conn id
    command='sh /root/first_shell.sh ',
    dag = dag
)

second=SSHOperator(
    task_id='second',
    ssh_conn_id='ssh-node5',# 配置在Airflow webui Connection中配置的SSH Conn id
    command='sh /root/second_shell.sh ',
    remote_host="192.168.179.6",#如果配置remote_host ,将会替换Connection中的SSH 配置的host
    dag=dag
)

first >> second


5、调度python配置脚本

将以上配置好的python文件上传至node4节点$AIRFLOW_HOME/dags下,重启Airflow websever与scheduler,登录webui,开启调度:

调度结果如下:

三、HiveOperator及调度HQL

可以通过HiveOperator直接操作Hive SQL ,HiveOperator的参数如下:

hql(str):需要执行的Hive SQL。
hive_cli_conn_id(str):连接Hive的conn_id,在airflow webui connection中配置的。


想要在airflow中使用HiveOperator调用Hive任务,首先需要安装以下依赖并配置Hive Metastore:

#切换Python37环境
[root@node4 ~]# conda activate python37

#安装hive provider package
(python37) [root@node4 ~]# pip install apache-airflow-providers-apache-hive==2.0.2

#启动airflow
(python37) [root@node4 ~]# airflow webserver --port 8080
(python37) [root@node4 ~]# airflow scheduler


登录Airflow webui并设置Hive Metastore,登录后找到”Admin”->”Connections”,点击“+”新增配置:

  • HiveOperator调度HQL案例

1、启动Hive,准备表

启动HDFS、Hive Metastore,在Hive中创建以下三张表:

create table person_info(id int,name string,age int) row format delimited fields terminated by '\t';

create table score_info(id int,name string,score int) row format delimited fields terminated by '\t';


向表 person_info加载如下数据:

1 zs 18

2 ls 19

3 ww 20

向表score_info加载如下数据:

1 zs 100

2 ls 200

3 ww 300

2、在node4节点配置Hive 客户端

由于Airflow 使用HiveOperator时需要在Airflow安装节点上有Hive客户端,所以需要在node4节点上配置Hive客户端。

将Hive安装包上传至node4 “/software”下解压,并配置Hive环境变量

#在/etc/profile文件最后配置Hive环境变量
export HIVE_HOME=/software/hive-1.2.1
export PATH=$PATH:$HIVE_HOME/bin

#使环境变量生效
source /etc/profile


修改
HIVE_HOME/conf/hive-site.xml ,写入如下内容:


 
  hive.metastore.warehouse.dir
  /user/hive/warehouse
 
 
  hive.metastore.local
  false
 
 
  hive.metastore.uris
  thrift://node1:9083
 


3、编写DAG python配置文件

注意在本地开发工具编写python配置时,需要用到HiveOperator,需要在本地对应的python环境中安装对应的provider package。

C:\Users\wubai>d:
D:\>cd d:\ProgramData\Anaconda3\envs\python37\Scripts
d:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow-providers-apache-hive==2.0.2
注意:这里本地安装也有可能缺少对应的C++环境,我们也可以不安装,直接跳过也可以。


Python配置文件:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.hive.operators.hive import HiveOperator

default_args = {
    'owner':'wangwu',
    'start_date':datetime(2021, 9, 23),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_hive_sql',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=HiveOperator(
    task_id='person_info',
    hive_cli_conn_id="node1-hive-metastore",
    hql='select id,name,age from person_info',
    dag = dag
)

second=HiveOperator(
    task_id='score_info',
    hive_cli_conn_id="node1-hive-metastore",
    hql='select id,name,score from score_info',
    dag=dag
)

third=HiveOperator(
    task_id='join_info',
    hive_cli_conn_id="node1-hive-metastore",
    hql='select a.id,a.name,a.age,b.score from person_info a join score_info b on a.id = b.id',
    dag=dag
)

first >> second >>third


4、调度python配置脚本

将以上配置好的python文件上传至node4节点$AIRFLOW_HOME/dags下,重启Airflow websever与scheduler,登录webui,开启调度:

调度结果如下:

四、PythonOperator

PythonOperator可以调用Python函数,由于Python基本可以调用任何类型的任务,如果实在找不到合适的Operator,将任务转为Python函数,使用PythonOperator即可。

关于PythonOperator常用参数如下,更多参数可以查看官网:airflow.operators.python — Airflow Documentation

python_callable(python callable):调用的python函数
op_kwargs(dict):调用python函数对应的 **args 参数,dict格式,使用参照案例。
op_args(list):调用python函数对应的 *args 参数,多个封装到一个tuple中,list格式,使用参照案例。


  • PythonOperator调度案例
import random
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

# python中 *  关键字参数允许你传入0个或任意个参数,这些可变参数在函数调用时自动组装为一个tuple。
# python中 ** 关键字参数允许你传入0个或任意个含参数名的参数,这些关键字参数在函数内部自动组装为一个dict。
def print__hello1(*a,**b):
    print(a)
    print(b)
    print("hello airflow1")
# 返回的值只会打印到日志中
    return{"sss1":"xxx1"}

def print__hello2(random_base):
    print(random_base)
    print("hello airflow2")
# 返回的值只会打印到日志中
    return{"sss2":"xxx2"}

default_args = {
    'owner':'maliu',
    'start_date':datetime(2021, 10, 1),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_pythoncode',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=PythonOperator(
    task_id='first',
    #填写  print__hello1 方法时,不要加上“()”
    python_callable=print__hello1,
    # op_args 对应 print_hello1 方法中的a参数
    op_args=[1,2,3,"hello","world"],
    # op_kwargs 对应 print__hello1 方法中的b参数
    op_kwargs={"id":"1","name":"zs","age":18},
    dag = dag
)

second=PythonOperator(
    task_id='second',
    #填写  print__hello2 方法时,不要加上“()”
    python_callable=print__hello2,
    # random_base 参数对应 print_hello2 方法中参数“random_base”
    op_kwargs={"random_base":random.randint(0,9)},
    dag=dag
)

first >> second

相关推荐

NUS邵林团队发布DexSinGrasp基于强化学习实现物体分离与抓取统一

本文的作者均来自新加坡国立大学LinSLab。本文的共同第一作者为新加坡国立大学实习生许立昕和博士生刘子轩,主要研究方向为机器人学习和灵巧操纵,其余作者分别为硕士生桂哲玮、实习生郭京翔、江泽宇以及...

「PLC进阶」如何通过编写SCL语言程序实现物料分拣?

01、前言SCL作为IEC61131-3编程语言的一种,由于其高级语言的特性,特别适合复杂运算、复杂数学函数应用的场合。本文以FactoryIO软件中的物料分拣案例作为硬件基础,介绍如何通过SCL来实...

zk源码—5.请求的处理过程一(http1.1请求方法)

大纲1.服务器的请求处理链...

自己动手从0开始实现一个分布式 RPC 框架

前言为什么要自己写一个RPC框架,我觉得从个人成长上说,如果一个程序员能清楚的了解RPC框架所具备的要素,掌握RPC框架中涉及的服务注册发现、负载均衡、序列化协议、RPC通信协议、Socket通信、异...

MLSys’25 | 极低内存消耗:用SGD的内存成本实现AdamW的优化性能

AIxiv专栏是机器之心发布学术、技术内容的栏目。过去数年,机器之心AIxiv专栏接收报道了2000多篇内容,覆盖全球各大高校与企业的顶级实验室,有效促进了学术交流与传播。如果您有优秀的工作想要分享,...

线程池误用导致系统假死(线程池会自动销毁吗)

背景介绍在项目中,为了提高系统性能使用了RxJava实现异步方案,其中异步线程池是自建的。但是当QPS稍微增大之后却发现系统假死、无响应和返回,调用方出现大量超时现象。但是通过监控发现,系统线程数正常...

大型乘用车工厂布局规划(六大乘用车基地)

乘用车工厂的布局规划直接影响生产效率、物流成本、安全性和未来扩展能力。合理的布局应确保生产流程顺畅、物流高效、资源优化,并符合现代化智能制造和绿色工厂的要求。以下是详细的工厂布局规划要点:1.工厂布...

西门子 S7-200 SMART PLC 连接Factory IO的方法

有很多同学不清楚如何西门子200smart如何连接FactoryIO,本教程为您提供了如何使用西门子S7-200SMARTPLC连接FactoryIO的说明。设置PC和PLC之间的...

西门子博图高级仿真软件的应用(西门子博途软件仿真)

1.博图高级仿真软件(S7-PLCSIMAdvancedV2.0)S7-PLCSIMAdvancedV2.0包含大量仿真功能,通过创建虚拟控制器对S7-1500和ET200SP控制器进行仿真...

PLC编程必踩的6大坑——请对号入座,评论区见

一、缺乏整体规划:面条式代码问题实例:某快递分拣线项目初期未做流程图设计,工程师直接开始编写传送带控制程序。后期增加质检模块时发现I/O地址冲突,电机启停逻辑与传感器信号出现3处死循环,导致项目延期2...

统信UOS无需开发者模式安装软件包
统信UOS无需开发者模式安装软件包

原文链接:统信UOS无需开发者模式安装软件包...

2025-05-05 14:55 csdh11

100个Java工具类之76:数据指纹DigestUtils

为了提高数据安全性,保证数据的完整性和真实性,DigestUtils应运而生。正确恰当地使用DigestUtils的加密算法,可以实现数据的脱敏,防止数据泄露或篡改。...

麒麟KYLINIOS软件仓库搭建02-软件仓库添加新的软件包

#秋日生活打卡季#原文链接:...

Java常用工具类技术文档(java中工具类的作用)

一、概述Java工具类(UtilityClasses)是封装了通用功能的静态方法集合,能够简化代码、提高开发效率。本文整理Java原生及常用第三方库(如ApacheCommons、GoogleG...

软路由的用法(自动追剧配置)(软路由教学)

本内容来源于@什么值得买APP,观点仅代表作者本人|作者:值友98958248861环境和需求...