Module core.default.resources.simple.topic
Set of constructs for making fan our message topics
Expand source code
"""Set of constructs for making fan our message topics
"""
from typing import Any, Dict
from core.constructs.resource import (
Resource,
TaggableResourceModel,
TaggableMixin,
update_hash,
ResourceOutputs,
PermissionsAvailableMixin,
)
from core.constructs.cloud_output import Cloud_Output_Str, OutputType
from core.constructs.types import cdev_str_model
from core.constructs.models import frozendict
from core.default.resources.simple.events import Event, event_model
from core.default.resources.simple.iam import Permission
from core.utils import hasher
RUUID = "cdev::simple::topic"
######################
##### Events
######################
class topic_event_model(event_model):
"""Model representing a Topic event"""
topic_arn: cdev_str_model
class TopicEvent(Event):
"""Construct representing the Events from the Topic"""
def __init__(self, topic_name: str) -> None:
self.cdev_topic_name = topic_name
self.topic_arn = Cloud_Output_Str(
name=topic_name, ruuid=RUUID, key="arn", type=OutputType.RESOURCE
)
def render(self) -> topic_event_model:
return topic_event_model(
originating_resource_name=self.cdev_topic_name,
originating_resource_type=RUUID,
hash=self.hash(),
topic_arn=self.topic_arn.render(),
)
def hash(self) -> str:
return hasher.hash_list(
[
self.cdev_topic_name,
]
)
#####################
###### Permission
######################
class TopicPermissions:
def __init__(self, resource_name) -> None:
self.SUBSCRIBE = Permission(
actions=[
"sns:GetTopicAttributes",
"sns:Subscribe",
],
cloud_id=Cloud_Output_Str(
resource_name, RUUID, "cloud_id", OutputType.RESOURCE
),
effect="Allow",
)
"""Permission to subscribe to the Topic"""
self.PUBLISH = Permission(
actions=["sns:GetTopicAttributes", "sns:Publish"],
cloud_id=Cloud_Output_Str(
resource_name, RUUID, "cloud_id", OutputType.RESOURCE
),
effect="Allow",
)
"""Permission to publish to the Topic"""
##############
##### Output
##############
class TopicOutput(ResourceOutputs):
"""Container object for the returned values from the cloud after a Topic has been deployed."""
def __init__(self, name: str) -> None:
super().__init__(name, RUUID)
@property
def topic_name(self) -> Cloud_Output_Str:
"""The name of the generated topic"""
return Cloud_Output_Str(
name=self._name, ruuid=RUUID, key="topic_name", type=self.OUTPUT_TYPE
)
@topic_name.setter
def topic_name(self, value: Any) -> None:
raise Exception
###############
##### Topic
###############
class simple_topic_model(TaggableResourceModel):
"""Model representing a Pub/Sub Topic"""
is_fifo: bool
"""Should the Topic guarantee ordering of messages"""
class Topic(PermissionsAvailableMixin, TaggableMixin, Resource):
"""Construct for creating a managed SNS Pub/Sub Topic"""
@update_hash
def __init__(
self,
cdev_name: str,
is_fifo: bool = False,
nonce: str = "",
tags: Dict[str, str] = None,
) -> None:
"""
Args:
cdev_name (str): Name of the resource.
is_fifo (bool, default=False): If True, the Queue will guarantee ordering of messages.
nonce (str): Nonce to make the resource hash unique if there are conflicting resources with same configuration.
tags (Dict[str, str]): A set of tags to add to the resource
"""
super().__init__(cdev_name, RUUID, nonce, tags=tags)
self._is_fifo = is_fifo
self.output = TopicOutput(cdev_name)
self.available_permissions: TopicPermissions = TopicPermissions(cdev_name)
self._event = None
@property
def is_fifo(self) -> bool:
"""Should the Pub/Sub Topic guarantee ordering of messages"""
return self._is_fifo
@is_fifo.setter
@update_hash
def is_fifo(self, value: bool) -> None:
self._is_fifo = value
def create_event_trigger(self) -> TopicEvent:
"""Create an Event for the Topic that other resources can listen to
Raises:
Exception: _description_
Exception: _description_
Returns:
QueueEvent
"""
if self._event:
raise Exception(
"Already created an event on this topic. Use `get_event()` to get the current event."
)
self._event = TopicEvent(
topic_name=self.name,
)
return self._event
def get_event(self) -> TopicEvent:
"""Get the Event for this Queue
Raises:
Exception: _description_
Returns:
QueueEvent
"""
if not self._event:
raise Exception(
"Topic Event has not been created. "
"Create a Topic Event for this topic using the `create_event_trigger` "
"function before calling this function."
)
return self._event
def compute_hash(self) -> None:
self._hash = hasher.hash_list(
[
self.is_fifo,
self.nonce,
self._get_tags_hash(),
]
)
def render(self) -> simple_topic_model:
return simple_topic_model(
name=self.name,
ruuid=self.ruuid,
hash=self.hash,
is_fifo=self.is_fifo,
tags=frozendict(self.tags)
)
Classes
class Topic (cdev_name: str, is_fifo: bool = False, nonce: str = '', tags: Dict[str, str] = None)
-
Construct for creating a managed SNS Pub/Sub Topic
Args
cdev_name
:str
- Name of the resource.
is_fifo
:bool
, default=False
- If True, the Queue will guarantee ordering of messages.
nonce
:str
- Nonce to make the resource hash unique if there are conflicting resources with same configuration.
tags
:Dict[str, str]
- A set of tags to add to the resource
Expand source code
class Topic(PermissionsAvailableMixin, TaggableMixin, Resource): """Construct for creating a managed SNS Pub/Sub Topic""" @update_hash def __init__( self, cdev_name: str, is_fifo: bool = False, nonce: str = "", tags: Dict[str, str] = None, ) -> None: """ Args: cdev_name (str): Name of the resource. is_fifo (bool, default=False): If True, the Queue will guarantee ordering of messages. nonce (str): Nonce to make the resource hash unique if there are conflicting resources with same configuration. tags (Dict[str, str]): A set of tags to add to the resource """ super().__init__(cdev_name, RUUID, nonce, tags=tags) self._is_fifo = is_fifo self.output = TopicOutput(cdev_name) self.available_permissions: TopicPermissions = TopicPermissions(cdev_name) self._event = None @property def is_fifo(self) -> bool: """Should the Pub/Sub Topic guarantee ordering of messages""" return self._is_fifo @is_fifo.setter @update_hash def is_fifo(self, value: bool) -> None: self._is_fifo = value def create_event_trigger(self) -> TopicEvent: """Create an Event for the Topic that other resources can listen to Raises: Exception: _description_ Exception: _description_ Returns: QueueEvent """ if self._event: raise Exception( "Already created an event on this topic. Use `get_event()` to get the current event." ) self._event = TopicEvent( topic_name=self.name, ) return self._event def get_event(self) -> TopicEvent: """Get the Event for this Queue Raises: Exception: _description_ Returns: QueueEvent """ if not self._event: raise Exception( "Topic Event has not been created. " "Create a Topic Event for this topic using the `create_event_trigger` " "function before calling this function." ) return self._event def compute_hash(self) -> None: self._hash = hasher.hash_list( [ self.is_fifo, self.nonce, self._get_tags_hash(), ] ) def render(self) -> simple_topic_model: return simple_topic_model( name=self.name, ruuid=self.ruuid, hash=self.hash, is_fifo=self.is_fifo, tags=frozendict(self.tags) )
Ancestors
Instance variables
var is_fifo : bool
-
Should the Pub/Sub Topic guarantee ordering of messages
Expand source code
@property def is_fifo(self) -> bool: """Should the Pub/Sub Topic guarantee ordering of messages""" return self._is_fifo
Methods
def compute_hash(self) ‑> None
-
Expand source code
def compute_hash(self) -> None: self._hash = hasher.hash_list( [ self.is_fifo, self.nonce, self._get_tags_hash(), ] )
def create_event_trigger(self) ‑> TopicEvent
-
Create an Event for the Topic that other resources can listen to
Raises
Exception
- description
Exception
- description
Returns
QueueEvent
Expand source code
def create_event_trigger(self) -> TopicEvent: """Create an Event for the Topic that other resources can listen to Raises: Exception: _description_ Exception: _description_ Returns: QueueEvent """ if self._event: raise Exception( "Already created an event on this topic. Use `get_event()` to get the current event." ) self._event = TopicEvent( topic_name=self.name, ) return self._event
def get_event(self) ‑> TopicEvent
-
Get the Event for this Queue
Raises
Exception
- description
Returns
QueueEvent
Expand source code
def get_event(self) -> TopicEvent: """Get the Event for this Queue Raises: Exception: _description_ Returns: QueueEvent """ if not self._event: raise Exception( "Topic Event has not been created. " "Create a Topic Event for this topic using the `create_event_trigger` " "function before calling this function." ) return self._event
def render(self) ‑> simple_topic_model
-
Expand source code
def render(self) -> simple_topic_model: return simple_topic_model( name=self.name, ruuid=self.ruuid, hash=self.hash, is_fifo=self.is_fifo, tags=frozendict(self.tags) )
Inherited members
class TopicEvent (topic_name: str)
-
Construct representing the Events from the Topic
Expand source code
class TopicEvent(Event): """Construct representing the Events from the Topic""" def __init__(self, topic_name: str) -> None: self.cdev_topic_name = topic_name self.topic_arn = Cloud_Output_Str( name=topic_name, ruuid=RUUID, key="arn", type=OutputType.RESOURCE ) def render(self) -> topic_event_model: return topic_event_model( originating_resource_name=self.cdev_topic_name, originating_resource_type=RUUID, hash=self.hash(), topic_arn=self.topic_arn.render(), ) def hash(self) -> str: return hasher.hash_list( [ self.cdev_topic_name, ] )
Ancestors
Methods
def hash(self) ‑> str
-
Expand source code
def hash(self) -> str: return hasher.hash_list( [ self.cdev_topic_name, ] )
def render(self) ‑> topic_event_model
-
Expand source code
def render(self) -> topic_event_model: return topic_event_model( originating_resource_name=self.cdev_topic_name, originating_resource_type=RUUID, hash=self.hash(), topic_arn=self.topic_arn.render(), )
class TopicOutput (name: str)
-
Container object for the returned values from the cloud after a Topic has been deployed.
Expand source code
class TopicOutput(ResourceOutputs): """Container object for the returned values from the cloud after a Topic has been deployed.""" def __init__(self, name: str) -> None: super().__init__(name, RUUID) @property def topic_name(self) -> Cloud_Output_Str: """The name of the generated topic""" return Cloud_Output_Str( name=self._name, ruuid=RUUID, key="topic_name", type=self.OUTPUT_TYPE ) @topic_name.setter def topic_name(self, value: Any) -> None: raise Exception
Ancestors
Instance variables
var topic_name : Cloud_Output_Str
-
The name of the generated topic
Expand source code
@property def topic_name(self) -> Cloud_Output_Str: """The name of the generated topic""" return Cloud_Output_Str( name=self._name, ruuid=RUUID, key="topic_name", type=self.OUTPUT_TYPE )
class TopicPermissions (resource_name)
-
Expand source code
class TopicPermissions: def __init__(self, resource_name) -> None: self.SUBSCRIBE = Permission( actions=[ "sns:GetTopicAttributes", "sns:Subscribe", ], cloud_id=Cloud_Output_Str( resource_name, RUUID, "cloud_id", OutputType.RESOURCE ), effect="Allow", ) """Permission to subscribe to the Topic""" self.PUBLISH = Permission( actions=["sns:GetTopicAttributes", "sns:Publish"], cloud_id=Cloud_Output_Str( resource_name, RUUID, "cloud_id", OutputType.RESOURCE ), effect="Allow", ) """Permission to publish to the Topic"""
Instance variables
var PUBLISH
-
Permission to publish to the Topic
var SUBSCRIBE
-
Permission to subscribe to the Topic
class simple_topic_model (**data: Any)
-
Model representing a Pub/Sub Topic
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class simple_topic_model(TaggableResourceModel): """Model representing a Pub/Sub Topic""" is_fifo: bool """Should the Topic guarantee ordering of messages"""
Ancestors
- TaggableResourceModel
- ResourceModel
- ImmutableModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var is_fifo : bool
-
Should the Topic guarantee ordering of messages
Inherited members
class topic_event_model (**data: Any)
-
Model representing a Topic event
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class topic_event_model(event_model): """Model representing a Topic event""" topic_arn: cdev_str_model
Ancestors
- event_model
- ImmutableModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var topic_arn : ~cdev_str_model