Skip to content

Consume EventBridge Pipes events

Run one FastSQS app behind both a Lambda SQS event source mapping and an EventBridge Pipes SQS-source target, without changing your routes.

from fastsqs import FastSQS, SQSEvent

app = FastSQS()


class OrderCreated(SQSEvent):
    order_id: str
    amount: int


@app.route(OrderCreated)
async def handle_order(msg: OrderCreated):
    print("processing", msg.order_id, msg.amount)


# Lambda entry point (set as the function handler):
def handler(event, context):
    return app.handler(event, context)

app.handler accepts two event shapes for an SQS source. The event source mapping delivers the envelope {"Records": [...]}. A Pipes SQS-source target delivers a bare list of records. The handler routes both unchanged, so the function above works behind either trigger with no branching.

What a Pipes target receives

EventBridge Pipes passes the records to its target as a bare list, not wrapped in a Records key. The per-record fields match the SQS event source mapping format: each element carries messageId, body, eventSourceARN, and the rest. Routing, the discriminator, and queue-type detection from eventSourceARN behave the same as under an event source mapping.

Note

Return the result of app.handler from your function. When partial_batch_failure is enabled (the default), the handler returns a batchItemFailures response so SQS and Pipes redeliver only the failed records. See Report partial batch failures.

Report batch item failures from a Pipe

A Pipes target reports partial failure with the same ReportBatchItemFailures response shape as an event source mapping. Keep partial_batch_failure on so the handler emits it.

from fastsqs import FastSQS, SQSEvent

app = FastSQS(partial_batch_failure=True)  # default


class OrderCreated(SQSEvent):
    order_id: str
    amount: int


@app.route(OrderCreated)
async def handle_order(msg: OrderCreated):
    if msg.amount <= 0:
        raise ValueError("amount must be positive")  # this record is reported failed


def handler(event, context):
    return app.handler(event, context)  # returns {"batchItemFailures": [...]}

Warning

The Pipe (or the event source mapping) must enable ReportBatchItemFailures for the returned batchItemFailures list to take effect. Without it, the source treats any failure as a whole-batch failure and redelivers every record.

Malformed enrichment elements

A Pipe enrichment step can inject a non-dict element into the bare list, such as a JSON string, number, or null. The handler reports that element as its own batch-item failure and processes its siblings normally. One malformed item does not fail the batch.

When a record carries a present-but-empty or None messageId, the handler coalesces the identifier to the "UNKNOWN" sentinel in the batchItemFailures entry. The handler never emits an empty-string or null itemIdentifier, because SQS and EventBridge read an empty or null identifier as a whole-batch failure.

Multiplex SQS and non-SQS events

To serve both SQS traffic and non-SQS traffic (for example an API Gateway proxy event) from one Lambda, dispatch by shape with is_sqs_event. It returns True for both a bare list and a {"Records": [...]} envelope.

from fastsqs import is_sqs_event


def handler(event, context):
    if is_sqs_event(event):               # a bare list OR {"Records": [...]}
        return app.handler(event, context)
    return http_handler(event, context)   # e.g. an API Gateway proxy event

For more on shape-based dispatch, see Multiplex SQS and non-SQS events.