A lightweight, standalone Python framework for processing Azure ServiceBus messages with automatic status reporting via REST API.
- Simple Installation:
pip install servicebus-framework - ServiceBus Integration: Listen to Azure ServiceBus queues and topics
- REST API Reporting: Automatically report task status to any REST API
- Pluggable Task Handlers: Easy to extend with custom task processors
- Robust Error Handling: Built-in retry mechanisms and error reporting
- Container-Ready: Designed for Azure Container Jobs and Kubernetes
- Zero Dependencies: No framework coupling - works with any backend
-
Install the framework:
pip install servicebus-framework
-
Create a task handler:
from servicebus_framework import TaskHandler, TaskResult class MyTaskHandler(TaskHandler): def process(self, task_data: dict) -> TaskResult: # Your task processing logic here message = task_data.get('message') return TaskResult(success=True, result=f"Processed: {message}")
-
Configure and run:
from servicebus_framework import ServiceBusWorker # Configure the worker worker = ServiceBusWorker( connection_string="Endpoint=sb://...", queue_name="my-queue", task_handler=MyTaskHandler(), api_base_url="https://myapi.com/api", api_token="my-token" ) # Start processing await worker.start()
-
Run as a script:
servicebus-worker \ --connection-string "Endpoint=sb://..." \ --queue my-queue \ --handler my_module.MyTaskHandler \ --api-url https://myapi.com/api \ --api-token my-token
The framework expects ServiceBus messages in this format:
{
"task_id": "unique-task-identifier",
"task_type": "my_task_type",
"task_data": {
"key": "value",
"param": "data"
}
}The framework automatically reports task status to your REST API:
- Start:
POST /tasks/{task_id}/status/with{"status": "running"} - Success:
POST /tasks/{task_id}/status/with{"status": "success", "result": {...}} - Error:
POST /tasks/{task_id}/status/with{"status": "failed", "error": "..."}
All configuration can be done via environment variables or constructor parameters:
export SERVICEBUS_CONNECTION_STRING="Endpoint=sb://..."
export SERVICEBUS_QUEUE_NAME="my-queue"
export API_BASE_URL="https://myapi.com/api"
export API_TOKEN="my-token"The framework is designed to run in containers:
FROM python:3.11-slim
COPY . /app
WORKDIR /app
RUN pip install -r requirements.txt
CMD ["python", "-m", "servicebus_framework.worker"]The framework provides utilities to integrate with your existing Django application:
- Task Status API: Endpoints to receive task status updates
- Task Submission: Methods to submit tasks to ServiceBus from Django
- Follow-up Scheduling: Queue additional tasks after completion
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Submit a pull request
MIT License