Message Queues

Message queue that allows for events to be transmitted between different software components.


Basic Queue

    
from cdev.aws.sqs import Queue myQueue = Queue('demo')


Integrating with a Serverless Function

You can use a Serverless Function to respond to the events from a queue.

    
from cdev.aws.sqs import Queue from cdev.aws.lambda_function import ServerlessFunction myQueue = Queue('demo') queue_event_trigger = myQueue.create_event_trigger() @ServerlessFunction("demofunction", events=[queue_event_trigger], permissions=[myQueue.available_permissions.READ_EVENTS]) def hello_world(event,context): print(event)

Message Ordering

By default, message order is not guaranteed, but you can designate a queue as First in First Out (fifo). This feature impacts performance and functionality of the queue, therefor you should be aware of the trade-offs.

    
from cdev.aws.sqs import Queue myQueue = Queue('demo', is_fifo=True)

Handling Errors

Sometimes there can be erroneous messages passed into the Queue that are not able to be properly handled. The message retry policy for a queue can be configured with the max_receive_count parameter, which determines the number of time a message should be retried. You can also provide a Dead Letter Queue (dql), which is a seperate queue to move messages into once they have exceeded the max_receive_count.

    
from cdev.aws.sqs import Queue myQueue = Queue('demo', dlq_arn="<aws_queue_arn>", max_receive_count=10)

By default, SQS sends batches of events to the handling function, and if the function throws an exception during the handling of a batch, the queue will mark the entire batch as failed. You can configure your queue to handle individual message failures by setting the batch_failure parameter on the event and return a structured response from your function when a message fails.

    
from cdev.aws.sqs import Queue from cdev.aws.lambda_function import ServerlessFunction myQueue = Queue('demo') queue_event_trigger = myQueue.create_event_trigger( batch_size=5, batch_failure=False ) @ServerlessFunction("demofunction", events=[queue_event_trigger], permissions=[myQueue.available_permissions.READ_EVENTS]) def hello_world(event,context): failed_items = [] for record in event.get('records'): try: process_message(record) except Exception: failed_items.append(record.get('message_id')) if failed_items: return {"batchItemFailures": [{"itemIdentifier": x} for x in failed_items]}