Skip to content

Distributed Event-Driven Video Processing Pipeline is a scalable system using FastAPI, RabbitMQ, and React to handle real-time video uploads, enhancement, and metadata extraction. It features microservices architecture with WebSocket-based client updates and asynchronous task processing.

Notifications You must be signed in to change notification settings

Vishnuprasadvbhat/Distributed-Event-Driven-Video-Processing-Pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

13 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Video Processing Pipeline

Project Overview

Event-driven architecture for parallel video processing with:

  • Video enhancement (brightness, contrast, resolution)
  • Metadata extraction (duration, dimensions, framerate)
  • Real-time progress updates

Project Workflow

UserReact/upload -------------+
                                   |
                            FastAPI ServerSave VideoPublish to RabbitMQ
                                   ↓
     ┌─────────────────────────────┴─────────────────────────────┐
     │                                                           │
Video Enhancement Worker                             Metadata Extraction Worker
     │                                                           │
Enhance & Save File                           Extract Metadata from File
     │                                                           │
→ POST enhancement statusPOST metadata status
       to FastAPI                                 to FastAPI
             ↓                                             ↓
       FastAPI checks if both tasks are donethen →
             → Sends WebSocket push to clientReact updates UI with:
       ✅ Enhanced Video | ✅ Metadata

Architecture

sequenceDiagram
    participant User
    participant Server
    participant RabbitMQ
    participant Enhancer
    participant Extractor
    
    User->>Server: Upload Video
    Server->>RabbitMQ: Publish Task
    RabbitMQ->>Enhancer: Process Video
    RabbitMQ->>Extractor: Extract Metadata
    Enhancer->>Server: Enhancement Done
    Extractor->>Server: Metadata Extracted
    Server->>User: Send Completion Notification
Loading

Tech Stack

  • FastAPI (Python web framework)
  • RabbitMQ (Message broker)
  • FFmpeg (Video processing)
  • WebSockets (Real-time updates)

Task Exchange

[FastAPI Server]    <--- Tasks & Updates --->     [RabbitMQ]
     |                                             /       \
[Client Uploads]                    [Metadata Worker]    [Enhancement Worker]

Installation

  1. Install requirements:
pip install -r requirements.txt
  1. Install RabbitMQ or Start a Docker container:
docker run -it --rm --name rabbitmq -p 5672:5672 rabbitmq

Running the System

  1. Start the server:
uvicorn server:app --reload
  1. Start the Queue in a terminal:
python queues/pika_publisher.py
  1. Start the workers in separate terminals:
python workers/video_enhancement.py
python workers/metadata_extractor.py

API Documentation

Endpoints

1. Video Upload

POST /upload

  • Content-Type: multipart/form-data
  • Parameters:
    • file: Video file to upload (required)

Request Example:

curl -X POST -F "file=@sample.mp4" http://localhost:8000/upload

Success Response:

{
  "message": "Upload Successful",
  "video": "sample.mp4",
  "video_path": "/videos/sample.mp4"
}

Error Responses:

  • 400: Invalid file type
  • 500: Server error during processing

2. Status Check

GET /status/{filename}

  • Parameters:
    • filename: Name of uploaded video (required)

Request Example:

curl http://localhost:8000/status/sample.mp4

Response Examples: Processing:

{
  "video": "sample.mp4",
  "status": {
    "enhanced": false,
    "metadata": null
  }
}

Completed:

{
  "video": "sample.mp4",
  "status": {
    "enhanced": true,
    "metadata": {
      "duration": 90.24,
      "width": 1920,
      "height": 1080
    }
  }
}

3. WebSocket Updates

WS /ws/{client_id}

  • Parameters:
    • client_id: Unique client identifier

Connection Example:

const socket = new WebSocket('ws://localhost:8000/ws/client123');
socket.onmessage = (event) => {
  console.log('Update:', JSON.parse(event.data));
};

Message Format:

{
  "video": "sample.mp4",
  "metadata": {
    "duration": 90.24,
    "width": 1920,
    "height": 1080
  },
  "enhanced_video_url": "/videos/sample_enhanced.mp4"
}

Workflow Code

Main processing flow:

# In server.py
@app.post("/upload/")
async def upload_video(file: UploadFile):
    video_path = save_upload(file)
    publish_task(video_path)  # Send to RabbitMQ
    return {"status": "processing"}

Video enhancement:

# In video_enhancement.py
ffmpeg.input(video_path)
    .filter("eq", brightness=0.2, contrast=1.5)
    .filter("scale", 1920, 1080)

Configuration Options

Environment variables:

  • RABBITMQ_HOST: RabbitMQ server host (default: localhost)
  • RABBITMQ_PORT: RabbitMQ server port (default: 5672)
  • UPLOAD_DIR: Video upload directory (default: ./videos)

Video processing parameters (in video_enhancement.py):

# Adjust these values as needed
FILTERS = {
    "brightness": 0.2,
    "contrast": 1.5,
    "fps": 60,
    "width": 1920,
    "height": 1080
}

Troubleshooting

Common issues:

  1. FFmpeg errors:

    • Ensure FFmpeg is installed and in PATH
    • Check file permissions on input/output directories
  2. RabbitMQ connection failures:

    • Verify RabbitMQ service is running
    • Check host/port configuration
  3. WebSocket disconnections:

    • Configure proper timeout values
    • Handle connection retries in client

Examples

Sample Input/Output:

  1. Original Video:

    • Resolution: 1280x720
    • Duration: 1:30
    • Framerate: 30fps
  2. Enhanced Video:

    • Resolution: 1920x1080
    • Duration: 1:30
    • Framerate: 60fps
    • Improved brightness/contrast

Metadata Output (JSON):

{
  "duration": 90.24,
  "width": 1920,
  "height": 1080,
  "frame_rate": 60
}

Development Guide

Debugging

Enabled debug logging:

# In any worker file
logging.basicConfig(level=logging.DEBUG)

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Submit a pull request

Further Enhancements

  1. Add File Size Limit:

    • Use Content-Length headers or monitor the file size during saving to enforce a size restriction.
  2. Integrate with FFmpeg or WriteGear:

    • Once the video is uploaded, you can use tools like FFmpeg (or VidGear’s WriteGear) to process the video (e.g., compression, format conversion).
  3. Implement Streaming:

    • Add a GET endpoint to serve the uploaded videos using streaming for efficient delivery.

References

License

MIT

About

Distributed Event-Driven Video Processing Pipeline is a scalable system using FastAPI, RabbitMQ, and React to handle real-time video uploads, enhancement, and metadata extraction. It features microservices architecture with WebSocket-based client updates and asynchronous task processing.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published