Testing utilities¶
Drive an app with synthetic events, no AWS required.
SQSTestClient ¶
Wraps a FastSQS app and dispatches synthetic SQS events to it.
send ¶
send(body: Body, *, message_id: str = 'test-1', group_id: Optional[str] = None, deduplication_id: Optional[str] = None, message_attributes: Optional[dict] = None, attributes: Optional[dict] = None, event_source_arn: Optional[str] = None, context: Any = None) -> Dict[str, Any]
Send a single message. Returns app.handler's result, i.e.
{"batchItemFailures": [...]}. body may be a dict (JSON-encoded)
or a raw str/bytes (passed through, e.g. to test malformed bodies).
send_batch ¶
Send a batch. Each item is a :class:RecordSpec (full control, incl.
per-record FIFO group_id) or a bare body for the simple case.
RecordSpec
dataclass
¶
One record in a :meth:SQSTestClient.send_batch call.
Use distinct group_id values to model multiple FIFO message groups in a
single batch.
make_record ¶
make_record(body: Body, *, message_id: str = 'test-1', group_id: Optional[str] = None, deduplication_id: Optional[str] = None, message_attributes: Optional[dict] = None, event_source_arn: Optional[str] = None, attributes: Optional[dict] = None) -> Dict[str, Any]
Build one synthetic SQS record.
Snake_case kwargs map to the camelCase SQS wire keys. When group_id is
given (and no explicit event_source_arn), a .fifo ARN is set so a
QueueType.AUTO app infers FIFO.
make_event ¶
Wrap records in an SQS event envelope.