Module core.default.workspace
Expand source code
import json
import os
from typing import List, Dict, Optional, Any, Tuple
from pydantic.types import DirectoryPath
from core.constructs.backend import Backend, Backend_Configuration
from core.constructs.mapper import CloudMapper
from core.constructs.components import Component, ComponentModel
from core.constructs.workspace import (
Workspace_State,
Workspace_Info,
Workspace,
WorkspaceManager,
wrap_phase,
)
from core.constructs.cloud_output import (
Cloud_Output,
cloud_output_dynamic_model,
evaluate_dynamic_output,
cloud_output_model,
)
from core.constructs.settings import Settings_Info
from ..utils import file_manager
DEFAULT_COMMAND_LOCATIONS = ["core.default.commands"]
ROOT_FOLDER_NAME = ".cdev"
WORKSPACE_FILE_NAME = "workspace_info.json"
class local_workspace(Workspace):
"""
A singleton that implements a workspace that is on the local filesystem. The singleton can be accessed during different
parts of the lifecycle of a cdev core command execution. When this singleton is created, it is registered as the global
workspace for that execution.
"""
_instance = None
_COMMANDS = DEFAULT_COMMAND_LOCATIONS
_MAPPERS = []
_COMPONENTS = []
_OUTPUT: List[Tuple[str, str, cloud_output_model]] = []
_resource_state_uuid: str = None
_backend: Backend = None
_state: Workspace_State = Workspace_State.UNINITIALIZED
_current_component: str = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(Workspace, cls).__new__(cls)
# Put any initialization here.
cls._instance.set_state(Workspace_State.UNINITIALIZED)
# Load the backend
cls._instance._backend = None
cls._instance._resource_state_uuid = None
cls._instance._COMMANDS = DEFAULT_COMMAND_LOCATIONS
cls._instance._MAPPERS = []
cls._instance._COMPONENTS = []
cls._instance._OUTPUT = []
Workspace.set_global_instance(cls._instance)
return cls._instance
@classmethod
def terminate_singleton(cls) -> None:
cls._instance = None
def initialize_workspace(
self,
settings_info: Settings_Info,
backend_info: Backend_Configuration,
resource_state_uuid: str,
initialization_modules: Optional[List[str]] = [],
configuration: Dict = {},
) -> None:
super().initialize_workspace(
settings_info=settings_info,
backend_info=backend_info,
resource_state_uuid=resource_state_uuid,
initialization_modules=initialization_modules,
configuration=configuration,
)
def destroy_workspace(self) -> None:
Workspace.destroy_workspace(self)
local_workspace.terminate_singleton()
################
##### State
################
def set_state(self, value: Workspace_State) -> None:
self._state = value
def get_state(self) -> Workspace_State:
return self._state
@wrap_phase([Workspace_State.EXECUTING_FRONTEND])
def generate_current_state(self) -> List[ComponentModel]:
rv = []
for component in self.get_components():
self._current_component = component.name
rv.append(component.render())
return rv
@wrap_phase([Workspace_State.INITIALIZING])
def set_resource_state_uuid(self, resource_state_uuid: str) -> None:
"""Set the Resource State UUID to denote the Resource State that this Workspace will execute over.
Note that this function should only be called during the `Workspace Initialization` part of the Cdev lifecycle.
Arguments:
resource_state_uuid (str): Resource State UUID from the Backend
"""
self._resource_state_uuid = resource_state_uuid
@wrap_phase(
[
Workspace_State.INITIALIZED,
Workspace_State.EXECUTING_FRONTEND,
Workspace_State.EXECUTING_BACKEND,
]
)
def get_resource_state_uuid(self) -> str:
"""Get the Resource State UUID that this Workspace is executing over.
Note that this function should only be called during the `Workspace Initialized` part of the Cdev lifecycle.
Returns:
resource_state_uuid (str): Resource State UUID from the Backend
"""
return self._resource_state_uuid
#######################
##### Display Output
#######################
@wrap_phase(
[
Workspace_State.INITIALIZED,
Workspace_State.EXECUTING_FRONTEND,
Workspace_State.EXECUTING_BACKEND,
]
)
def display_output(self, tag: str, output: Cloud_Output) -> None:
self._OUTPUT.append((tag, self._current_component, output.render()))
@wrap_phase([Workspace_State.INITIALIZED, Workspace_State.EXECUTING_BACKEND])
def render_outputs(self) -> List[Tuple[str, Any]]:
if not self._current_component:
raise Exception
rv = []
backend = self.get_backend()
for tag, component, cloud_output in self._OUTPUT:
resolved_value = backend.get_cloud_output_value_by_name(
self.get_resource_state_uuid(),
component,
cloud_output.ruuid,
cloud_output.name,
cloud_output.key,
)
if isinstance(cloud_output, cloud_output_dynamic_model):
# If the cloud output object contains computations than evaluate the computations over the cloud output
resolved_value = evaluate_dynamic_output(resolved_value, cloud_output)
rv.append((tag, resolved_value))
return rv
def clear_output(self) -> None:
self._OUTPUT = []
#################
##### Mappers
#################
@wrap_phase([Workspace_State.INITIALIZING])
def add_mapper(self, mapper: CloudMapper) -> None:
if not isinstance(mapper, CloudMapper):
# TODO Throw error
print(f"BAD CLOUD MAPPER {mapper}")
return
self._MAPPERS.append(mapper)
@wrap_phase([Workspace_State.INITIALIZING])
def add_mappers(self, mappers: List[CloudMapper]) -> None:
for mapper in mappers:
self.add_mapper(mapper)
#ANIBAL This mapper may not exist here!
self._MAPPERS.append(mapper)
@wrap_phase([Workspace_State.INITIALIZED, Workspace_State.EXECUTING_BACKEND])
def get_mappers(self) -> List[CloudMapper]:
return self._MAPPERS
@wrap_phase([Workspace_State.INITIALIZED, Workspace_State.EXECUTING_BACKEND])
def get_mapper_namespace(self) -> Dict:
mappers: List[CloudMapper] = self.get_mappers()
rv = {}
for mapper in mappers:
for namespace in mapper.get_namespaces():
rv[namespace] = mapper
return rv
#################
##### Commands
#################
@wrap_phase([Workspace_State.INITIALIZING])
def add_command(self, command_location: str) -> None:
self._COMMANDS.append(command_location)
@wrap_phase([Workspace_State.INITIALIZING])
def add_commands(self, command_locations: List[str]) -> None:
for command_location in command_locations:
self.add_command(command_location)
@wrap_phase([Workspace_State.INITIALIZED])
def get_commands(self) -> List[str]:
return self._COMMANDS
#################
##### Components
#################
@wrap_phase([Workspace_State.INITIALIZING])
def add_component(self, component: Component) -> None:
self._COMPONENTS.append(component)
@wrap_phase([Workspace_State.INITIALIZING])
def add_components(self, components: List[Component]) -> None:
for component in components:
self.add_component(component)
@wrap_phase([Workspace_State.INITIALIZED, Workspace_State.EXECUTING_FRONTEND])
def get_components(self) -> List[Component]:
return self._COMPONENTS
#################
##### Backend
#################
@wrap_phase([Workspace_State.INITIALIZING])
def set_backend(self, backend: Backend) -> None:
if not isinstance(backend, Backend):
raise Exception("Not a backend object")
self._backend = backend
@wrap_phase(
[
Workspace_State.INITIALIZED,
Workspace_State.EXECUTING_FRONTEND,
Workspace_State.EXECUTING_BACKEND,
]
)
def get_backend(self) -> Backend:
return self._backend
class local_workspace_manager(WorkspaceManager):
def __init__(
self,
base_dir: DirectoryPath,
workspace_dir: str = None,
workspace_filename: str = None,
) -> None:
self.base_dir = base_dir
self.workspace_dir = workspace_dir if workspace_dir else ROOT_FOLDER_NAME
self.workspace_filename = (
workspace_filename if workspace_filename else WORKSPACE_FILE_NAME
)
def create_new_workspace(self, workspace_info: Workspace_Info) -> None:
base_cdev_dir = os.path.join(self.base_dir, self.workspace_dir)
if not os.path.isdir(base_cdev_dir):
os.mkdir(base_cdev_dir)
file_manager.safe_json_write(
workspace_info.dict(), os.path.join(base_cdev_dir, self.workspace_filename)
)
def check_if_workspace_exists(self) -> bool:
return os.path.isfile(
os.path.join(self.base_dir, self.workspace_dir, self.workspace_filename)
)
def load_workspace_configuration(self) -> Workspace_Info:
file_location = os.path.join(
self.base_dir, self.workspace_dir, self.workspace_filename
)
with open(file_location, "r") as fh:
raw_data = json.load(fh)
try:
rv = Workspace_Info(**raw_data)
return rv
except Exception as e:
print(e)
raise e
Classes
class local_workspace
-
A singleton that implements a workspace that is on the local filesystem. The singleton can be accessed during different parts of the lifecycle of a cdev core command execution. When this singleton is created, it is registered as the global workspace for that execution.
Expand source code
class local_workspace(Workspace): """ A singleton that implements a workspace that is on the local filesystem. The singleton can be accessed during different parts of the lifecycle of a cdev core command execution. When this singleton is created, it is registered as the global workspace for that execution. """ _instance = None _COMMANDS = DEFAULT_COMMAND_LOCATIONS _MAPPERS = [] _COMPONENTS = [] _OUTPUT: List[Tuple[str, str, cloud_output_model]] = [] _resource_state_uuid: str = None _backend: Backend = None _state: Workspace_State = Workspace_State.UNINITIALIZED _current_component: str = None def __new__(cls): if cls._instance is None: cls._instance = super(Workspace, cls).__new__(cls) # Put any initialization here. cls._instance.set_state(Workspace_State.UNINITIALIZED) # Load the backend cls._instance._backend = None cls._instance._resource_state_uuid = None cls._instance._COMMANDS = DEFAULT_COMMAND_LOCATIONS cls._instance._MAPPERS = [] cls._instance._COMPONENTS = [] cls._instance._OUTPUT = [] Workspace.set_global_instance(cls._instance) return cls._instance @classmethod def terminate_singleton(cls) -> None: cls._instance = None def initialize_workspace( self, settings_info: Settings_Info, backend_info: Backend_Configuration, resource_state_uuid: str, initialization_modules: Optional[List[str]] = [], configuration: Dict = {}, ) -> None: super().initialize_workspace( settings_info=settings_info, backend_info=backend_info, resource_state_uuid=resource_state_uuid, initialization_modules=initialization_modules, configuration=configuration, ) def destroy_workspace(self) -> None: Workspace.destroy_workspace(self) local_workspace.terminate_singleton() ################ ##### State ################ def set_state(self, value: Workspace_State) -> None: self._state = value def get_state(self) -> Workspace_State: return self._state @wrap_phase([Workspace_State.EXECUTING_FRONTEND]) def generate_current_state(self) -> List[ComponentModel]: rv = [] for component in self.get_components(): self._current_component = component.name rv.append(component.render()) return rv @wrap_phase([Workspace_State.INITIALIZING]) def set_resource_state_uuid(self, resource_state_uuid: str) -> None: """Set the Resource State UUID to denote the Resource State that this Workspace will execute over. Note that this function should only be called during the `Workspace Initialization` part of the Cdev lifecycle. Arguments: resource_state_uuid (str): Resource State UUID from the Backend """ self._resource_state_uuid = resource_state_uuid @wrap_phase( [ Workspace_State.INITIALIZED, Workspace_State.EXECUTING_FRONTEND, Workspace_State.EXECUTING_BACKEND, ] ) def get_resource_state_uuid(self) -> str: """Get the Resource State UUID that this Workspace is executing over. Note that this function should only be called during the `Workspace Initialized` part of the Cdev lifecycle. Returns: resource_state_uuid (str): Resource State UUID from the Backend """ return self._resource_state_uuid ####################### ##### Display Output ####################### @wrap_phase( [ Workspace_State.INITIALIZED, Workspace_State.EXECUTING_FRONTEND, Workspace_State.EXECUTING_BACKEND, ] ) def display_output(self, tag: str, output: Cloud_Output) -> None: self._OUTPUT.append((tag, self._current_component, output.render())) @wrap_phase([Workspace_State.INITIALIZED, Workspace_State.EXECUTING_BACKEND]) def render_outputs(self) -> List[Tuple[str, Any]]: if not self._current_component: raise Exception rv = [] backend = self.get_backend() for tag, component, cloud_output in self._OUTPUT: resolved_value = backend.get_cloud_output_value_by_name( self.get_resource_state_uuid(), component, cloud_output.ruuid, cloud_output.name, cloud_output.key, ) if isinstance(cloud_output, cloud_output_dynamic_model): # If the cloud output object contains computations than evaluate the computations over the cloud output resolved_value = evaluate_dynamic_output(resolved_value, cloud_output) rv.append((tag, resolved_value)) return rv def clear_output(self) -> None: self._OUTPUT = [] ################# ##### Mappers ################# @wrap_phase([Workspace_State.INITIALIZING]) def add_mapper(self, mapper: CloudMapper) -> None: if not isinstance(mapper, CloudMapper): # TODO Throw error print(f"BAD CLOUD MAPPER {mapper}") return self._MAPPERS.append(mapper) @wrap_phase([Workspace_State.INITIALIZING]) def add_mappers(self, mappers: List[CloudMapper]) -> None: for mapper in mappers: self.add_mapper(mapper) #ANIBAL This mapper may not exist here! self._MAPPERS.append(mapper) @wrap_phase([Workspace_State.INITIALIZED, Workspace_State.EXECUTING_BACKEND]) def get_mappers(self) -> List[CloudMapper]: return self._MAPPERS @wrap_phase([Workspace_State.INITIALIZED, Workspace_State.EXECUTING_BACKEND]) def get_mapper_namespace(self) -> Dict: mappers: List[CloudMapper] = self.get_mappers() rv = {} for mapper in mappers: for namespace in mapper.get_namespaces(): rv[namespace] = mapper return rv ################# ##### Commands ################# @wrap_phase([Workspace_State.INITIALIZING]) def add_command(self, command_location: str) -> None: self._COMMANDS.append(command_location) @wrap_phase([Workspace_State.INITIALIZING]) def add_commands(self, command_locations: List[str]) -> None: for command_location in command_locations: self.add_command(command_location) @wrap_phase([Workspace_State.INITIALIZED]) def get_commands(self) -> List[str]: return self._COMMANDS ################# ##### Components ################# @wrap_phase([Workspace_State.INITIALIZING]) def add_component(self, component: Component) -> None: self._COMPONENTS.append(component) @wrap_phase([Workspace_State.INITIALIZING]) def add_components(self, components: List[Component]) -> None: for component in components: self.add_component(component) @wrap_phase([Workspace_State.INITIALIZED, Workspace_State.EXECUTING_FRONTEND]) def get_components(self) -> List[Component]: return self._COMPONENTS ################# ##### Backend ################# @wrap_phase([Workspace_State.INITIALIZING]) def set_backend(self, backend: Backend) -> None: if not isinstance(backend, Backend): raise Exception("Not a backend object") self._backend = backend @wrap_phase( [ Workspace_State.INITIALIZED, Workspace_State.EXECUTING_FRONTEND, Workspace_State.EXECUTING_BACKEND, ] ) def get_backend(self) -> Backend: return self._backend
Ancestors
Static methods
def terminate_singleton() ‑> None
-
Expand source code
@classmethod def terminate_singleton(cls) -> None: cls._instance = None
Methods
def clear_output(self) ‑> None
-
Expand source code
def clear_output(self) -> None: self._OUTPUT = []
def generate_current_state(workspace: Workspace, *func_posargs, **func_kwargs)
-
Expand source code
def wrapper_func(workspace: "Workspace", *func_posargs, **func_kwargs): current_state = workspace.get_state() if current_state not in phases: raise Exception( f"Trying to call {func} while in workspace state {current_state} but need to be in {phases}" ) rv = func(workspace, *func_posargs, **func_kwargs) return rv
Inherited members
Workspace
:add_command
add_commands
add_component
add_components
add_mapper
add_mappers
destroy_workspace
display_output
get_backend
get_commands
get_components
get_mapper_namespace
get_mappers
get_resource_state_uuid
get_state
initialize_workspace
instance
remove_global_instance
render_outputs
set_backend
set_global_instance
set_resource_state_uuid
set_state
settings
class local_workspace_manager (base_dir: pydantic.types.DirectoryPath, workspace_dir: str = None, workspace_filename: str = None)
-
Expand source code
class local_workspace_manager(WorkspaceManager): def __init__( self, base_dir: DirectoryPath, workspace_dir: str = None, workspace_filename: str = None, ) -> None: self.base_dir = base_dir self.workspace_dir = workspace_dir if workspace_dir else ROOT_FOLDER_NAME self.workspace_filename = ( workspace_filename if workspace_filename else WORKSPACE_FILE_NAME ) def create_new_workspace(self, workspace_info: Workspace_Info) -> None: base_cdev_dir = os.path.join(self.base_dir, self.workspace_dir) if not os.path.isdir(base_cdev_dir): os.mkdir(base_cdev_dir) file_manager.safe_json_write( workspace_info.dict(), os.path.join(base_cdev_dir, self.workspace_filename) ) def check_if_workspace_exists(self) -> bool: return os.path.isfile( os.path.join(self.base_dir, self.workspace_dir, self.workspace_filename) ) def load_workspace_configuration(self) -> Workspace_Info: file_location = os.path.join( self.base_dir, self.workspace_dir, self.workspace_filename ) with open(file_location, "r") as fh: raw_data = json.load(fh) try: rv = Workspace_Info(**raw_data) return rv except Exception as e: print(e) raise e
Ancestors
Inherited members