Event-driven architecture for parallel video processing with:
- Video enhancement (brightness, contrast, resolution)
- Metadata extraction (duration, dimensions, framerate)
- Real-time progress updates
User → React → /upload -------------+
|
FastAPI Server
↓ Save Video
↓ Publish to RabbitMQ
↓
┌─────────────────────────────┴─────────────────────────────┐
│ │
Video Enhancement Worker Metadata Extraction Worker
│ │
Enhance & Save File Extract Metadata from File
│ │
→ POST enhancement status → POST metadata status
to FastAPI to FastAPI
↓ ↓
FastAPI checks if both tasks are done — then →
→ Sends WebSocket push to client
↓
React updates UI with:
✅ Enhanced Video | ✅ Metadata
sequenceDiagram
participant User
participant Server
participant RabbitMQ
participant Enhancer
participant Extractor
User->>Server: Upload Video
Server->>RabbitMQ: Publish Task
RabbitMQ->>Enhancer: Process Video
RabbitMQ->>Extractor: Extract Metadata
Enhancer->>Server: Enhancement Done
Extractor->>Server: Metadata Extracted
Server->>User: Send Completion Notification
- FastAPI (Python web framework)
- RabbitMQ (Message broker)
- FFmpeg (Video processing)
- WebSockets (Real-time updates)
[FastAPI Server] <--- Tasks & Updates ---> [RabbitMQ]
| / \
[Client Uploads] [Metadata Worker] [Enhancement Worker]
- Install requirements:
pip install -r requirements.txt
- Install RabbitMQ or Start a Docker container:
docker run -it --rm --name rabbitmq -p 5672:5672 rabbitmq
- Start the server:
uvicorn server:app --reload
- Start the Queue in a terminal:
python queues/pika_publisher.py
- Start the workers in separate terminals:
python workers/video_enhancement.py
python workers/metadata_extractor.py
POST /upload
- Content-Type: multipart/form-data
- Parameters:
file
: Video file to upload (required)
Request Example:
curl -X POST -F "file=@sample.mp4" http://localhost:8000/upload
Success Response:
{
"message": "Upload Successful",
"video": "sample.mp4",
"video_path": "/videos/sample.mp4"
}
Error Responses:
- 400: Invalid file type
- 500: Server error during processing
GET /status/{filename}
- Parameters:
filename
: Name of uploaded video (required)
Request Example:
curl http://localhost:8000/status/sample.mp4
Response Examples: Processing:
{
"video": "sample.mp4",
"status": {
"enhanced": false,
"metadata": null
}
}
Completed:
{
"video": "sample.mp4",
"status": {
"enhanced": true,
"metadata": {
"duration": 90.24,
"width": 1920,
"height": 1080
}
}
}
WS /ws/{client_id}
- Parameters:
client_id
: Unique client identifier
Connection Example:
const socket = new WebSocket('ws://localhost:8000/ws/client123');
socket.onmessage = (event) => {
console.log('Update:', JSON.parse(event.data));
};
Message Format:
{
"video": "sample.mp4",
"metadata": {
"duration": 90.24,
"width": 1920,
"height": 1080
},
"enhanced_video_url": "/videos/sample_enhanced.mp4"
}
Main processing flow:
# In server.py
@app.post("/upload/")
async def upload_video(file: UploadFile):
video_path = save_upload(file)
publish_task(video_path) # Send to RabbitMQ
return {"status": "processing"}
Video enhancement:
# In video_enhancement.py
ffmpeg.input(video_path)
.filter("eq", brightness=0.2, contrast=1.5)
.filter("scale", 1920, 1080)
Environment variables:
RABBITMQ_HOST
: RabbitMQ server host (default: localhost)RABBITMQ_PORT
: RabbitMQ server port (default: 5672)UPLOAD_DIR
: Video upload directory (default: ./videos)
Video processing parameters (in video_enhancement.py):
# Adjust these values as needed
FILTERS = {
"brightness": 0.2,
"contrast": 1.5,
"fps": 60,
"width": 1920,
"height": 1080
}
Common issues:
-
FFmpeg errors:
- Ensure FFmpeg is installed and in PATH
- Check file permissions on input/output directories
-
RabbitMQ connection failures:
- Verify RabbitMQ service is running
- Check host/port configuration
-
WebSocket disconnections:
- Configure proper timeout values
- Handle connection retries in client
Sample Input/Output:
-
Original Video:
- Resolution: 1280x720
- Duration: 1:30
- Framerate: 30fps
-
Enhanced Video:
- Resolution: 1920x1080
- Duration: 1:30
- Framerate: 60fps
- Improved brightness/contrast
Metadata Output (JSON):
{
"duration": 90.24,
"width": 1920,
"height": 1080,
"frame_rate": 60
}
Enabled debug logging:
# In any worker file
logging.basicConfig(level=logging.DEBUG)
- Fork the repository
- Create a feature branch
- Submit a pull request
-
Add File Size Limit:
- Use
Content-Length
headers or monitor the file size during saving to enforce a size restriction.
- Use
-
Integrate with FFmpeg or WriteGear:
- Once the video is uploaded, you can use tools like
FFmpeg
(or VidGear’sWriteGear
) to process the video (e.g., compression, format conversion).
- Once the video is uploaded, you can use tools like
-
Implement Streaming:
- Add a GET endpoint to serve the uploaded videos using streaming for efficient delivery.
- Extracting Metadata from Audio/Video with Python
- RabbitMQ Exchange Types Explained
- Why Declare Exchanges in RabbitMQ
- FFmpeg Video Filters Documentation
- Pika Python Client Documentation
- RabbitMQ with Python Guide
- Pub/Sub with FastAPI and RabbitMQ
- RabbitMQ with FastAPI and WebSockets
MIT