Real-time Monitoring and SSE Events
Real-time monitoring in OEC.SH combines Server-Sent Events (SSE) with Redis pub/sub and Netdata integration to provide live updates on infrastructure metrics, alerts, and operational events across the platform.
Architecture Overview
OEC.SH uses a dual-layer monitoring architecture:
- SSE Layer: Real-time event streaming from backend to connected clients
- Monitoring Layer: Edge-first metrics collection via Netdata on managed servers
┌─────────────────────────────────────────────────────────────────┐
│ FRONTEND CLIENTS │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Browser 1 │ │ Browser 2 │ │ Browser N │ │
│ └──────┬─────┘ └──────┬─────┘ └──────┬─────┘ │
│ │ │ │ │
│ │ SSE Connection │ │ │
│ └────────────────┴────────────────┘ │
│ │ │
└──────────────────────────┼───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ BACKEND API SERVER │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ SSE Endpoint (/api/v1/events/stream) │ │
│ │ - Handles EventSource connections │ │
│ │ - Authenticates via JWT token in query parameter │ │
│ │ - Subscribes to Redis pub/sub channel │ │
│ │ - Streams events to clients │ │
│ └─────────────────────┬────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Redis Pub/Sub (SSE Channel) │ │
│ │ - Channel: "sse:events" │ │
│ │ - Cross-worker event distribution │ │
│ │ - Organization-scoped filtering │ │
│ └──────────────────────┬────────────────────────────────────┘ │
│ │ │
│ ┌───────────────┴───────────────┐ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌──────────────┐ │
│ │ Services │ │ ARQ Workers │ │
│ │ - Alerts │ │ - Monitoring│ │
│ │ - Replica │ │ - Tasks │ │
│ │ - Tasks │ │ - Backups │ │
│ └─────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ NETDATA MONITORING │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Parent Netdata (on PaaSPortal server) │ │
│ │ - Collects metrics from all child agents │ │
│ │ - REST API for metrics queries │ │
│ │ - Webhook alerts to backend │ │
│ └─────────────────────┬────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┴────────────────┐ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Child Agent │ │ Child Agent │ │
│ │ (Server 1) │ │ (Server N) │ │
│ │ - CPU/RAM │ │ - CPU/RAM │ │
│ │ - Disk │ │ - Disk │ │
│ │ - Containers│ │ - Containers│ │
│ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘Server-Sent Events (SSE)
What is SSE?
Server-Sent Events is a web standard that enables servers to push real-time updates to web clients over HTTP. Unlike WebSockets, SSE is:
- Unidirectional: Server to client only (perfect for notifications)
- Auto-reconnecting: Browser automatically reconnects on connection loss
- Simple HTTP: Works through standard HTTP/HTTPS and proxies
- Event-driven: Named events with structured data payloads
SSE vs WebSockets
| Feature | SSE | WebSockets |
|---|---|---|
| Direction | Server → Client | Bidirectional |
| Protocol | HTTP/HTTPS | ws:// or wss:// |
| Reconnection | Automatic | Manual |
| Browser Support | All modern browsers | All modern browsers |
| Proxy/Firewall | Easy (HTTP) | Sometimes blocked |
| Use Case | Real-time updates, notifications | Chat, gaming, real-time collaboration |
| Complexity | Simple | More complex |
Why OEC.SH uses SSE: For monitoring dashboards and operational alerts, we only need server-to-client communication. SSE provides automatic reconnection and simpler implementation compared to WebSockets.
SSE Implementation
Backend: Event Broadcasting
The backend uses Redis pub/sub to distribute events across multiple worker processes and API servers:
# backend/api/v1/routes/events.py
import redis.asyncio as redis
from fastapi import APIRouter, Query, Request
from fastapi.responses import StreamingResponse
SSE_CHANNEL = "sse:events"
async def broadcast_to_organization(
org_id: str | UUID,
event_type: str,
data: dict
) -> None:
"""Broadcast an event via Redis pub/sub to all connected SSE clients."""
r = redis.from_url(get_redis_url())
message = json.dumps({
"org_id": str(org_id),
"type": event_type,
"data": data
})
await r.publish(SSE_CHANNEL, message)
await r.aclose()SSE Connection Endpoint
Clients connect to the SSE stream endpoint with JWT authentication:
Endpoint: GET /api/v1/events/stream?token={jwt_token}
Authentication: JWT token passed as query parameter (since EventSource doesn't support custom headers)
Response: text/event-stream with real-time events
@router.get("/stream")
async def event_stream(
request: Request,
token: str = Query(..., description="JWT access token"),
):
"""SSE endpoint for real-time updates using Redis pub/sub."""
user_id = await get_user_from_token(token)
return StreamingResponse(
event_generator(request, user_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
},
)Event Generator
The event generator subscribes to Redis pub/sub and yields SSE-formatted messages:
async def event_generator(
request: Request,
user_id: str
) -> AsyncGenerator[str, None]:
"""Generate SSE events for a client using Redis pub/sub."""
r = redis.from_url(get_redis_url())
pubsub = r.pubsub()
await pubsub.subscribe(SSE_CHANNEL)
# Send initial connection confirmation
yield "event: connected\ndata: {\"status\": \"connected\"}\n\n"
while True:
# Check if client disconnected
if await request.is_disconnected():
break
try:
# Wait for Redis messages with timeout
message = await asyncio.wait_for(
pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0),
timeout=30.0
)
if message and message["type"] == "message":
data = json.loads(message["data"])
event_type = data.get("type", "message")
event_data = json.dumps({
"type": event_type,
"data": data.get("data", {})
})
yield f"event: {event_type}\ndata: {event_data}\n\n"
except TimeoutError:
# Send keepalive ping every 30 seconds
yield ": keepalive\n\n"
await pubsub.unsubscribe(SSE_CHANNEL)
await pubsub.aclose()
await r.aclose()Key Features:
- Redis pub/sub: Enables cross-worker event distribution
- Keepalive pings: Prevents connection timeout (every 30 seconds)
- Graceful disconnect: Detects when client disconnects and cleans up
- Automatic reconnection: Client reconnects if connection drops
Event Types
OEC.SH emits various SSE event types for different operational activities:
Deployment Events
| Event Type | Description | Data Fields |
|---|---|---|
deployment.started | Environment deployment initiated | environment_id, deployment_id, triggered_by, branch |
deployment.progress | Deployment step progress update | environment_id, deployment_id, progress_percent, current_step |
deployment.completed | Deployment successfully finished | environment_id, deployment_id, duration_seconds |
deployment.failed | Deployment encountered error | environment_id, deployment_id, error_message |
Example Event:
{
"type": "deployment.progress",
"data": {
"environment_id": "a1b2c3d4-...",
"deployment_id": "e5f6g7h8-...",
"progress_percent": 45,
"current_step": "Building Docker image"
}
}Backup Events
| Event Type | Description | Data Fields |
|---|---|---|
backup.started | Backup operation initiated | environment_id, backup_id, backup_type |
backup.completed | Backup successfully finished | environment_id, backup_id, size_bytes, duration_seconds |
backup.failed | Backup encountered error | environment_id, backup_id, error_message |
Example Event:
{
"type": "backup.completed",
"data": {
"environment_id": "a1b2c3d4-...",
"backup_id": "b9c0d1e2-...",
"backup_type": "manual",
"size_bytes": 524288000,
"duration_seconds": 45
}
}Replica Health Events (Sprint 2E40)
| Event Type | Description | Data Fields |
|---|---|---|
replica.health_updated | PostgreSQL replica health check result | environment_id, status, lag_bytes, lag_seconds, last_check |
replica.lag_alert | Replica lag exceeded threshold | environment_id, severity, lag_bytes, lag_seconds, threshold_bytes, threshold_seconds |
Health Statuses:
online: Replica is healthy and streaminglagging: Replica lag exceeds warning threshold (50MB or 15 seconds)offline: Replica is not streamingerror: Health check failed
Lag Thresholds:
- Warning: 50MB or 15 seconds
- Critical: 100MB or 30 seconds
Example Event:
{
"type": "replica.health_updated",
"data": {
"environment_id": "a1b2c3d4-...",
"status": "online",
"lag_bytes": 12582912, // ~12MB
"lag_seconds": 3.5,
"last_check": "2024-12-11T10:30:00Z"
}
}Alert Events
| Event Type | Description | Data Fields |
|---|---|---|
alert_triggered | New monitoring alert triggered | alert_id, alert_type, severity, title, message, server_id, server_name, metric_value, threshold |
alert_acknowledged | Alert acknowledged by user | alert_id, acknowledged_by, acknowledged_at |
alert_resolved | Alert marked as resolved | alert_id, resolved_at |
Alert Severities:
info: Informational, no action requiredwarning: Action recommended within hourscritical: Immediate action required
Alert Types:
cpu_high: CPU usage above threshold (80% warning, 95% critical)memory_high: Memory usage above threshold (80% warning, 95% critical)disk_high: Disk usage above threshold (80% warning, 90% critical)container_down: Container stopped or crashedserver_down: Server unreachablecustom: Custom alert from Netdata webhook
Example Event:
{
"type": "alert_triggered",
"data": {
"alert_id": "c3d4e5f6-...",
"alert_type": "cpu_high",
"severity": "warning",
"title": "High CPU Usage",
"message": "Server CPU usage at 85% for 10 minutes",
"server_id": "d7e8f9g0-...",
"server_name": "prod-server-1",
"metric_value": 85.3,
"threshold": 80.0,
"triggered_at": "2024-12-11T10:35:00Z"
}
}Server Health Events
| Event Type | Description | Data Fields |
|---|---|---|
server.connected | Server health check passed | server_id, server_name, status |
server.health_updated | Server health metrics updated | server_id, cpu_percent, memory_percent, disk_percent, status |
Example Event:
{
"type": "server.health_updated",
"data": {
"server_id": "f1g2h3i4-...",
"server_name": "prod-server-2",
"cpu_percent": 45.2,
"memory_percent": 67.8,
"disk_percent": 55.3,
"status": "healthy"
}
}Task Status Events
| Event Type | Description | Data Fields |
|---|---|---|
task_status | Background task status update | task_id, task_type, status, environment_id, progress_percent, current_step |
Task Statuses:
pending: Task queued, not yet startedin_progress: Task currently executingcompleted: Task finished successfullyfailed: Task encountered error
Example Event:
{
"type": "task_status",
"data": {
"task_id": "g5h6i7j8-...",
"task_type": "deploy",
"status": "in_progress",
"environment_id": "a1b2c3d4-...",
"progress_percent": 60,
"current_step": "Starting containers"
}
}Resource Quota Events
| Event Type | Description | Data Fields |
|---|---|---|
quota.exceeded | Organization resource quota exceeded | organization_id, resource_type, used, limit |
quota.warning | Organization approaching quota limit | organization_id, resource_type, used, limit, percent_used |
Resource Types:
cpu_cores: Total CPU cores allocatedram_mb: Total RAM in megabytesdisk_gb: Total disk space in gigabytes
Example Event:
{
"type": "quota.warning",
"data": {
"organization_id": "h9i0j1k2-...",
"resource_type": "ram_mb",
"used": 15360,
"limit": 16384,
"percent_used": 93.75
}
}Permission Events
| Event Type | Description | Data Fields |
|---|---|---|
permissions_changed | User permissions updated | user_id, organization_id, project_id, changed_permissions |
role_permissions_changed | Role permissions modified | role_id, organization_id, changed_permissions |
Example Event:
{
"type": "permissions_changed",
"data": {
"user_id": "i3j4k5l6-...",
"organization_id": "h9i0j1k2-...",
"project_id": "m7n8o9p0-...",
"changed_permissions": ["project.environments.create", "project.deployments.trigger"]
}
}Migration Events
| Event Type | Description | Data Fields |
|---|---|---|
migration_progress | Odoo.sh migration progress update | migration_id, status, progress_percent, current_step, error_message |
Migration Statuses:
analyzing: Analyzing backup structuredownloading: Downloading backup chunksassembling: Assembling multi-part backuprestoring: Restoring database and filestorecompleted: Migration finishedfailed: Migration error
Example Event:
{
"type": "migration_progress",
"data": {
"migration_id": "j5k6l7m8-...",
"status": "downloading",
"progress_percent": 35,
"current_step": "Downloading chunk 3 of 8",
"error_message": null
}
}Redis Pub/Sub Architecture
Why Redis Pub/Sub?
OEC.SH uses Redis pub/sub for cross-worker event distribution:
Problem: Multiple backend workers and API servers run concurrently. When a background task (e.g., deployment) emits an event, it needs to reach all connected SSE clients, not just those connected to the same worker.
Solution: Redis pub/sub acts as a message broker. All workers subscribe to the same channel, ensuring events reach all connected clients regardless of which worker generated the event.
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker 3 │
│ (ARQ Task) │ │ (API Server)│ │ (API Server)│
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
│ publish() │ subscribe() │ subscribe()
└────────────────────────┼────────────────────────┘
│
┌───────────▼──────────┐
│ Redis Pub/Sub │
│ Channel: sse:events │
└───────────┬──────────┘
│
┌─────────────┴─────────────┐
│ │
┌───────▼────────┐ ┌───────▼────────┐
│ Client A │ │ Client B │
│ (Browser) │ │ (Browser) │
└────────────────┘ └────────────────┘Channel Naming
OEC.SH uses a single Redis pub/sub channel for all SSE events:
Channel: sse:events
Message Format:
{
"org_id": "uuid-of-organization",
"type": "event_type",
"data": {
"field1": "value1",
"field2": "value2"
}
}Message Serialization
All SSE messages are serialized as JSON:
import json
message = json.dumps({
"org_id": str(org_id),
"type": event_type,
"data": data
})
await redis_client.publish(SSE_CHANNEL, message)Organization-Scoped Filtering
Events include org_id for organization-level filtering. Clients automatically receive only events for their organization based on JWT token authentication.
Frontend Integration
useEventStream Hook
The frontend establishes SSE connections using the useEventStream hook:
File: frontend/src/hooks/useEventStream.ts
export function useEventStream() {
const { isAuthenticated } = useAuthStore();
const eventSourceRef = useRef<EventSource | null>(null);
const reconnectAttempts = useRef(0);
const maxReconnectAttempts = 10;
const baseReconnectDelay = 1000; // 1 second
const connect = useCallback(() => {
const token = getAccessToken();
if (!token || !isAuthenticated) return;
const url = `${API_URL}/events/stream?token=${encodeURIComponent(token)}`;
const eventSource = new EventSource(url);
eventSourceRef.current = eventSource;
eventSource.onopen = () => {
console.log("[SSE] Connected");
reconnectAttempts.current = 0;
};
eventSource.onerror = (error) => {
console.error("[SSE] Error:", error);
eventSource.close();
// Exponential backoff reconnection
if (reconnectAttempts.current < maxReconnectAttempts) {
const delay = baseReconnectDelay * Math.pow(2, reconnectAttempts.current);
setTimeout(() => {
reconnectAttempts.current++;
connect();
}, delay);
}
};
// Listen for specific event types
eventSource.addEventListener("environment_status", (e) => {
const parsed = JSON.parse(e.data);
dispatchEvent(parsed);
});
eventSource.addEventListener("alert_triggered", (e) => {
const parsed = JSON.parse(e.data);
dispatchEvent({ type: "alert_triggered", data: parsed });
});
// ... more event listeners
}, [isAuthenticated]);
useEffect(() => {
connect();
return () => {
if (eventSourceRef.current) {
eventSourceRef.current.close();
}
};
}, [connect]);
return {
isConnected: eventSourceRef.current?.readyState === EventSource.OPEN,
};
}Features:
- Automatic reconnection: Exponential backoff (1s, 2s, 4s, 8s, ...)
- Max retry attempts: Up to 10 reconnection attempts
- Event dispatching: Broadcasts events to subscribed components
- Cleanup: Closes connection when component unmounts
SSEProvider Context
The SSEProvider wraps the app root to provide SSE connection state:
File: frontend/src/providers/SSEProvider.tsx
export function SSEProvider({ children }: { children: ReactNode }) {
const { isConnected } = useEventStream();
return (
<SSEContext.Provider value={{ isConnected }}>
{children}
</SSEContext.Provider>
);
}Component Usage: Subscribe to Events
Components subscribe to specific event types using useSSEEvent:
import { useSSEEvent } from "@/hooks/useEventStream";
export function DeploymentStatus({ environmentId }: Props) {
const [status, setStatus] = useState<string>("idle");
const [progress, setProgress] = useState<number>(0);
// Subscribe to deployment progress events
useSSEEvent("deployment.progress", (event) => {
if (event.data.environment_id === environmentId) {
setProgress(event.data.progress_percent);
setStatus(event.data.current_step);
}
});
// Subscribe to deployment completion
useSSEEvent("deployment.completed", (event) => {
if (event.data.environment_id === environmentId) {
setStatus("completed");
setProgress(100);
toast.success("Deployment completed successfully!");
}
});
return (
<div>
<ProgressBar percent={progress} />
<span>{status}</span>
</div>
);
}Global Event Subscription
Use wildcard subscription ("*") to listen to all events:
useSSEEvent("*", (event) => {
console.log("Received event:", event.type, event.data);
// Log all events for debugging
if (isDevelopment) {
console.table({
Type: event.type,
Data: JSON.stringify(event.data, null, 2),
});
}
});Event Subscription Patterns
Pattern 1: Single Component Subscription
// Subscribe to specific event in one component
useSSEEvent("replica.health_updated", handleReplicaUpdate);Pattern 2: Multiple Event Types
// Subscribe to multiple related events
useSSEEvent("deployment.started", handleDeploymentStart);
useSSEEvent("deployment.progress", handleDeploymentProgress);
useSSEEvent("deployment.completed", handleDeploymentComplete);
useSSEEvent("deployment.failed", handleDeploymentFailed);Pattern 3: Filtered Subscription
// Subscribe and filter by resource ID
useSSEEvent("alert_triggered", (event) => {
if (event.data.server_id === currentServerId) {
showAlert(event.data);
}
});Pattern 4: Toast Notifications
// Show toast notifications for critical events
useSSEEvent("alert_triggered", (event) => {
if (event.data.severity === "critical") {
toast.error(event.data.title, {
description: event.data.message,
});
}
});Netdata Integration
Parent-Child Streaming Architecture
OEC.SH uses Netdata parent-child streaming to collect metrics from all managed servers:
┌─────────────────────────────────────────────────────────────────┐
│ PAASPORTAL SERVER (Parent Netdata) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Netdata Parent Container │ │
│ │ - Port: 19999 │ │
│ │ - Collects metrics from all child agents │ │
│ │ - REST API: /api/v1/... │ │
│ │ - Web UI: https://monitoring.paasportal.io │ │
│ └──────────────────────────────────────────────────────────┘ │
└────────────────────────────────┬────────────────────────────────┘
│ Streaming (port 19999)
┌───────────────┴───────────────┐
│ │
┌────────────────▼─────────┐ ┌───────────────▼────────────┐
│ MANAGED SERVER 1 │ │ MANAGED SERVER N │
│ ┌────────────────────┐ │ │ ┌────────────────────┐ │
│ │ Netdata Child Agent│ │ │ │ Netdata Child Agent│ │
│ │ - Machine GUID │ │ │ │ - Machine GUID │ │
│ │ - Streams to parent│ │ │ │ - Streams to parent│ │
│ │ - Collects metrics │ │ │ │ - Collects metrics │ │
│ └────────────────────┘ │ │ └────────────────────┘ │
│ │ │ │
│ Docker Containers: │ │ Docker Containers: │
│ - Odoo │ │ - Odoo │
│ - PostgreSQL │ │ - PostgreSQL │
│ - Redis │ │ - Redis │
└──────────────────────────┘ └────────────────────────────┘Parent Netdata Configuration
The parent Netdata runs on the PaaSPortal server and receives streaming data from all child agents.
Environment Variables:
NETDATA_PARENT_PUBLIC_URL: Public URL for parent Netdata (e.g.,https://monitoring.paasportal.io)NETDATA_PARENT_API_KEY: API key for authenticating with parent
Stream Configuration (/etc/netdata/stream.conf on parent):
[STREAM_API_KEY]
# Enable accepting streamed metrics
enabled = yes
# Allow connections from child agents
allow from = *
# API key for authentication
api key = YOUR_STREAM_API_KEY
# Retention for streamed metrics
history = 3600
memory mode = dbengineChild Netdata Agent Installation
Child agents are installed on managed servers via the bootstrap script:
# In bootstrap_server.sh
# Install Netdata
bash <(curl -Ss https://my-netdata.io/kickstart.sh) \
--stable-channel \
--disable-telemetry \
--claim-token ${NETDATA_CLAIM_TOKEN} \
--claim-url https://app.netdata.cloud
# Configure streaming to parent
cat > /etc/netdata/stream.conf <<EOF
[stream]
enabled = yes
destination = ${NETDATA_PARENT_HOST}:19999
api key = ${NETDATA_STREAM_API_KEY}
timeout seconds = 60
buffer size bytes = 1048576
reconnect delay seconds = 5
EOF
# Restart Netdata
systemctl restart netdataChild Agent Configuration:
- Collects CPU, memory, disk, network, and Docker container metrics
- Streams all metrics to parent Netdata in real-time
- Stores local metrics for 1 hour (fallback if parent unreachable)
- Auto-reconnects to parent if connection drops
Netdata Machine GUID
Each child agent has a unique machine GUID used to identify it in the parent:
Database Field: vm.netdata_machine_guid
Location on Server: /var/lib/netdata/registry/machine.guid
The backend stores this GUID to query metrics for specific servers from the parent Netdata API.
Streaming Status
The vm table tracks streaming configuration and status:
class VM(Base):
# ... other fields ...
# Netdata streaming configuration
netdata_streaming_enabled = Column(Boolean, default=False)
netdata_streaming_status = Column(String(50), nullable=True)
netdata_machine_guid = Column(String(36), nullable=True)
netdata_version = Column(String(20), nullable=True)
netdata_installed_at = Column(DateTime(timezone=True), nullable=True)Streaming Statuses:
connected: Child agent streaming to parentdisconnected: Child agent not streamingpending: Streaming configuration in progresserror: Streaming setup failed
Server Health Monitoring
Metrics Collection
OEC.SH collects the following metrics via Netdata:
System Metrics:
- CPU: Percentage utilization (user, system, idle)
- Memory: Used, free, cached, buffered (bytes and percentage)
- Disk: Used space, total space (bytes and percentage)
- Network: Receive/transmit rates (bytes/sec)
Container Metrics (per Docker container):
- CPU: Container CPU percentage
- Memory: Used memory, limit, percentage
- Network: Container RX/TX bytes
- Status: Running, stopped, restarting
- Restarts: Number of restarts
- Uptime: Container uptime in seconds
Health Thresholds
Health status is calculated based on metric thresholds:
# backend/services/monitoring_service.py
CPU_WARNING_THRESHOLD = 80
CPU_CRITICAL_THRESHOLD = 95
MEMORY_WARNING_THRESHOLD = 80
MEMORY_CRITICAL_THRESHOLD = 95
DISK_WARNING_THRESHOLD = 80
DISK_CRITICAL_THRESHOLD = 90
def _calculate_status(cpu: float, memory: float, disk: float) -> str:
"""Calculate overall health status from metrics."""
if (cpu >= CPU_CRITICAL_THRESHOLD or
memory >= MEMORY_CRITICAL_THRESHOLD or
disk >= DISK_CRITICAL_THRESHOLD):
return "critical"
if (cpu >= CPU_WARNING_THRESHOLD or
memory >= MEMORY_WARNING_THRESHOLD or
disk >= DISK_WARNING_THRESHOLD):
return "warning"
return "healthy"Health Statuses:
healthy: All metrics within normal rangewarning: One or more metrics above warning thresholdcritical: One or more metrics above critical thresholdunreachable: Cannot connect to server or Netdata
Monitoring Service
The MonitoringService queries Netdata for metrics:
File: backend/services/monitoring_service.py
class MonitoringService:
"""Service for querying metrics on-demand from Netdata."""
async def get_server_summary(self, server_id: UUID) -> MetricsSummary:
"""Level 1: Quick summary for cards."""
server = await self._get_server(server_id)
async with get_netdata_client(server) as client:
system_stats = await client.get_system_stats()
container_stats = await client.get_container_stats()
return self._to_summary(system_stats, container_stats)
async def get_server_metrics(
self,
server_id: UUID,
include_timeseries: bool = False,
time_range: str = "24h"
) -> ServerMetricsResponse:
"""Level 2: Full metrics with optional charts."""
server = await self._get_server(server_id)
async with get_netdata_client(server) as client:
system_stats = await client.get_system_stats()
container_stats = await client.get_container_stats()
timeseries = None
if include_timeseries:
history = await client.get_historical_stats(minutes=parse_time_range(time_range))
timeseries = self._format_timeseries(history)
return ServerMetricsResponse(
server_id=server_id,
server_name=server.name,
summary=self._to_summary(system_stats, container_stats),
containers=[self._to_container_metrics(c) for c in container_stats],
timeseries=timeseries,
queried_at=datetime.now(UTC),
)Netdata Client
The NetdataClient supports dual-mode operation:
Modes:
- Parent Mode: Query parent Netdata via HTTP using machine GUID
- SSH Mode: Direct SSH tunnel to child agent (fallback)
File: backend/services/netdata_client.py
class NetdataClient:
"""Client for querying Netdata API with dual-mode support."""
def __init__(self, vm: VM, use_parent: bool | None = None):
"""
Initialize client.
Args:
vm: VM to query metrics for
use_parent: Override mode. If None, auto-detect from vm.netdata_streaming_enabled
"""
self.vm = vm
self.use_parent = use_parent if use_parent is not None else vm.netdata_streaming_enabled
async def _query_parent_api(self, endpoint: str) -> dict:
"""Query parent Netdata for this VM's metrics."""
if not self.vm.netdata_machine_guid:
raise NetdataClientError("VM has no machine_guid")
# Parent exposes child nodes at /host/{machine_guid}/...
url = f"{PARENT_URL}/host/{self.vm.netdata_machine_guid}{endpoint}"
response = await self._http_client.get(url)
return response.json()
async def get_system_stats(self) -> dict:
"""Get current system statistics."""
# Query CPU, memory, disk, network from Netdata charts
cpu_data = await self.get_chart_data("system.cpu", after=-1, points=1)
mem_data = await self.get_chart_data("system.ram", after=-1, points=1)
disk_data = await self.get_chart_data("disk_space._", after=-1, points=1)
net_data = await self.get_chart_data("system.net", after=-1, points=1)
return {
"cpu": {"percent": parse_cpu_percent(cpu_data)},
"memory": {"used": parse_mem_used(mem_data), "total": parse_mem_total(mem_data), "percent": parse_mem_percent(mem_data)},
"disk": {"used": parse_disk_used(disk_data), "total": parse_disk_total(disk_data), "percent": parse_disk_percent(disk_data)},
"network": {"rx_bytes": parse_net_rx(net_data), "tx_bytes": parse_net_tx(net_data)},
"timestamp": datetime.now(UTC).isoformat(),
}Container Health Checks
Container health is monitored via Docker status and metrics:
async def get_container_stats(self, container_name: str | None = None) -> list[dict]:
"""Get container statistics from Netdata or docker ps."""
if self.use_parent:
# Query cgroup charts from Netdata
charts = await self._query_api("/api/v1/charts")
containers = []
for chart_name in charts["charts"].keys():
if chart_name.startswith("cgroup_") and ".cpu" in chart_name:
cname = chart_name.replace("cgroup_", "").rsplit(".cpu", 1)[0]
# Get CPU, memory, network for this container
cpu_percent = await self._get_container_cpu(cname)
memory_used, memory_limit = await self._get_container_memory(cname)
network_rx, network_tx = await self._get_container_network(cname)
containers.append({
"container_name": cname,
"cpu_percent": cpu_percent,
"memory_used": memory_used,
"memory_limit": memory_limit,
"network_rx": network_rx,
"network_tx": network_tx,
"status": "running",
})
return containers
else:
# SSH mode: Use docker ps
cmd = "docker ps -a --format '{{.Names}}|{{.Status}}|{{.RunningFor}}'"
exit_code, stdout, stderr = self.ssh.execute_command(cmd)
containers = []
for line in stdout.strip().split('\n'):
parts = line.split('|')
containers.append({
"container_name": parts[0],
"status": parse_docker_status(parts[1]),
"uptime": parse_uptime(parts[2]),
})
return containersDatabase Replica Monitoring
Replica Health Checks
OEC.SH monitors PostgreSQL read replica health via ARQ cron job:
Frequency: Every 2 minutes
Monitored Metrics:
- Replication State: streaming, catchup, stopped
- Lag (Bytes):
pg_wal_lsn_diff(sent_lsn, flush_lsn) - Lag (Seconds):
EXTRACT(EPOCH FROM write_lag)
Health Calculation:
# backend/services/replication_monitor.py
LAG_WARNING_BYTES = 50 * 1024 * 1024 # 50 MB
LAG_CRITICAL_BYTES = 100 * 1024 * 1024 # 100 MB
LAG_WARNING_SECONDS = 15
LAG_CRITICAL_SECONDS = 30
if lag_bytes > LAG_CRITICAL_BYTES or lag_seconds > LAG_CRITICAL_SECONDS:
status = "lagging" # Critical lag
elif lag_bytes > LAG_WARNING_BYTES or lag_seconds > LAG_WARNING_SECONDS:
status = "lagging" # Warning lag
else:
status = "online" # HealthyReplica Health Query
The replication monitor queries pg_stat_replication on the primary PostgreSQL container:
async def _query_replication_stats(self, environment: ProjectEnvironment) -> dict:
"""Query pg_stat_replication view on primary PostgreSQL container."""
primary_container = f"postgres-primary-{environment.id}"
sql_query = """
SELECT
state,
pg_wal_lsn_diff(sent_lsn, flush_lsn) AS lag_bytes,
EXTRACT(EPOCH FROM write_lag) AS lag_seconds
FROM pg_stat_replication
WHERE application_name = 'replica'
LIMIT 1;
"""
query_cmd = (
f"docker exec {primary_container} "
f"psql -U odoo -d postgres -t -A -F '|' "
f"-c \"{sql_query}\""
)
ssh_manager = get_ssh_manager_for_vm(environment.vm)
ssh_manager.connect()
exit_code, stdout, stderr = ssh_manager.execute_command(query_cmd, timeout=30)
# Parse output: state|lag_bytes|lag_seconds
parts = stdout.strip().split("|")
state = parts[0].strip()
lag_bytes = int(float(parts[1])) if parts[1] else 0
lag_seconds = float(parts[2]) if parts[2] else 0.0
return {
"state": state,
"lag_bytes": lag_bytes,
"lag_seconds": lag_seconds,
}SSE Events for Replica Health
Replica health updates are broadcast via SSE:
async def _emit_health_updated_event(
self,
environment: ProjectEnvironment,
status: str,
lag_bytes: int | None,
lag_seconds: float | None
) -> None:
"""Emit SSE event for replica health update."""
org_id = environment.project.organization_id
event_data = {
"environment_id": str(environment.id),
"status": status,
"lag_bytes": lag_bytes,
"lag_seconds": lag_seconds,
"last_check": environment.replica_last_check.isoformat(),
}
await broadcast_to_organization(
org_id=org_id,
event_type="replica.health_updated",
data=event_data
)Frontend Integration:
// Subscribe to replica health updates
useSSEEvent("replica.health_updated", (event) => {
if (event.data.environment_id === environmentId) {
setReplicaStatus(event.data.status);
setLagBytes(event.data.lag_bytes);
setLagSeconds(event.data.lag_seconds);
// Show alert if lagging
if (event.data.status === "lagging") {
toast.warning("Database replica is lagging", {
description: `Lag: ${formatBytes(event.data.lag_bytes)} / ${event.data.lag_seconds}s`,
});
}
}
});Performance Metrics
Deployment Duration Tracking
Deployment duration is tracked from start to completion:
# backend/models/deployment.py
class Deployment(Base):
id = Column(UUID, primary_key=True)
environment_id = Column(UUID, ForeignKey("project_environments.id"))
status = Column(SQLEnum(DeploymentStatus))
created_at = Column(DateTime(timezone=True), default=datetime.now(UTC))
started_at = Column(DateTime(timezone=True), nullable=True)
completed_at = Column(DateTime(timezone=True), nullable=True)
@property
def duration_seconds(self) -> int | None:
"""Calculate deployment duration in seconds."""
if not self.started_at or not self.completed_at:
return None
return int((self.completed_at - self.started_at).total_seconds())SSE Event:
{
"type": "deployment.completed",
"data": {
"deployment_id": "...",
"duration_seconds": 127
}
}Backup Completion Times
Backup operations track size and duration:
# backend/models/backup.py
class Backup(Base):
id = Column(UUID, primary_key=True)
environment_id = Column(UUID, ForeignKey("project_environments.id"))
started_at = Column(DateTime(timezone=True))
completed_at = Column(DateTime(timezone=True), nullable=True)
size_bytes = Column(BigInteger, nullable=True)
@property
def duration_seconds(self) -> int | None:
"""Calculate backup duration."""
if not self.started_at or not self.completed_at:
return None
return int((self.completed_at - self.started_at).total_seconds())API Response Times
API response times can be monitored via middleware:
# backend/core/middleware.py
@app.middleware("http")
async def add_timing_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration_ms = (time.time() - start_time) * 1000
response.headers["X-Response-Time"] = f"{duration_ms:.2f}ms"
return responseResource Utilization Trends
Historical metrics are stored in Netdata (edge) for 30 days:
Query Historical Data:
async def get_historical_stats(self, minutes: int = 60) -> dict:
"""Get historical statistics for charts."""
after = -minutes * 60 # Convert to seconds
cpu_data = await self.get_chart_data("system.cpu", after=after, points=minutes)
mem_data = await self.get_chart_data("system.ram", after=after, points=minutes)
return {
"cpu": [{"timestamp": ts, "value": val} for ts, val in cpu_data],
"memory": [{"timestamp": ts, "value": val} for ts, val in mem_data],
}Frontend Chart:
const { data } = useQuery({
queryKey: ["server-metrics-history", serverId, timeRange],
queryFn: () => api.get(`/servers/${serverId}/metrics`, {
params: { include_timeseries: true, time_range: timeRange }
}),
});
// Render with uPlot or Recharts
<LineChart data={data.timeseries.cpu} />Alert System
Alert Types and Severities
Alert Types:
cpu_high: CPU usage threshold exceededmemory_high: Memory usage threshold exceededdisk_high: Disk space threshold exceededcontainer_down: Container stopped or unhealthyserver_down: Server unreachablereplica_lag: Database replica laggingcustom: Custom Netdata webhook alert
Severities:
info: Informational, no action requiredwarning: Attention recommended within hourscritical: Immediate action required
Alert Rules
Alert rules define thresholds and notification preferences:
# backend/models/alert.py
class AlertRule(Base):
id = Column(UUID, primary_key=True)
organization_id = Column(UUID, ForeignKey("organizations.id"), nullable=True)
name = Column(String(255), nullable=False)
description = Column(Text)
alert_type = Column(String(50), nullable=False)
severity = Column(SQLEnum(AlertSeverity), nullable=False)
# Threshold configuration
metric_name = Column(String(100), nullable=False)
operator = Column(String(10), default=">") # >, <, >=, <=, ==
threshold = Column(Float, nullable=False)
duration_seconds = Column(Integer, default=300) # Must breach for this long
# Notification
notify_channels = Column(JSON, default=list) # ["email", "slack"]
cooldown_seconds = Column(Integer, default=3600) # 1 hour between alerts
is_active = Column(Boolean, default=True)Example Rules:
DEFAULT_ALERT_RULES = [
{
"name": "High CPU",
"alert_type": "cpu_high",
"severity": "warning",
"metric_name": "cpu_percent",
"operator": ">",
"threshold": 80,
"duration_seconds": 600, # 10 minutes
"notify_channels": ["email"],
"cooldown_seconds": 3600, # 1 hour
},
{
"name": "Disk Space Critical",
"alert_type": "disk_high",
"severity": "critical",
"metric_name": "disk_percent",
"operator": ">",
"threshold": 95,
"duration_seconds": 60,
"notify_channels": ["email", "slack"],
"cooldown_seconds": 1800, # 30 minutes
},
]Alert Creation and Deduplication
Alerts are created with fingerprint-based deduplication:
# backend/services/alert_service.py
class AlertService:
async def create_alert(
self,
server_id: UUID,
alert_type: AlertType,
severity: AlertSeverity,
title: str,
message: str | None = None,
metric_value: float | None = None,
threshold: float | None = None,
) -> AlertEvent:
"""Create alert with deduplication."""
# Generate fingerprint for deduplication
fingerprint = self._generate_fingerprint(server_id, alert_type, metric_name)
# Check for existing unresolved alert within cooldown
existing = await self._find_existing_alert(fingerprint, cooldown_seconds=3600)
if existing:
# Update occurrence count instead of creating new
existing.occurrence_count += 1
existing.last_occurrence = datetime.now(UTC)
existing.metric_value = metric_value
await self.db.commit()
return existing
# Create new alert
alert = AlertEvent(
organization_id=server.organization_id,
server_id=server_id,
alert_type=alert_type,
severity=severity,
title=title,
message=message,
metric_value=metric_value,
threshold=threshold,
status=AlertStatus.TRIGGERED,
fingerprint=fingerprint,
occurrence_count=1,
)
self.db.add(alert)
await self.db.commit()
# Broadcast via SSE
await broadcast_to_organization(
org_id=alert.organization_id,
event_type="alert_triggered",
data={
"alert_id": str(alert.id),
"alert_type": alert_type.value,
"severity": severity.value,
"title": title,
"metric_value": metric_value,
"threshold": threshold,
},
)
return alertAlert Lifecycle
Alerts follow a lifecycle: Triggered → Acknowledged → Resolved
class AlertStatus(str, Enum):
TRIGGERED = "triggered" # New alert, not yet seen
ACKNOWLEDGED = "acknowledged" # User acknowledged, investigating
RESOLVED = "resolved" # Issue fixed
async def acknowledge_alert(self, alert_id: UUID, user_id: UUID) -> AlertEvent:
"""Mark alert as acknowledged."""
alert = await self.db.get(AlertEvent, alert_id)
alert.status = AlertStatus.ACKNOWLEDGED
alert.acknowledged_at = datetime.now(UTC)
alert.acknowledged_by = user_id
await self.db.commit()
# Broadcast status change
await broadcast_to_organization(
org_id=alert.organization_id,
event_type="alert_acknowledged",
data={
"alert_id": str(alert.id),
"acknowledged_by": str(user_id),
},
)
return alert
async def resolve_alert(self, alert_id: UUID) -> AlertEvent:
"""Mark alert as resolved."""
alert = await self.db.get(AlertEvent, alert_id)
alert.status = AlertStatus.RESOLVED
alert.resolved_at = datetime.now(UTC)
await self.db.commit()
# Broadcast resolution
await broadcast_to_organization(
org_id=alert.organization_id,
event_type="alert_resolved",
data={"alert_id": str(alert.id)},
)
return alertAlert Delivery Channels
Email (via Postmark):
await email_service.send_alert_notification(
to=user.email,
alert=alert_event,
template="alert_triggered"
)SSE (real-time in-app):
await broadcast_to_organization(
org_id=alert.organization_id,
event_type="alert_triggered",
data=alert_data
)Slack Webhook (optional):
async def send_slack_alert(webhook_url: str, alert: AlertEvent):
payload = {
"attachments": [{
"color": "#FF0000" if alert.severity == "critical" else "#FFCC00",
"title": f"{alert.severity.upper()}: {alert.title}",
"text": alert.message,
"fields": [
{"title": "Server", "value": alert.server.name, "short": True},
{"title": "Value", "value": str(alert.metric_value), "short": True},
],
}]
}
async with httpx.AsyncClient() as client:
await client.post(webhook_url, json=payload)Alert Configuration
Organizations can configure alert rules:
API Endpoints:
GET /api/v1/monitoring/alerts/rules- List alert rulesPOST /api/v1/monitoring/alerts/rules- Create custom rulePUT /api/v1/monitoring/alerts/rules/{id}- Update ruleDELETE /api/v1/monitoring/alerts/rules/{id}- Delete rule
Frontend:
const { data: rules } = useQuery({
queryKey: ["alert-rules"],
queryFn: () => api.get("/monitoring/alerts/rules"),
});
// Create new rule
await api.post("/monitoring/alerts/rules", {
name: "High Memory (Custom)",
alert_type: "memory_high",
severity: "warning",
metric_name: "memory_percent",
operator: ">",
threshold: 85,
duration_seconds: 300,
notify_channels: ["email", "slack"],
cooldown_seconds: 1800,
});Netdata Configuration
Environment Variables
Backend Configuration:
# .env.production
# Netdata parent URL (public-facing)
NETDATA_PARENT_PUBLIC_URL=https://monitoring.paasportal.io
# Netdata parent API URL (internal)
NETDATA_PARENT_URL=http://netdata-parent:19999
# API key for authentication
NETDATA_PARENT_API_KEY=your-api-key-hereParent Container Deployment
The Netdata parent runs as a Docker container on the PaaSPortal server:
docker-compose.yml:
services:
netdata-parent:
image: netdata/netdata:stable
container_name: netdata-parent
hostname: netdata-parent
restart: unless-stopped
ports:
- "19999:19999"
volumes:
- netdata-config:/etc/netdata
- netdata-lib:/var/lib/netdata
- netdata-cache:/var/cache/netdata
environment:
- NETDATA_CLAIM_TOKEN=${NETDATA_CLAIM_TOKEN}
- NETDATA_CLAIM_URL=https://app.netdata.cloud
cap_add:
- SYS_PTRACE
security_opt:
- apparmor:unconfined
labels:
- "traefik.enable=true"
- "traefik.http.routers.netdata.rule=Host(`monitoring.paasportal.io`)"
- "traefik.http.routers.netdata.tls.certresolver=letsencrypt"
- "traefik.http.services.netdata.loadbalancer.server.port=19999"
volumes:
netdata-config:
netdata-lib:
netdata-cache:Stream Configuration File
Parent (/etc/netdata/stream.conf):
[STREAM_API_KEY]
# Enable receiving streamed metrics
enabled = yes
# Allow from any IP (child agents)
allow from = *
# Stream API key (must match child configuration)
api key = YOUR_STREAM_API_KEY
# Retention for streamed metrics (1 hour = 3600 seconds)
history = 3600
# Storage mode
memory mode = dbengine
# Health monitoring
health enabled = yesChild Agent (/etc/netdata/stream.conf on managed servers):
[stream]
# Enable streaming to parent
enabled = yes
# Parent Netdata address
destination = monitoring.paasportal.io:19999
# Stream API key (must match parent)
api key = YOUR_STREAM_API_KEY
# Connection settings
timeout seconds = 60
buffer size bytes = 1048576
reconnect delay seconds = 5
# Send all charts
send charts matching = *Parent-Child Linking
Child agents automatically link to the parent when streaming is configured:
- Install Netdata on managed server via bootstrap script
- Configure streaming in
/etc/netdata/stream.conf - Restart Netdata:
systemctl restart netdata - Verify connection: Check parent dashboard for new host
- Store machine GUID in database:
vm.netdata_machine_guid
Verification:
# On parent server
curl http://localhost:19999/api/v1/info | jq .
# Response includes streaming hosts
{
"streaming": {
"status": "enabled",
"nodes": [
{
"guid": "abc123...",
"hostname": "prod-server-1",
"status": "connected"
}
]
}
}Monitoring API
Server Metrics Endpoint
GET /api/v1/monitoring/servers/{server_id}/summary
Returns quick metrics summary for server list views.
Response:
{
"cpu_percent": 45.2,
"memory_percent": 67.8,
"memory_used_bytes": 5637144576,
"memory_total_bytes": 8318521344,
"disk_percent": 55.3,
"disk_used_bytes": 55301234688,
"disk_total_bytes": 100000000000,
"network_rx_rate": 1248576,
"network_tx_rate": 895632,
"container_count": 8,
"containers_healthy": 7,
"containers_unhealthy": 1,
"status": "healthy",
"queried_at": "2024-12-11T10:30:00Z"
}GET /api/v1/monitoring/servers/{server_id}/metrics
Returns full server metrics with optional time-series data.
Query Parameters:
include_timeseries: Boolean (default: false)time_range: String -1h,6h,24h,7d,30d(default:24h)
Response:
{
"server_id": "a1b2c3d4-...",
"server_name": "prod-server-1",
"summary": {
"cpu_percent": 45.2,
"memory_percent": 67.8,
"disk_percent": 55.3,
"status": "healthy",
"queried_at": "2024-12-11T10:30:00Z"
},
"containers": [
{
"container_id": "abc123",
"container_name": "prod-env-123_odoo",
"cpu_percent": 15.3,
"memory_percent": 45.0,
"memory_used_bytes": 536870912,
"memory_limit_bytes": 1073741824,
"network_rx_bytes": 1248576,
"network_tx_bytes": 895632,
"status": "running",
"restart_count": 0,
"uptime_seconds": 86400
}
],
"timeseries": {
"cpu": {
"metric_name": "cpu",
"points": [
{"timestamp": "2024-12-11T10:00:00Z", "value": 42.5},
{"timestamp": "2024-12-11T10:05:00Z", "value": 45.2}
],
"unit": "%"
},
"memory": {
"metric_name": "memory",
"points": [
{"timestamp": "2024-12-11T10:00:00Z", "value": 65.3},
{"timestamp": "2024-12-11T10:05:00Z", "value": 67.8}
],
"unit": "%"
}
},
"netdata_dashboard_url": "https://monitoring.prod-server-1.paasportal.io",
"queried_at": "2024-12-11T10:30:00Z"
}Environment Metrics Endpoint
GET /api/v1/monitoring/environments/{environment_id}/metrics
Returns container metrics filtered to a specific environment.
Response:
[
{
"container_id": "abc123",
"container_name": "env-prod_odoo",
"cpu_percent": 15.3,
"memory_percent": 45.0,
"memory_used_bytes": 536870912,
"memory_limit_bytes": 1073741824,
"network_rx_bytes": 1248576,
"network_tx_bytes": 895632,
"status": "running",
"restart_count": 0,
"uptime_seconds": 86400
},
{
"container_id": "def456",
"container_name": "env-prod_db",
"cpu_percent": 8.7,
"memory_percent": 52.3,
"memory_used_bytes": 1073741824,
"memory_limit_bytes": 2147483648,
"network_rx_bytes": 524288,
"network_tx_bytes": 315392,
"status": "running",
"restart_count": 0,
"uptime_seconds": 86400
}
]Replica Health Endpoint
GET /api/v1/monitoring/environments/{environment_id}/replicas/health
Returns PostgreSQL read replica health status.
Response:
{
"status": "online",
"lag_bytes": 12582912,
"lag_seconds": 3.5,
"error_message": null,
"last_check": "2024-12-11T10:30:00Z"
}Status Values:
online: Healthy, streaming with acceptable laglagging: Lag exceeds warning/critical thresholdsoffline: Replication not streamingerror: Health check failed
Nginx Configuration
SSE Proxy Settings
Nginx must be configured to support SSE connections:
File: nginx/conf.d/devsh.openeducat.ai.conf
# SSE endpoint - disable buffering
location /api/v1/events/stream {
proxy_pass http://backend:8000;
# SSE requires these headers
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
# Disable buffering for SSE
proxy_buffering off;
proxy_cache off;
# Long timeout for SSE (24 hours)
proxy_read_timeout 86400s;
proxy_send_timeout 86400s;
# Forward client info
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}Critical Settings:
proxy_buffering off: Disables response buffering (required for SSE)proxy_http_version 1.1: Uses HTTP/1.1 for persistent connectionschunked_transfer_encoding off: Disables chunked encodingproxy_read_timeout 86400s: 24-hour timeout for long-lived connections
Backend Headers
The backend also sets the X-Accel-Buffering: no header:
return StreamingResponse(
event_generator(request, user_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Instructs Nginx to disable buffering
},
)Permissions
Monitoring Permissions
Access to monitoring data is controlled via the permission system:
Organization-Level:
org.monitoring.view- View monitoring dashboards and metricsorg.monitoring.alerts.view- View alertsorg.monitoring.alerts.configure- Configure alert rules
Project-Level:
project.monitoring.view- View project environment metrics
Permission Checks
Backend:
# Verify organization monitoring permission
has_permission = await check_permission(
db=db,
user=current_user,
permission_code="org.monitoring.view",
organization_id=server.organization_id,
)
if not has_permission:
raise HTTPException(403, "No permission to view monitoring")Frontend:
import { useAbilities } from "@/hooks/useAbilities";
export function MonitoringDashboard() {
const { can } = useAbilities({ organizationId });
if (!can("monitoring", "view")) {
return <AccessDenied />;
}
return <MonitoringCharts />;
}Best Practices
Event Subscription Management
1. Unsubscribe on Unmount
Always clean up subscriptions to prevent memory leaks:
useEffect(() => {
const unsubscribe = subscribeToEvents("deployment.progress", handleProgress);
return () => {
unsubscribe(); // Clean up on unmount
};
}, []);2. Filter Events Efficiently
Filter events in the callback rather than subscribing multiple times:
// Good: Single subscription with filtering
useSSEEvent("deployment.progress", (event) => {
if (event.data.environment_id === currentEnvId) {
updateProgress(event.data.progress_percent);
}
});
// Bad: Multiple subscriptions (inefficient)
environments.forEach(env => {
useSSEEvent("deployment.progress", (event) => {
if (event.data.environment_id === env.id) {
// ...
}
});
});3. Batch State Updates
Use batch state updates to prevent multiple re-renders:
useSSEEvent("server.health_updated", (event) => {
setServerMetrics(prev => ({
...prev,
[event.data.server_id]: {
cpu: event.data.cpu_percent,
memory: event.data.memory_percent,
disk: event.data.disk_percent,
status: event.data.status,
}
}));
});Connection Pooling and Limits
Server-Side:
- Redis pub/sub supports unlimited subscribers
- Each SSE connection consumes ~1KB memory
- No explicit connection limit, but monitor server resources
Client-Side:
- Browser limit: 6 concurrent connections per domain
- SSE uses 1 connection per tab
- Reconnection with exponential backoff prevents thundering herd
Alert Fatigue Prevention
1. Deduplication
Use fingerprints to group repeated alerts:
fingerprint = f"{alert_type}:{server_id}:{metric_name}"2. Cooldown Periods
Prevent alert spam with cooldowns:
cooldown_seconds = {
"cpu_high": 3600, # 1 hour
"disk_high": 1800, # 30 minutes
"server_down": 300, # 5 minutes
}3. Severity-Based Routing
Route alerts based on severity:
- Info: In-app only
- Warning: In-app + email digest (hourly)
- Critical: In-app + email + Slack (immediate)
4. Occurrence Counting
Track how many times an alert fired:
existing.occurrence_count += 1
existing.last_occurrence = datetime.now(UTC)Frontend Display:
{alert.occurrence_count > 1 && (
<Badge variant="warning">
Occurred {alert.occurrence_count} times
</Badge>
)}Troubleshooting
SSE Connection Drops
Symptom: SSE connection closes unexpectedly
Causes:
- Nginx buffering enabled: Check
proxy_buffering off - Firewall timeout: Some firewalls close idle connections after 30 seconds
- Load balancer timeout: Check load balancer settings
- JWT token expired: Token expiration closes connection
Solutions:
# Nginx configuration
proxy_buffering off;
proxy_read_timeout 86400s;# Backend: Send keepalive pings
except TimeoutError:
yield ": keepalive\n\n" # Every 30 seconds// Frontend: Auto-reconnect with exponential backoff
eventSource.onerror = () => {
const delay = baseDelay * Math.pow(2, reconnectAttempts.current);
setTimeout(() => connect(), delay);
};Missing Events
Symptom: SSE events not received by client
Causes:
- Not subscribed to event type: Check event listener registration
- Organization filter: Events only sent to matching organization
- Redis connection lost: Backend lost connection to Redis
- Client disconnected: Client reconnecting, missed events
Debug:
// Subscribe to all events for debugging
useSSEEvent("*", (event) => {
console.log("[SSE Debug]", event.type, event.data);
});# Backend: Log all broadcasts
logger.info(f"Broadcasting {event_type} to org {org_id}")Netdata Parent-Child Connection Issues
Symptom: Child agent not appearing in parent dashboard
Causes:
- Incorrect stream API key: Child/parent keys don't match
- Firewall blocking port 19999: Parent unreachable
- Incorrect parent hostname: DNS resolution fails
- Netdata not running: Child agent stopped
Debug:
# On child server
systemctl status netdata
# Check stream configuration
cat /etc/netdata/stream.conf
# Test connectivity to parent
telnet monitoring.paasportal.io 19999
# Check Netdata logs
journalctl -u netdata -f
# Verify machine GUID
cat /var/lib/netdata/registry/machine.guidFix:
# Restart Netdata on child
systemctl restart netdata
# Force reconnection
netdatacli reload-claiming-state
# Check streaming status on parent
curl http://localhost:19999/api/v1/info | jq '.streaming'High CPU from Monitoring Overhead
Symptom: Monitoring system consuming excessive CPU
Causes:
- Too frequent queries: Polling Netdata too often
- Too many metrics: Collecting unnecessary data
- Inefficient queries: Not using Netdata's aggregation
Solutions:
# Limit query frequency
@cache(ttl=60) # Cache for 60 seconds
async def get_server_metrics(server_id: UUID):
# ...# Use Netdata's built-in aggregation
await client.get_chart_data(
"system.cpu",
after=-60, # Last 60 seconds
points=1, # Return 1 aggregated point
)# Reduce Netdata collection frequency
# /etc/netdata/netdata.conf
[global]
update every = 5 # Collect every 5 seconds (default: 1)SSE Event Order
Symptom: Events arrive out of order
Cause: Multiple workers publish to Redis simultaneously
Solution: Include timestamp in event data:
await broadcast_to_organization(
org_id=org_id,
event_type="deployment.progress",
data={
"deployment_id": str(deployment_id),
"progress_percent": 50,
"timestamp": datetime.now(UTC).isoformat(), # Add timestamp
}
)// Frontend: Sort by timestamp
const sortedEvents = events.sort((a, b) =>
new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime()
);Summary
OEC.SH's real-time monitoring system combines:
- SSE: Unidirectional server-to-client event streaming
- Redis Pub/Sub: Cross-worker event distribution
- Netdata: Edge-first metrics collection with parent-child streaming
- Alert System: Threshold-based monitoring with deduplication
- Replica Monitoring: PostgreSQL replication health checks every 2 minutes
- Permission-Based Access: Organization and project-level access control
This architecture enables:
- Real-time dashboard updates without polling
- Scalable event distribution across multiple workers
- Low-overhead monitoring (30MB per server)
- Data sovereignty (metrics stay on customer VMs)
- Comprehensive alerting with fatigue prevention