Skip to content

Commit

Permalink
feat: add lambda web adapter streaming api backend only example (#543)
Browse files Browse the repository at this point in the history
  • Loading branch information
jlonge4 authored Nov 24, 2024
1 parent f731aba commit be5a020
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ The Lambda Web Adapter also supports all non-HTTP event triggers, such as SQS, S
- [FastAPI with Background Tasks](examples/fastapi-background-tasks)
- [FastAPI with Response Streaming](examples/fastapi-response-streaming)
- [FastAPI with Response Streaming in Zip](examples/fastapi-response-streaming-zip)
- [FastAPI Response Streaming Backend with IAM Auth](examples/fastapi-backend-only-response-streaming/)
- [Flask](examples/flask)
- [Flask in Zip](examples/flask-zip)
- [Serverless Django](https://github.com/aws-hebrew-book/serverless-django) by [@efi-mk](https://github.com/efi-mk)
Expand Down
115 changes: 115 additions & 0 deletions examples/fastapi-backend-only-response-streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Serverless Streaming with Lambda Web Adapter and Bedrock

This example demonstrates how to set up a serverless streaming service using AWS Lambda, Lambda Web Adapter, and Amazon Bedrock. The service can be easily consumed by any frontend application through simple GET requests, without the need for websockets.

## Overview

This project showcases:
- Streaming responses from Amazon Bedrock (using Anthropic Claude v2 model)
- Using FastAPI with AWS Lambda
- Implementing Lambda Web Adapter for response streaming
- Creating a Function URL that supports response streaming

The setup allows any frontend to consume the streaming service via GET requests to the Function URL.

## How It Works

1. A FastAPI application is set up to handle requests and interact with Bedrock.
2. The application is packaged as a Docker image, including the Lambda Web Adapter.
3. AWS SAM is used to deploy the Lambda function with the necessary configurations.
4. A Function URL is created with response streaming enabled.
5. Frontends can send GET requests to this URL to receive streamed responses.

## Key Components

### Dockerfile

```dockerfile
FROM public.ecr.aws/docker/library/python:3.12.0-slim-bullseye
COPY --from=public.ecr.aws/awsguru/aws-lambda-adapter:0.8.4 /lambda-adapter /opt/extensions/lambda-adapter

WORKDIR /app
ADD . .
RUN pip install -r requirements.txt

CMD ["python", "main.py"]
```

Notice that we only need to add the second line to install Lambda Web Adapter.

```dockerfile
COPY --from=public.ecr.aws/awsguru/aws-lambda-adapter:0.8.4 /lambda-adapter /opt/extensions/
```

In the SAM template, we use an environment variable `AWS_LWA_INVOKE_MODE: RESPONSE_STREAM` to configure Lambda Web Adapter in response streaming mode. And adding a function url with `InvokeMode: RESPONSE_STREAM`.

```yaml
FastAPIFunction:
Type: AWS::Serverless::Function
Properties:
PackageType: Image
MemorySize: 512
Environment:
Variables:
AWS_LWA_INVOKE_MODE: RESPONSE_STREAM
FunctionUrlConfig:
AuthType: NONE
InvokeMode: RESPONSE_STREAM
Policies:
- Statement:
- Sid: BedrockInvokePolicy
Effect: Allow
Action:
- bedrock:InvokeModelWithResponseStream
Resource: '*'
```
## Build and deploy
Run the following commands to build and deploy this example.
```bash
sam build --use-container
sam deploy --guided
```


## Test the example

After the deployment completes, use the `FastAPIFunctionUrl` shown in the output messages to send get requests with your query to the /api/stream route.


```python
import requests
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
import boto3
import json
import time

session = boto3.Session()
credentials = session.get_credentials()
region = 'us-east-1'

payload = {"query": query}

request = AWSRequest(
method='GET',
url=f'{func_url}/api/stream',
data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)

SigV4Auth(credentials, "lambda", region).add_auth(request)
buffer = ""
response= requests.get(
request.url,
data=request.data,
headers=dict(request.headers),
stream=True
)

for chunk in response.iter_content(chunk_size=64):
print(chunk.decode('utf-8'), end='', flush=True)
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM public.ecr.aws/docker/library/python:3.12.0-slim-bullseye
COPY --from=public.ecr.aws/awsguru/aws-lambda-adapter:0.8.4 /lambda-adapter /opt/extensions/lambda-adapter

WORKDIR /app
ADD . .
RUN pip install -r requirements.txt

CMD ["python", "main.py"]
80 changes: 80 additions & 0 deletions examples/fastapi-backend-only-response-streaming/app/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import boto3
import json
import os
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import asyncio


BEDROCK_MODEL = os.environ.get(
"BEDROCK_MODEL", "anthropic.claude-3-haiku-20240307-v1:0"
)
SYSTEM = os.environ.get("SYSTEM", "You are a helpful assistant.")

app = FastAPI()
bedrock = boto3.Session().client("bedrock-runtime")


# Define the request model
class QueryRequest(BaseModel):
query: str


@app.get("/api/stream")
async def api_stream(request: QueryRequest):
if not request.query:
return None

return StreamingResponse(
bedrock_stream(request.query),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)


async def bedrock_stream(query: str):
instruction = f"""
You are a helpful assistant. Please provide an answer to the user's query
<query>{query}</query>.
"""
body = json.dumps(
{
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1024,
"system": SYSTEM,
"temperature": 0.1,
"top_k": 10,
"messages": [
{
"role": "user",
"content": instruction,
}
],
}
)

response = bedrock.invoke_model_with_response_stream(
modelId=BEDROCK_MODEL, body=body
)

stream = response.get("body")
if stream:
for event in stream:
chunk = event.get("chunk")
if chunk:
message = json.loads(chunk.get("bytes").decode())
if message["type"] == "content_block_delta":
yield message["delta"]["text"] or ""
await asyncio.sleep(0.01)
elif message["type"] == "message_stop":
yield "\n"
await asyncio.sleep(0.01)


if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", "8080")))
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
annotated-types==0.5.0
anyio==3.7.1
boto3==1.28.61
botocore==1.31.61
click==8.1.7
exceptiongroup==1.1.3
fastapi==0.109.2
h11==0.14.0
idna==3.7
jmespath==1.0.1
pydantic==2.4.2
pydantic_core==2.10.1
python-dateutil==2.8.2
s3transfer==0.7.0
six==1.16.0
sniffio==1.3.0
starlette==0.36.3
typing_extensions==4.8.0
urllib3==1.26.19
uvicorn==0.23.2

0 comments on commit be5a020

Please sign in to comment.