airflow使用SimpleHttpOperator实现http调用任务
2024-08-29 21:05:48
使用SimpleHttpOperator
作为处理器的时候,会发现默认访问的地址www.google.com
端口为443
例如下面这样定义的任务
task = SimpleHttpOperator(
task_id='get_op',
http_conn_id='http_test',
method='GET',
endpoint='test1',
data={},
headers={},
dag=dag)
在运行的时候会抛出如下异常:
Subtask: During handling of the above exception, another exception occurred:
......
File "/data1/bigdata/anaconda3.5/lib/python3.6/site-packages/airflow/operators/http_operator.py", line 82, in execute
self.extra_options)
File "/data1/bigdata/anaconda3.5/lib/python3.6/site-packages/airflow/hooks/http_hook.py", line 86, in run
return self.run_and_check(session, prepped_request, extra_options)
File "/data1/bigdata/anaconda3.5/lib/python3.6/site-packages/airflow/hooks/http_hook.py", line 102, in run_and_check
allow_redirects=extra_options.get("allow_redirects", True))
......
Subtask: requests.exceptions.ConnectionError: HTTPSConnectionPool(host='www.google.com', port=443): Max retries exceeded with url: /test1 (Caused by NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x2ac347314940>: Failed to establish a new connection: [Errno 101] Network is unreachable',))
说明http请求的host需要进行配置,不然默认访问谷歌域名.
查看源码:
http_hook.py
def get_conn(self, headers):
......
conn = self.get_connection(self.http_conn_id)
session = requests.Session()
self.base_url = conn.host
if not self.base_url.startswith('http'):
self.base_url = 'http://' + self.base_url
......
base_hook.py
def get_connection(cls, conn_id):
environment_uri = os.environ.get(CONN_ENV_PREFIX + conn_id.upper())
conn = None
if environment_uri:
conn = Connection(conn_id=conn_id, uri=environment_uri)
else:
conn = random.choice(cls.get_connections(conn_id))
if conn.host:
logging.info("Using connection to: " + conn.host)
return conn
通过源码得知,airflow会先读取环境变量看是否有自定义uri,如果有的话使用自定义的uri,如果没有的话则使用内置的默认值。
而环境变量的定义规则是AIRFLOW_CONN_前缀加上http_conn_id的大写形式
例如上述例子中的任务,可以通过设置环境变量export AIRFLOW_CONN_HTTP_TEST=http://localhost:8080
来实现。
同时也可以在python代码中动态设置:
os.environ['AIRFLOW_CONN_HTTP_TEST']='http://localhost:8080'
一般推荐在代码中动态设置.
SimpleHttpOperator
的几种常见用法如下(官方示例):
t1 = SimpleHttpOperator(
task_id='post_op',
endpoint='api/v1.0/nodes',
data=json.dumps({"priority": 5}),
headers={"Content-Type": "application/json"},
response_check=lambda response: True if len(response.json()) == 0 else False,
dag=dag)
t5 = SimpleHttpOperator(
task_id='post_op_formenc',
endpoint='nodes/url',
data="name=Joe",
headers={"Content-Type": "application/x-www-form-urlencoded"},
dag=dag)
t2 = SimpleHttpOperator(
task_id='get_op',
method='GET',
endpoint='api/v1.0/nodes',
data={"param1": "value1", "param2": "value2"},
headers={},
dag=dag)
t3 = SimpleHttpOperator(
task_id='put_op',
method='PUT',
endpoint='api/v1.0/nodes',
data=json.dumps({"priority": 5}),
headers={"Content-Type": "application/json"},
dag=dag)
t4 = SimpleHttpOperator(
task_id='del_op',
method='DELETE',
endpoint='api/v1.0/nodes',
data="some=data",
headers={"Content-Type": "application/x-www-form-urlencoded"},
dag=dag)
完整示例如下:
import os
from datetime import timedelta, datetime
import pytz
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.models import DAG
default_args = {
'owner': 'cord',
'depends_on_past': False,
'wait_for_downstream': True,
'execution_timeout': timedelta(minutes=3),
'email': ['123456789@qq.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
tz = pytz.timezone('Asia/Shanghai')
dt = datetime(2018, 7, 26, 12, 20, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)
os.environ['AIRFLOW_CONN_HTTP_TEST']='http://localhost:9090'
dag = DAG(
'bm01',
default_args=default_args,
description='my DAG',
schedule_interval='*/2 * * * *',
start_date=utc_dt
)
task1 = SimpleHttpOperator(
task_id='get_op1',
http_conn_id='http_test',
method='GET',
endpoint='test1',
data={},
headers={},
dag=dag)
task2 = SimpleHttpOperator(
task_id='get_op2',
http_conn_id='http_test',
method='GET',
endpoint='test2',
data={},
headers={},
dag=dag)
task1 >> task2
另外,这里SimpleHttpOperator
发出的HTTP请求是阻塞的,也就是说在依赖任务中,只有上游任务执行完成返回之后才会去执行下游任务。
最新文章
- fir.im Weekly - 我回来了
- android XMl 解析神奇xstream 六: 把集合list 转化为 XML文档
- C51 库函数(2)
- 【技术引擎——汇聚IT思想之间的碰撞】
- JS实现图片翻书效果
- hdu 3333 Turing Tree(线段树+离散化)
- php实现断点下载
- iOS 获取公历、农历日期的年月日
- linux学习(八)chmod、chown、umask、lsattr、chattr
- QPainterPath 不规则提示框
- 关于mysql中的count()函数
- 调用Android自带浏览器打开网页
- exec与match方法的区别
- Linux部署Web应用程序超链接下载中文名称文件404问题解决办法
- GitHub + circleCI 自动构建/自动部署 应用
- sql: postgreSQL sql script
- 通过Nrgok映射外网调试微信
- Linux 创建Bridge
- c++之cin/cin.get/cin.getline()详解
- js与原生进行交互
热门文章
- Flutter学习笔记(15)--MaterialApp应用组件及routes路由详解
- JVM调优前戏之JDK命令行工具---jhat
- 安装Windows Server 2008
- macOS 下的 MySQL 8.0.17 安装与简易配置
- VR、AR、MR、CR 与 AI与SaaS、CRM、MRP与B2B、B2C、C2C、O2O、P2P
- Requests+正则表达式爬取猫眼电影(TOP100榜)
- 详解golang net之transport
- 关于Socket、TCP/IP、HTTP、FTP及网络编程
- HDU 6394 Tree 分块 || lct
- POJ-1511 Invitation Cards( 最短路,spfa )