2# +==== BEGIN CatFeeder =================+
5# ...............)..(.')
7# ...............\(__)|
8# Inspired by Joan Stark
9# source https://www.asciiart.eu/
13# FILE: openapi_builder.py
14# CREATION DATE: 23-01-2026
15# LAST Modified: 3:12:29 24-01-2026
17# OpenAPI schema builder for FastAPI route documentation
19# COPYRIGHT: (c) Cat Feeder
20# PURPOSE: Handles all OpenAPI/documentation related functionality for PathManager
23# +==== END CatFeeder =================+
29from typing
import Dict, Any, List, Callable, Optional
30from display_tty
import Disp, initialise_logger
34 """Handles OpenAPI schema generation and metadata extraction for endpoints."""
36 disp: Disp = initialise_logger(__qualname__,
False)
39 self.
disp.update_disp_debug(debug)
40 self.
disp.log_debug(
"OpenAPIBuilder initialized")
43 """Extract metadata from endpoint function for FastAPI documentation."""
44 endpoint_name = getattr(endpoint,
'__name__',
'unknown_endpoint')
46 f
"Extracting metadata for endpoint: {endpoint_name}")
52 if decorator_metadata:
54 f
"Found decorator metadata for {endpoint_name}: {list(decorator_metadata.keys())}")
55 metadata.update(decorator_metadata)
56 except Exception
as e:
57 self.
disp.log_warning(
58 f
"Failed to extract decorator metadata for {endpoint_name}: {e}")
67 f
"Completed metadata extraction for {endpoint_name}")
71 """Build OpenAPI parameter information from metadata."""
78 param_descriptions = []
83 if param_descriptions:
84 openapi_info[
'parameter_description'] =
"\n\n".join(
90 """Build security description from metadata."""
93 if metadata.get(
'requires_auth'):
94 security_parts.append(
"Authentication required")
95 if metadata.get(
'requires_admin'):
96 security_parts.append(
"Admin privileges required")
97 if metadata.get(
'public'):
98 security_parts.append(
"Public access")
99 if metadata.get(
'testing_only'):
100 security_parts.append(
"Testing only")
102 security_level = metadata.get(
'security_level')
104 security_parts.append(f
"Security level: {security_level}")
106 environment = metadata.get(
'environment')
108 security_parts.append(f
"Environment: {environment}")
110 return "\n".join(security_parts)
114 """Extract metadata from decorator attributes."""
115 endpoint_name = getattr(endpoint,
'__name__',
'unknown_endpoint')
128 """Extract and combine description metadata."""
129 function_description =
""
130 if hasattr(endpoint,
'__doc__')
and endpoint.__doc__:
131 function_description = endpoint.__doc__.strip()
133 f
"Found function docstring for {endpoint_name}")
135 decorator_description = metadata.get(
'description',
"")
136 if decorator_description:
138 f
"Found decorator description for {endpoint_name}")
140 if function_description
and decorator_description:
141 if function_description != decorator_description:
142 metadata[
'description'] = f
"{function_description}\n\n{decorator_description}"
144 f
"Combined function and decorator descriptions for {endpoint_name}")
146 metadata[
'description'] = function_description
148 f
"Using identical description for {endpoint_name}")
149 elif function_description:
150 metadata[
'description'] = function_description
152 f
"Using function description for {endpoint_name}")
153 elif decorator_description:
154 metadata[
'description'] = decorator_description
156 f
"Using decorator description for {endpoint_name}")
159 """Extract type annotation metadata safely."""
160 if hasattr(endpoint,
'__annotations__'):
162 annotations = endpoint.__annotations__
163 safe_annotations = {}
164 for key, value
in annotations.items():
165 if not callable(value)
and not inspect.isclass(value):
166 safe_annotations[key] = str(value)
169 metadata[
'annotations'] = safe_annotations
171 f
"Found {len(safe_annotations)} safe annotations for {endpoint_name}")
172 except Exception
as e:
173 self.
disp.log_warning(
174 f
"Failed to process annotations for {endpoint_name}: {e}")
177 """Extract security-related metadata from endpoint."""
178 if hasattr(endpoint,
'_requires_auth')
and getattr(endpoint,
"_requires_auth",
None):
179 metadata[
'requires_auth'] =
True
181 f
"Endpoint {endpoint_name} requires authentication")
183 if hasattr(endpoint,
'_requires_admin')
and getattr(endpoint,
"_requires_admin",
None):
184 metadata[
'requires_admin'] =
True
186 f
"Endpoint {endpoint_name} requires admin privileges")
188 if hasattr(endpoint,
'_public')
and getattr(endpoint,
"_public",
None):
189 metadata[
'public'] =
True
190 self.
disp.log_debug(f
"Endpoint {endpoint_name} is public")
192 if hasattr(endpoint,
'_testing_only')
and getattr(endpoint,
"_testing_only",
None):
193 metadata[
'testing_only'] =
True
194 self.
disp.log_debug(f
"Endpoint {endpoint_name} is testing-only")
196 if hasattr(endpoint,
'_security_level'):
197 metadata[
'security_level'] = getattr(
198 endpoint,
"_security_level",
None)
200 f
"Endpoint {endpoint_name} has security level: {getattr(endpoint, '_security_level', 'unknown')}")
202 if hasattr(endpoint,
'_environment'):
203 metadata[
'environment'] = getattr(endpoint,
"_environment",
None)
205 f
"Endpoint {endpoint_name} has environment: {getattr(endpoint, '_environment', 'unknown')}")
208 """Extract documentation-related metadata from endpoint."""
209 if hasattr(endpoint,
'_tags'):
210 metadata[
'tags'] = getattr(endpoint,
"_tags",
None)
212 f
"Endpoint {endpoint_name} has tags: {getattr(endpoint, '_tags', False)}")
214 if hasattr(endpoint,
'_description'):
215 metadata[
'description'] = getattr(endpoint,
"_description",
None)
217 f
"Endpoint {endpoint_name} has decorator description")
219 if hasattr(endpoint,
'_summary'):
220 metadata[
'summary'] = getattr(endpoint,
"_summary",
None)
222 f
"Endpoint {endpoint_name} has summary: {getattr(endpoint, '_summary', 'None')}")
224 if hasattr(endpoint,
'_response_model'):
225 metadata[
'response_model'] = getattr(
226 endpoint,
"_response_model",
None)
228 f
"Endpoint {endpoint_name} has decorator response_model")
231 """Extract parameter-related metadata from endpoint."""
233 if hasattr(endpoint,
'_requires_body')
and getattr(endpoint,
"_requires_body",
None):
234 metadata[
'requires_body'] =
True
235 metadata[
'body_model'] = getattr(endpoint,
"_body_model",
None)
236 metadata[
'body_description'] = getattr(
237 endpoint,
"_body_description",
"Request body")
239 f
"Endpoint {endpoint_name} requires request body")
242 if hasattr(endpoint,
'_requires_headers')
and getattr(endpoint,
"_requires_headers",
None):
243 metadata[
'requires_headers'] =
True
244 metadata[
'header_names'] = getattr(endpoint,
"_header_names", [])
245 metadata[
'headers_description'] = getattr(
246 endpoint,
"_headers_description",
"Required headers")
248 f
"Endpoint {endpoint_name} requires headers: {metadata['header_names']}")
250 if hasattr(endpoint,
'_requires_auth_header')
and getattr(endpoint,
"_requires_auth_header",
None):
251 metadata[
'requires_auth_header'] =
True
252 metadata[
'auth_header_name'] = getattr(
253 endpoint,
"_auth_header_name",
"Authorization")
254 metadata[
'auth_scheme'] = getattr(endpoint,
"_auth_scheme",
None)
256 f
"Endpoint {endpoint_name} requires auth header")
258 if hasattr(endpoint,
'_requires_bearer_auth')
and getattr(endpoint,
"_requires_bearer_auth",
None):
259 metadata[
'requires_bearer_auth'] =
True
260 metadata[
'auth_scheme'] =
"Bearer"
261 metadata[
'auth_header_name'] =
"Authorization"
263 f
"Endpoint {endpoint_name} requires Bearer token")
265 if hasattr(endpoint,
'_requires_basic_auth')
and getattr(endpoint,
"_requires_basic_auth",
None):
266 metadata[
'requires_basic_auth'] =
True
267 metadata[
'auth_scheme'] =
"Basic"
268 metadata[
'auth_header_name'] =
"Authorization"
270 f
"Endpoint {endpoint_name} requires Basic auth")
272 if hasattr(endpoint,
'_requires_api_key')
and getattr(endpoint,
"_requires_api_key",
None):
273 metadata[
'requires_api_key'] =
True
274 metadata[
'auth_scheme'] =
"API-Key"
275 metadata[
'auth_header_name'] = getattr(
276 endpoint,
"_auth_header_name",
"X-API-Key")
277 self.
disp.log_debug(f
"Endpoint {endpoint_name} requires API key")
280 if hasattr(endpoint,
'_requires_query_params')
and getattr(endpoint,
"_requires_query_params",
None):
281 metadata[
'requires_query_params'] =
True
282 metadata[
'query_params'] = getattr(endpoint,
"_query_params", {})
284 f
"Endpoint {endpoint_name} requires query params: {list(metadata['query_params'].keys())}")
287 if hasattr(endpoint,
'_requires_path_params')
and getattr(endpoint,
"_requires_path_params",
None):
288 metadata[
'requires_path_params'] =
True
289 metadata[
'path_params'] = getattr(endpoint,
"_path_params", {})
291 f
"Endpoint {endpoint_name} requires path params: {list(metadata['path_params'].keys())}")
297 """Extract content type related metadata from endpoint."""
298 if hasattr(endpoint,
'_accepts_json_body')
and getattr(endpoint,
"_accepts_json_body",
None):
299 metadata[
'accepts_json_body'] =
True
300 metadata[
'json_body_description'] = getattr(
301 endpoint,
"_json_body_description",
"JSON request body")
303 if hasattr(endpoint,
'_json_body_example'):
304 json_example = getattr(endpoint,
"_json_body_example")
306 if isinstance(json_example, (dict, list, str, int, float, bool, type(
None))):
307 metadata[
'json_body_example'] = json_example
310 metadata[
'json_body_example'] = str(json_example)
311 self.
disp.log_debug(f
"Endpoint {endpoint_name} accepts JSON body")
313 if hasattr(endpoint,
'_accepts_form_data')
and getattr(endpoint,
"_accepts_form_data",
None):
314 metadata[
'accepts_form_data'] =
True
315 metadata[
'form_data_description'] = getattr(
316 endpoint,
"_form_data_description",
"Form data")
317 self.
disp.log_debug(f
"Endpoint {endpoint_name} accepts form data")
319 if hasattr(endpoint,
'_accepts_file_upload')
and getattr(endpoint,
"_accepts_file_upload",
None):
320 metadata[
'accepts_file_upload'] =
True
321 metadata[
'file_upload_description'] = getattr(
322 endpoint,
"_file_upload_description",
"File upload")
324 f
"Endpoint {endpoint_name} accepts file upload")
327 """Build request body information for OpenAPI."""
328 if metadata.get(
'requires_body')
and metadata.get(
'body_model'):
329 openapi_info[
'request_body'] = {
331 "application/json": {
332 "schema": {
"type":
"object"}
335 "description": metadata.get(
'body_description',
'Request body'),
338 elif metadata.get(
'accepts_json_body'):
339 json_schema = {
"type":
"object"}
341 json_example = metadata.get(
'json_body_example')
342 if json_example
is not None:
344 if isinstance(json_example, str):
345 parsed_example = json.loads(json_example)
346 json_schema[
"example"] = parsed_example
348 json_schema[
"example"] = json_example
349 except (json.JSONDecodeError, TypeError)
as e:
350 self.
disp.log_warning(
351 f
"Invalid JSON example provided: {json_example}, error: {e}")
352 json_schema[
"description"] = f
"Example: {json_example}"
354 openapi_info[
'request_body'] = {
356 "application/json": {
357 "schema": json_schema
360 "description": metadata.get(
'json_body_description',
'JSON request body'),
363 elif metadata.get(
'accepts_form_data'):
364 openapi_info[
'request_body'] = {
366 "application/x-www-form-urlencoded": {
367 "schema": {
"type":
"object"}
370 "description": metadata.get(
'form_data_description',
'Form data'),
373 elif metadata.get(
'accepts_file_upload'):
374 openapi_info[
'request_body'] = {
376 "multipart/form-data": {
388 "description": metadata.get(
'file_upload_description',
'File upload'),
393 """Build header parameter descriptions."""
394 if metadata.get(
'requires_bearer_auth'):
395 param_descriptions.append(
396 "**Header:** `Authorization: Bearer <token>` - Bearer token required")
397 elif metadata.get(
'requires_basic_auth'):
398 param_descriptions.append(
399 "**Header:** `Authorization: Basic <credentials>` - Basic authentication required")
400 elif metadata.get(
'requires_api_key'):
401 header_name = metadata.get(
'auth_header_name',
'X-API-Key')
402 param_descriptions.append(
403 f
"**Header:** `{header_name}: <api-key>` - API key required")
404 elif metadata.get(
'requires_auth_header'):
405 header_name = metadata.get(
'auth_header_name',
'Authorization')
406 scheme = metadata.get(
'auth_scheme')
408 param_descriptions.append(
409 f
"**Header:** `{header_name}: {scheme} <credentials>` - Authentication required")
411 param_descriptions.append(
412 f
"**Header:** `{header_name}` - Authentication token required")
414 if metadata.get(
'requires_headers'):
415 headers = metadata.get(
'header_names', [])
417 header_list =
", ".join(f
"`{h}`" for h
in headers)
418 param_descriptions.append(
419 f
"**Headers:** {header_list} - {metadata.get('headers_description', 'Required headers')}")
422 """Build query parameter descriptions."""
423 if metadata.get(
'requires_query_params'):
424 query_params = metadata.get(
'query_params', {})
427 for param, desc
in query_params.items():
428 param_list.append(f
"`{param}` - {desc}")
429 param_descriptions.append(
430 f
"**Query Parameters:** {'; '.join(param_list)}")
433 """Build path parameter descriptions."""
434 if metadata.get(
'requires_path_params'):
435 path_params = metadata.get(
'path_params', {})
438 for param, desc
in path_params.items():
439 param_list.append(f
"`{param}` - {desc}")
440 param_descriptions.append(
441 f
"**Path Parameters:** {'; '.join(param_list)}")
444 """Extract route metadata from decorated endpoint for FastAPI route configuration."""
450 'summary':
'_summary',
451 'description':
'_description',
452 'response_model':
'_response_model',
453 'responses':
'_responses',
454 'dependencies':
'_dependencies',
455 'status_code':
'_status_code',
456 'deprecated':
'_deprecated',
457 'include_in_schema':
'_include_in_schema'
462 if hasattr(endpoint,
'_operation_id_base'):
465 "Multi-method endpoint detected - skipping operation_id to avoid duplicates")
466 elif hasattr(endpoint,
'_operation_id'):
468 metadata[
'operation_id'] = getattr(endpoint,
'_operation_id')
470 f
"Found operation_id: {metadata['operation_id']}")
473 for fastapi_param, attr_name
in metadata_attrs.items():
474 if hasattr(endpoint, attr_name):
475 value = getattr(endpoint, attr_name)
476 if value
is not None:
477 metadata[fastapi_param] = value
478 self.
disp.log_debug(f
"Found {fastapi_param}: {value}")
481 if hasattr(endpoint,
'_accepts_json_body')
and getattr(endpoint,
'_accepts_json_body'):
482 metadata[
'include_in_schema'] =
True
485 if hasattr(endpoint,
'_json_body_description'):
486 body_desc = getattr(endpoint,
'_json_body_description')
487 if 'description' not in metadata:
488 metadata[
'description'] = body_desc
490 metadata[
'description'] += f
"\n\nRequest Body: {body_desc}"
493 if hasattr(endpoint,
'_requires_auth')
and getattr(endpoint,
'_requires_auth'):
494 metadata[
'dependencies'] = metadata.get(
'dependencies', [])
496 if hasattr(endpoint,
'_requires_admin')
and getattr(endpoint,
'_requires_admin'):
497 metadata[
'dependencies'] = metadata.get(
'dependencies', [])
500 if hasattr(endpoint,
'_requires_bearer_auth')
and getattr(endpoint,
'_requires_bearer_auth'):
501 metadata[
'dependencies'] = metadata.get(
'dependencies', [])
504 if 'include_in_schema' not in metadata:
505 metadata[
'include_in_schema'] =
True
510 """Create a wrapper with unique UUID-based name for multi-method endpoints.
513 original_func: The original endpoint function.
514 endpoint_name: Base name for the endpoint.
517 Wrapper function with unique UUID-based name.
520 unique_id = str(uuid.uuid4()).replace(
'-',
'')[:8]
521 unique_name = f
"{endpoint_name}_{unique_id}"
524 if hasattr(original_func,
'__await__'):
526 async def uuid_wrapper(*args, **kwargs):
527 return await original_func(*args, **kwargs)
530 def uuid_wrapper(*args, **kwargs):
531 return original_func(*args, **kwargs)
534 uuid_wrapper.__name__ = unique_name
535 uuid_wrapper.__qualname__ = unique_name
536 uuid_wrapper.__doc__ = getattr(original_func,
'__doc__',
None)
537 uuid_wrapper.__module__ = getattr(original_func,
'__module__',
None)
538 uuid_wrapper.__annotations__ = getattr(
539 original_func,
'__annotations__', {})
546 uuid_wrapper.__code__ = original_func.__code__
547 except AttributeError:
554 """Copy decorator metadata from original function to wrapper.
557 original_func: The original endpoint function.
558 wrapper_func: The wrapper function to copy metadata to.
561 '_tags',
'_summary',
'_description',
'_response_model',
562 '_requires_auth',
'_requires_admin',
'_public',
'_testing_only',
563 '_security_level',
'_environment',
'_operation_id',
'_accepts_json_body',
564 '_json_body_description',
'_json_body_example',
'_requires_bearer_auth'
567 for attr
in metadata_attrs:
568 if hasattr(original_func, attr):
569 setattr(wrapper_func, attr, getattr(original_func, attr))
572 """Prepare endpoint for multi-method registration to avoid Operation ID conflicts.
575 endpoint: The original endpoint function.
576 methods: List of HTTP methods for this endpoint.
579 Original endpoint if single method, UUID wrapper if multi-method.
581 if len(methods) <= 1:
584 endpoint_name = getattr(endpoint,
'__name__',
'unknown_endpoint')
None _extract_security_metadata(self, Callable endpoint, str endpoint_name, Dict[str, Any] metadata)
Dict[str, Any] build_openapi_parameters(self, Dict[str, Any] metadata)
Dict[str, Any] _extract_decorator_metadata(self, Callable endpoint)
Callable prepare_endpoint_for_multi_method(self, Callable endpoint, List[str] methods)
Dict[str, Any] extract_endpoint_metadata(self, Callable endpoint)
Dict[str, Any] extract_route_metadata(self, Callable endpoint)
Callable create_uuid_wrapper(self, Callable original_func, str endpoint_name)
None _build_request_body_info(self, Dict[str, Any] metadata, Dict[str, Any] openapi_info)
None _build_header_descriptions(self, Dict[str, Any] metadata, List[str] param_descriptions)
None _extract_content_type_metadata(self, Callable endpoint, str endpoint_name, Dict[str, Any] metadata)
str build_security_description(self, Dict[str, Any] metadata)
None _build_path_param_descriptions(self, Dict[str, Any] metadata, List[str] param_descriptions)
None _extract_documentation_metadata(self, Callable endpoint, str endpoint_name, Dict[str, Any] metadata)
None _copy_decorator_metadata(self, Callable original_func, Callable wrapper_func)
None _extract_description_metadata(self, Callable endpoint, str endpoint_name, Dict[str, Any] metadata)
__init__(self, bool debug=False)
None _build_query_param_descriptions(self, Dict[str, Any] metadata, List[str] param_descriptions)
None _extract_annotation_metadata(self, Callable endpoint, str endpoint_name, Dict[str, Any] metadata)
None _extract_parameter_metadata(self, Callable endpoint, str endpoint_name, Dict[str, Any] metadata)