Skip to content

Commit d8a3ad2

Browse files
authored
Refactor serve_logs with FastAPI (#52581)
* Refactor serve_logs with FastAPI Fix test_invalid_characters_handled Refactor with StaticFiles * Remove details for 403 forbidden response * Replace Gunicorn with Uvicorn * Fix uvicorn multiple workers error * Remove flask and gunicorn from airflow-core/pyproject.toml * Remove old serve_logs module and add gunicorn to spelling_wordlist
1 parent 07b5b7f commit d8a3ad2

File tree

6 files changed

+178
-135
lines changed

6 files changed

+178
-135
lines changed

airflow-core/pyproject.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,6 @@ dependencies = [
8080
# 0.115.10 fastapi was a bad release that broke our API's and static checks.
8181
# Related fastapi issue here: https://github.com/fastapi/fastapi/discussions/13431
8282
"fastapi[standard]>=0.115.0,!=0.115.10",
83-
# We could get rid of flask and gunicorn if we replace serve_logs with a starlette + unicorn
84-
"flask>=2.1.1",
85-
# We could get rid of flask and gunicorn if we replace serve_logs with a starlette + unicorn
86-
"gunicorn>=20.1.0",
8783
"httpx>=0.25.0",
8884
'importlib_metadata>=6.5;python_version<"3.12"',
8985
'importlib_metadata>=7.0;python_version>="3.12"',
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
from airflow.utils.serve_logs.core import serve_logs
20+
from airflow.utils.serve_logs.log_server import create_app
21+
22+
__all__ = ["serve_logs", "create_app"]
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
"""Serve logs process."""
18+
19+
from __future__ import annotations
20+
21+
import logging
22+
import socket
23+
import sys
24+
25+
import uvicorn
26+
27+
from airflow.configuration import conf
28+
29+
logger = logging.getLogger(__name__)
30+
31+
32+
def serve_logs(port=None):
33+
"""Serve logs generated by Worker."""
34+
# setproctitle causes issue on Mac OS: https://github.com/benoitc/gunicorn/issues/3021
35+
os_type = sys.platform
36+
if os_type == "darwin":
37+
logger.debug("Mac OS detected, skipping setproctitle")
38+
else:
39+
from setproctitle import setproctitle
40+
41+
setproctitle("airflow serve-logs")
42+
43+
port = port or conf.getint("logging", "WORKER_LOG_SERVER_PORT")
44+
45+
# If dual stack is available and IPV6_V6ONLY is not enabled on the socket
46+
# then when IPV6 is bound to it will also bind to IPV4 automatically
47+
if getattr(socket, "has_dualstack_ipv6", lambda: False)():
48+
host = "::" # ASGI uses `::` syntax for IPv6 binding instead of the `[::]` notation used in WSGI, while preserving the `[::]` format in logs
49+
serve_log_uri = f"http://[::]:{port}"
50+
else:
51+
host = "0.0.0.0"
52+
serve_log_uri = f"http://{host}:{port}"
53+
54+
logger.info("Starting log server on %s", serve_log_uri)
55+
56+
# Use uvicorn directly for ASGI applications
57+
uvicorn.run("airflow.utils.serve_logs.log_server:app", host=host, port=port, workers=2, log_level="info")
58+
# Note: if we want to use more than 1 workers, we **can't** use the instance of FastAPI directly
59+
# This is way we split the instantiation of log server to a separate module
60+
#
61+
# https://github.com/encode/uvicorn/blob/374bb6764e8d7f34abab0746857db5e3d68ecfdd/docs/deployment/index.md?plain=1#L50-L63
62+
63+
64+
if __name__ == "__main__":
65+
serve_logs()

airflow-core/src/airflow/utils/serve_logs.py renamed to airflow-core/src/airflow/utils/serve_logs/log_server.py

Lines changed: 75 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,23 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17-
"""Serve logs process."""
17+
"""Log server written in FastAPI."""
1818

1919
from __future__ import annotations
2020

2121
import logging
2222
import os
23-
import socket
24-
import sys
25-
from collections import namedtuple
23+
from typing import cast
2624

27-
import gunicorn.app.base
28-
from flask import Flask, abort, request, send_from_directory
25+
from fastapi import FastAPI, HTTPException, Request, status
26+
from fastapi.staticfiles import StaticFiles
2927
from jwt.exceptions import (
3028
ExpiredSignatureError,
3129
ImmatureSignatureError,
3230
InvalidAudienceError,
3331
InvalidIssuedAtError,
3432
InvalidSignatureError,
3533
)
36-
from werkzeug.exceptions import HTTPException
3734

3835
from airflow.api_fastapi.auth.tokens import JWTValidator, get_signing_key
3936
from airflow.configuration import conf
@@ -43,74 +40,55 @@
4340
logger = logging.getLogger(__name__)
4441

4542

46-
def create_app():
47-
flask_app = Flask(__name__, static_folder=None)
48-
leeway = conf.getint("webserver", "log_request_clock_grace", fallback=30)
49-
log_directory = os.path.expanduser(conf.get("logging", "BASE_LOG_FOLDER"))
50-
log_config_class = conf.get("logging", "logging_config_class")
51-
if log_config_class:
52-
logger.info("Detected user-defined logging config. Attempting to load %s", log_config_class)
53-
try:
54-
logging_config = import_string(log_config_class)
55-
try:
56-
base_log_folder = logging_config["handlers"]["task"]["base_log_folder"]
57-
except KeyError:
58-
base_log_folder = None
59-
if base_log_folder is not None:
60-
log_directory = base_log_folder
61-
logger.info(
62-
"Successfully imported user-defined logging config. Flask App will serve log from %s",
63-
log_directory,
64-
)
65-
else:
66-
logger.warning(
67-
"User-defined logging config does not specify 'base_log_folder'. "
68-
"Flask App will use default log directory %s",
69-
base_log_folder,
70-
)
71-
except Exception as e:
72-
raise ImportError(f"Unable to load {log_config_class} due to error: {e}")
73-
signer = JWTValidator(
74-
issuer=None,
75-
secret_key=get_signing_key("api", "secret_key"),
76-
algorithm="HS512",
77-
leeway=leeway,
78-
audience="task-instance-logs",
79-
)
43+
class JWTAuthStaticFiles(StaticFiles):
44+
"""StaticFiles with JWT authentication."""
8045

81-
# Prevent direct access to the logs port
82-
@flask_app.before_request
83-
def validate_pre_signed_url():
46+
# reference from https://github.com/fastapi/fastapi/issues/858#issuecomment-876564020
47+
48+
def __init__(self, *args, **kwargs) -> None:
49+
super().__init__(*args, **kwargs)
50+
51+
async def __call__(self, scope, receive, send) -> None:
52+
request = Request(scope, receive)
53+
await self.validate_jwt_token(request)
54+
await super().__call__(scope, receive, send)
55+
56+
async def validate_jwt_token(self, request: Request):
57+
# we get the signer from the app state instead of creating a new instance for each request
58+
signer = cast("JWTValidator", request.app.state.signer)
8459
try:
8560
auth = request.headers.get("Authorization")
8661
if auth is None:
8762
logger.warning("The Authorization header is missing: %s.", request.headers)
88-
abort(403)
89-
payload = signer.validated_claims(auth)
63+
raise HTTPException(
64+
status_code=status.HTTP_403_FORBIDDEN, detail="Authorization header missing"
65+
)
66+
payload = await signer.avalidated_claims(auth)
9067
token_filename = payload.get("filename")
91-
request_filename = request.view_args["filename"]
68+
# Extract filename from url path
69+
request_filename = request.url.path.lstrip("/log/")
9270
if token_filename is None:
9371
logger.warning("The payload does not contain 'filename' key: %s.", payload)
94-
abort(403)
72+
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
9573
if token_filename != request_filename:
9674
logger.warning(
9775
"The payload log_relative_path key is different than the one in token:"
9876
"Request path: %s. Token path: %s.",
9977
request_filename,
10078
token_filename,
10179
)
102-
abort(403)
80+
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
10381
except HTTPException:
10482
raise
10583
except InvalidAudienceError:
10684
logger.warning("Invalid audience for the request", exc_info=True)
107-
abort(403)
85+
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
10886
except InvalidSignatureError:
10987
logger.warning("The signature of the request was wrong", exc_info=True)
110-
abort(403)
88+
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
11189
except ImmatureSignatureError:
11290
logger.warning("The signature of the request was sent from the future", exc_info=True)
113-
abort(403)
91+
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
11492
except ExpiredSignatureError:
11593
logger.warning(
11694
"The signature of the request has expired. Make sure that all components "
@@ -119,78 +97,64 @@ def validate_pre_signed_url():
11997
get_docs_url("configurations-ref.html#secret-key"),
12098
exc_info=True,
12199
)
122-
abort(403)
100+
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
123101
except InvalidIssuedAtError:
124102
logger.warning(
125-
"The request was issues in the future. Make sure that all components "
103+
"The request was issued in the future. Make sure that all components "
126104
"in your system have synchronized clocks. "
127105
"See more at %s",
128106
get_docs_url("configurations-ref.html#secret-key"),
129107
exc_info=True,
130108
)
131-
abort(403)
109+
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
132110
except Exception:
133111
logger.warning("Unknown error", exc_info=True)
134-
abort(403)
135-
136-
@flask_app.route("/log/<path:filename>")
137-
def serve_logs_view(filename):
138-
return send_from_directory(log_directory, filename, mimetype="application/json", as_attachment=False)
139-
140-
return flask_app
141-
142-
143-
GunicornOption = namedtuple("GunicornOption", ["key", "value"])
144-
112+
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
145113

146-
class StandaloneGunicornApplication(gunicorn.app.base.BaseApplication):
147-
"""
148-
Standalone Gunicorn application/serve for usage with any WSGI-application.
149114

150-
Code inspired by an example from the Gunicorn documentation.
151-
https://github.com/benoitc/gunicorn/blob/cf55d2cec277f220ebd605989ce78ad1bb553c46/examples/standalone_app.py
152-
153-
For details, about standalone gunicorn application, see:
154-
https://docs.gunicorn.org/en/stable/custom.html
155-
"""
156-
157-
def __init__(self, app, options=None):
158-
self.options = options or []
159-
self.application = app
160-
super().__init__()
161-
162-
def load_config(self):
163-
for option in self.options:
164-
self.cfg.set(option.key.lower(), option.value)
165-
166-
def load(self):
167-
return self.application
168-
169-
170-
def serve_logs(port=None):
171-
"""Serve logs generated by Worker."""
172-
# setproctitle causes issue on Mac OS: https://github.com/benoitc/gunicorn/issues/3021
173-
os_type = sys.platform
174-
if os_type == "darwin":
175-
logger.debug("Mac OS detected, skipping setproctitle")
176-
else:
177-
from setproctitle import setproctitle
178-
179-
setproctitle("airflow serve-logs")
180-
wsgi_app = create_app()
115+
def create_app():
116+
leeway = conf.getint("webserver", "log_request_clock_grace", fallback=30)
117+
log_directory = os.path.expanduser(conf.get("logging", "BASE_LOG_FOLDER"))
118+
log_config_class = conf.get("logging", "logging_config_class")
119+
if log_config_class:
120+
logger.info("Detected user-defined logging config. Attempting to load %s", log_config_class)
121+
try:
122+
logging_config = import_string(log_config_class)
123+
try:
124+
base_log_folder = logging_config["handlers"]["task"]["base_log_folder"]
125+
except KeyError:
126+
base_log_folder = None
127+
if base_log_folder is not None:
128+
log_directory = base_log_folder
129+
logger.info(
130+
"Successfully imported user-defined logging config. FastAPI App will serve log from %s",
131+
log_directory,
132+
)
133+
else:
134+
logger.warning(
135+
"User-defined logging config does not specify 'base_log_folder'. "
136+
"FastAPI App will use default log directory %s",
137+
base_log_folder,
138+
)
139+
except Exception as e:
140+
raise ImportError(f"Unable to load {log_config_class} due to error: {e}")
181141

182-
port = port or conf.getint("logging", "WORKER_LOG_SERVER_PORT")
142+
fastapi_app = FastAPI()
143+
fastapi_app.state.signer = JWTValidator(
144+
issuer=None,
145+
secret_key=get_signing_key("api", "secret_key"),
146+
algorithm="HS512",
147+
leeway=leeway,
148+
audience="task-instance-logs",
149+
)
183150

184-
# If dual stack is available and IPV6_V6ONLY is not enabled on the socket
185-
# then when IPV6 is bound to it will also bind to IPV4 automatically
186-
if getattr(socket, "has_dualstack_ipv6", lambda: False)():
187-
bind_option = GunicornOption("bind", f"[::]:{port}")
188-
else:
189-
bind_option = GunicornOption("bind", f"0.0.0.0:{port}")
151+
fastapi_app.mount(
152+
"/log",
153+
JWTAuthStaticFiles(directory=log_directory, html=False),
154+
name="serve_logs",
155+
)
190156

191-
options = [bind_option, GunicornOption("workers", 2)]
192-
StandaloneGunicornApplication(wsgi_app, options).run()
157+
return fastapi_app
193158

194159

195-
if __name__ == "__main__":
196-
serve_logs()
160+
app = create_app()

0 commit comments

Comments
 (0)