š Building a Production-Ready Video-to-Audio Converter with Python Microservices and Kubernetes

As Domain Lead for Cloud & DevOps at Techno India University, I bridge academic theory with industry standards, driving initiatives that turn classroom concepts into scalable solutions. Previously, as a Full Stack Developer at The Entrepreneurship Network, I delivered robust web solutions, showcasing my Kubernetes and DevOps expertise to meet and exceed business objectives.
Introduction
In today's cloud-native world, building scalable and maintainable applications requires a deep understanding of microservices architecture, containerization, and orchestration. This comprehensive guide will walk you through building a production-ready video-to-audio converter using Python microservices, deployed and managed with Kubernetes.
What You'll Learn
How to design and implement a microservices architecture
Asynchronous processing with message queues
Kubernetes deployment and orchestration
Database management in a distributed system
Security best practices for microservices
Production deployment strategies
Prerequisites
Basic understanding of Python and Flask
Familiarity with Docker and containerization
Basic knowledge of Kubernetes concepts
Understanding of databases (MySQL, MongoDB)
Command line proficiency
System Architecture Overview
Our video-to-audio converter consists of four main microservices that work together to provide a complete solution:
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā Client Application ā
āāāāāāāāāāāāāāāāāāāāāāāāāāā¬āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā
ā HTTP Requests
ā
āāāāāāāāāāāāāāāāāāāāāāāāāāā¼āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā Gateway Service ā
ā (Port 8000) ā
ā ā
ā ⢠File Upload/Download ā
ā ⢠Request Routing ā
ā ⢠Authentication Validation ā
āāāāāāāāāāāāāāā¬āāāāāāāāāāāāāāāāāāāāāāāāāāāāāā¬āāāāāāāāāāāāāāāāāāāāāā
ā ā
ā JWT Validation ā Message Publishing
ā ā
āāāāāāāāāāāāāāā¼āāāāāāāāāāāāāāāāāā āā¼āāāāāāāāāāāāāāāāāāāāāā
ā Auth Service ā ā RabbitMQ ā
ā (Port 5000) ā ā Message Broker ā
ā ā ā ā
ā ⢠User Authentication ā ā ⢠video queue ā
ā ⢠JWT Token Generation ā ā ⢠mp3 queue ā
ā ⢠User Management ā ā ⢠Dead letter queue ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā āā¬āāāāāāāāāāāāāāāāāāāāāā
ā
ā Message Consumption
ā
āāāāāāāāāāāāāāāāāāāā¼āāāāāāāāāāāāāāāāāāā
ā Converter Service ā
ā ā
ā ⢠Video Processing ā
ā ⢠FFmpeg Integration ā
ā ⢠File Format Conversion ā
āāāāāāāāāāāāāāāā¬āāāāāāāāāāāāāāāāāāāāāāā
ā
ā Completion Notification
ā
āāāāāāāāāāāāāāāā¼āāāāāāāāāāāāāāāāāāāāāāā
ā Notification Service ā
ā ā
ā ⢠Email Notifications ā
ā ⢠Status Updates ā
ā ⢠User Communication ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
Data Flow Architecture
āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā
ā MySQL ā ā MongoDB ā ā MongoDB ā ā Email ā
ā DB ā ā Videos ā ā MP3 ā ā SMTP ā
ā ā ā (GridFS)ā ā (GridFS)ā ā Server ā
āāāāāā²āāāāā āāāāāā²āāāāā āāāāāā²āāāāā āāāāāā²āāāāā
ā ā ā ā
ā ā ā ā
āāāāāā¼āāāāā āāāāāā¼āāāāā āāāāāā¼āāāāā āāāāāā¼āāāāā
ā Auth ā āGateway ā āConverterā āNotificationā
āService ā āService ā āService ā āService ā
āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā
Key Design Principles
Single Responsibility: Each service has one specific purpose
Loose Coupling: Services communicate through well-defined APIs
High Cohesion: Related functionality is grouped together
Fault Tolerance: Services can handle failures gracefully
Scalability: Individual services can be scaled independently
Understanding Microservices Architecture
What Are Microservices?
Microservices architecture is a method of developing software applications as a suite of independently deployable, small, modular services. Each service runs in its own process and communicates via well-defined, lightweight mechanisms.
Benefits of Our Microservices Approach
Independent Deployment: Update services without affecting others
Technology Diversity: Each service can use different technologies
Fault Isolation: Failure in one service doesn't bring down the entire system
Team Autonomy: Different teams can own different services
Scalability: Scale services based on demand
Communication Patterns
Our system uses two main communication patterns:
1. Synchronous Communication (HTTP/REST)
Client to Gateway Service
Gateway to Auth Service
Used for immediate responses
2. Asynchronous Communication (Message Queues)
Gateway to Converter Service (via RabbitMQ)
Converter to Notification Service (via RabbitMQ)
Used for long-running operations
Deep Dive into Each Service
1. Gateway Service
The Gateway Service acts as the entry point for all client requests, implementing the API Gateway pattern.
Core Responsibilities
# Key functionality areas
āāā Authentication Validation
āāā File Upload Management
āāā Request Routing
āāā Response Aggregation
āāā Error Handling
Key Components
File Upload Handler
@app.route('/upload', methods=['POST'])
def upload():
# Validate JWT token
token, err = validate.token(request)
# Check admin privileges
if access_data["is_admin"]:
# Process file upload
# Store in MongoDB GridFS
# Publish message to RabbitMQ
return "File uploaded successfully", 200
MongoDB GridFS Integration
Stores large video files efficiently
Handles file chunking automatically
Provides metadata storage
RabbitMQ Publishing
Publishes video processing jobs
Ensures message durability
Handles connection failures gracefully
Technology Stack
Framework: Flask (Python web framework)
Database: MongoDB with GridFS for file storage
Message Queue: RabbitMQ for asynchronous processing
Authentication: JWT token validation
2. Authentication Service
The Auth Service manages user authentication and authorization using JWT tokens.
Core Responsibilities
# Authentication workflow
āāā User Login Validation
āāā JWT Token Generation
āāā Token Validation
āāā User Session Management
āāā Admin Permission Control
JWT Implementation
Token Generation
def create_token(username: str, secret: str, is_admin: bool) -> str:
return jwt.encode({
'user_email': username,
'is_admin': is_admin,
'exp': datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=1),
'iat': datetime.datetime.now(datetime.timezone.utc)
}, secret, algorithm='HS256')
Token Validation
def validate_jwt(token: str, secret: str) -> dict | None:
try:
decoded = jwt.decode(token, secret, algorithms=['HS256'])
return decoded
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
Database Schema
-- Users table structure
CREATE TABLE users (
id INT AUTO_INCREMENT PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
password VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Technology Stack
Framework: Flask with Flask-MySQL
Database: MySQL for user data
Authentication: JWT for stateless authentication
Security: bcrypt for password hashing (recommended)
3. Converter Service
The Converter Service handles the core business logic of converting video files to MP3 audio.
Core Responsibilities
# Conversion workflow
āāā Message Queue Consumption
āāā Video File Retrieval
āāā Audio Extraction
āāā Format Conversion
āāā Result Storage
Processing Pipeline
āāāāāāāāāāāāāāā āāāāāāāāāāāāāāā āāāāāāāāāāāāāāā
ā Receive ā ā Download ā ā Convert ā
ā Message āāāāā¶ā Video āāāāā¶ā to Audio ā
ā ā ā ā ā ā
āāāāāāāāāāāāāāā āāāāāāāāāāāāāāā āāāāāāāāāāāāāāā
ā
āāāāāāāāāāāāāāā āāāāāāāāāāāāāāā ā
ā Publish ā ā Store āāāāāāāāāāāāā
āNotification āāāāāā MP3 ā
ā ā ā ā
āāāāāāāāāāāāāāā āāāāāāāāāāāāāāā
Message Processing
Consumer Implementation
def callback(ch, method, _properties, body):
err = to_mp3.start(body, fs_videos, fs_mp3, ch)
if err:
ch.basic_nack(delivery_tag=method.delivery_tag)
else:
ch.basic_ack(delivery_tag=method.delivery_tag)
Video Conversion Process
Message Reception: Receives job from RabbitMQ
File Retrieval: Downloads video from MongoDB GridFS
Audio Extraction: Uses FFmpeg/MoviePy for conversion
Quality Control: Validates output file
Storage: Saves MP3 to MongoDB GridFS
Notification: Publishes completion message
Technology Stack
Framework: Pure Python with Pika for RabbitMQ
Processing: FFmpeg and MoviePy for video conversion
Database: MongoDB GridFS for file storage
Message Queue: RabbitMQ for job processing
4. Notification Service
The Notification Service handles user communication and status updates.
Core Responsibilities
# Notification workflow
āāā Message Queue Monitoring
āāā Email Template Processing
āāā SMTP Communication
āāā Delivery Status Tracking
āāā Error Handling and Retry Logic
Email Notification System
Message Processing with Retry Logic
def callback(ch, method, properties, body):
try:
message_data = json.loads(body)
# Validate message structure
if 'mp3_fid' not in message_data:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
return
# Send notification
err = email.notify(body)
if err:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
else:
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
Notification Types
Conversion Started: Job received and processing began
Conversion Complete: File ready for download
Conversion Failed: Error occurred during processing
System Alerts: Service health notifications
Technology Stack
Framework: Pure Python with Pika
Email: SMTP with Gmail integration
Message Queue: RabbitMQ for notification jobs
Templating: HTML email templates
Message Queue Communication with RabbitMQ
Why RabbitMQ?
RabbitMQ provides reliable, scalable message queuing that enables asynchronous communication between our services.
Queue Architecture
āāāāāāāāāāāāāāā āāāāāāāāāāāāāāā āāāāāāāāāāāāāāā
ā Gateway ā ā RabbitMQ ā ā Converter ā
ā Service āāāāā¶ā Broker āāāāā¶ā Service ā
ā ā ā ā ā ā
āāāāāāāāāāāāāāā ā āāāāāāāāā ā āāāāāāāāāāāāāāā
ā ā video ā ā
ā ā queue ā ā
ā āāāāāāāāā ā
ā ā āāāāāāāāāāāāāāā
ā āāāāāāāāā ā āNotification ā
ā ā mp3 ā āāāāā¶ā Service ā
ā ā queue ā ā ā ā
ā āāāāāāāāā ā āāāāāāāāāāāāāāā
āāāāāāāāāāāāāāā
Message Flow Patterns
1. Video Processing Queue
Publisher (Gateway Service)
# Publishing video processing job
message = {
"video_fid": str(video_fid),
"mp3_fid": str(mp3_fid),
"username": access_data["user_email"]
}
channel.basic_publish(
exchange="",
routing_key="video",
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2) # Make message persistent
)
Consumer (Converter Service)
# Consuming video processing jobs
channel.basic_consume(
queue="video",
on_message_callback=callback,
)
2. Notification Queue
Publisher (Converter Service)
# Publishing notification job
notification = {
"mp3_fid": mp3_fid,
"user_email": username,
"status": "completed"
}
channel.basic_publish(
exchange="",
routing_key="mp3",
body=json.dumps(notification)
)
Message Durability and Reliability
Queue Declaration
# Ensure queue persistence
channel.queue_declare(queue='video', durable=True)
channel.queue_declare(queue='mp3', durable=True)
Message Acknowledgment
# Manual acknowledgment for reliability
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue='video',
on_message_callback=callback,
auto_ack=False # Manual acknowledgment
)
Error Handling and Dead Letter Queues
# Dead letter queue configuration
channel.queue_declare(
queue='video_dlq',
durable=True,
arguments={
'x-message-ttl': 300000, # 5 minutes
'x-dead-letter-exchange': 'dlx'
}
)
Kubernetes Orchestration
Why Kubernetes?
Kubernetes provides container orchestration, enabling:
Automated deployment and scaling
Service discovery and load balancing
Self-healing capabilities
Rolling updates with zero downtime
Deployment Architecture
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā Kubernetes Cluster ā
ā ā
ā āāāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāāā ā
ā ā Gateway ā ā Auth ā ā Converter ā ā
ā ā Deployment ā ā Deployment ā ā Deployment ā ā
ā ā (2 replicas)ā ā (2 replicas)ā ā (1 replica) ā ā
ā āāāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāāā ā
ā ā
ā āāāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāāā ā
ā ā Notification ā ā RabbitMQ ā ā
ā ā Deployment ā ā StatefulSet ā ā
ā ā (1 replica) ā ā (1 replica) ā ā
ā āāāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāāā ā
ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā Services ā ā
ā ā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā ā ā
ā ā āGateway ā ā Auth ā āRabbitMQ ā ā Internalā ā ā
ā ā āService ā āService ā āService ā āServices ā ā ā
ā ā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
Key Kubernetes Resources
1. Deployments
Gateway Service Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: gateway
labels:
app: gateway
spec:
replicas: 2
selector:
matchLabels:
app: gateway
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1
maxSurge: 3
template:
metadata:
labels:
app: gateway
spec:
containers:
- name: gateway
image: devpiush/python-microservice-gateway:latest
envFrom:
- configMapRef:
name: gateway-config
- secretRef:
name: gateway-secrets
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
2. Services
Service Discovery and Load Balancing
apiVersion: v1
kind: Service
metadata:
name: gateway
spec:
selector:
app: gateway
ports:
- port: 8000
targetPort: 8000
type: ClusterIP
3. ConfigMaps and Secrets
Configuration Management
apiVersion: v1
kind: ConfigMap
metadata:
name: gateway-config
data:
RABBITMQ_HOST: rabbitmq
AUTH_SVC_ADDR: auth:5000
---
apiVersion: v1
kind: Secret
metadata:
name: gateway-secrets
type: Opaque
data:
MONGO_URI: <base64-encoded-uri>
MONGO_MP3_URI: <base64-encoded-uri>
4. StatefulSets for RabbitMQ
Persistent Message Broker
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: rabbitmq
spec:
serviceName: rabbitmq
replicas: 1
selector:
matchLabels:
app: rabbitmq
template:
spec:
containers:
- name: rabbitmq
image: rabbitmq:3.9-management
volumeMounts:
- name: rabbitmq-data
mountPath: /var/lib/rabbitmq
volumeClaimTemplates:
- metadata:
name: rabbitmq-data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 1Gi
Scaling Strategies
Horizontal Pod Autoscaling
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: gateway-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: gateway
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
Database Design and Management
Database Architecture Overview
āāāāāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāāāāā
ā MySQL ā ā MongoDB ā
ā (Auth Data) ā ā (File Storage) ā
ā ā ā ā
ā āāāāāāāāāāāāā ā ā āāāāāāāāāāāāā ā
ā ā users ā ā ā ā gateway_dbā ā
ā ā table ā ā ā ā (videos) ā ā
ā āāāāāāāāāāāāā ā ā āāāāāāāāāāāāā ā
ā ā ā ā
ā ā ā āāāāāāāāāāāāā ā
ā ā ā ā mp3_db ā ā
ā ā ā ā (audio) ā ā
ā ā ā āāāāāāāāāāāāā ā
āāāāāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāāāāā
1. MySQL for Authentication
Why MySQL for Auth?
ACID compliance for user data
Strong consistency for authentication
Mature ecosystem and tooling
Excellent performance for read-heavy workloads
Schema Design
-- Users table
CREATE TABLE users (
id INT AUTO_INCREMENT PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
password VARCHAR(255) NOT NULL,
is_admin BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_email (email),
INDEX idx_created_at (created_at)
);
-- Sessions table (optional for enhanced security)
CREATE TABLE user_sessions (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
token_hash VARCHAR(255) NOT NULL,
expires_at TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
INDEX idx_token_hash (token_hash),
INDEX idx_expires_at (expires_at)
);
2. MongoDB for File Storage
Why MongoDB GridFS?
Handles large files (>16MB)
Automatic file chunking
Metadata storage capabilities
Horizontal scaling support
GridFS Structure
// GridFS automatically creates two collections:
// fs.files - File metadata
{
"_id": ObjectId("..."),
"filename": "video.mp4",
"length": 52428800,
"chunkSize": 261120,
"uploadDate": ISODate("..."),
"metadata": {
"user_email": "user@example.com",
"original_name": "my_video.mp4",
"content_type": "video/mp4"
}
}
// fs.chunks - File data chunks
{
"_id": ObjectId("..."),
"files_id": ObjectId("..."),
"n": 0,
"data": BinData(...)
}
Database Connection Management
Connection Pooling and Retry Logic
# MongoDB connection with retry logic
def connect_to_mongodb(uri, max_retries=3):
for attempt in range(max_retries):
try:
client = MongoClient(uri, serverSelectionTimeoutMS=5000)
# Test connection
client.admin.command('ping')
return client
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # Exponential backoff
3. Data Consistency and Backup
Backup Strategies
# MongoDB backup
mongodump --host host.minikube.internal:27017 \
--username piush \
--password password \
--out /backup/mongodb/
# MySQL backup
mysqldump --host host.minikube.internal \
--user piush \
--password=password \
auth_db > /backup/mysql/auth_db.sql
Data Migration Scripts
# Example migration script
def migrate_user_data():
"""Migrate user data with validation"""
# Connect to databases
mysql_conn = mysql.connection
mongo_client = MongoClient(MONGO_URI)
# Migration logic
cursor = mysql_conn.cursor()
cursor.execute("SELECT * FROM users")
for user in cursor.fetchall():
# Validate and transform data
migrated_user = transform_user_data(user)
# Insert into new system
mongo_client.users.insert_one(migrated_user)
Security Implementation
Multi-Layer Security Architecture
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā Security Layers ā
ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā Network Security ā ā
ā ā ⢠TLS/HTTPS Encryption ā ā
ā ā ⢠Network Policies ā ā
ā ā ⢠Firewall Rules ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā Application Security ā ā
ā ā ⢠JWT Authentication ā ā
ā ā ⢠Input Validation ā ā
ā ā ⢠Rate Limiting ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā Data Security ā ā
ā ā ⢠Password Hashing ā ā
ā ā ⢠Database Encryption ā ā
ā ā ⢠Secrets Management ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
1. Authentication and Authorization
JWT Token Implementation
Token Structure
{
"header": {
"typ": "JWT",
"alg": "HS256"
},
"payload": {
"user_email": "user@example.com",
"is_admin": true,
"exp": 1640995200,
"iat": 1640908800
},
"signature": "encrypted_signature"
}
Enhanced Token Validation
def validate_token_with_refresh(token, secret):
"""Enhanced token validation with refresh logic"""
try:
decoded = jwt.decode(token, secret, algorithms=['HS256'])
# Check if token expires soon (within 1 hour)
exp_time = datetime.fromtimestamp(decoded['exp'])
if exp_time - datetime.now() < timedelta(hours=1):
# Issue refresh token
return decoded, create_refresh_token(decoded)
return decoded, None
except jwt.ExpiredSignatureError:
# Check if refresh is possible
return None, "token_expired"
except jwt.InvalidTokenError:
return None, "invalid_token"
Role-Based Access Control (RBAC)
def check_permissions(user_data, required_permission):
"""Check user permissions for specific actions"""
user_roles = user_data.get('roles', [])
user_permissions = get_permissions_for_roles(user_roles)
return required_permission in user_permissions
# Decorator for permission checking
def require_permission(permission):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
token, err = validate.token(request)
if err:
return str(err[0]), err[1]
user_data = json.loads(token)
if not check_permissions(user_data, permission):
return "Insufficient permissions", 403
return f(*args, **kwargs)
return decorated_function
return decorator
# Usage
@app.route('/admin/users', methods=['GET'])
@require_permission('admin.users.read')
def list_users():
# Admin-only endpoint
pass
2. Input Validation and Sanitization
from marshmallow import Schema, fields, validate
class FileUploadSchema(Schema):
"""Validate file upload requests"""
file = fields.Raw(required=True)
filename = fields.Str(
required=True,
validate=validate.Length(min=1, max=255)
)
content_type = fields.Str(
required=True,
validate=validate.OneOf([
'video/mp4', 'video/avi', 'video/quicktime',
'video/x-msvideo', 'video/x-ms-wmv'
])
)
def validate_upload(request):
"""Validate file upload with comprehensive checks"""
schema = FileUploadSchema()
try:
# Basic validation
data = schema.load(request.form)
# File size validation
file = request.files.get('file')
if file.content_length > MAX_FILE_SIZE:
return None, "File too large"
# File type validation
if not is_valid_video_file(file):
return None, "Invalid file type"
# Virus scanning (in production)
if not scan_for_malware(file):
return None, "File failed security scan"
return data, None
except ValidationError as e:
return None, str(e)
3. Secrets Management
Kubernetes Secrets
apiVersion: v1
kind: Secret
metadata:
name: app-secrets
type: Opaque
data:
mysql-password: <base64-encoded-password>
mongodb-uri: <base64-encoded-uri>
jwt-secret: <base64-encoded-secret>
smtp-password: <base64-encoded-password>
Environment-based Configuration
import os
from cryptography.fernet import Fernet
class Config:
"""Centralized configuration management"""
# Database configuration
MYSQL_HOST = os.getenv('MYSQL_HOST', 'localhost')
MYSQL_USER = os.getenv('MYSQL_USER')
MYSQL_PASSWORD = decrypt_if_encrypted(os.getenv('MYSQL_PASSWORD'))
# JWT configuration
JWT_SECRET = os.getenv('JWT_SECRET')
JWT_EXPIRY = int(os.getenv('JWT_EXPIRY', '86400')) # 24 hours
# File upload limits
MAX_FILE_SIZE = int(os.getenv('MAX_FILE_SIZE', '104857600')) # 100MB
@staticmethod
def decrypt_if_encrypted(value):
"""Decrypt value if it's encrypted"""
if value and value.startswith('encrypted:'):
cipher = Fernet(os.getenv('ENCRYPTION_KEY'))
return cipher.decrypt(value[10:]).decode()
return value
4. Network Security
Kubernetes Network Policies
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: gateway-network-policy
spec:
podSelector:
matchLabels:
app: gateway
policyTypes:
- Ingress
- Egress
ingress:
- from:
- podSelector:
matchLabels:
app: nginx-ingress
ports:
- protocol: TCP
port: 8000
egress:
- to:
- podSelector:
matchLabels:
app: auth
ports:
- protocol: TCP
port: 5000
- to:
- podSelector:
matchLabels:
app: rabbitmq
ports:
- protocol: TCP
port: 5672
Setting Up Your Development Environment
Prerequisites Installation
1. Install Required Tools
macOS (using Homebrew)
# Install Homebrew
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
# Install required tools
brew install docker
brew install kubectl
brew install minikube
brew install python@3.11
brew install git
Ubuntu/Debian
# Update package list
sudo apt update
# Install Docker
sudo apt install docker.io
sudo systemctl start docker
sudo systemctl enable docker
sudo usermod -aG docker $USER
# Install kubectl
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
sudo install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl
# Install Minikube
curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
sudo install minikube-linux-amd64 /usr/local/bin/minikube
# Install Python 3.11
sudo apt install python3.11 python3.11-venv python3.11-pip
2. Verify Installation
# Verify Docker
docker --version
docker run hello-world
# Verify Kubernetes
kubectl version --client
# Verify Minikube
minikube version
# Verify Python
python3.11 --version
pip3 --version
Local Development Setup
1. Clone and Setup Project
# Clone the repository
git clone <your-repo-url>
cd python-video-to-audio-microservices
# Create virtual environment for each service
cd auth_service
python3.11 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
cd ..
cd gateway
python3.11 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
cd ..
# Repeat for other services
2. Setup External Databases
Start Database Services
# Start databases using Docker Compose
docker-compose up -d
# Verify databases are running
docker ps
# Test MySQL connection
mysql -h localhost -u piush -p auth_db
# Test MongoDB connection
mongosh "mongodb://piush:password@localhost:27017/gateway_db?authSource=admin"
3. Environment Configuration
Create .env files for each service
.env for auth_service
MYSQL_HOST=localhost
MYSQL_USER=piush
MYSQL_PASSWORD=password
MYSQL_DB=auth_db
JWT_SECRET=your_development_secret_key_here
.env for gateway
MONGO_URI=mongodb://piush:password@localhost:27017/gateway_db?authSource=admin
MONGO_MP3_URI=mongodb://piush:password@localhost:27017/mp3?authSource=admin
AUTH_SVC_ADDR=localhost:5000
RABBITMQ_HOST=localhost
4. Development Workflow
Terminal Setup for Development
# Terminal 1: Start RabbitMQ
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3.9-management
# Terminal 2: Start Auth Service
cd auth_service
source .venv/bin/activate
python server.py
# Terminal 3: Start Gateway Service
cd gateway
source .venv/bin/activate
python server.py
# Terminal 4: Start Converter Service
cd converter
source .venv/bin/activate
python consumer.py
# Terminal 5: Start Notification Service
cd notification
source .venv/bin/activate
python consumer.py
Development Tools and Best Practices
1. Code Quality Tools
Setup Pre-commit Hooks
# Install pre-commit
pip install pre-commit
# Create .pre-commit-config.yaml
cat > .pre-commit-config.yaml << EOF
repos:
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
language_version: python3.11
- repo: https://github.com/pycqa/flake8
rev: 4.0.1
hooks:
- id: flake8
- repo: https://github.com/pycqa/isort
rev: 5.10.1
hooks:
- id: isort
EOF
# Install hooks
pre-commit install
2. Testing Setup
Unit Testing Framework
# tests/test_auth_service.py
import unittest
import json
from auth_service.server import app
class TestAuthService(unittest.TestCase):
def setUp(self):
self.app = app.test_client()
app.config['TESTING'] = True
def test_health_endpoint(self):
"""Test health check endpoint"""
response = self.app.get('/health')
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertEqual(data['status'], 'OK')
def test_login_without_auth(self):
"""Test login without authorization headers"""
response = self.app.post('/login')
self.assertEqual(response.status_code, 401)
def test_login_with_valid_credentials(self):
"""Test login with valid credentials"""
# Setup test user in database
# ... test implementation
pass
if __name__ == '__main__':
unittest.main()
Integration Testing
# tests/test_integration.py
import requests
import time
import unittest
class TestIntegration(unittest.TestCase):
def setUp(self):
self.base_url = "http://localhost:8000"
self.auth_url = "http://localhost:5000"
def test_complete_workflow(self):
"""Test complete video conversion workflow"""
# 1. Login and get token
auth_response = requests.post(
f"{self.auth_url}/login",
auth=('admin@example.com', 'password')
)
self.assertEqual(auth_response.status_code, 200)
token = auth_response.text
# 2. Upload video file
with open('test_video.mp4', 'rb') as f:
upload_response = requests.post(
f"{self.base_url}/upload",
files={'file': f},
headers={'Authorization': f'Bearer {token}'}
)
self.assertEqual(upload_response.status_code, 200)
# 3. Wait for processing (in real test, use polling)
time.sleep(30)
# 4. Download converted file
# ... implementation
3. Monitoring and Debugging
Logging Configuration
# utils/logging_config.py
import logging
import sys
from pythonjsonlogger import jsonlogger
def setup_logging(service_name, log_level="INFO"):
"""Setup structured logging for microservices"""
# Create logger
logger = logging.getLogger(service_name)
logger.setLevel(getattr(logging, log_level))
# Create handler
handler = logging.StreamHandler(sys.stdout)
# Create formatter
formatter = jsonlogger.JsonFormatter(
'%(asctime)s %(name)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
handler.setFormatter(formatter)
# Add handler to logger
logger.addHandler(handler)
return logger
# Usage in services
logger = setup_logging("gateway_service")
logger.info("Service started", extra={"port": 8000})
Deployment Guide
Production Deployment Strategy
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā Deployment Pipeline ā
ā ā
ā āāāāāāāāā āāāāāāāāā āāāāāāāāā āāāāāāāāā ā
ā ā Dev āāāāā¶ā Test āāāāā¶āStagingāāāāā¶ā Prod ā ā
ā ā ā ā ā ā ā ā ā ā
ā āāāāāāāāā āāāāāāāāā āāāāāāāāā āāāāāāāāā ā
ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā CI/CD Pipeline ā ā
ā ā ā ā
ā ā Code ā Build ā Test ā Security Scan ā Deploy ā ā
ā ā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
Step-by-Step Deployment Process
1. Prepare Kubernetes Cluster
Start Minikube with Adequate Resources
# Delete existing cluster if any
minikube delete
# Start with sufficient resources
minikube start \
--cpus=4 \
--memory=8192 \
--disk-size=20g \
--driver=docker
# Enable required addons
minikube addons enable ingress
minikube addons enable metrics-server
minikube addons enable dashboard
# Verify cluster status
kubectl cluster-info
kubectl get nodes
2. Setup External Dependencies
Database Setup with Docker Compose
# Start external databases
docker-compose up -d
# Verify database connectivity
docker exec -it mysql_db mysql -u piush -p auth_db
docker exec -it mongodb mongosh "mongodb://piush:password@localhost:27017/gateway_db?authSource=admin"
# Initialize MySQL schema
mysql -h localhost -u piush -p auth_db < auth_service/init.sql
3. Build and Push Docker Images
Build Images Locally
# Build auth service
cd auth_service
docker build -t devpiush/python-microservice-auth:latest .
# Build gateway service
cd ../gateway
docker build -t devpiush/python-microservice-gateway:latest .
# Build converter service
cd ../converter
docker build -t devpiush/python-microservice-converter:latest .
# Build notification service
cd ../notification
docker build -t devpiush/python-microservice-notification:latest .
Push to Registry (Optional)
# Login to Docker Hub
docker login
# Push images
docker push devpiush/python-microservice-auth:latest
docker push devpiush/python-microservice-gateway:latest
docker push devpiush/python-microservice-converter:latest
docker push devpiush/python-microservice-notification:latest
Load Images into Minikube
# Load images directly into Minikube
minikube image load devpiush/python-microservice-auth:latest
minikube image load devpiush/python-microservice-gateway:latest
minikube image load devpiush/python-microservice-converter:latest
minikube image load devpiush/python-microservice-notification:latest
# Verify images are loaded
minikube image ls | grep devpiush
4. Deploy Services in Order
Step 4.1: Deploy RabbitMQ
# Deploy RabbitMQ StatefulSet
kubectl apply -f gateway/rabbitmq/manifests/
# Wait for RabbitMQ to be ready
kubectl wait --for=condition=ready pod -l app=rabbitmq --timeout=300s
# Verify RabbitMQ is running
kubectl get pods -l app=rabbitmq
kubectl logs -l app=rabbitmq
Step 4.2: Deploy Auth Service
# Deploy auth service
kubectl apply -f auth_service/manifests/
# Wait for deployment
kubectl wait --for=condition=available deployment/auth --timeout=300s
# Verify deployment
kubectl get pods -l app=auth
kubectl logs -l app=auth
Step 4.3: Deploy Gateway Service
# Deploy gateway service
kubectl apply -f gateway/manifests/
# Wait for deployment
kubectl wait --for=condition=available deployment/gateway --timeout=300s
# Verify deployment
kubectl get pods -l app=gateway
kubectl logs -l app=gateway
Step 4.4: Deploy Converter Service
# Deploy converter service
kubectl apply -f converter/manifests/
# Wait for deployment
kubectl wait --for=condition=available deployment/converter --timeout=300s
# Verify deployment
kubectl get pods -l app=converter
Step 4.5: Deploy Notification Service
# Deploy notification service
kubectl apply -f notification/manifests/
# Wait for deployment
kubectl wait --for=condition=available deployment/notification --timeout=300s
# Verify deployment
kubectl get pods -l app=notification
5. Verify Complete Deployment
Check All Services
# Get all pods
kubectl get pods
# Expected output:
# NAME READY STATUS RESTARTS AGE
# auth-xxx-xxx 1/1 Running 0 5m
# gateway-xxx-xxx 1/1 Running 0 4m
# gateway-xxx-yyy 1/1 Running 0 4m
# converter-xxx-xxx 1/1 Running 0 3m
# notification-xxx-xxx 1/1 Running 0 2m
# rabbitmq-0 1/1 Running 0 6m
# Check services
kubectl get services
# Check deployments
kubectl get deployments
Health Checks
# Port forward for testing
kubectl port-forward service/gateway 8000:8000 &
kubectl port-forward service/auth 5000:5000 &
# Test auth service health
curl http://localhost:5000/health
# Test gateway service health
curl http://localhost:8000/health
# Test RabbitMQ management UI
kubectl port-forward service/rabbitmq 15672:15672 &
# Visit http://localhost:15672 (guest/guest)
Advanced Deployment Configurations
1. Production ConfigMaps and Secrets
Production ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
name: gateway-config-prod
namespace: production
data:
RABBITMQ_HOST: rabbitmq.production.svc.cluster.local
AUTH_SVC_ADDR: auth.production.svc.cluster.local:5000
LOG_LEVEL: INFO
MAX_FILE_SIZE: "104857600" # 100MB
WORKER_PROCESSES: "4"
Production Secrets (encrypted)
apiVersion: v1
kind: Secret
metadata:
name: gateway-secrets-prod
namespace: production
type: Opaque
data:
MONGO_URI: <encrypted-base64-uri>
MONGO_MP3_URI: <encrypted-base64-uri>
JWT_SECRET: <encrypted-base64-secret>
2. Resource Management
Resource Requests and Limits
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
Quality of Service Classes
Guaranteed: requests = limits (critical services)
Burstable: requests < limits (most services)
BestEffort: no requests or limits (non-critical)
3. Rolling Updates and Rollbacks
Rolling Update Strategy
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 25%
maxSurge: 25%
Rollback Commands
# Check rollout history
kubectl rollout history deployment/gateway
# Rollback to previous version
kubectl rollout undo deployment/gateway
# Rollback to specific revision
kubectl rollout undo deployment/gateway --to-revision=2
# Monitor rollout status
kubectl rollout status deployment/gateway
CI/CD Pipeline Integration
GitHub Actions Example
.github/workflows/deploy.yml
name: Deploy to Kubernetes
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.11
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run tests
run: pytest tests/ --cov=./ --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v2
build-and-deploy:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
- name: Login to DockerHub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Build and push images
run: |
docker build -t ${{ secrets.DOCKER_USERNAME }}/auth:${{ github.sha }} auth_service/
docker push ${{ secrets.DOCKER_USERNAME }}/auth:${{ github.sha }}
docker build -t ${{ secrets.DOCKER_USERNAME }}/gateway:${{ github.sha }} gateway/
docker push ${{ secrets.DOCKER_USERNAME }}/gateway:${{ github.sha }}
- name: Deploy to Kubernetes
uses: azure/k8s-deploy@v1
with:
manifests: |
auth_service/manifests/
gateway/manifests/
images: |
${{ secrets.DOCKER_USERNAME }}/auth:${{ github.sha }}
${{ secrets.DOCKER_USERNAME }}/gateway:${{ github.sha }}
Testing and Monitoring
Testing Strategy
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā Testing Pyramid ā
ā ā
ā āāāāāāāāāāā ā
ā ā E2E ā ā
ā ā Tests ā ā
ā āāāāāāāāāāā ā
ā ā
ā āāāāāāāāāāāāāāāāāāāāāāā ā
ā ā Integration Tests ā ā
ā ā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāā ā
ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā Unit Tests ā ā
ā ā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
1. Unit Testing
Testing Framework Setup
# conftest.py - Shared test configuration
import pytest
import tempfile
import os
from unittest.mock import MagicMock
@pytest.fixture
def temp_file():
"""Create temporary file for testing"""
fd, path = tempfile.mkstemp(suffix='.mp4')
try:
yield path
finally:
os.close(fd)
os.unlink(path)
@pytest.fixture
def mock_mongodb():
"""Mock MongoDB connection"""
mock_client = MagicMock()
mock_db = MagicMock()
mock_fs = MagicMock()
mock_client.gateway_db = mock_db
mock_db.fs = mock_fs
return mock_client, mock_db, mock_fs
@pytest.fixture
def mock_rabbitmq():
"""Mock RabbitMQ connection"""
mock_connection = MagicMock()
mock_channel = MagicMock()
mock_connection.channel.return_value = mock_channel
return mock_connection, mock_channel
Service-Specific Unit Tests
Auth Service Tests
# tests/test_auth_service.py
import pytest
import jwt
from datetime import datetime, timedelta
from auth_service.server import create_token, validate_jwt
class TestAuthService:
def test_create_token(self):
"""Test JWT token creation"""
secret = "test_secret"
username = "test@example.com"
is_admin = True
token = create_token(username, secret, is_admin)
# Verify token structure
assert isinstance(token, str)
assert len(token.split('.')) == 3 # header.payload.signature
# Decode and verify payload
decoded = jwt.decode(token, secret, algorithms=['HS256'])
assert decoded['user_email'] == username
assert decoded['is_admin'] == is_admin
assert 'exp' in decoded
assert 'iat' in decoded
def test_validate_jwt_valid_token(self):
"""Test validation of valid JWT token"""
secret = "test_secret"
token = create_token("test@example.com", secret, True)
result = validate_jwt(token, secret)
assert result is not None
assert result['user_email'] == "test@example.com"
assert result['is_admin'] is True
def test_validate_jwt_expired_token(self):
"""Test validation of expired JWT token"""
secret = "test_secret"
# Create expired token
expired_payload = {
'user_email': "test@example.com",
'is_admin': True,
'exp': datetime.utcnow() - timedelta(hours=1), # Expired 1 hour ago
'iat': datetime.utcnow() - timedelta(hours=2)
}
expired_token = jwt.encode(expired_payload, secret, algorithm='HS256')
result = validate_jwt(expired_token, secret)
assert result is None
def test_validate_jwt_invalid_signature(self):
"""Test validation of token with invalid signature"""
secret = "test_secret"
wrong_secret = "wrong_secret"
token = create_token("test@example.com", secret, True)
result = validate_jwt(token, wrong_secret)
assert result is None
Gateway Service Tests
# tests/test_gateway_service.py
import pytest
import json
from unittest.mock import patch, MagicMock
from gateway.server import app
class TestGatewayService:
@pytest.fixture
def client(self):
app.config['TESTING'] = True
return app.test_client()
def test_health_endpoint(self, client):
"""Test health check endpoint"""
response = client.get('/health')
assert response.status_code == 200
data = json.loads(response.data)
assert 'status' in data
assert 'mongodb' in data
assert 'rabbitmq' in data
@patch('gateway.server.validate.token')
def test_upload_without_token(self, mock_validate, client):
"""Test upload without authentication token"""
mock_validate.return_value = (None, ("Unauthorized", 401))
response = client.post('/upload')
assert response.status_code == 401
assert b"Unauthorized" in response.data
@patch('gateway.server.validate.token')
@patch('gateway.server.util.upload')
def test_upload_success(self, mock_upload, mock_validate, client):
"""Test successful file upload"""
# Mock successful authentication
mock_validate.return_value = ('{"user_email": "admin@test.com", "is_admin": true}', None)
mock_upload.return_value = None # No error
# Create test file
data = {
'file': (BytesIO(b'fake video content'), 'test.mp4')
}
response = client.post('/upload', data=data, content_type='multipart/form-data')
assert response.status_code == 200
assert b"File uploaded successfully" in response.data
2. Integration Testing
Database Integration Tests
# tests/test_integration_database.py
import pytest
from pymongo import MongoClient
import mysql.connector
from gridfs import GridFS
class TestDatabaseIntegration:
@pytest.fixture(scope="class")
def mongodb_client(self):
"""Setup MongoDB test database"""
client = MongoClient("mongodb://piush:password@localhost:27017/")
test_db = client.test_gateway_db
yield client, test_db
# Cleanup
client.drop_database("test_gateway_db")
client.close()
@pytest.fixture(scope="class")
def mysql_connection(self):
"""Setup MySQL test database"""
connection = mysql.connector.connect(
host='localhost',
user='piush',
password='password',
database='test_auth_db'
)
# Setup test schema
cursor = connection.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
password VARCHAR(255) NOT NULL
)
""")
connection.commit()
yield connection
# Cleanup
cursor.execute("DROP TABLE users")
connection.commit()
connection.close()
def test_mongodb_file_storage(self, mongodb_client):
"""Test file storage in MongoDB GridFS"""
client, db = mongodb_client
fs = GridFS(db)
# Store test file
test_content = b"Test video content"
file_id = fs.put(test_content, filename="test.mp4")
# Retrieve file
retrieved_file = fs.get(file_id)
retrieved_content = retrieved_file.read()
assert retrieved_content == test_content
assert retrieved_file.filename == "test.mp4"
def test_mysql_user_operations(self, mysql_connection):
"""Test user operations in MySQL"""
cursor = mysql_connection.cursor()
# Insert test user
cursor.execute(
"INSERT INTO users (email, password) VALUES (%s, %s)",
("test@example.com", "hashed_password")
)
mysql_connection.commit()
# Retrieve user
cursor.execute("SELECT email, password FROM users WHERE email = %s", ("test@example.com",))
result = cursor.fetchone()
assert result is not None
assert result[0] == "test@example.com"
assert result[1] == "hashed_password"
Message Queue Integration Tests
# tests/test_integration_rabbitmq.py
import pytest
import pika
import json
import time
from threading import Thread
class TestRabbitMQIntegration:
@pytest.fixture(scope="class")
def rabbitmq_connection(self):
"""Setup RabbitMQ test connection"""
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
# Declare test queues
channel.queue_declare(queue='test_video', durable=True)
channel.queue_declare(queue='test_mp3', durable=True)
yield connection, channel
# Cleanup
channel.queue_delete(queue='test_video')
channel.queue_delete(queue='test_mp3')
connection.close()
def test_message_publishing_and_consuming(self, rabbitmq_connection):
"""Test message publishing and consuming"""
connection, channel = rabbitmq_connection
test_message = {
"video_fid": "test_video_id",
"mp3_fid": "test_mp3_id",
"username": "test@example.com"
}
received_messages = []
def callback(ch, method, properties, body):
received_messages.append(json.loads(body))
ch.basic_ack(delivery_tag=method.delivery_tag)
# Setup consumer
channel.basic_consume(
queue='test_video',
on_message_callback=callback
)
# Publish message
channel.basic_publish(
exchange='',
routing_key='test_video',
body=json.dumps(test_message),
properties=pika.BasicProperties(delivery_mode=2)
)
# Start consuming in a separate thread
def start_consuming():
channel.start_consuming()
consumer_thread = Thread(target=start_consuming)
consumer_thread.daemon = True
consumer_thread.start()
# Wait for message to be processed
time.sleep(2)
channel.stop_consuming()
assert len(received_messages) == 1
assert received_messages[0] == test_message
3. End-to-End Testing
Complete Workflow Testing
# tests/test_e2e.py
import pytest
import requests
import time
import os
from base64 import b64encode
class TestEndToEndWorkflow:
@pytest.fixture(scope="class")
def test_setup(self):
"""Setup for E2E tests"""
self.gateway_url = "http://localhost:8000"
self.auth_url = "http://localhost:5000"
self.test_user = "admin@example.com"
self.test_password = "password"
def test_complete_video_conversion_workflow(self, test_setup):
"""Test complete video conversion workflow"""
# Step 1: Health checks
auth_health = requests.get(f"{self.auth_url}/health")
assert auth_health.status_code == 200
gateway_health = requests.get(f"{self.gateway_url}/health")
assert gateway_health.status_code == 200
# Step 2: User authentication
auth_header = b64encode(f"{self.test_user}:{self.test_password}".encode()).decode()
login_response = requests.post(
f"{self.auth_url}/login",
headers={'Authorization': f'Basic {auth_header}'}
)
assert login_response.status_code == 200
token = login_response.text
# Step 3: File upload
test_video_path = "tests/fixtures/sample_video.mp4"
with open(test_video_path, 'rb') as video_file:
upload_response = requests.post(
f"{self.gateway_url}/upload",
files={'file': ('test_video.mp4', video_file, 'video/mp4')},
headers={'Authorization': f'Bearer {token}'}
)
assert upload_response.status_code == 200
# Step 4: Wait for processing (with polling)
max_wait_time = 120 # 2 minutes
start_time = time.time()
while time.time() - start_time < max_wait_time:
# Check if conversion is complete
# In a real implementation, you might check a status endpoint
time.sleep(10)
# Step 5: Verify email notification (mock check)
# In production, you'd verify email was sent
print("ā
End-to-end workflow completed successfully")
Monitoring and Observability
1. Application Metrics
Prometheus Metrics Integration
# utils/metrics.py
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
import functools
# Define metrics
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
REQUEST_LATENCY = Histogram('http_request_duration_seconds', 'HTTP request latency')
ACTIVE_CONNECTIONS = Gauge('active_connections', 'Active database connections')
QUEUE_SIZE = Gauge('rabbitmq_queue_size', 'RabbitMQ queue size', ['queue_name'])
def track_requests(f):
"""Decorator to track HTTP requests"""
@functools.wraps(f)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = f(*args, **kwargs)
status = getattr(result, 'status_code', 200)
REQUEST_COUNT.labels(method=request.method, endpoint=request.endpoint, status=status).inc()
return result
except Exception as e:
REQUEST_COUNT.labels(method=request.method, endpoint=request.endpoint, status=500).inc()
raise
finally:
REQUEST_LATENCY.observe(time.time() - start_time)
return wrapper
# Usage in Flask app
@app.route('/upload', methods=['POST'])
@track_requests
def upload():
# ... existing upload logic
pass
Custom Health Checks
# utils/health_checks.py
import requests
import pymongo
import mysql.connector
from datetime import datetime
class HealthChecker:
def __init__(self):
self.checks = {
'database': self.check_database,
'rabbitmq': self.check_rabbitmq,
'external_services': self.check_external_services,
'disk_space': self.check_disk_space,
'memory_usage': self.check_memory_usage
}
def check_database(self):
"""Check database connectivity"""
try:
# Check MongoDB
mongo_client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
mongo_client.admin.command('ping')
# Check MySQL
mysql_conn = mysql.connector.connect(
host=MYSQL_HOST,
user=MYSQL_USER,
password=MYSQL_PASSWORD,
database=MYSQL_DB
)
mysql_conn.close()
return {'status': 'healthy', 'message': 'All databases accessible'}
except Exception as e:
return {'status': 'unhealthy', 'message': f'Database error: {str(e)}'}
def check_rabbitmq(self):
"""Check RabbitMQ connectivity"""
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=RABBITMQ_HOST)
)
connection.close()
return {'status': 'healthy', 'message': 'RabbitMQ accessible'}
except Exception as e:
return {'status': 'unhealthy', 'message': f'RabbitMQ error: {str(e)}'}
def run_all_checks(self):
"""Run all health checks"""
results = {}
overall_status = 'healthy'
for check_name, check_func in self.checks.items():
try:
result = check_func()
results[check_name] = result
if result['status'] != 'healthy':
overall_status = 'unhealthy'
except Exception as e:
results[check_name] = {
'status': 'unhealthy',
'message': f'Check failed: {str(e)}'
}
overall_status = 'unhealthy'
return {
'timestamp': datetime.utcnow().isoformat(),
'overall_status': overall_status,
'checks': results
}
# Enhanced health endpoint
@app.route('/health/detailed', methods=['GET'])
def detailed_health():
"""Detailed health check endpoint"""
health_checker = HealthChecker()
results = health_checker.run_all_checks()
status_code = 200 if results['overall_status'] == 'healthy' else 503
return jsonify(results), status_code
2. Logging and Tracing
Structured Logging with Correlation IDs
# utils/logging_setup.py
import logging
import uuid
from flask import request, g
import json
class CorrelationIdFilter(logging.Filter):
"""Add correlation ID to log records"""
def filter(self, record):
correlation_id = getattr(g, 'correlation_id', str(uuid.uuid4()))
record.correlation_id = correlation_id
return True
def setup_logging():
"""Setup structured logging with correlation IDs"""
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(correlation_id)s - %(message)s'
)
# Create handler
handler = logging.StreamHandler()
handler.setFormatter(formatter)
# Add correlation ID filter
correlation_filter = CorrelationIdFilter()
handler.addFilter(correlation_filter)
# Setup root logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(handler)
@app.before_request
def before_request():
"""Set correlation ID for request tracking"""
g.correlation_id = request.headers.get('X-Correlation-ID', str(uuid.uuid4()))
@app.after_request
def after_request(response):
"""Add correlation ID to response headers"""
response.headers['X-Correlation-ID'] = g.correlation_id
return response
3. Performance Monitoring
Database Query Performance
# utils/db_monitoring.py
import time
import logging
from functools import wraps
logger = logging.getLogger(__name__)
def monitor_db_query(operation_name):
"""Decorator to monitor database query performance"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
execution_time = time.time() - start_time
logger.info(
f"DB Query: {operation_name}",
extra={
'execution_time': execution_time,
'status': 'success',
'operation': operation_name
}
)
# Alert on slow queries
if execution_time > 1.0: # 1 second threshold
logger.warning(
f"Slow query detected: {operation_name}",
extra={'execution_time': execution_time}
)
return result
except Exception as e:
execution_time = time.time() - start_time
logger.error(
f"DB Query Error: {operation_name}",
extra={
'execution_time': execution_time,
'status': 'error',
'error': str(e),
'operation': operation_name
}
)
raise
return wrapper
return decorator
# Usage
@monitor_db_query("user_login")
def authenticate_user(email, password):
cursor = mysql.connection.cursor()
cursor.execute("SELECT * FROM users WHERE email=%s", (email,))
return cursor.fetchone()
Troubleshooting Common Issues
Diagnostic Framework
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā Troubleshooting Workflow ā
ā ā
ā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā ā
ā ā Problem āāāāā¶āDiagnose āāāāā¶ā Isolate āāāāā¶ā Fix ā ā
ā āReported ā ā Issue ā āComponentā ā Issue ā ā
ā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā ā
ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā Diagnostic Tools ā ā
ā ā ā ā
ā ā ⢠kubectl logs ā ā
ā ā ⢠kubectl describe ā ā
ā ā ⢠kubectl get events ā ā
ā ā ⢠Service health endpoints ā ā
ā ā ⢠Database connectivity tests ā ā
ā ā ⢠Message queue inspection ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
1. Pod Issues
CrashLoopBackOff
Diagnosis Commands
# Check pod status
kubectl get pods -l app=gateway
# Examine pod details
kubectl describe pod <pod-name>
# Check logs
kubectl logs <pod-name> --previous
# Get events
kubectl get events --sort-by=.metadata.creationTimestamp
Common Causes and Solutions
1. Database Connection Issues
# Test database connectivity
kubectl run debug-pod --image=alpine --rm -it -- sh
# Inside the debug pod:
apk add --no-cache mysql-client
mysql -h host.minikube.internal -u piush -p
# Test MongoDB
apk add --no-cache mongodb-tools
mongosh "mongodb://piush:password@host.minikube.internal:27017/gateway_db"
2. Missing Environment Variables
# Check ConfigMap
kubectl get configmap gateway-config -o yaml
# Check Secret
kubectl get secret gateway-secrets -o yaml
# Verify pod environment
kubectl exec <pod-name> -- env | grep -E "(MONGO|MYSQL|RABBITMQ)"
3. Resource Constraints
# Check resource usage
kubectl top pods
kubectl top nodes
# Check resource limits
kubectl describe pod <pod-name> | grep -A 10 "Limits:"
Image Pull Issues
Diagnosis
# Check image pull status
kubectl describe pod <pod-name> | grep -A 5 "Events:"
# Verify image exists
minikube image ls | grep devpiush
# Pull image manually
docker pull devpiush/python-microservice-gateway:latest
minikube image load devpiush/python-microservice-gateway:latest
2. Service Communication Issues
Service Discovery Problems
Debug Service Connectivity
# Create debug pod
kubectl run debug --image=nicolaka/netshoot --rm -it
# Inside debug pod:
# Test DNS resolution
nslookup auth.default.svc.cluster.local
nslookup rabbitmq.default.svc.cluster.local
# Test service connectivity
curl http://auth:5000/health
curl http://gateway:8000/health
# Check port connectivity
telnet auth 5000
telnet rabbitmq 5672
Service Configuration Check
# Check service endpoints
kubectl get endpoints
# Verify service selectors
kubectl get service auth -o yaml | grep -A 5 selector
# Check if pods match selector
kubectl get pods -l app=auth --show-labels
RabbitMQ Connection Issues
RabbitMQ Diagnostics
# Check RabbitMQ pod logs
kubectl logs -l app=rabbitmq
# Access RabbitMQ management UI
kubectl port-forward service/rabbitmq 15672:15672
# Check queue status via management API
curl -u guest:guest http://localhost:15672/api/queues
# Test connection from consumer pods
kubectl exec -it <converter-pod> -- python -c "
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
print('Connected successfully')
connection.close()
"
3. Database Issues
MongoDB Connection Problems
MongoDB Diagnostics
# Test MongoDB connectivity
kubectl run mongo-debug --image=mongo:7.0 --rm -it -- \
mongosh "mongodb://piush:password@host.minikube.internal:27017/gateway_db?authSource=admin"
# Check GridFS collections
db.fs.files.find().limit(5)
db.fs.chunks.find().limit(5)
# Check database sizes
db.stats()
Common MongoDB Issues
Authentication failure: Check username/password in secrets
Network connectivity: Verify host.minikube.internal resolves
GridFS corruption: Run database repair commands
Disk space: Check available storage
MySQL Connection Problems
MySQL Diagnostics
# Test MySQL connectivity
kubectl run mysql-debug --image=mysql:8.0 --rm -it -- \
mysql -h host.minikube.internal -u piush -p auth_db
# Check user permissions
SHOW GRANTS FOR 'piush'@'%';
# Verify table structure
DESCRIBE users;
# Check connection limits
SHOW VARIABLES LIKE 'max_connections';
SHOW STATUS LIKE 'Threads_connected';
4. Performance Issues
High Latency Diagnosis
Application Performance Profiling
# utils/profiler.py
import cProfile
import pstats
from functools import wraps
from flask import request
def profile_endpoint(func):
"""Profile endpoint performance"""
@wraps(func)
def wrapper(*args, **kwargs):
if request.args.get('profile') == 'true':
profiler = cProfile.Profile()
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20) # Top 20 functions
return result
else:
return func(*args, **kwargs)
return wrapper
# Usage
@app.route('/upload')
@profile_endpoint
def upload():
# ... existing code
pass
Database Query Optimization
-- Check slow queries in MySQL
SELECT * FROM information_schema.processlist WHERE time > 10;
-- Enable slow query log
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 1;
-- Analyze query performance
EXPLAIN SELECT * FROM users WHERE email = 'user@example.com';
-- Add indexes for better performance
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_created_at ON users(created_at);
Memory Issues
Memory Usage Monitoring
# Check pod memory usage
kubectl top pods
# Check detailed resource usage
kubectl describe pod <pod-name> | grep -A 10 "Requests:\|Limits:"
# Monitor memory inside pod
kubectl exec <pod-name> -- cat /proc/meminfo
kubectl exec <pod-name> -- ps aux --sort=-%mem | head -10
Memory Leak Detection
# utils/memory_monitor.py
import psutil
import threading
import time
import logging
logger = logging.getLogger(__name__)
class MemoryMonitor:
def __init__(self, threshold_mb=500, check_interval=60):
self.threshold_mb = threshold_mb
self.check_interval = check_interval
self.monitoring = False
def start_monitoring(self):
"""Start memory monitoring in background thread"""
self.monitoring = True
monitor_thread = threading.Thread(target=self._monitor_memory)
monitor_thread.daemon = True
monitor_thread.start()
def _monitor_memory(self):
"""Monitor memory usage"""
while self.monitoring:
process = psutil.Process()
memory_info = process.memory_info()
memory_mb = memory_info.rss / 1024 / 1024
logger.info(f"Memory usage: {memory_mb:.2f} MB")
if memory_mb > self.threshold_mb:
logger.warning(
f"High memory usage detected: {memory_mb:.2f} MB "
f"(threshold: {self.threshold_mb} MB)"
)
time.sleep(self.check_interval)
# Initialize in application
memory_monitor = MemoryMonitor()
memory_monitor.start_monitoring()
5. File Processing Issues
Video Conversion Failures
FFmpeg Debugging
# converter/debug_converter.py
import subprocess
import logging
logger = logging.getLogger(__name__)
def debug_video_conversion(input_path, output_path):
"""Debug video conversion with detailed logging"""
# Get video information
ffprobe_cmd = [
'ffprobe', '-v', 'quiet', '-print_format', 'json',
'-show_format', '-show_streams', input_path
]
try:
result = subprocess.run(ffprobe_cmd, capture_output=True, text=True)
logger.info(f"Video info: {result.stdout}")
except Exception as e:
logger.error(f"FFprobe failed: {e}")
# Convert with verbose logging
ffmpeg_cmd = [
'ffmpeg', '-i', input_path, '-vn', '-acodec', 'libmp3lame',
'-ab', '192k', '-ar', '44100', '-f', 'mp3', output_path, '-y'
]
try:
result = subprocess.run(
ffmpeg_cmd, capture_output=True, text=True, timeout=300
)
if result.returncode == 0:
logger.info("Video conversion successful")
return True
else:
logger.error(f"FFmpeg error: {result.stderr}")
return False
except subprocess.TimeoutExpired:
logger.error("Video conversion timed out")
return False
except Exception as e:
logger.error(f"Video conversion failed: {e}")
return False
6. Automated Recovery
Self-Healing Mechanisms
# utils/recovery.py
import time
import logging
from threading import Thread
logger = logging.getLogger(__name__)
class AutoRecovery:
def __init__(self):
self.recovery_strategies = {
'database_connection': self.recover_database_connection,
'rabbitmq_connection': self.recover_rabbitmq_connection,
'memory_cleanup': self.cleanup_memory
}
def recover_database_connection(self):
"""Attempt to recover database connection"""
try:
# Recreate database connections
logger.info("Attempting database recovery...")
# Close existing connections
if hasattr(self, 'mysql_connection'):
self.mysql_connection.close()
# Recreate connections with retry logic
self.mysql_connection = self.create_mysql_connection_with_retry()
logger.info("Database recovery successful")
return True
except Exception as e:
logger.error(f"Database recovery failed: {e}")
return False
def recover_rabbitmq_connection(self):
"""Attempt to recover RabbitMQ connection"""
try:
logger.info("Attempting RabbitMQ recovery...")
# Close existing connection
if hasattr(self, 'rabbitmq_connection'):
self.rabbitmq_connection.close()
# Recreate connection
self.rabbitmq_connection = self.create_rabbitmq_connection_with_retry()
logger.info("RabbitMQ recovery successful")
return True
except Exception as e:
logger.error(f"RabbitMQ recovery failed: {e}")
return False
def cleanup_memory(self):
"""Cleanup memory to prevent OOM issues"""
try:
import gc
gc.collect()
logger.info("Memory cleanup completed")
return True
except Exception as e:
logger.error(f"Memory cleanup failed: {e}")
return False
Production Considerations
Security Hardening
1. Container Security
Secure Dockerfile Practices
# Use specific version tags
FROM python:3.11.5-slim
# Create non-root user
RUN groupadd -r appuser && useradd -r -g appuser appuser
# Install security updates
RUN apt-get update && apt-get install -y --no-install-recommends \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
# Copy and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY --chown=appuser:appuser . /app
WORKDIR /app
# Switch to non-root user
USER appuser
# Set security headers
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
EXPOSE 8000
CMD ["gunicorn", "--config", "gunicorn.conf.py", "server:app"]
Security Scanning
# Scan Docker images for vulnerabilities
trivy image devpiush/python-microservice-gateway:latest
# Scan Kubernetes manifests
kube-score score gateway/manifests/*.yaml
# Run security benchmarks
kubectl run --rm -it kube-bench --image=aquasec/kube-bench:latest --restart=Never
2. Network Security
Network Policies for Production
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: production-network-policy
namespace: production
spec:
podSelector: {}
policyTypes:
- Ingress
- Egress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: production
egress:
- to:
- namespaceSelector:
matchLabels:
name: production
- to: []
ports:
- protocol: TCP
port: 53 # DNS
- protocol: UDP
port: 53 # DNS
TLS/SSL Configuration
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: gateway-ingress
annotations:
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
cert-manager.io/cluster-issuer: "letsencrypt-prod"
spec:
tls:
- hosts:
- api.yourcompany.com
secretName: gateway-tls
rules:
- host: api.yourcompany.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: gateway
port:
number: 8000
Scalability and Performance
1. Horizontal Pod Autoscaling
Advanced HPA Configuration
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: gateway-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: gateway
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: pending_requests
target:
type: AverageValue
averageValue: "10"
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
- type: Pods
value: 4
periodSeconds: 15
selectPolicy: Max
2. Database Optimization
MongoDB Performance Tuning
// Create indexes for better query performance
db.fs.files.createIndex({ "metadata.user_email": 1 });
db.fs.files.createIndex({ uploadDate: 1 });
db.fs.files.createIndex({ filename: 1 });
// Enable sharding for large datasets
sh.enableSharding("gateway_db");
sh.shardCollection("gateway_db.fs.files", { _id: "hashed" });
// Configure read preferences
db.fs.files.find().readPref("secondaryPreferred");
MySQL Performance Optimization
-- Optimize MySQL configuration
SET GLOBAL innodb_buffer_pool_size = 1073741824; -- 1GB
SET GLOBAL query_cache_size = 268435456; -- 256MB
SET GLOBAL max_connections = 500;
-- Create optimal indexes
CREATE INDEX idx_users_email_active ON users(email, active);
CREATE INDEX idx_sessions_user_expires ON user_sessions(user_id, expires_at);
-- Analyze query performance
ANALYZE TABLE users;
OPTIMIZE TABLE users;
3. Caching Strategy
Redis Cache Implementation
# utils/cache.py
import redis
import json
import hashlib
from functools import wraps
redis_client = redis.Redis(host='redis', port=6379, decode_responses=True)
def cache_result(expiration=300):
"""Cache function results in Redis"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Create cache key
key_data = f"{func.__name__}:{str(args)}:{str(kwargs)}"
cache_key = hashlib.md5(key_data.encode()).hexdigest()
# Try to get from cache
cached_result = redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Execute function and cache result
result = func(*args, **kwargs)
redis_client.setex(cache_key, expiration, json.dumps(result))
return result
return wrapper
return decorator
# Usage
@cache_result(expiration=600) # Cache for 10 minutes
def get_user_profile(user_email):
# Expensive database query
cursor = mysql.connection.cursor()
cursor.execute("SELECT * FROM users WHERE email=%s", (user_email,))
return cursor.fetchone()
Backup and Disaster Recovery
1. Database Backup Strategy
Automated Backup System
#!/bin/bash
# backup.sh - Automated backup script
set -e
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_DIR="/backups"
# MongoDB backup
echo "Starting MongoDB backup..."
mongodump --host host.minikube.internal:27017 \
--username piush \
--password password \
--out "$BACKUP_DIR/mongodb_$TIMESTAMP"
# MySQL backup
echo "Starting MySQL backup..."
mysqldump --host host.minikube.internal \
--user piush \
--password=password \
--all-databases > "$BACKUP_DIR/mysql_$TIMESTAMP.sql"
# Compress backups
tar -czf "$BACKUP_DIR/backup_$TIMESTAMP.tar.gz" \
"$BACKUP_DIR/mongodb_$TIMESTAMP" \
"$BACKUP_DIR/mysql_$TIMESTAMP.sql"
# Upload to cloud storage (example with AWS S3)
aws s3 cp "$BACKUP_DIR/backup_$TIMESTAMP.tar.gz" \
s3://your-backup-bucket/backups/
# Cleanup old local backups (keep last 7 days)
find "$BACKUP_DIR" -name "backup_*.tar.gz" -mtime +7 -delete
echo "Backup completed successfully"
Kubernetes CronJob for Backups
apiVersion: batch/v1
kind: CronJob
metadata:
name: database-backup
spec:
schedule: "0 2 * * *" # Daily at 2 AM
jobTemplate:
spec:
template:
spec:
containers:
- name: backup
image: backup-tool:latest
command:
- /bin/bash
- /scripts/backup.sh
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-credentials
key: access-key-id
volumeMounts:
- name: backup-scripts
mountPath: /scripts
- name: backup-storage
mountPath: /backups
volumes:
- name: backup-scripts
configMap:
name: backup-scripts
- name: backup-storage
persistentVolumeClaim:
claimName: backup-pvc
restartPolicy: OnFailure
2. Disaster Recovery Plan
Recovery Procedures
#!/bin/bash
# disaster_recovery.sh - Disaster recovery script
set -e
BACKUP_FILE=$1
RECOVERY_DIR="/recovery"
if [ -z "$BACKUP_FILE" ]; then
echo "Usage: $0 <backup_file>"
exit 1
fi
echo "Starting disaster recovery from $BACKUP_FILE"
# Extract backup
tar -xzf "$BACKUP_FILE" -C "$RECOVERY_DIR"
# Restore MongoDB
echo "Restoring MongoDB..."
mongorestore --host host.minikube.internal:27017 \
--username piush \
--password password \
--drop \
"$RECOVERY_DIR/mongodb_*/gateway_db"
# Restore MySQL
echo "Restoring MySQL..."
mysql --host host.minikube.internal \
--user piush \
--password=password < "$RECOVERY_DIR/mysql_*.sql"
# Verify data integrity
echo "Verifying data integrity..."
python verify_data_integrity.py
echo "Disaster recovery completed successfully"
Monitoring and Alerting
1. Prometheus and Grafana Setup
Prometheus Configuration
# prometheus-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-config
data:
prometheus.yml: |
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'kubernetes-pods'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
- job_name: 'gateway-service'
static_configs:
- targets: ['gateway:8000']
metrics_path: /metrics
scrape_interval: 10s
- job_name: 'auth-service'
static_configs:
- targets: ['auth:5000']
metrics_path: /metrics
scrape_interval: 10s
Alert Rules
# alert-rules.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: alert-rules
data:
alert_rules.yml: |
groups:
- name: microservices-alerts
rules:
- alert: HighErrorRate
expr: sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m])) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "High error rate detected"
description: "Error rate is above 10% for 5 minutes"
- alert: PodCrashLooping
expr: rate(kube_pod_container_status_restarts_total[15m]) > 0
for: 5m
labels:
severity: critical
annotations:
summary: "Pod is crash looping"
description: "Pod {{ $labels.pod }} is restarting frequently"
- alert: DatabaseConnectionLoss
expr: up{job="mysql"} == 0 or up{job="mongodb"} == 0
for: 2m
labels:
severity: critical
annotations:
summary: "Database connection lost"
description: "Cannot connect to database"
- alert: QueueBacklog
expr: rabbitmq_queue_messages > 1000
for: 10m
labels:
severity: warning
annotations:
summary: "Message queue backlog"
description: "Queue {{ $labels.queue }} has {{ $value }} messages"
2. Log Aggregation
ELK Stack Integration
# filebeat-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: filebeat-config
data:
filebeat.yml: |
filebeat.inputs:
- type: container
paths:
- /var/log/containers/*gateway*.log
- /var/log/containers/*auth*.log
- /var/log/containers/*converter*.log
- /var/log/containers/*notification*.log
processors:
- add_kubernetes_metadata:
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: "/var/log/containers/"
output.elasticsearch:
hosts: ["elasticsearch:9200"]
index: "microservices-%{+yyyy.MM.dd}"
setup.template.name: "microservices"
setup.template.pattern: "microservices-*"
Conclusion
Building a production-ready microservices application requires careful consideration of multiple aspects: architecture design, service communication, data management, security, deployment, and monitoring. Through this comprehensive guide, we've explored how to create a scalable video-to-audio converter using modern cloud-native technologies.
Key Takeaways
1. Architecture Principles
Single Responsibility: Each service has a clear, focused purpose
Loose Coupling: Services communicate through well-defined APIs
Event-Driven Design: Asynchronous processing improves scalability
Fault Tolerance: System gracefully handles component failures
2. Technology Choices
Python/Flask: Rapid development and extensive ecosystem
MongoDB GridFS: Efficient large file storage
MySQL: ACID compliance for critical user data
RabbitMQ: Reliable message queuing
Kubernetes: Container orchestration and management
3. Production Readiness
Security: Multi-layer security implementation
Monitoring: Comprehensive observability and alerting
Scalability: Horizontal scaling capabilities
Reliability: Backup and disaster recovery procedures
Next Steps for Enhancement
1. Advanced Features
# Potential enhancements
āāā Authentication
ā āāā OAuth2/OIDC Integration
ā āāā Multi-factor Authentication
ā āāā Social Login (Google, GitHub)
āāā Processing
ā āāā Multiple Audio Formats (WAV, FLAC, AAC)
ā āāā Video Preprocessing (Compression, Format Conversion)
ā āāā Batch Processing Capabilities
āāā Storage
ā āāā Cloud Storage Integration (S3, GCS, Azure Blob)
ā āāā CDN for File Distribution
ā āāā Automatic Cleanup Policies
āāā User Experience
āāā Web UI for File Management
āāā Progress Tracking
āāā Download History
2. Advanced Deployment
Multi-region deployment for global availability
Blue-green deployments for zero-downtime updates
Canary releases for safer production rollouts
GitOps workflows with ArgoCD or Flux
3. Enhanced Monitoring
Distributed tracing with Jaeger or Zipkin
Application Performance Monitoring (APM)
Business metrics and KPI dashboards
Synthetic monitoring for proactive issue detection
Learning Resources
Documentation
Tools and Platforms
Minikube for local development
Helm for Kubernetes package management
Prometheus for monitoring
Grafana for visualization
Docker Hub for container registry
Final Thoughts
Microservices architecture, while powerful, comes with complexity that must be carefully managed. The key to success lies in starting simple, implementing strong foundations (monitoring, security, testing), and evolving the system incrementally based on real-world feedback and requirements.
This video-to-audio converter serves as a practical example of how to apply microservices principles, but the patterns and practices demonstrated here can be adapted to virtually any distributed system requirements.
Remember: Perfect is the enemy of good. Start with a working system, implement proper monitoring and testing, then iterate and improve based on actual usage patterns and business needs.



