Features
Monitoring
Real-time Monitoring

Real-time Monitoring and SSE Events

Real-time monitoring in OEC.SH combines Server-Sent Events (SSE) with Redis pub/sub and Netdata integration to provide live updates on infrastructure metrics, alerts, and operational events across the platform.


Architecture Overview

OEC.SH uses a dual-layer monitoring architecture:

  1. SSE Layer: Real-time event streaming from backend to connected clients
  2. Monitoring Layer: Edge-first metrics collection via Netdata on managed servers
┌─────────────────────────────────────────────────────────────────┐
│                      FRONTEND CLIENTS                           │
│  ┌────────────┐  ┌────────────┐  ┌────────────┐                │
│  │  Browser 1 │  │  Browser 2 │  │  Browser N │                │
│  └──────┬─────┘  └──────┬─────┘  └──────┬─────┘                │
│         │                │                │                      │
│         │ SSE Connection │                │                      │
│         └────────────────┴────────────────┘                      │
│                          │                                       │
└──────────────────────────┼───────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                   BACKEND API SERVER                            │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │          SSE Endpoint (/api/v1/events/stream)            │  │
│  │  - Handles EventSource connections                       │  │
│  │  - Authenticates via JWT token in query parameter        │  │
│  │  - Subscribes to Redis pub/sub channel                   │  │
│  │  - Streams events to clients                             │  │
│  └─────────────────────┬────────────────────────────────────┘  │
│                        │                                         │
│                        ▼                                         │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │              Redis Pub/Sub (SSE Channel)                 │  │
│  │  - Channel: "sse:events"                                 │  │
│  │  - Cross-worker event distribution                       │  │
│  │  - Organization-scoped filtering                         │  │
│  └──────────────────────┬────────────────────────────────────┘ │
│                         │                                        │
│         ┌───────────────┴───────────────┐                       │
│         ▼                               ▼                       │
│  ┌─────────────┐               ┌──────────────┐                │
│  │  Services   │               │  ARQ Workers │                │
│  │  - Alerts   │               │  - Monitoring│                │
│  │  - Replica  │               │  - Tasks     │                │
│  │  - Tasks    │               │  - Backups   │                │
│  └─────────────┘               └──────────────┘                │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                   NETDATA MONITORING                            │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │           Parent Netdata (on PaaSPortal server)          │  │
│  │  - Collects metrics from all child agents                │  │
│  │  - REST API for metrics queries                          │  │
│  │  - Webhook alerts to backend                             │  │
│  └─────────────────────┬────────────────────────────────────┘  │
│                        │                                         │
│                        ▼                                         │
│         ┌──────────────┴────────────────┐                       │
│         ▼                                ▼                       │
│  ┌─────────────┐                 ┌─────────────┐               │
│  │ Child Agent │                 │ Child Agent │               │
│  │ (Server 1)  │                 │ (Server N)  │               │
│  │ - CPU/RAM   │                 │ - CPU/RAM   │               │
│  │ - Disk      │                 │ - Disk      │               │
│  │ - Containers│                 │ - Containers│               │
│  └─────────────┘                 └─────────────┘               │
└─────────────────────────────────────────────────────────────────┘

Server-Sent Events (SSE)

What is SSE?

Server-Sent Events is a web standard that enables servers to push real-time updates to web clients over HTTP. Unlike WebSockets, SSE is:

  • Unidirectional: Server to client only (perfect for notifications)
  • Auto-reconnecting: Browser automatically reconnects on connection loss
  • Simple HTTP: Works through standard HTTP/HTTPS and proxies
  • Event-driven: Named events with structured data payloads

SSE vs WebSockets

FeatureSSEWebSockets
DirectionServer → ClientBidirectional
ProtocolHTTP/HTTPSws:// or wss://
ReconnectionAutomaticManual
Browser SupportAll modern browsersAll modern browsers
Proxy/FirewallEasy (HTTP)Sometimes blocked
Use CaseReal-time updates, notificationsChat, gaming, real-time collaboration
ComplexitySimpleMore complex

Why OEC.SH uses SSE: For monitoring dashboards and operational alerts, we only need server-to-client communication. SSE provides automatic reconnection and simpler implementation compared to WebSockets.


SSE Implementation

Backend: Event Broadcasting

The backend uses Redis pub/sub to distribute events across multiple worker processes and API servers:

# backend/api/v1/routes/events.py
 
import redis.asyncio as redis
from fastapi import APIRouter, Query, Request
from fastapi.responses import StreamingResponse
 
SSE_CHANNEL = "sse:events"
 
async def broadcast_to_organization(
    org_id: str | UUID,
    event_type: str,
    data: dict
) -> None:
    """Broadcast an event via Redis pub/sub to all connected SSE clients."""
    r = redis.from_url(get_redis_url())
    message = json.dumps({
        "org_id": str(org_id),
        "type": event_type,
        "data": data
    })
    await r.publish(SSE_CHANNEL, message)
    await r.aclose()

SSE Connection Endpoint

Clients connect to the SSE stream endpoint with JWT authentication:

Endpoint: GET /api/v1/events/stream?token={jwt_token}

Authentication: JWT token passed as query parameter (since EventSource doesn't support custom headers)

Response: text/event-stream with real-time events

@router.get("/stream")
async def event_stream(
    request: Request,
    token: str = Query(..., description="JWT access token"),
):
    """SSE endpoint for real-time updates using Redis pub/sub."""
    user_id = await get_user_from_token(token)
 
    return StreamingResponse(
        event_generator(request, user_id),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
        },
    )

Event Generator

The event generator subscribes to Redis pub/sub and yields SSE-formatted messages:

async def event_generator(
    request: Request,
    user_id: str
) -> AsyncGenerator[str, None]:
    """Generate SSE events for a client using Redis pub/sub."""
    r = redis.from_url(get_redis_url())
    pubsub = r.pubsub()
    await pubsub.subscribe(SSE_CHANNEL)
 
    # Send initial connection confirmation
    yield "event: connected\ndata: {\"status\": \"connected\"}\n\n"
 
    while True:
        # Check if client disconnected
        if await request.is_disconnected():
            break
 
        try:
            # Wait for Redis messages with timeout
            message = await asyncio.wait_for(
                pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0),
                timeout=30.0
            )
 
            if message and message["type"] == "message":
                data = json.loads(message["data"])
                event_type = data.get("type", "message")
                event_data = json.dumps({
                    "type": event_type,
                    "data": data.get("data", {})
                })
 
                yield f"event: {event_type}\ndata: {event_data}\n\n"
 
        except TimeoutError:
            # Send keepalive ping every 30 seconds
            yield ": keepalive\n\n"
 
    await pubsub.unsubscribe(SSE_CHANNEL)
    await pubsub.aclose()
    await r.aclose()

Key Features:

  • Redis pub/sub: Enables cross-worker event distribution
  • Keepalive pings: Prevents connection timeout (every 30 seconds)
  • Graceful disconnect: Detects when client disconnects and cleans up
  • Automatic reconnection: Client reconnects if connection drops

Event Types

OEC.SH emits various SSE event types for different operational activities:

Deployment Events

Event TypeDescriptionData Fields
deployment.startedEnvironment deployment initiatedenvironment_id, deployment_id, triggered_by, branch
deployment.progressDeployment step progress updateenvironment_id, deployment_id, progress_percent, current_step
deployment.completedDeployment successfully finishedenvironment_id, deployment_id, duration_seconds
deployment.failedDeployment encountered errorenvironment_id, deployment_id, error_message

Example Event:

{
  "type": "deployment.progress",
  "data": {
    "environment_id": "a1b2c3d4-...",
    "deployment_id": "e5f6g7h8-...",
    "progress_percent": 45,
    "current_step": "Building Docker image"
  }
}

Backup Events

Event TypeDescriptionData Fields
backup.startedBackup operation initiatedenvironment_id, backup_id, backup_type
backup.completedBackup successfully finishedenvironment_id, backup_id, size_bytes, duration_seconds
backup.failedBackup encountered errorenvironment_id, backup_id, error_message

Example Event:

{
  "type": "backup.completed",
  "data": {
    "environment_id": "a1b2c3d4-...",
    "backup_id": "b9c0d1e2-...",
    "backup_type": "manual",
    "size_bytes": 524288000,
    "duration_seconds": 45
  }
}

Replica Health Events (Sprint 2E40)

Event TypeDescriptionData Fields
replica.health_updatedPostgreSQL replica health check resultenvironment_id, status, lag_bytes, lag_seconds, last_check
replica.lag_alertReplica lag exceeded thresholdenvironment_id, severity, lag_bytes, lag_seconds, threshold_bytes, threshold_seconds

Health Statuses:

  • online: Replica is healthy and streaming
  • lagging: Replica lag exceeds warning threshold (50MB or 15 seconds)
  • offline: Replica is not streaming
  • error: Health check failed

Lag Thresholds:

  • Warning: 50MB or 15 seconds
  • Critical: 100MB or 30 seconds

Example Event:

{
  "type": "replica.health_updated",
  "data": {
    "environment_id": "a1b2c3d4-...",
    "status": "online",
    "lag_bytes": 12582912,  // ~12MB
    "lag_seconds": 3.5,
    "last_check": "2024-12-11T10:30:00Z"
  }
}

Alert Events

Event TypeDescriptionData Fields
alert_triggeredNew monitoring alert triggeredalert_id, alert_type, severity, title, message, server_id, server_name, metric_value, threshold
alert_acknowledgedAlert acknowledged by useralert_id, acknowledged_by, acknowledged_at
alert_resolvedAlert marked as resolvedalert_id, resolved_at

Alert Severities:

  • info: Informational, no action required
  • warning: Action recommended within hours
  • critical: Immediate action required

Alert Types:

  • cpu_high: CPU usage above threshold (80% warning, 95% critical)
  • memory_high: Memory usage above threshold (80% warning, 95% critical)
  • disk_high: Disk usage above threshold (80% warning, 90% critical)
  • container_down: Container stopped or crashed
  • server_down: Server unreachable
  • custom: Custom alert from Netdata webhook

Example Event:

{
  "type": "alert_triggered",
  "data": {
    "alert_id": "c3d4e5f6-...",
    "alert_type": "cpu_high",
    "severity": "warning",
    "title": "High CPU Usage",
    "message": "Server CPU usage at 85% for 10 minutes",
    "server_id": "d7e8f9g0-...",
    "server_name": "prod-server-1",
    "metric_value": 85.3,
    "threshold": 80.0,
    "triggered_at": "2024-12-11T10:35:00Z"
  }
}

Server Health Events

Event TypeDescriptionData Fields
server.connectedServer health check passedserver_id, server_name, status
server.health_updatedServer health metrics updatedserver_id, cpu_percent, memory_percent, disk_percent, status

Example Event:

{
  "type": "server.health_updated",
  "data": {
    "server_id": "f1g2h3i4-...",
    "server_name": "prod-server-2",
    "cpu_percent": 45.2,
    "memory_percent": 67.8,
    "disk_percent": 55.3,
    "status": "healthy"
  }
}

Task Status Events

Event TypeDescriptionData Fields
task_statusBackground task status updatetask_id, task_type, status, environment_id, progress_percent, current_step

Task Statuses:

  • pending: Task queued, not yet started
  • in_progress: Task currently executing
  • completed: Task finished successfully
  • failed: Task encountered error

Example Event:

{
  "type": "task_status",
  "data": {
    "task_id": "g5h6i7j8-...",
    "task_type": "deploy",
    "status": "in_progress",
    "environment_id": "a1b2c3d4-...",
    "progress_percent": 60,
    "current_step": "Starting containers"
  }
}

Resource Quota Events

Event TypeDescriptionData Fields
quota.exceededOrganization resource quota exceededorganization_id, resource_type, used, limit
quota.warningOrganization approaching quota limitorganization_id, resource_type, used, limit, percent_used

Resource Types:

  • cpu_cores: Total CPU cores allocated
  • ram_mb: Total RAM in megabytes
  • disk_gb: Total disk space in gigabytes

Example Event:

{
  "type": "quota.warning",
  "data": {
    "organization_id": "h9i0j1k2-...",
    "resource_type": "ram_mb",
    "used": 15360,
    "limit": 16384,
    "percent_used": 93.75
  }
}

Permission Events

Event TypeDescriptionData Fields
permissions_changedUser permissions updateduser_id, organization_id, project_id, changed_permissions
role_permissions_changedRole permissions modifiedrole_id, organization_id, changed_permissions

Example Event:

{
  "type": "permissions_changed",
  "data": {
    "user_id": "i3j4k5l6-...",
    "organization_id": "h9i0j1k2-...",
    "project_id": "m7n8o9p0-...",
    "changed_permissions": ["project.environments.create", "project.deployments.trigger"]
  }
}

Migration Events

Event TypeDescriptionData Fields
migration_progressOdoo.sh migration progress updatemigration_id, status, progress_percent, current_step, error_message

Migration Statuses:

  • analyzing: Analyzing backup structure
  • downloading: Downloading backup chunks
  • assembling: Assembling multi-part backup
  • restoring: Restoring database and filestore
  • completed: Migration finished
  • failed: Migration error

Example Event:

{
  "type": "migration_progress",
  "data": {
    "migration_id": "j5k6l7m8-...",
    "status": "downloading",
    "progress_percent": 35,
    "current_step": "Downloading chunk 3 of 8",
    "error_message": null
  }
}

Redis Pub/Sub Architecture

Why Redis Pub/Sub?

OEC.SH uses Redis pub/sub for cross-worker event distribution:

Problem: Multiple backend workers and API servers run concurrently. When a background task (e.g., deployment) emits an event, it needs to reach all connected SSE clients, not just those connected to the same worker.

Solution: Redis pub/sub acts as a message broker. All workers subscribe to the same channel, ensuring events reach all connected clients regardless of which worker generated the event.

┌──────────────┐         ┌──────────────┐         ┌──────────────┐
│  Worker 1    │         │  Worker 2    │         │  Worker 3    │
│  (ARQ Task)  │         │  (API Server)│         │  (API Server)│
└──────┬───────┘         └──────┬───────┘         └──────┬───────┘
       │                        │                        │
       │ publish()              │ subscribe()            │ subscribe()
       └────────────────────────┼────────────────────────┘

                    ┌───────────▼──────────┐
                    │   Redis Pub/Sub      │
                    │  Channel: sse:events │
                    └───────────┬──────────┘

                  ┌─────────────┴─────────────┐
                  │                           │
          ┌───────▼────────┐         ┌───────▼────────┐
          │  Client A      │         │  Client B      │
          │  (Browser)     │         │  (Browser)     │
          └────────────────┘         └────────────────┘

Channel Naming

OEC.SH uses a single Redis pub/sub channel for all SSE events:

Channel: sse:events

Message Format:

{
  "org_id": "uuid-of-organization",
  "type": "event_type",
  "data": {
    "field1": "value1",
    "field2": "value2"
  }
}

Message Serialization

All SSE messages are serialized as JSON:

import json
 
message = json.dumps({
    "org_id": str(org_id),
    "type": event_type,
    "data": data
})
 
await redis_client.publish(SSE_CHANNEL, message)

Organization-Scoped Filtering

Events include org_id for organization-level filtering. Clients automatically receive only events for their organization based on JWT token authentication.


Frontend Integration

useEventStream Hook

The frontend establishes SSE connections using the useEventStream hook:

File: frontend/src/hooks/useEventStream.ts

export function useEventStream() {
  const { isAuthenticated } = useAuthStore();
  const eventSourceRef = useRef<EventSource | null>(null);
  const reconnectAttempts = useRef(0);
  const maxReconnectAttempts = 10;
  const baseReconnectDelay = 1000; // 1 second
 
  const connect = useCallback(() => {
    const token = getAccessToken();
    if (!token || !isAuthenticated) return;
 
    const url = `${API_URL}/events/stream?token=${encodeURIComponent(token)}`;
    const eventSource = new EventSource(url);
    eventSourceRef.current = eventSource;
 
    eventSource.onopen = () => {
      console.log("[SSE] Connected");
      reconnectAttempts.current = 0;
    };
 
    eventSource.onerror = (error) => {
      console.error("[SSE] Error:", error);
      eventSource.close();
 
      // Exponential backoff reconnection
      if (reconnectAttempts.current < maxReconnectAttempts) {
        const delay = baseReconnectDelay * Math.pow(2, reconnectAttempts.current);
        setTimeout(() => {
          reconnectAttempts.current++;
          connect();
        }, delay);
      }
    };
 
    // Listen for specific event types
    eventSource.addEventListener("environment_status", (e) => {
      const parsed = JSON.parse(e.data);
      dispatchEvent(parsed);
    });
 
    eventSource.addEventListener("alert_triggered", (e) => {
      const parsed = JSON.parse(e.data);
      dispatchEvent({ type: "alert_triggered", data: parsed });
    });
 
    // ... more event listeners
  }, [isAuthenticated]);
 
  useEffect(() => {
    connect();
    return () => {
      if (eventSourceRef.current) {
        eventSourceRef.current.close();
      }
    };
  }, [connect]);
 
  return {
    isConnected: eventSourceRef.current?.readyState === EventSource.OPEN,
  };
}

Features:

  • Automatic reconnection: Exponential backoff (1s, 2s, 4s, 8s, ...)
  • Max retry attempts: Up to 10 reconnection attempts
  • Event dispatching: Broadcasts events to subscribed components
  • Cleanup: Closes connection when component unmounts

SSEProvider Context

The SSEProvider wraps the app root to provide SSE connection state:

File: frontend/src/providers/SSEProvider.tsx

export function SSEProvider({ children }: { children: ReactNode }) {
  const { isConnected } = useEventStream();
 
  return (
    <SSEContext.Provider value={{ isConnected }}>
      {children}
    </SSEContext.Provider>
  );
}

Component Usage: Subscribe to Events

Components subscribe to specific event types using useSSEEvent:

import { useSSEEvent } from "@/hooks/useEventStream";
 
export function DeploymentStatus({ environmentId }: Props) {
  const [status, setStatus] = useState<string>("idle");
  const [progress, setProgress] = useState<number>(0);
 
  // Subscribe to deployment progress events
  useSSEEvent("deployment.progress", (event) => {
    if (event.data.environment_id === environmentId) {
      setProgress(event.data.progress_percent);
      setStatus(event.data.current_step);
    }
  });
 
  // Subscribe to deployment completion
  useSSEEvent("deployment.completed", (event) => {
    if (event.data.environment_id === environmentId) {
      setStatus("completed");
      setProgress(100);
      toast.success("Deployment completed successfully!");
    }
  });
 
  return (
    <div>
      <ProgressBar percent={progress} />
      <span>{status}</span>
    </div>
  );
}

Global Event Subscription

Use wildcard subscription ("*") to listen to all events:

useSSEEvent("*", (event) => {
  console.log("Received event:", event.type, event.data);
 
  // Log all events for debugging
  if (isDevelopment) {
    console.table({
      Type: event.type,
      Data: JSON.stringify(event.data, null, 2),
    });
  }
});

Event Subscription Patterns

Pattern 1: Single Component Subscription

// Subscribe to specific event in one component
useSSEEvent("replica.health_updated", handleReplicaUpdate);

Pattern 2: Multiple Event Types

// Subscribe to multiple related events
useSSEEvent("deployment.started", handleDeploymentStart);
useSSEEvent("deployment.progress", handleDeploymentProgress);
useSSEEvent("deployment.completed", handleDeploymentComplete);
useSSEEvent("deployment.failed", handleDeploymentFailed);

Pattern 3: Filtered Subscription

// Subscribe and filter by resource ID
useSSEEvent("alert_triggered", (event) => {
  if (event.data.server_id === currentServerId) {
    showAlert(event.data);
  }
});

Pattern 4: Toast Notifications

// Show toast notifications for critical events
useSSEEvent("alert_triggered", (event) => {
  if (event.data.severity === "critical") {
    toast.error(event.data.title, {
      description: event.data.message,
    });
  }
});

Netdata Integration

Parent-Child Streaming Architecture

OEC.SH uses Netdata parent-child streaming to collect metrics from all managed servers:

┌─────────────────────────────────────────────────────────────────┐
│              PAASPORTAL SERVER (Parent Netdata)                 │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │          Netdata Parent Container                        │  │
│  │  - Port: 19999                                           │  │
│  │  - Collects metrics from all child agents                │  │
│  │  - REST API: /api/v1/...                                 │  │
│  │  - Web UI: https://monitoring.paasportal.io              │  │
│  └──────────────────────────────────────────────────────────┘  │
└────────────────────────────────┬────────────────────────────────┘
                                 │ Streaming (port 19999)
                 ┌───────────────┴───────────────┐
                 │                               │
┌────────────────▼─────────┐    ┌───────────────▼────────────┐
│  MANAGED SERVER 1        │    │  MANAGED SERVER N          │
│  ┌────────────────────┐  │    │  ┌────────────────────┐    │
│  │ Netdata Child Agent│  │    │  │ Netdata Child Agent│    │
│  │ - Machine GUID     │  │    │  │ - Machine GUID     │    │
│  │ - Streams to parent│  │    │  │ - Streams to parent│    │
│  │ - Collects metrics │  │    │  │ - Collects metrics │    │
│  └────────────────────┘  │    │  └────────────────────┘    │
│                          │    │                            │
│  Docker Containers:      │    │  Docker Containers:        │
│  - Odoo                  │    │  - Odoo                    │
│  - PostgreSQL            │    │  - PostgreSQL              │
│  - Redis                 │    │  - Redis                   │
└──────────────────────────┘    └────────────────────────────┘

Parent Netdata Configuration

The parent Netdata runs on the PaaSPortal server and receives streaming data from all child agents.

Environment Variables:

  • NETDATA_PARENT_PUBLIC_URL: Public URL for parent Netdata (e.g., https://monitoring.paasportal.io)
  • NETDATA_PARENT_API_KEY: API key for authenticating with parent

Stream Configuration (/etc/netdata/stream.conf on parent):

[STREAM_API_KEY]
    # Enable accepting streamed metrics
    enabled = yes
 
    # Allow connections from child agents
    allow from = *
 
    # API key for authentication
    api key = YOUR_STREAM_API_KEY
 
    # Retention for streamed metrics
    history = 3600
    memory mode = dbengine

Child Netdata Agent Installation

Child agents are installed on managed servers via the bootstrap script:

# In bootstrap_server.sh
 
# Install Netdata
bash <(curl -Ss https://my-netdata.io/kickstart.sh) \
  --stable-channel \
  --disable-telemetry \
  --claim-token ${NETDATA_CLAIM_TOKEN} \
  --claim-url https://app.netdata.cloud
 
# Configure streaming to parent
cat > /etc/netdata/stream.conf <<EOF
[stream]
    enabled = yes
    destination = ${NETDATA_PARENT_HOST}:19999
    api key = ${NETDATA_STREAM_API_KEY}
    timeout seconds = 60
    buffer size bytes = 1048576
    reconnect delay seconds = 5
EOF
 
# Restart Netdata
systemctl restart netdata

Child Agent Configuration:

  • Collects CPU, memory, disk, network, and Docker container metrics
  • Streams all metrics to parent Netdata in real-time
  • Stores local metrics for 1 hour (fallback if parent unreachable)
  • Auto-reconnects to parent if connection drops

Netdata Machine GUID

Each child agent has a unique machine GUID used to identify it in the parent:

Database Field: vm.netdata_machine_guid

Location on Server: /var/lib/netdata/registry/machine.guid

The backend stores this GUID to query metrics for specific servers from the parent Netdata API.

Streaming Status

The vm table tracks streaming configuration and status:

class VM(Base):
    # ... other fields ...
 
    # Netdata streaming configuration
    netdata_streaming_enabled = Column(Boolean, default=False)
    netdata_streaming_status = Column(String(50), nullable=True)
    netdata_machine_guid = Column(String(36), nullable=True)
    netdata_version = Column(String(20), nullable=True)
    netdata_installed_at = Column(DateTime(timezone=True), nullable=True)

Streaming Statuses:

  • connected: Child agent streaming to parent
  • disconnected: Child agent not streaming
  • pending: Streaming configuration in progress
  • error: Streaming setup failed

Server Health Monitoring

Metrics Collection

OEC.SH collects the following metrics via Netdata:

System Metrics:

  • CPU: Percentage utilization (user, system, idle)
  • Memory: Used, free, cached, buffered (bytes and percentage)
  • Disk: Used space, total space (bytes and percentage)
  • Network: Receive/transmit rates (bytes/sec)

Container Metrics (per Docker container):

  • CPU: Container CPU percentage
  • Memory: Used memory, limit, percentage
  • Network: Container RX/TX bytes
  • Status: Running, stopped, restarting
  • Restarts: Number of restarts
  • Uptime: Container uptime in seconds

Health Thresholds

Health status is calculated based on metric thresholds:

# backend/services/monitoring_service.py
 
CPU_WARNING_THRESHOLD = 80
CPU_CRITICAL_THRESHOLD = 95
MEMORY_WARNING_THRESHOLD = 80
MEMORY_CRITICAL_THRESHOLD = 95
DISK_WARNING_THRESHOLD = 80
DISK_CRITICAL_THRESHOLD = 90
 
def _calculate_status(cpu: float, memory: float, disk: float) -> str:
    """Calculate overall health status from metrics."""
    if (cpu >= CPU_CRITICAL_THRESHOLD or
        memory >= MEMORY_CRITICAL_THRESHOLD or
        disk >= DISK_CRITICAL_THRESHOLD):
        return "critical"
 
    if (cpu >= CPU_WARNING_THRESHOLD or
        memory >= MEMORY_WARNING_THRESHOLD or
        disk >= DISK_WARNING_THRESHOLD):
        return "warning"
 
    return "healthy"

Health Statuses:

  • healthy: All metrics within normal range
  • warning: One or more metrics above warning threshold
  • critical: One or more metrics above critical threshold
  • unreachable: Cannot connect to server or Netdata

Monitoring Service

The MonitoringService queries Netdata for metrics:

File: backend/services/monitoring_service.py

class MonitoringService:
    """Service for querying metrics on-demand from Netdata."""
 
    async def get_server_summary(self, server_id: UUID) -> MetricsSummary:
        """Level 1: Quick summary for cards."""
        server = await self._get_server(server_id)
 
        async with get_netdata_client(server) as client:
            system_stats = await client.get_system_stats()
            container_stats = await client.get_container_stats()
 
        return self._to_summary(system_stats, container_stats)
 
    async def get_server_metrics(
        self,
        server_id: UUID,
        include_timeseries: bool = False,
        time_range: str = "24h"
    ) -> ServerMetricsResponse:
        """Level 2: Full metrics with optional charts."""
        server = await self._get_server(server_id)
 
        async with get_netdata_client(server) as client:
            system_stats = await client.get_system_stats()
            container_stats = await client.get_container_stats()
 
            timeseries = None
            if include_timeseries:
                history = await client.get_historical_stats(minutes=parse_time_range(time_range))
                timeseries = self._format_timeseries(history)
 
        return ServerMetricsResponse(
            server_id=server_id,
            server_name=server.name,
            summary=self._to_summary(system_stats, container_stats),
            containers=[self._to_container_metrics(c) for c in container_stats],
            timeseries=timeseries,
            queried_at=datetime.now(UTC),
        )

Netdata Client

The NetdataClient supports dual-mode operation:

Modes:

  1. Parent Mode: Query parent Netdata via HTTP using machine GUID
  2. SSH Mode: Direct SSH tunnel to child agent (fallback)

File: backend/services/netdata_client.py

class NetdataClient:
    """Client for querying Netdata API with dual-mode support."""
 
    def __init__(self, vm: VM, use_parent: bool | None = None):
        """
        Initialize client.
 
        Args:
            vm: VM to query metrics for
            use_parent: Override mode. If None, auto-detect from vm.netdata_streaming_enabled
        """
        self.vm = vm
        self.use_parent = use_parent if use_parent is not None else vm.netdata_streaming_enabled
 
    async def _query_parent_api(self, endpoint: str) -> dict:
        """Query parent Netdata for this VM's metrics."""
        if not self.vm.netdata_machine_guid:
            raise NetdataClientError("VM has no machine_guid")
 
        # Parent exposes child nodes at /host/{machine_guid}/...
        url = f"{PARENT_URL}/host/{self.vm.netdata_machine_guid}{endpoint}"
 
        response = await self._http_client.get(url)
        return response.json()
 
    async def get_system_stats(self) -> dict:
        """Get current system statistics."""
        # Query CPU, memory, disk, network from Netdata charts
        cpu_data = await self.get_chart_data("system.cpu", after=-1, points=1)
        mem_data = await self.get_chart_data("system.ram", after=-1, points=1)
        disk_data = await self.get_chart_data("disk_space._", after=-1, points=1)
        net_data = await self.get_chart_data("system.net", after=-1, points=1)
 
        return {
            "cpu": {"percent": parse_cpu_percent(cpu_data)},
            "memory": {"used": parse_mem_used(mem_data), "total": parse_mem_total(mem_data), "percent": parse_mem_percent(mem_data)},
            "disk": {"used": parse_disk_used(disk_data), "total": parse_disk_total(disk_data), "percent": parse_disk_percent(disk_data)},
            "network": {"rx_bytes": parse_net_rx(net_data), "tx_bytes": parse_net_tx(net_data)},
            "timestamp": datetime.now(UTC).isoformat(),
        }

Container Health Checks

Container health is monitored via Docker status and metrics:

async def get_container_stats(self, container_name: str | None = None) -> list[dict]:
    """Get container statistics from Netdata or docker ps."""
 
    if self.use_parent:
        # Query cgroup charts from Netdata
        charts = await self._query_api("/api/v1/charts")
        containers = []
 
        for chart_name in charts["charts"].keys():
            if chart_name.startswith("cgroup_") and ".cpu" in chart_name:
                cname = chart_name.replace("cgroup_", "").rsplit(".cpu", 1)[0]
 
                # Get CPU, memory, network for this container
                cpu_percent = await self._get_container_cpu(cname)
                memory_used, memory_limit = await self._get_container_memory(cname)
                network_rx, network_tx = await self._get_container_network(cname)
 
                containers.append({
                    "container_name": cname,
                    "cpu_percent": cpu_percent,
                    "memory_used": memory_used,
                    "memory_limit": memory_limit,
                    "network_rx": network_rx,
                    "network_tx": network_tx,
                    "status": "running",
                })
 
        return containers
 
    else:
        # SSH mode: Use docker ps
        cmd = "docker ps -a --format '{{.Names}}|{{.Status}}|{{.RunningFor}}'"
        exit_code, stdout, stderr = self.ssh.execute_command(cmd)
 
        containers = []
        for line in stdout.strip().split('\n'):
            parts = line.split('|')
            containers.append({
                "container_name": parts[0],
                "status": parse_docker_status(parts[1]),
                "uptime": parse_uptime(parts[2]),
            })
 
        return containers

Database Replica Monitoring

Replica Health Checks

OEC.SH monitors PostgreSQL read replica health via ARQ cron job:

Frequency: Every 2 minutes

Monitored Metrics:

  • Replication State: streaming, catchup, stopped
  • Lag (Bytes): pg_wal_lsn_diff(sent_lsn, flush_lsn)
  • Lag (Seconds): EXTRACT(EPOCH FROM write_lag)

Health Calculation:

# backend/services/replication_monitor.py
 
LAG_WARNING_BYTES = 50 * 1024 * 1024    # 50 MB
LAG_CRITICAL_BYTES = 100 * 1024 * 1024  # 100 MB
LAG_WARNING_SECONDS = 15
LAG_CRITICAL_SECONDS = 30
 
if lag_bytes > LAG_CRITICAL_BYTES or lag_seconds > LAG_CRITICAL_SECONDS:
    status = "lagging"  # Critical lag
elif lag_bytes > LAG_WARNING_BYTES or lag_seconds > LAG_WARNING_SECONDS:
    status = "lagging"  # Warning lag
else:
    status = "online"   # Healthy

Replica Health Query

The replication monitor queries pg_stat_replication on the primary PostgreSQL container:

async def _query_replication_stats(self, environment: ProjectEnvironment) -> dict:
    """Query pg_stat_replication view on primary PostgreSQL container."""
 
    primary_container = f"postgres-primary-{environment.id}"
 
    sql_query = """
        SELECT
            state,
            pg_wal_lsn_diff(sent_lsn, flush_lsn) AS lag_bytes,
            EXTRACT(EPOCH FROM write_lag) AS lag_seconds
        FROM pg_stat_replication
        WHERE application_name = 'replica'
        LIMIT 1;
    """
 
    query_cmd = (
        f"docker exec {primary_container} "
        f"psql -U odoo -d postgres -t -A -F '|' "
        f"-c \"{sql_query}\""
    )
 
    ssh_manager = get_ssh_manager_for_vm(environment.vm)
    ssh_manager.connect()
 
    exit_code, stdout, stderr = ssh_manager.execute_command(query_cmd, timeout=30)
 
    # Parse output: state|lag_bytes|lag_seconds
    parts = stdout.strip().split("|")
    state = parts[0].strip()
    lag_bytes = int(float(parts[1])) if parts[1] else 0
    lag_seconds = float(parts[2]) if parts[2] else 0.0
 
    return {
        "state": state,
        "lag_bytes": lag_bytes,
        "lag_seconds": lag_seconds,
    }

SSE Events for Replica Health

Replica health updates are broadcast via SSE:

async def _emit_health_updated_event(
    self,
    environment: ProjectEnvironment,
    status: str,
    lag_bytes: int | None,
    lag_seconds: float | None
) -> None:
    """Emit SSE event for replica health update."""
 
    org_id = environment.project.organization_id
 
    event_data = {
        "environment_id": str(environment.id),
        "status": status,
        "lag_bytes": lag_bytes,
        "lag_seconds": lag_seconds,
        "last_check": environment.replica_last_check.isoformat(),
    }
 
    await broadcast_to_organization(
        org_id=org_id,
        event_type="replica.health_updated",
        data=event_data
    )

Frontend Integration:

// Subscribe to replica health updates
useSSEEvent("replica.health_updated", (event) => {
  if (event.data.environment_id === environmentId) {
    setReplicaStatus(event.data.status);
    setLagBytes(event.data.lag_bytes);
    setLagSeconds(event.data.lag_seconds);
 
    // Show alert if lagging
    if (event.data.status === "lagging") {
      toast.warning("Database replica is lagging", {
        description: `Lag: ${formatBytes(event.data.lag_bytes)} / ${event.data.lag_seconds}s`,
      });
    }
  }
});

Performance Metrics

Deployment Duration Tracking

Deployment duration is tracked from start to completion:

# backend/models/deployment.py
 
class Deployment(Base):
    id = Column(UUID, primary_key=True)
    environment_id = Column(UUID, ForeignKey("project_environments.id"))
 
    status = Column(SQLEnum(DeploymentStatus))
    created_at = Column(DateTime(timezone=True), default=datetime.now(UTC))
    started_at = Column(DateTime(timezone=True), nullable=True)
    completed_at = Column(DateTime(timezone=True), nullable=True)
 
    @property
    def duration_seconds(self) -> int | None:
        """Calculate deployment duration in seconds."""
        if not self.started_at or not self.completed_at:
            return None
        return int((self.completed_at - self.started_at).total_seconds())

SSE Event:

{
  "type": "deployment.completed",
  "data": {
    "deployment_id": "...",
    "duration_seconds": 127
  }
}

Backup Completion Times

Backup operations track size and duration:

# backend/models/backup.py
 
class Backup(Base):
    id = Column(UUID, primary_key=True)
    environment_id = Column(UUID, ForeignKey("project_environments.id"))
 
    started_at = Column(DateTime(timezone=True))
    completed_at = Column(DateTime(timezone=True), nullable=True)
    size_bytes = Column(BigInteger, nullable=True)
 
    @property
    def duration_seconds(self) -> int | None:
        """Calculate backup duration."""
        if not self.started_at or not self.completed_at:
            return None
        return int((self.completed_at - self.started_at).total_seconds())

API Response Times

API response times can be monitored via middleware:

# backend/core/middleware.py
 
@app.middleware("http")
async def add_timing_header(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    duration_ms = (time.time() - start_time) * 1000
    response.headers["X-Response-Time"] = f"{duration_ms:.2f}ms"
    return response

Resource Utilization Trends

Historical metrics are stored in Netdata (edge) for 30 days:

Query Historical Data:

async def get_historical_stats(self, minutes: int = 60) -> dict:
    """Get historical statistics for charts."""
    after = -minutes * 60  # Convert to seconds
 
    cpu_data = await self.get_chart_data("system.cpu", after=after, points=minutes)
    mem_data = await self.get_chart_data("system.ram", after=after, points=minutes)
 
    return {
        "cpu": [{"timestamp": ts, "value": val} for ts, val in cpu_data],
        "memory": [{"timestamp": ts, "value": val} for ts, val in mem_data],
    }

Frontend Chart:

const { data } = useQuery({
  queryKey: ["server-metrics-history", serverId, timeRange],
  queryFn: () => api.get(`/servers/${serverId}/metrics`, {
    params: { include_timeseries: true, time_range: timeRange }
  }),
});
 
// Render with uPlot or Recharts
<LineChart data={data.timeseries.cpu} />

Alert System

Alert Types and Severities

Alert Types:

  • cpu_high: CPU usage threshold exceeded
  • memory_high: Memory usage threshold exceeded
  • disk_high: Disk space threshold exceeded
  • container_down: Container stopped or unhealthy
  • server_down: Server unreachable
  • replica_lag: Database replica lagging
  • custom: Custom Netdata webhook alert

Severities:

  • info: Informational, no action required
  • warning: Attention recommended within hours
  • critical: Immediate action required

Alert Rules

Alert rules define thresholds and notification preferences:

# backend/models/alert.py
 
class AlertRule(Base):
    id = Column(UUID, primary_key=True)
    organization_id = Column(UUID, ForeignKey("organizations.id"), nullable=True)
 
    name = Column(String(255), nullable=False)
    description = Column(Text)
 
    alert_type = Column(String(50), nullable=False)
    severity = Column(SQLEnum(AlertSeverity), nullable=False)
 
    # Threshold configuration
    metric_name = Column(String(100), nullable=False)
    operator = Column(String(10), default=">")  # >, <, >=, <=, ==
    threshold = Column(Float, nullable=False)
    duration_seconds = Column(Integer, default=300)  # Must breach for this long
 
    # Notification
    notify_channels = Column(JSON, default=list)  # ["email", "slack"]
    cooldown_seconds = Column(Integer, default=3600)  # 1 hour between alerts
 
    is_active = Column(Boolean, default=True)

Example Rules:

DEFAULT_ALERT_RULES = [
    {
        "name": "High CPU",
        "alert_type": "cpu_high",
        "severity": "warning",
        "metric_name": "cpu_percent",
        "operator": ">",
        "threshold": 80,
        "duration_seconds": 600,  # 10 minutes
        "notify_channels": ["email"],
        "cooldown_seconds": 3600,  # 1 hour
    },
    {
        "name": "Disk Space Critical",
        "alert_type": "disk_high",
        "severity": "critical",
        "metric_name": "disk_percent",
        "operator": ">",
        "threshold": 95,
        "duration_seconds": 60,
        "notify_channels": ["email", "slack"],
        "cooldown_seconds": 1800,  # 30 minutes
    },
]

Alert Creation and Deduplication

Alerts are created with fingerprint-based deduplication:

# backend/services/alert_service.py
 
class AlertService:
    async def create_alert(
        self,
        server_id: UUID,
        alert_type: AlertType,
        severity: AlertSeverity,
        title: str,
        message: str | None = None,
        metric_value: float | None = None,
        threshold: float | None = None,
    ) -> AlertEvent:
        """Create alert with deduplication."""
 
        # Generate fingerprint for deduplication
        fingerprint = self._generate_fingerprint(server_id, alert_type, metric_name)
 
        # Check for existing unresolved alert within cooldown
        existing = await self._find_existing_alert(fingerprint, cooldown_seconds=3600)
 
        if existing:
            # Update occurrence count instead of creating new
            existing.occurrence_count += 1
            existing.last_occurrence = datetime.now(UTC)
            existing.metric_value = metric_value
            await self.db.commit()
            return existing
 
        # Create new alert
        alert = AlertEvent(
            organization_id=server.organization_id,
            server_id=server_id,
            alert_type=alert_type,
            severity=severity,
            title=title,
            message=message,
            metric_value=metric_value,
            threshold=threshold,
            status=AlertStatus.TRIGGERED,
            fingerprint=fingerprint,
            occurrence_count=1,
        )
 
        self.db.add(alert)
        await self.db.commit()
 
        # Broadcast via SSE
        await broadcast_to_organization(
            org_id=alert.organization_id,
            event_type="alert_triggered",
            data={
                "alert_id": str(alert.id),
                "alert_type": alert_type.value,
                "severity": severity.value,
                "title": title,
                "metric_value": metric_value,
                "threshold": threshold,
            },
        )
 
        return alert

Alert Lifecycle

Alerts follow a lifecycle: Triggered → Acknowledged → Resolved

class AlertStatus(str, Enum):
    TRIGGERED = "triggered"      # New alert, not yet seen
    ACKNOWLEDGED = "acknowledged"  # User acknowledged, investigating
    RESOLVED = "resolved"          # Issue fixed
 
async def acknowledge_alert(self, alert_id: UUID, user_id: UUID) -> AlertEvent:
    """Mark alert as acknowledged."""
    alert = await self.db.get(AlertEvent, alert_id)
 
    alert.status = AlertStatus.ACKNOWLEDGED
    alert.acknowledged_at = datetime.now(UTC)
    alert.acknowledged_by = user_id
 
    await self.db.commit()
 
    # Broadcast status change
    await broadcast_to_organization(
        org_id=alert.organization_id,
        event_type="alert_acknowledged",
        data={
            "alert_id": str(alert.id),
            "acknowledged_by": str(user_id),
        },
    )
 
    return alert
 
async def resolve_alert(self, alert_id: UUID) -> AlertEvent:
    """Mark alert as resolved."""
    alert = await self.db.get(AlertEvent, alert_id)
 
    alert.status = AlertStatus.RESOLVED
    alert.resolved_at = datetime.now(UTC)
 
    await self.db.commit()
 
    # Broadcast resolution
    await broadcast_to_organization(
        org_id=alert.organization_id,
        event_type="alert_resolved",
        data={"alert_id": str(alert.id)},
    )
 
    return alert

Alert Delivery Channels

Email (via Postmark):

await email_service.send_alert_notification(
    to=user.email,
    alert=alert_event,
    template="alert_triggered"
)

SSE (real-time in-app):

await broadcast_to_organization(
    org_id=alert.organization_id,
    event_type="alert_triggered",
    data=alert_data
)

Slack Webhook (optional):

async def send_slack_alert(webhook_url: str, alert: AlertEvent):
    payload = {
        "attachments": [{
            "color": "#FF0000" if alert.severity == "critical" else "#FFCC00",
            "title": f"{alert.severity.upper()}: {alert.title}",
            "text": alert.message,
            "fields": [
                {"title": "Server", "value": alert.server.name, "short": True},
                {"title": "Value", "value": str(alert.metric_value), "short": True},
            ],
        }]
    }
 
    async with httpx.AsyncClient() as client:
        await client.post(webhook_url, json=payload)

Alert Configuration

Organizations can configure alert rules:

API Endpoints:

  • GET /api/v1/monitoring/alerts/rules - List alert rules
  • POST /api/v1/monitoring/alerts/rules - Create custom rule
  • PUT /api/v1/monitoring/alerts/rules/{id} - Update rule
  • DELETE /api/v1/monitoring/alerts/rules/{id} - Delete rule

Frontend:

const { data: rules } = useQuery({
  queryKey: ["alert-rules"],
  queryFn: () => api.get("/monitoring/alerts/rules"),
});
 
// Create new rule
await api.post("/monitoring/alerts/rules", {
  name: "High Memory (Custom)",
  alert_type: "memory_high",
  severity: "warning",
  metric_name: "memory_percent",
  operator: ">",
  threshold: 85,
  duration_seconds: 300,
  notify_channels: ["email", "slack"],
  cooldown_seconds: 1800,
});

Netdata Configuration

Environment Variables

Backend Configuration:

# .env.production
 
# Netdata parent URL (public-facing)
NETDATA_PARENT_PUBLIC_URL=https://monitoring.paasportal.io
 
# Netdata parent API URL (internal)
NETDATA_PARENT_URL=http://netdata-parent:19999
 
# API key for authentication
NETDATA_PARENT_API_KEY=your-api-key-here

Parent Container Deployment

The Netdata parent runs as a Docker container on the PaaSPortal server:

docker-compose.yml:

services:
  netdata-parent:
    image: netdata/netdata:stable
    container_name: netdata-parent
    hostname: netdata-parent
    restart: unless-stopped
 
    ports:
      - "19999:19999"
 
    volumes:
      - netdata-config:/etc/netdata
      - netdata-lib:/var/lib/netdata
      - netdata-cache:/var/cache/netdata
 
    environment:
      - NETDATA_CLAIM_TOKEN=${NETDATA_CLAIM_TOKEN}
      - NETDATA_CLAIM_URL=https://app.netdata.cloud
 
    cap_add:
      - SYS_PTRACE
 
    security_opt:
      - apparmor:unconfined
 
    labels:
      - "traefik.enable=true"
      - "traefik.http.routers.netdata.rule=Host(`monitoring.paasportal.io`)"
      - "traefik.http.routers.netdata.tls.certresolver=letsencrypt"
      - "traefik.http.services.netdata.loadbalancer.server.port=19999"
 
volumes:
  netdata-config:
  netdata-lib:
  netdata-cache:

Stream Configuration File

Parent (/etc/netdata/stream.conf):

[STREAM_API_KEY]
    # Enable receiving streamed metrics
    enabled = yes
 
    # Allow from any IP (child agents)
    allow from = *
 
    # Stream API key (must match child configuration)
    api key = YOUR_STREAM_API_KEY
 
    # Retention for streamed metrics (1 hour = 3600 seconds)
    history = 3600
 
    # Storage mode
    memory mode = dbengine
 
    # Health monitoring
    health enabled = yes

Child Agent (/etc/netdata/stream.conf on managed servers):

[stream]
    # Enable streaming to parent
    enabled = yes
 
    # Parent Netdata address
    destination = monitoring.paasportal.io:19999
 
    # Stream API key (must match parent)
    api key = YOUR_STREAM_API_KEY
 
    # Connection settings
    timeout seconds = 60
    buffer size bytes = 1048576
    reconnect delay seconds = 5
 
    # Send all charts
    send charts matching = *

Parent-Child Linking

Child agents automatically link to the parent when streaming is configured:

  1. Install Netdata on managed server via bootstrap script
  2. Configure streaming in /etc/netdata/stream.conf
  3. Restart Netdata: systemctl restart netdata
  4. Verify connection: Check parent dashboard for new host
  5. Store machine GUID in database: vm.netdata_machine_guid

Verification:

# On parent server
curl http://localhost:19999/api/v1/info | jq .
 
# Response includes streaming hosts
{
  "streaming": {
    "status": "enabled",
    "nodes": [
      {
        "guid": "abc123...",
        "hostname": "prod-server-1",
        "status": "connected"
      }
    ]
  }
}

Monitoring API

Server Metrics Endpoint

GET /api/v1/monitoring/servers/{server_id}/summary

Returns quick metrics summary for server list views.

Response:

{
  "cpu_percent": 45.2,
  "memory_percent": 67.8,
  "memory_used_bytes": 5637144576,
  "memory_total_bytes": 8318521344,
  "disk_percent": 55.3,
  "disk_used_bytes": 55301234688,
  "disk_total_bytes": 100000000000,
  "network_rx_rate": 1248576,
  "network_tx_rate": 895632,
  "container_count": 8,
  "containers_healthy": 7,
  "containers_unhealthy": 1,
  "status": "healthy",
  "queried_at": "2024-12-11T10:30:00Z"
}

GET /api/v1/monitoring/servers/{server_id}/metrics

Returns full server metrics with optional time-series data.

Query Parameters:

  • include_timeseries: Boolean (default: false)
  • time_range: String - 1h, 6h, 24h, 7d, 30d (default: 24h)

Response:

{
  "server_id": "a1b2c3d4-...",
  "server_name": "prod-server-1",
  "summary": {
    "cpu_percent": 45.2,
    "memory_percent": 67.8,
    "disk_percent": 55.3,
    "status": "healthy",
    "queried_at": "2024-12-11T10:30:00Z"
  },
  "containers": [
    {
      "container_id": "abc123",
      "container_name": "prod-env-123_odoo",
      "cpu_percent": 15.3,
      "memory_percent": 45.0,
      "memory_used_bytes": 536870912,
      "memory_limit_bytes": 1073741824,
      "network_rx_bytes": 1248576,
      "network_tx_bytes": 895632,
      "status": "running",
      "restart_count": 0,
      "uptime_seconds": 86400
    }
  ],
  "timeseries": {
    "cpu": {
      "metric_name": "cpu",
      "points": [
        {"timestamp": "2024-12-11T10:00:00Z", "value": 42.5},
        {"timestamp": "2024-12-11T10:05:00Z", "value": 45.2}
      ],
      "unit": "%"
    },
    "memory": {
      "metric_name": "memory",
      "points": [
        {"timestamp": "2024-12-11T10:00:00Z", "value": 65.3},
        {"timestamp": "2024-12-11T10:05:00Z", "value": 67.8}
      ],
      "unit": "%"
    }
  },
  "netdata_dashboard_url": "https://monitoring.prod-server-1.paasportal.io",
  "queried_at": "2024-12-11T10:30:00Z"
}

Environment Metrics Endpoint

GET /api/v1/monitoring/environments/{environment_id}/metrics

Returns container metrics filtered to a specific environment.

Response:

[
  {
    "container_id": "abc123",
    "container_name": "env-prod_odoo",
    "cpu_percent": 15.3,
    "memory_percent": 45.0,
    "memory_used_bytes": 536870912,
    "memory_limit_bytes": 1073741824,
    "network_rx_bytes": 1248576,
    "network_tx_bytes": 895632,
    "status": "running",
    "restart_count": 0,
    "uptime_seconds": 86400
  },
  {
    "container_id": "def456",
    "container_name": "env-prod_db",
    "cpu_percent": 8.7,
    "memory_percent": 52.3,
    "memory_used_bytes": 1073741824,
    "memory_limit_bytes": 2147483648,
    "network_rx_bytes": 524288,
    "network_tx_bytes": 315392,
    "status": "running",
    "restart_count": 0,
    "uptime_seconds": 86400
  }
]

Replica Health Endpoint

GET /api/v1/monitoring/environments/{environment_id}/replicas/health

Returns PostgreSQL read replica health status.

Response:

{
  "status": "online",
  "lag_bytes": 12582912,
  "lag_seconds": 3.5,
  "error_message": null,
  "last_check": "2024-12-11T10:30:00Z"
}

Status Values:

  • online: Healthy, streaming with acceptable lag
  • lagging: Lag exceeds warning/critical thresholds
  • offline: Replication not streaming
  • error: Health check failed

Nginx Configuration

SSE Proxy Settings

Nginx must be configured to support SSE connections:

File: nginx/conf.d/devsh.openeducat.ai.conf

# SSE endpoint - disable buffering
location /api/v1/events/stream {
    proxy_pass http://backend:8000;
 
    # SSE requires these headers
    proxy_set_header Connection '';
    proxy_http_version 1.1;
    chunked_transfer_encoding off;
 
    # Disable buffering for SSE
    proxy_buffering off;
    proxy_cache off;
 
    # Long timeout for SSE (24 hours)
    proxy_read_timeout 86400s;
    proxy_send_timeout 86400s;
 
    # Forward client info
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_set_header X-Forwarded-Proto $scheme;
}

Critical Settings:

  • proxy_buffering off: Disables response buffering (required for SSE)
  • proxy_http_version 1.1: Uses HTTP/1.1 for persistent connections
  • chunked_transfer_encoding off: Disables chunked encoding
  • proxy_read_timeout 86400s: 24-hour timeout for long-lived connections

Backend Headers

The backend also sets the X-Accel-Buffering: no header:

return StreamingResponse(
    event_generator(request, user_id),
    media_type="text/event-stream",
    headers={
        "Cache-Control": "no-cache",
        "Connection": "keep-alive",
        "X-Accel-Buffering": "no",  # Instructs Nginx to disable buffering
    },
)

Permissions

Monitoring Permissions

Access to monitoring data is controlled via the permission system:

Organization-Level:

  • org.monitoring.view - View monitoring dashboards and metrics
  • org.monitoring.alerts.view - View alerts
  • org.monitoring.alerts.configure - Configure alert rules

Project-Level:

  • project.monitoring.view - View project environment metrics

Permission Checks

Backend:

# Verify organization monitoring permission
has_permission = await check_permission(
    db=db,
    user=current_user,
    permission_code="org.monitoring.view",
    organization_id=server.organization_id,
)
if not has_permission:
    raise HTTPException(403, "No permission to view monitoring")

Frontend:

import { useAbilities } from "@/hooks/useAbilities";
 
export function MonitoringDashboard() {
  const { can } = useAbilities({ organizationId });
 
  if (!can("monitoring", "view")) {
    return <AccessDenied />;
  }
 
  return <MonitoringCharts />;
}

Best Practices

Event Subscription Management

1. Unsubscribe on Unmount

Always clean up subscriptions to prevent memory leaks:

useEffect(() => {
  const unsubscribe = subscribeToEvents("deployment.progress", handleProgress);
 
  return () => {
    unsubscribe(); // Clean up on unmount
  };
}, []);

2. Filter Events Efficiently

Filter events in the callback rather than subscribing multiple times:

// Good: Single subscription with filtering
useSSEEvent("deployment.progress", (event) => {
  if (event.data.environment_id === currentEnvId) {
    updateProgress(event.data.progress_percent);
  }
});
 
// Bad: Multiple subscriptions (inefficient)
environments.forEach(env => {
  useSSEEvent("deployment.progress", (event) => {
    if (event.data.environment_id === env.id) {
      // ...
    }
  });
});

3. Batch State Updates

Use batch state updates to prevent multiple re-renders:

useSSEEvent("server.health_updated", (event) => {
  setServerMetrics(prev => ({
    ...prev,
    [event.data.server_id]: {
      cpu: event.data.cpu_percent,
      memory: event.data.memory_percent,
      disk: event.data.disk_percent,
      status: event.data.status,
    }
  }));
});

Connection Pooling and Limits

Server-Side:

  • Redis pub/sub supports unlimited subscribers
  • Each SSE connection consumes ~1KB memory
  • No explicit connection limit, but monitor server resources

Client-Side:

  • Browser limit: 6 concurrent connections per domain
  • SSE uses 1 connection per tab
  • Reconnection with exponential backoff prevents thundering herd

Alert Fatigue Prevention

1. Deduplication

Use fingerprints to group repeated alerts:

fingerprint = f"{alert_type}:{server_id}:{metric_name}"

2. Cooldown Periods

Prevent alert spam with cooldowns:

cooldown_seconds = {
    "cpu_high": 3600,      # 1 hour
    "disk_high": 1800,     # 30 minutes
    "server_down": 300,    # 5 minutes
}

3. Severity-Based Routing

Route alerts based on severity:

  • Info: In-app only
  • Warning: In-app + email digest (hourly)
  • Critical: In-app + email + Slack (immediate)

4. Occurrence Counting

Track how many times an alert fired:

existing.occurrence_count += 1
existing.last_occurrence = datetime.now(UTC)

Frontend Display:

{alert.occurrence_count > 1 && (
  <Badge variant="warning">
    Occurred {alert.occurrence_count} times
  </Badge>
)}

Troubleshooting

SSE Connection Drops

Symptom: SSE connection closes unexpectedly

Causes:

  1. Nginx buffering enabled: Check proxy_buffering off
  2. Firewall timeout: Some firewalls close idle connections after 30 seconds
  3. Load balancer timeout: Check load balancer settings
  4. JWT token expired: Token expiration closes connection

Solutions:

# Nginx configuration
proxy_buffering off;
proxy_read_timeout 86400s;
# Backend: Send keepalive pings
except TimeoutError:
    yield ": keepalive\n\n"  # Every 30 seconds
// Frontend: Auto-reconnect with exponential backoff
eventSource.onerror = () => {
  const delay = baseDelay * Math.pow(2, reconnectAttempts.current);
  setTimeout(() => connect(), delay);
};

Missing Events

Symptom: SSE events not received by client

Causes:

  1. Not subscribed to event type: Check event listener registration
  2. Organization filter: Events only sent to matching organization
  3. Redis connection lost: Backend lost connection to Redis
  4. Client disconnected: Client reconnecting, missed events

Debug:

// Subscribe to all events for debugging
useSSEEvent("*", (event) => {
  console.log("[SSE Debug]", event.type, event.data);
});
# Backend: Log all broadcasts
logger.info(f"Broadcasting {event_type} to org {org_id}")

Netdata Parent-Child Connection Issues

Symptom: Child agent not appearing in parent dashboard

Causes:

  1. Incorrect stream API key: Child/parent keys don't match
  2. Firewall blocking port 19999: Parent unreachable
  3. Incorrect parent hostname: DNS resolution fails
  4. Netdata not running: Child agent stopped

Debug:

# On child server
systemctl status netdata
 
# Check stream configuration
cat /etc/netdata/stream.conf
 
# Test connectivity to parent
telnet monitoring.paasportal.io 19999
 
# Check Netdata logs
journalctl -u netdata -f
 
# Verify machine GUID
cat /var/lib/netdata/registry/machine.guid

Fix:

# Restart Netdata on child
systemctl restart netdata
 
# Force reconnection
netdatacli reload-claiming-state
 
# Check streaming status on parent
curl http://localhost:19999/api/v1/info | jq '.streaming'

High CPU from Monitoring Overhead

Symptom: Monitoring system consuming excessive CPU

Causes:

  1. Too frequent queries: Polling Netdata too often
  2. Too many metrics: Collecting unnecessary data
  3. Inefficient queries: Not using Netdata's aggregation

Solutions:

# Limit query frequency
@cache(ttl=60)  # Cache for 60 seconds
async def get_server_metrics(server_id: UUID):
    # ...
# Use Netdata's built-in aggregation
await client.get_chart_data(
    "system.cpu",
    after=-60,    # Last 60 seconds
    points=1,     # Return 1 aggregated point
)
# Reduce Netdata collection frequency
# /etc/netdata/netdata.conf
[global]
    update every = 5  # Collect every 5 seconds (default: 1)

SSE Event Order

Symptom: Events arrive out of order

Cause: Multiple workers publish to Redis simultaneously

Solution: Include timestamp in event data:

await broadcast_to_organization(
    org_id=org_id,
    event_type="deployment.progress",
    data={
        "deployment_id": str(deployment_id),
        "progress_percent": 50,
        "timestamp": datetime.now(UTC).isoformat(),  # Add timestamp
    }
)
// Frontend: Sort by timestamp
const sortedEvents = events.sort((a, b) =>
  new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime()
);

Summary

OEC.SH's real-time monitoring system combines:

  • SSE: Unidirectional server-to-client event streaming
  • Redis Pub/Sub: Cross-worker event distribution
  • Netdata: Edge-first metrics collection with parent-child streaming
  • Alert System: Threshold-based monitoring with deduplication
  • Replica Monitoring: PostgreSQL replication health checks every 2 minutes
  • Permission-Based Access: Organization and project-level access control

This architecture enables:

  • Real-time dashboard updates without polling
  • Scalable event distribution across multiple workers
  • Low-overhead monitoring (30MB per server)
  • Data sovereignty (metrics stay on customer VMs)
  • Comprehensive alerting with fatigue prevention