Cat Feeder  1.0.0
The Cat feeder project
Loading...
Searching...
No Matches
testing_endpoints.py
Go to the documentation of this file.
1r"""
2# +==== BEGIN CatFeeder =================+
3# LOGO:
4# ..............(..../\
5# ...............)..(.')
6# ..............(../..)
7# ...............\‍(__)|
8# Inspired by Joan Stark
9# source https://www.asciiart.eu/
10# animals/cats
11# /STOP
12# PROJECT: CatFeeder
13# FILE: testing_endpoints.py
14# CREATION DATE: 30-11-2025
15# LAST Modified: 21:56:40 23-01-2026
16# DESCRIPTION:
17# This is the backend server in charge of making the actual website work.
18# /STOP
19# COPYRIGHT: (c) Cat Feeder
20# PURPOSE: Files to test the submodules of the server.
21# // AR
22# +==== END CatFeeder =================+
23"""
24
25from typing import Optional, Dict, Union, TYPE_CHECKING
26from datetime import datetime
27from display_tty import Disp, initialise_logger
28from fastapi import Request, Response
29from ...http_codes import HCI, HttpDataTypes
30from ...core import RuntimeControl, RuntimeManager, RI
31from ...favicon import favicon_constants as FAV_CONST
32
33if TYPE_CHECKING:
34 from ...sql import SQL
35 from ...bucket import Bucket
36 from ...crons import BackgroundTasks
37 from ...image_reducer import ImageReducer
38 from ...server_header import ServerHeaders
39 from ...boilerplates import BoilerplateIncoming, BoilerplateResponses, BoilerplateNonHTTP
40
41
43 """_summary_
44 """
45 disp: Disp = initialise_logger(__qualname__, False)
46
47 def __init__(self, success: int = 0, error: int = 84, debug: bool = False) -> None:
48 """_summary_
49
50 Args:
51 success (int, optional): _description_. Defaults to 0.
52 error (int, optional): _description_. Defaults to 84.
53 debug (bool, optional): _description_. Defaults to False.
54 """
55 # ------------------------ The logging function ------------------------
56 self.disp.update_disp_debug(debug)
57 self.disp.log_debug("Initialising...")
58 # -------------------------- Inherited values --------------------------
59 self.debug: bool = debug
60 self.success: int = success
61 self.error: int = error
62 self.runtime_manager: RuntimeManager = RI
63 # -------------------------- Shared instances --------------------------
64 self.boilerplate_incoming_initialised: "BoilerplateIncoming" = self.runtime_manager.get(
65 "BoilerplateIncoming")
66 self.boilerplate_responses_initialised: "BoilerplateResponses" = self.runtime_manager.get(
67 "BoilerplateResponses")
68 self.boilerplate_non_http_initialised: "BoilerplateNonHTTP" = self.runtime_manager.get(
69 "BoilerplateNonHTTP")
70 self.runtime_controls_initialised: "RuntimeControl" = self.runtime_manager.get(
71 "RuntimeControl")
72 self.server_headers_initialised: "ServerHeaders" = self.runtime_manager.get(
73 "ServerHeaders")
74 self.background_tasks_initialised: "BackgroundTasks" = self.runtime_manager.get(
75 "BackgroundTasks")
76 self.sql_connectionsql_connection: Optional["SQL"] = self.runtime_manager.get_if_exists(
77 "SQL",
78 None
79 )
80 self.bucket_connectionbucket_connection: Optional["Bucket"] = self.runtime_manager.get_if_exists(
81 "Bucket",
82 None
83 )
84 self.image_reducerimage_reducer: Optional["ImageReducer"] = self.runtime_manager.get_if_exists(
85 "ImageReducer",
86 None
87 )
88 self.disp.log_debug("Initialised")
89
90 def _get_admin_token(self, title: str, request: Request) -> Union[Response, str]:
91 """Get the token of the user if they are administrator, otherwise, return the correct http response.
92
93 Args:
94 title (str): The title of the endpoint calling this function.
95 request (Request): The incoming request parameters
96
97 Returns:
98 Union[Response, str]: The response if an error occurred, the token otherwise.
99 """
100 token = self.boilerplate_incoming_initialised.get_token_if_present(
101 request
102 )
103 if not token:
104 return self.boilerplate_responses_initialised.invalid_token(title)
105 if not self.boilerplate_non_http_initialised.is_token_correct(token):
106 return self.boilerplate_responses_initialised.invalid_token(title)
107 if not self.boilerplate_non_http_initialised.is_token_admin(token):
108 return self.boilerplate_responses_initialised.insuffisant_rights(title, token)
109 return token
110
111 # SQL testing
112
113 async def get_tables(self, request: Request) -> Response:
114 """Get a list of all database table names.
115
116 Args:
117 request (Request): The incoming HTTP request.
118
119 Returns:
120 Response: JSON list of table names or error response.
121 """
122 token = self._get_admin_token("Get tables", request)
123 if isinstance(token, Response):
124 return token
125 self.sql_connectionsql_connection = self.runtime_manager.get_if_exists(
126 "SQL",
128 )
130 return HCI.service_unavailable("Database connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
131 self.disp.log_debug("Gathering tables")
132 data = self.sql_connectionsql_connection.get_table_names()
133 self.disp.log_debug(f"Gathered tables: {data}")
134 if isinstance(data, int):
135 return HCI.not_found(str(data), content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
136 return HCI.success(data, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
137
138 async def get_table_columns(self, request: Request) -> Response:
139 """Get column names for a specific table.
140
141 Query Parameters:
142 table_name (str): Name of the table to inspect.
143
144 Args:
145 request (Request): The incoming HTTP request.
146
147 Returns:
148 Response: JSON list of column names or error response.
149 """
150 token = self._get_admin_token("Get table columns", request)
151 if isinstance(token, Response):
152 return token
153 self.sql_connectionsql_connection = self.runtime_manager.get_if_exists(
156 return HCI.service_unavailable("Database connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
157 table_name = request.query_params.get("table_name")
158 if not table_name:
159 return HCI.bad_request("Missing required parameter: table_name", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
160 self.disp.log_debug(f"Gathering columns from table: {table_name}")
161 data = self.sql_connectionsql_connection.get_table_column_names(table_name)
162 self.disp.log_debug(f"Gathered columns: {data}")
163 if isinstance(data, int):
164 return HCI.not_found(f"Table '{table_name}' not found or has no columns", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
165 return HCI.success({"table": table_name, "columns": data}, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
166
167 async def describe_table(self, request: Request) -> Response:
168 """Get the full schema description of a table.
169
170 Query Parameters:
171 table_name (str): Name of the table to describe.
172
173 Args:
174 request (Request): The incoming HTTP request.
175
176 Returns:
177 Response: JSON with table schema information or error response.
178 """
179 token = self._get_admin_token("Describe table", request)
180 if isinstance(token, Response):
181 return token
182 self.sql_connectionsql_connection = self.runtime_manager.get_if_exists(
185 return HCI.service_unavailable("Database connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
186 table_name = request.query_params.get("table_name")
187 if not table_name:
188 return HCI.bad_request("Missing required parameter: table_name", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
189 self.disp.log_debug(f"Describing table: {table_name}")
190 data = self.sql_connectionsql_connection.describe_table(table_name)
191 self.disp.log_debug(f"Table description: {data}")
192 if isinstance(data, int):
193 return HCI.not_found(f"Failed to describe table '{table_name}'", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
194 return HCI.success({"table": table_name, "schema": data}, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
195
196 async def get_database_version(self, request: Request) -> Response:
197 """Get the database version information.
198
199 Args:
200 request (Request): The incoming HTTP request.
201
202 Returns:
203 Response: JSON with database version or error response.
204 """
205 token = self._get_admin_token("Get database version", request)
206 if isinstance(token, Response):
207 return token
208 self.sql_connectionsql_connection = self.runtime_manager.get_if_exists(
211 return HCI.service_unavailable("Database connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
212 self.disp.log_debug("Gathering database version")
214 self.disp.log_debug(f"Database version: {data}")
215 if data is None:
216 return HCI.not_found("Failed to retrieve database version", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
217 return HCI.success({"version": data}, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
218
219 async def test_sql_connection(self, request: Request) -> Response:
220 """Test if the SQL database connection is active.
221
222 Args:
223 request (Request): The incoming HTTP request.
224
225 Returns:
226 Response: JSON with connection status.
227 """
228 token = self._get_admin_token("Test sql connection", request)
229 if isinstance(token, Response):
230 return token
231 self.sql_connectionsql_connection = self.runtime_manager.get_if_exists(
234 return HCI.service_unavailable("Database connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
235 self.disp.log_debug("Testing database connection")
236 is_connected: bool = self.sql_connectionsql_connection.is_connected()
237 result: Dict[str, Union[str, bool]] = {"connected": is_connected}
238 node_id: str = "message"
239 if is_connected:
240 result[node_id] = "Connection is active"
241 else:
242 result[node_id] = "Connection failed"
243 self.disp.log_debug(f"Connection status: {is_connected}")
244 return HCI.success(result, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
245
246 async def get_table_size(self, request: Request) -> Response:
247 """Get the number of rows in a table.
248
249 Query Parameters:
250 table_name (str): Name of the table to count rows.
251
252 Args:
253 request (Request): The incoming HTTP request.
254
255 Returns:
256 Response: JSON with row count or error response.
257 """
258 token = self._get_admin_token("Get table size", request)
259 if isinstance(token, Response):
260 return token
261 self.sql_connectionsql_connection = self.runtime_manager.get_if_exists(
264 return HCI.service_unavailable("Database connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
265 table_name = request.query_params.get("table_name")
266 if not table_name:
267 return HCI.bad_request("Missing required parameter: table_name", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
268 self.disp.log_debug(f"Getting size of table: {table_name}")
269 row_count = self.sql_connectionsql_connection.get_table_size(table_name, "*")
270 self.disp.log_debug(f"Table size: {row_count}")
271 if row_count < 0:
272 return HCI.not_found(f"Failed to get size of table '{table_name}'", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
273 return HCI.success({"table": table_name, "row_count": row_count}, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
274
275 async def get_triggers(self, request: Request) -> Response:
276 """Get all database triggers.
277
278 Args:
279 request (Request): The incoming HTTP request.
280
281 Returns:
282 Response: JSON with triggers dictionary or error response.
283 """
284 token = self._get_admin_token("Get triggers", request)
285 if isinstance(token, Response):
286 return token
287 self.sql_connectionsql_connection = self.runtime_manager.get_if_exists(
290 return HCI.service_unavailable("Database connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
291 self.disp.log_debug("Gathering triggers")
293 self.disp.log_debug(f"Gathered triggers: {data}")
294 if isinstance(data, int):
295 return HCI.not_found("Failed to retrieve triggers", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
296 return HCI.success(data, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
297
298 async def get_trigger_names(self, request: Request) -> Response:
299 """Get list of all trigger names.
300
301 Args:
302 request (Request): The incoming HTTP request.
303
304 Returns:
305 Response: JSON list of trigger names or error response.
306 """
307 token = self._get_admin_token("Get trigger names", request)
308 if isinstance(token, Response):
309 return token
310 self.sql_connectionsql_connection = self.runtime_manager.get_if_exists(
313 return HCI.service_unavailable("Database connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
314 self.disp.log_debug("Gathering trigger names")
316 self.disp.log_debug(f"Gathered trigger names: {data}")
317 if isinstance(data, int):
318 return HCI.not_found("Failed to retrieve trigger names", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
319 return HCI.success(data, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
320
321 async def get_current_datetime(self, request: Request) -> Response:
322 """Get current datetime in the project's format.
323
324 Args:
325 request (Request): The incoming HTTP request.
326
327 Returns:
328 Response: JSON with current datetime string.
329 """
330 token = self._get_admin_token("Get current datetime", request)
331 if isinstance(token, Response):
332 return token
333 self.sql_connectionsql_connection = self.runtime_manager.get_if_exists(
336 return HCI.service_unavailable("Database connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
337 self.disp.log_debug("Getting current datetime")
338 datetime_str = self.sql_connectionsql_connection.get_correct_now_value()
339 self.disp.log_debug(f"Current datetime: {datetime_str}")
340 return HCI.success({"datetime": datetime_str}, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
341
342 async def get_current_date(self, request: Request) -> Response:
343 """Get current date in the project's format.
344
345 Args:
346 request (Request): The incoming HTTP request.
347
348 Returns:
349 Response: JSON with current date string.
350 """
351 token = self._get_admin_token("Get current date", request)
352 if isinstance(token, Response):
353 return token
354 self.sql_connectionsql_connection = self.runtime_manager.get_if_exists(
357 return HCI.service_unavailable("Database connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
358 self.disp.log_debug("Getting current date")
359 date_str = self.sql_connectionsql_connection.get_correct_current_date_value()
360 self.disp.log_debug(f"Current date: {date_str}")
361 return HCI.success({"date": date_str}, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
362
363 async def convert_datetime_to_string(self, request: Request) -> Response:
364 """Convert a datetime object to project's string format.
365
366 Query Parameters:
367 datetime (str): ISO format datetime string to convert.
368 date_only (bool, optional): If true, return only date portion. Defaults to false.
369 sql_mode (bool, optional): If true, include millisecond precision. Defaults to false.
370
371 Args:
372 request (Request): The incoming HTTP request.
373
374 Returns:
375 Response: JSON with formatted datetime string or error response.
376 """
377 token = self._get_admin_token("Convert datetime to string", request)
378 if isinstance(token, Response):
379 return token
380 self.sql_connectionsql_connection = self.runtime_manager.get_if_exists(
383 return HCI.service_unavailable("Database connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
384 datetime_str = request.query_params.get("datetime")
385 if not datetime_str:
386 return HCI.bad_request("Missing required parameter: datetime (ISO format)", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
387 date_only = request.query_params.get(
388 "date_only", "false").lower() == "true"
389 sql_mode = request.query_params.get(
390 "sql_mode", "false").lower() == "true"
391 self.disp.log_debug(f"Converting datetime: {datetime_str}")
392 try:
393 dt_obj = datetime.fromisoformat(datetime_str)
394 formatted = self.sql_connectionsql_connection.datetime_to_string(
395 dt_obj, date_only, sql_mode)
396 self.disp.log_debug(f"Formatted datetime: {formatted}")
397 return HCI.success({"original": datetime_str, "formatted": formatted, "date_only": date_only, "sql_mode": sql_mode}, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
398 except ValueError as e:
399 return HCI.bad_request(f"Invalid datetime format: {str(e)}", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
400
401 async def convert_string_to_datetime(self, request: Request) -> Response:
402 """Convert a project-formatted string to datetime object.
403
404 Query Parameters:
405 datetime_str (str): Project-formatted datetime string to parse.
406 date_only (bool, optional): If true, parse as date only. Defaults to false.
407
408 Args:
409 request (Request): The incoming HTTP request.
410
411 Returns:
412 Response: JSON with ISO format datetime or error response.
413 """
414 token = self._get_admin_token("Convert string to datetime", request)
415 if isinstance(token, Response):
416 return token
417 self.sql_connectionsql_connection = self.runtime_manager.get_if_exists(
420 return HCI.service_unavailable("Database connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
421 datetime_str = request.query_params.get("datetime_str")
422 if not datetime_str:
423 return HCI.bad_request("Missing required parameter: datetime_str", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
424 date_only = request.query_params.get(
425 "date_only", "false").lower() == "true"
426 self.disp.log_debug(f"Parsing datetime string: {datetime_str}")
427 try:
428 dt_obj = self.sql_connectionsql_connection.string_to_datetime(
429 datetime_str, date_only)
430 iso_format = dt_obj.isoformat()
431 self.disp.log_debug(f"Parsed to ISO: {iso_format}")
432 return HCI.success({"original": datetime_str, "iso_format": iso_format, "date_only": date_only}, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
433 except ValueError as e:
434 return HCI.bad_request(f"Invalid datetime string: {str(e)}", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
435
436 # Bucket (S3) testing
437
438 async def get_buckets(self, request: Request) -> Response:
439 """Get a list of all S3 bucket names.
440
441 Args:
442 request (Request): The incoming HTTP request.
443
444 Returns:
445 Response: JSON list of bucket names or error response.
446 """
447 token = self._get_admin_token("Get buckets", request)
448 if isinstance(token, Response):
449 return token
450 self.bucket_connectionbucket_connection = self.runtime_manager.get_if_exists(
451 "Bucket",
453 )
455 return HCI.service_unavailable("S3 bucket connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
456 self.disp.log_debug("Gathering buckets")
457 data = self.bucket_connectionbucket_connection.get_bucket_names()
458 self.disp.log_debug(f"Gathered buckets: {data}")
459 if isinstance(data, int):
460 return HCI.not_found("Failed to retrieve bucket names", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
461 return HCI.success(data, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
462
463 async def test_bucket_connection(self, request: Request) -> Response:
464 """Test if the S3 bucket connection is active.
465
466 Args:
467 request (Request): The incoming HTTP request.
468
469 Returns:
470 Response: JSON with connection status.
471 """
472 token = self._get_admin_token("test bucket connection", request)
473 if isinstance(token, Response):
474 return token
475 self.bucket_connectionbucket_connection = self.runtime_manager.get_if_exists(
476 "Bucket",
478 )
480 return HCI.service_unavailable("S3 bucket connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
481 self.disp.log_debug("Testing bucket connection")
482 is_connected: bool = self.bucket_connectionbucket_connection.is_connected()
483 result: Dict[str, Union[str, bool]] = {"connected": is_connected}
484 node_id: str = "message"
485 if is_connected:
486 result[node_id] = "Connection is active"
487 else:
488 result[node_id] = "Connection failed"
489 self.disp.log_debug(f"Connection status: {is_connected}")
490 return HCI.success(result, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
491
492 async def get_bucket_files(self, request: Request) -> Response:
493 """Get all files in a specific bucket.
494
495 Query Parameters:
496 bucket_name (str): Name of the bucket to list files from.
497
498 Args:
499 request (Request): The incoming HTTP request.
500
501 Returns:
502 Response: JSON list of file names or error response.
503 """
504 token = self._get_admin_token("Get bucket files", request)
505 if isinstance(token, Response):
506 return token
507 self.bucket_connectionbucket_connection = self.runtime_manager.get_if_exists(
508 "Bucket",
510 )
512 return HCI.service_unavailable("S3 bucket connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
513 bucket_name = request.query_params.get("bucket_name")
514 if not bucket_name:
515 return HCI.bad_request("Missing required parameter: bucket_name", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
516 self.disp.log_debug(f"Gathering files from bucket: {bucket_name}")
518 self.disp.log_debug(f"Gathered files: {data}")
519 if isinstance(data, int):
520 return HCI.not_found(f"Failed to retrieve files from bucket '{bucket_name}'", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
521 return HCI.success({"bucket": bucket_name, "files": data}, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
522
523 async def get_bucket_file_info(self, request: Request) -> Response:
524 """Get information about a specific file in a bucket.
525
526 Query Parameters:
527 bucket_name (str): Name of the bucket.
528 file_name (str): Name of the file to get info about.
529
530 Args:
531 request (Request): The incoming HTTP request.
532
533 Returns:
534 Response: JSON with file metadata or error response.
535 """
536 token = self._get_admin_token("Get bucket file info", request)
537 if isinstance(token, Response):
538 return token
539 self.bucket_connectionbucket_connection = self.runtime_manager.get_if_exists(
540 "Bucket",
542 )
544 return HCI.service_unavailable("S3 bucket connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
545 bucket_name = request.query_params.get("bucket_name")
546 file_name = request.query_params.get("file_name")
547 if not bucket_name or not file_name:
548 return HCI.bad_request("Missing required parameters: bucket_name and file_name", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
549 self.disp.log_debug(
550 f"Getting info for file '{file_name}' in bucket '{bucket_name}'"
551 )
552 data = self.bucket_connectionbucket_connection.get_bucket_file(bucket_name, file_name)
553 self.disp.log_debug(f"File info: {data}")
554 if isinstance(data, int):
555 return HCI.not_found(f"File '{file_name}' not found in bucket '{bucket_name}'", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
556 return HCI.success(data, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
557
558 async def create_test_bucket(self, request: Request) -> Response:
559 """Create a new test bucket.
560
561 Query Parameters:
562 bucket_name (str): Name of the bucket to create.
563
564 Args:
565 request (Request): The incoming HTTP request.
566
567 Returns:
568 Response: Success or error response.
569 """
570 token = self._get_admin_token("Create test bucket", request)
571 if isinstance(token, Response):
572 return token
573 self.bucket_connectionbucket_connection = self.runtime_manager.get_if_exists(
574 "Bucket",
576 )
578 return HCI.service_unavailable("S3 bucket connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
579 bucket_name = request.query_params.get("bucket_name")
580 if not bucket_name:
581 return HCI.bad_request("Missing required parameter: bucket_name", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
582 self.disp.log_debug(f"Creating bucket: {bucket_name}")
583 result = self.bucket_connectionbucket_connection.create_bucket(bucket_name)
584 if result != self.success:
585 return HCI.internal_server_error(f"Failed to create bucket '{bucket_name}'", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
586 self.disp.log_debug(f"Bucket '{bucket_name}' created successfully")
587 return HCI.created({"message": f"Bucket '{bucket_name}' created successfully"}, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
588
589 async def delete_test_bucket(self, request: Request) -> Response:
590 """Delete a test bucket.
591
592 Query Parameters:
593 bucket_name (str): Name of the bucket to delete.
594
595 Args:
596 request (Request): The incoming HTTP request.
597
598 Returns:
599 Response: Success or error response.
600 """
601 token = self._get_admin_token("Delete test bucket", request)
602 if isinstance(token, Response):
603 return token
604 self.bucket_connectionbucket_connection = self.runtime_manager.get_if_exists(
605 "Bucket",
607 )
609 return HCI.service_unavailable("S3 bucket connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
610 bucket_name = request.query_params.get("bucket_name")
611 if not bucket_name:
612 return HCI.bad_request("Missing required parameter: bucket_name", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
613 self.disp.log_debug(f"Deleting bucket: {bucket_name}")
614 result = self.bucket_connectionbucket_connection.delete_bucket(bucket_name)
615 if result != self.success:
616 return HCI.internal_server_error(f"Failed to delete bucket '{bucket_name}'", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
617 self.disp.log_debug(f"Bucket '{bucket_name}' deleted successfully")
618 return HCI.success({"message": f"Bucket '{bucket_name}' deleted successfully"}, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
619
620 async def upload_test_file_stream(self, request: Request) -> Response:
621 """Upload a file to a bucket as a byte stream (from request body).
622
623 Query Parameters:
624 bucket_name (str): Name of the bucket to upload to.
625 file_name (str): Name to save the file as in the bucket.
626
627 Body: Raw file content (bytes).
628
629 Args:
630 request (Request): The incoming HTTP request.
631
632 Returns:
633 Response: Success or error response.
634 """
635 token = self._get_admin_token("Upload test file stream", request)
636 if isinstance(token, Response):
637 return token
638 self.bucket_connectionbucket_connection = self.runtime_manager.get_if_exists(
639 "Bucket",
641 )
643 return HCI.service_unavailable("S3 bucket connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
644 bucket_name = request.query_params.get("bucket_name")
645 file_name = request.query_params.get("file_name")
646 if not bucket_name or not file_name:
647 return HCI.bad_request("Missing required parameters: bucket_name and file_name", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
648 try:
649 file_data = await request.body()
650 if not file_data:
651 return HCI.bad_request("Request body is empty", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
652 self.disp.log_debug(
653 f"Uploading file stream '{file_name}' to bucket '{bucket_name}' ({len(file_data)} bytes)")
654 result = self.bucket_connectionbucket_connection.upload_stream(
655 bucket_name, file_data, file_name)
656 if result != self.success:
657 return HCI.internal_server_error(f"Failed to upload file '{file_name}' to bucket '{bucket_name}'", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
658 self.disp.log_debug(f"File '{file_name}' uploaded successfully")
659 return HCI.created({"message": f"File '{file_name}' uploaded successfully to bucket '{bucket_name}'"}, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
660 except Exception as e:
661 self.disp.log_error(f"Error uploading file stream: {str(e)}")
662 return HCI.internal_server_error(f"Error uploading file: {str(e)}", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
663
664 async def download_test_file_stream(self, request: Request) -> Response:
665 """Download a file from a bucket as a byte stream.
666
667 Query Parameters:
668 bucket_name (str): Name of the bucket.
669 file_name (str): Name of the file to download.
670
671 Args:
672 request (Request): The incoming HTTP request.
673
674 Returns:
675 Response: File content as bytes or error response.
676 """
677 token = self._get_admin_token("Download test file stream", request)
678 if isinstance(token, Response):
679 return token
680 self.bucket_connectionbucket_connection = self.runtime_manager.get_if_exists(
681 "Bucket",
683 )
685 return HCI.service_unavailable("S3 bucket connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
686 bucket_name = request.query_params.get("bucket_name")
687 file_name = request.query_params.get("file_name")
688 if not bucket_name or not file_name:
689 return HCI.bad_request("Missing required parameters: bucket_name and file_name", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
690 self.disp.log_debug(
691 f"Downloading file stream '{file_name}' from bucket '{bucket_name}'")
692 file_data = self.bucket_connectionbucket_connection.download_stream(
693 bucket_name, file_name)
694 if isinstance(file_data, int):
695 return HCI.not_found(f"File '{file_name}' not found in bucket '{bucket_name}'", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
696 self.disp.log_debug(
697 f"File '{file_name}' downloaded successfully ({len(file_data)} bytes)")
698 self.image_reducerimage_reducer = self.runtime_manager.get_if_exists(
699 "ImageReducer", self.image_reducerimage_reducer
700 )
702 self.disp.log_debug(
703 "ImageReducer not available, returning as octet-stream"
704 )
705 return HCI.success(file_data, content_type=HttpDataTypes.OCTET_STREAM, headers=self.server_headers_initialised.for_stream())
706 detected_format = self.image_reducerimage_reducer.detect_file_format(file_data)
707 self.disp.log_debug(
708 f"Detected file format: {detected_format}"
709 )
710 file_format = FAV_CONST.reducer_type_to_data_type(
711 detected_format
712 )
713 self.disp.log_debug(
714 f"Detected file format: {file_format}, returning accordingly")
715 return HCI.success(file_data, content_type=file_format, headers=self.server_headers_initialised.for_stream())
716
717 async def delete_test_file(self, request: Request) -> Response:
718 """Delete a file from a bucket.
719
720 Query Parameters:
721 bucket_name (str): Name of the bucket.
722 file_name (str): Name of the file to delete.
723
724 Args:
725 request (Request): The incoming HTTP request.
726
727 Returns:
728 Response: Success or error response.
729 """
730 token = self._get_admin_token("Delete test file", request)
731 if isinstance(token, Response):
732 return token
733 self.bucket_connectionbucket_connection = self.runtime_manager.get_if_exists(
734 "Bucket",
736 )
738 return HCI.service_unavailable("S3 bucket connection not available", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
739 bucket_name = request.query_params.get("bucket_name")
740 file_name = request.query_params.get("file_name")
741 if not bucket_name or not file_name:
742 return HCI.bad_request("Missing required parameters: bucket_name and file_name", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
743 self.disp.log_debug(
744 f"Deleting file '{file_name}' from bucket '{bucket_name}'")
745 result = self.bucket_connectionbucket_connection.delete_file(bucket_name, file_name)
746 if result != self.success:
747 return HCI.internal_server_error(f"Failed to delete file '{file_name}' from bucket '{bucket_name}'", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
748 self.disp.log_debug(f"File '{file_name}' deleted successfully")
749 return HCI.success({"message": f"File '{file_name}' deleted successfully from bucket '{bucket_name}'"}, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
None __init__(self, int success=0, int error=84, bool debug=False)
Union[Response, str] _get_admin_token(self, str title, Request request)