API
Consumer(...)
Creates a new SQS consumer. Default parameters:
consumer = Consumer(
queue_url, # REQUIRED
region="eu-west-1",
sqs_client=None,
attribute_names=[],
message_attribute_names=[],
batch_size=1,
wait_time_seconds=1,
visibility_timeout_seconds=None,
polling_wait_time_ms=0
)
Attributes
Attribute |
Description |
Default |
Example(s) |
|---|---|---|---|
|
SQS Queue URL |
REQUIRED |
|
|
AWS region for internally creating an SQS client using |
|
|
|
Override this to pass your own SQS client. This takes precedence over |
|
|
|
List of attributes that need to be returned along with each message. |
|
- |
|
List of names of message attributes, i.e. metadata you have passed to each message while sending to the queue. |
|
|
|
Number of messages to return at once. (Maximum |
|
|
|
The duration (in seconds) for which the call waits for a message to arrive in the queue before returning. If a message is available, the call returns sooner than |
|
|
|
The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved. |
|
|
|
The duration (in ms) between two subsequent polls. |
|
|
consumer.start()
Starts polling the queue. This blocks the main thread.
consumer.stop()
Stops polling the queue. You can use this method only if you run the consumer in a separate thread.
handle_message(message)
Override this method to define logic for handling a single message. By default, this does nothing (i.e. pass). This is called only if batch_size=1.
from aws_sqs_consumer import Consumer, Message
class SimpleConsumer(Consumer):
def handle_message(self, message: Message):
print(f"Processing message: {message.Body}")
handle_processing_exception(message, exception)
Override this method to handle any exception processing the message, including message deletion. By default, stack trace is printed to the console. This is called only if batch_size=1.
See Handling exceptions.
handle_message_batch(messages)
Override this method to define logic for handling a message batch. By default, this does nothing (i.e. pass). This is called only if batch_size > 1.
handle_batch_processing_exception(messages, exception)
Override this method to handle any exception processing a message batch, including message batch deletion. By default, stack trace is printed to the console. This is called only if batch_size > 1.
from typing import List
from aws_sqs_consumer import Consumer, Message
class BatchConsumer(Consumer):
def handle_message_batch(self, messages: List[Message]):
print(f"Received {len(messages)} Messages")
raise Exception("Failed to process message batch")
def handle_batch_processing_exception(messages: List[Message], exception):
print(f"Exception occurred while processing message batch: {exception}")
Message
Message represents a single SQS message. It is defined as a Python dataclass with the following attributes:
MessageId(str) - A unique identifier for the message.ReceiptHandle(str) - An identifier associated with the act of receiving the message.MD5OfBody(str) - An MD5 digest of the non-URL-encoded message body string.Body(str) - The message’s contents.Attributes(Dict[str, str]) - A map of the attributes requested inattribute_namesparameter inConsumer.MD5OfMessageAttributes(str) - An MD5 digest of the non-URL-encoded message attribute string.MessageAttributes(Dict[str, MessageAttributeValue]) - Dictionary of user defined message attributes.
Example:
def handle_message(self, message: Message):
print("MessageID: ", message.MessageId)
print("ReceiptHandle: ", message.ReceiptHandle)
print("MD5OfBody: ", message.MD5OfBody)
print("Body: ", message.Body)
print("Attributes: ", message.Attributes)
MessageID: 29bab209-989d-41f3-85b4-c0e9f8d8b7a9
ReceiptHandle: AQEBU2VaFVLF6eXzFVLwPIFCqrZC0twP+qzfy2mi...==
MD5OfBody: c72b9698fa1927e1dd12d3cf26ed84b3
Body: test message
Attributes: {'ApproximateFirstReceiveTimestamp': '1640985674001'}
MessageAttributeValue
MessageAttributeValue represents a user-defined SQS message attribute value. It is defined as a Python dataclass with the following attributes:
StringValue(str) - attribute value, if it is astr.BinaryValue(bytes) - Binary type attributes can store any binary data, such as compressed data, encrypted data, or images.DataType(str) - SQS supportsString,Number, andBinary. For the Number data type, you must useStringValue.
Example:
def handle_message(self, message: Message):
msg_attributes = message.MessageAttributes
host = msg_attributes["host"].StringValue
age = msg_attributes["age"].StringValue
print("Message body=", message.Body)
print("Message attribute host=", host)
print("Message attribute age=", age)
Message body=test message
Message attribute host=host001.example.com
Message attribute age=20