Module core.default.cloudmapper

Expand source code
from typing import Callable, Dict, List, Set


from core.constructs.output_manager import OutputTask
from core.constructs.resource import Resource_Difference
from core.constructs.mapper import CloudMapper

from .mappers.simple import (
    api_deployer,
    bucket_deployer,
    lambda_deployer,
    dynamodb_deployer,
    queue_deployer,
    relational_db_deployer,
    static_site_deployer,
    topic_deployer,
)


class DefaultMapper(CloudMapper):
    def __init__(self) -> None:
        super().__init__(RESOURCE_TO_HANDLER_FUNCTION)

    def get_namespaces(self) -> List[str]:
        return [
            "cdev::simple::api",
            "cdev::simple::bucket",
            "cdev::simple::function",
            "cdev::simple::lambda_layer",
            "cdev::simple::table",
            "cdev::simple::queue",
            "cdev::simple::relationaldb",
            "cdev::simple::staticsite",
            "cdev::simple::topic",
        ]

    def deploy_resource(
        self,
        transaction_token: str,
        namespace_token: str,
        resource_diff: Resource_Difference,
        previous_output: Dict,
        output_task: OutputTask,
    ) -> Dict:
        ruuid = (
            resource_diff.new_resource.ruuid
            if resource_diff.new_resource
            else resource_diff.previous_resource.ruuid
        )

        return self.get_resource_to_handler()[ruuid](
            transaction_token,
            namespace_token,
            resource_diff,
            previous_output,
            output_task,
        )

    def get_available_resources(self) -> Set[str]:
        return set(self.get_namespaces())

    def get_resource_to_handler(self) -> Dict[str, Callable]:
        return self.resource_to_handler


RESOURCE_TO_HANDLER_FUNCTION = {
    "cdev::simple::function": lambda_deployer.handle_simple_lambda_function_deployment,
    "cdev::simple::lambda_layer": lambda_deployer.handle_simple_layer_deployment,
    "cdev::simple::api": api_deployer.handle_simple_api_deployment,
    "cdev::simple::bucket": bucket_deployer.handle_simple_bucket_deployment,
    "cdev::simple::table": dynamodb_deployer.handle_simple_table_deployment,
    "cdev::simple::queue": queue_deployer.handle_simple_queue_deployment,
    "cdev::simple::relationaldb": relational_db_deployer.handle_simple_relational_db_deployment,
    "cdev::simple::staticsite": static_site_deployer.handle_simple_static_site_deployment,
    "cdev::simple::topic": topic_deployer.handle_simple_topic_deployment,
}

Classes

class DefaultMapper

A Cloud Mapper is the construct responsible for directly interacting with the Cloud Provider and managing resource state.

Expand source code
class DefaultMapper(CloudMapper):
    def __init__(self) -> None:
        super().__init__(RESOURCE_TO_HANDLER_FUNCTION)

    def get_namespaces(self) -> List[str]:
        return [
            "cdev::simple::api",
            "cdev::simple::bucket",
            "cdev::simple::function",
            "cdev::simple::lambda_layer",
            "cdev::simple::table",
            "cdev::simple::queue",
            "cdev::simple::relationaldb",
            "cdev::simple::staticsite",
            "cdev::simple::topic",
        ]

    def deploy_resource(
        self,
        transaction_token: str,
        namespace_token: str,
        resource_diff: Resource_Difference,
        previous_output: Dict,
        output_task: OutputTask,
    ) -> Dict:
        ruuid = (
            resource_diff.new_resource.ruuid
            if resource_diff.new_resource
            else resource_diff.previous_resource.ruuid
        )

        return self.get_resource_to_handler()[ruuid](
            transaction_token,
            namespace_token,
            resource_diff,
            previous_output,
            output_task,
        )

    def get_available_resources(self) -> Set[str]:
        return set(self.get_namespaces())

    def get_resource_to_handler(self) -> Dict[str, Callable]:
        return self.resource_to_handler

Ancestors

Inherited members