Skip to content

Commit 2a533c2

Browse files
committed
feat: Introduce functions_framework.aio submodule that support async function execution.
1 parent 0b521ab commit 2a533c2

File tree

16 files changed

+865
-169
lines changed

16 files changed

+865
-169
lines changed

setup.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@
5757
"cloudevents>=1.2.0,<2.0.0",
5858
"Werkzeug>=0.14,<4.0.0",
5959
],
60+
extras_require={
61+
"async": ["starlette>=0.37.0,<1.0.0"],
62+
},
6063
entry_points={
6164
"console_scripts": [
6265
"ff=functions_framework._cli:_cli",
Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
16+
import functools
17+
import inspect
18+
import os
19+
20+
from typing import Any, Awaitable, Callable, Union
21+
22+
from cloudevents.http import from_http
23+
from cloudevents.http.event import CloudEvent
24+
25+
from functions_framework import _function_registry
26+
from functions_framework.exceptions import (
27+
FunctionsFrameworkException,
28+
MissingSourceException,
29+
)
30+
31+
try:
32+
from starlette.applications import Starlette
33+
from starlette.exceptions import HTTPException
34+
from starlette.requests import Request
35+
from starlette.responses import JSONResponse, Response
36+
from starlette.routing import Route
37+
except ImportError:
38+
raise FunctionsFrameworkException(
39+
"Starlette is not installed. Install the framework with the 'async' extra: "
40+
"pip install functions-framework[async]"
41+
)
42+
43+
HTTPResponse = Union[
44+
Response, # Functions can return a full Starlette Response object
45+
str, # Str returns are wrapped in Response(result)
46+
dict[Any, Any], # Dict returns are wrapped in JSONResponse(result)
47+
tuple[Any, int], # Flask-style (content, status_code) supported
48+
None, # None raises HTTPException
49+
]
50+
51+
_FUNCTION_STATUS_HEADER_FIELD = "X-Google-Status"
52+
_CRASH = "crash"
53+
54+
CloudEventFunction = Callable[[CloudEvent], Union[None, Awaitable[None]]]
55+
HTTPFunction = Callable[[Request], Union[HTTPResponse, Awaitable[HTTPResponse]]]
56+
57+
58+
def cloud_event(func: CloudEventFunction) -> CloudEventFunction:
59+
"""Decorator that registers cloudevent as user function signature type."""
60+
_function_registry.REGISTRY_MAP[func.__name__] = (
61+
_function_registry.CLOUDEVENT_SIGNATURE_TYPE
62+
)
63+
if inspect.iscoroutinefunction(func):
64+
65+
@functools.wraps(func)
66+
async def async_wrapper(*args, **kwargs):
67+
return await func(*args, **kwargs)
68+
69+
return async_wrapper
70+
71+
@functools.wraps(func)
72+
def wrapper(*args, **kwargs):
73+
return func(*args, **kwargs)
74+
75+
return wrapper
76+
77+
78+
def http(func: HTTPFunction) -> HTTPFunction:
79+
"""Decorator that registers http as user function signature type."""
80+
_function_registry.REGISTRY_MAP[func.__name__] = (
81+
_function_registry.HTTP_SIGNATURE_TYPE
82+
)
83+
84+
if inspect.iscoroutinefunction(func):
85+
86+
@functools.wraps(func)
87+
async def async_wrapper(*args, **kwargs):
88+
return await func(*args, **kwargs)
89+
90+
return async_wrapper
91+
92+
@functools.wraps(func)
93+
def wrapper(*args, **kwargs):
94+
return func(*args, **kwargs)
95+
96+
return wrapper
97+
98+
99+
async def _crash_handler(request, exc):
100+
headers = {_FUNCTION_STATUS_HEADER_FIELD: _CRASH}
101+
return Response(f"Internal Server Error: {exc}", status_code=500, headers=headers)
102+
103+
104+
def _http_func_wrapper(function, is_async):
105+
@functools.wraps(function)
106+
async def handler(request):
107+
if is_async:
108+
result = await function(request)
109+
else:
110+
result = await asyncio.to_thread(function, request)
111+
if isinstance(result, str):
112+
return Response(result)
113+
elif isinstance(result, dict):
114+
return JSONResponse(result)
115+
elif isinstance(result, tuple) and len(result) == 2:
116+
# Support Flask-style tuple response
117+
content, status_code = result
118+
return Response(content, status_code=status_code)
119+
elif result is None:
120+
raise HTTPException(status_code=500, detail="No response returned")
121+
else:
122+
return result
123+
124+
return handler
125+
126+
127+
def _cloudevent_func_wrapper(function, is_async):
128+
@functools.wraps(function)
129+
async def handler(request):
130+
data = await request.body()
131+
132+
try:
133+
event = from_http(request.headers, data)
134+
except Exception as e:
135+
raise HTTPException(
136+
400, detail=f"Bad Request: Got CloudEvent exception: {repr(e)}"
137+
)
138+
if is_async:
139+
await function(event)
140+
else:
141+
await asyncio.to_thread(function, event)
142+
return Response("OK")
143+
144+
return handler
145+
146+
147+
async def _handle_not_found(request: Request):
148+
raise HTTPException(status_code=404, detail="Not Found")
149+
150+
151+
def create_asgi_app(target=None, source=None, signature_type=None):
152+
"""Create an ASGI application for the function.
153+
154+
Args:
155+
target: The name of the target function to invoke
156+
source: The source file containing the function
157+
signature_type: The signature type of the function
158+
('http', 'event', 'cloudevent', or 'typed')
159+
160+
Returns:
161+
A Starlette ASGI application instance
162+
"""
163+
target = _function_registry.get_function_target(target)
164+
source = _function_registry.get_function_source(source)
165+
166+
if not os.path.exists(source):
167+
raise MissingSourceException(
168+
f"File {source} that is expected to define function doesn't exist"
169+
)
170+
171+
source_module, spec = _function_registry.load_function_module(source)
172+
spec.loader.exec_module(source_module)
173+
function = _function_registry.get_user_function(source, source_module, target)
174+
signature_type = _function_registry.get_func_signature_type(target, signature_type)
175+
176+
is_async = inspect.iscoroutinefunction(function)
177+
routes = []
178+
if signature_type == _function_registry.HTTP_SIGNATURE_TYPE:
179+
http_handler = _http_func_wrapper(function, is_async)
180+
routes.append(
181+
Route(
182+
"/",
183+
endpoint=http_handler,
184+
methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "HEAD", "PATCH"],
185+
),
186+
)
187+
routes.append(Route("/robots.txt", endpoint=_handle_not_found, methods=["GET"]))
188+
routes.append(
189+
Route("/favicon.ico", endpoint=_handle_not_found, methods=["GET"])
190+
)
191+
routes.append(
192+
Route(
193+
"/{path:path}",
194+
http_handler,
195+
methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "HEAD", "PATCH"],
196+
)
197+
)
198+
elif signature_type == _function_registry.CLOUDEVENT_SIGNATURE_TYPE:
199+
cloudevent_handler = _cloudevent_func_wrapper(function, is_async)
200+
routes.append(Route("/{path:path}", cloudevent_handler, methods=["POST"]))
201+
routes.append(Route("/", cloudevent_handler, methods=["POST"]))
202+
elif signature_type == _function_registry.TYPED_SIGNATURE_TYPE:
203+
raise FunctionsFrameworkException(
204+
f"ASGI server does not support typed events (signature type: '{signature_type}'). "
205+
)
206+
elif signature_type == _function_registry.BACKGROUNDEVENT_SIGNATURE_TYPE:
207+
raise FunctionsFrameworkException(
208+
f"ASGI server does not support legacy background events (signature type: '{signature_type}'). "
209+
"Use 'cloudevent' signature type instead."
210+
)
211+
else:
212+
raise FunctionsFrameworkException(
213+
f"Unsupported signature type for ASGI server: {signature_type}"
214+
)
215+
216+
exception_handlers = {
217+
500: _crash_handler,
218+
}
219+
app = Starlette(routes=routes, exception_handlers=exception_handlers)
220+
return app
221+
222+
223+
class LazyASGIApp:
224+
"""
225+
Wrap the ASGI app in a lazily initialized wrapper to prevent initialization
226+
at import-time
227+
"""
228+
229+
def __init__(self, target=None, source=None, signature_type=None):
230+
self.target = target
231+
self.source = source
232+
self.signature_type = signature_type
233+
234+
self.app = None
235+
self._app_initialized = False
236+
237+
async def __call__(self, scope, receive, send):
238+
if not self._app_initialized:
239+
self.app = create_asgi_app(self.target, self.source, self.signature_type)
240+
self._app_initialized = True
241+
await self.app(scope, receive, send)
242+
243+
244+
app = LazyASGIApp()

0 commit comments

Comments
 (0)