forked from testcontainers/testcontainers-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample_basic.py
More file actions
72 lines (56 loc) · 2.28 KB
/
example_basic.py
File metadata and controls
72 lines (56 loc) · 2.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import json
import boto3
from testcontainers.localstack import LocalStackContainer
def basic_example():
with LocalStackContainer() as localstack:
# Get endpoint URL
endpoint_url = localstack.get_endpoint_url()
# Create S3 client
s3 = boto3.client(
"s3",
endpoint_url=endpoint_url,
aws_access_key_id="test",
aws_secret_access_key="test",
region_name="us-east-1",
)
# Create SQS client
sqs = boto3.client(
"sqs",
endpoint_url=endpoint_url,
aws_access_key_id="test",
aws_secret_access_key="test",
region_name="us-east-1",
)
# Create S3 bucket
bucket_name = "test-bucket"
s3.create_bucket(Bucket=bucket_name)
print(f"Created S3 bucket: {bucket_name}")
# Upload file to S3
test_data = {"message": "Hello from LocalStack!", "timestamp": "2024-01-01"}
s3.put_object(Bucket=bucket_name, Key="test.json", Body=json.dumps(test_data))
print("Uploaded test.json to S3")
# Create SQS queue
queue_name = "test-queue"
queue = sqs.create_queue(QueueName=queue_name)
queue_url = queue["QueueUrl"]
print(f"Created SQS queue: {queue_name}")
# Send message to SQS
message = {"message": "Test message", "number": 42}
sqs.send_message(QueueUrl=queue_url, MessageBody=json.dumps(message))
print("Sent message to SQS")
# Receive message from SQS
response = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=1)
if "Messages" in response:
received_message = json.loads(response["Messages"][0]["Body"])
print("\nReceived message from SQS:")
print(json.dumps(received_message, indent=2))
# Delete message
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"])
print("Deleted message from queue")
# List S3 objects
objects = s3.list_objects(Bucket=bucket_name)
print("\nS3 bucket contents:")
for obj in objects.get("Contents", []):
print(f"Key: {obj['Key']}, Size: {obj['Size']} bytes")
if __name__ == "__main__":
basic_example()