Skip to content

Core Components

Detailed documentation of Orchestry's core components, their implementation, and interactions.

Application Manager

The Application Manager is the heart of Orchestry, responsible for managing the complete lifecycle of containerized applications.

Class Structure

class AppManager:
    def __init__(self, state_store=None, nginx_manager=None):
        self.client = docker.from_env()
        self.state_store = state_store or get_database_manager()
        self.nginx = nginx_manager or DockerNginxManager()
        self.health_checker = HealthChecker()
        self.instances = {}  # app_name -> list of ContainerInstance
        self._lock = threading.RLock()
        self._ensure_network()

Core Methods

Application Registration

async def register_app(self, app_spec: dict) -> str:
    """Register a new application from specification."""
    # 1. Validate specification
    validated_spec = self._validate_spec(app_spec)

    # 2. Check for conflicts
    if await self.state_store.app_exists(app_spec['metadata']['name']):
        raise AppAlreadyExistsError()

    # 3. Store in database
    app_record = AppRecord(
        name=app_spec['metadata']['name'],
        spec=validated_spec,
        status='registered',
        created_at=time.time(),
        updated_at=time.time()
    )
    await self.state_store.store_app(app_record)

    # 4. Log event
    await self.state_store.log_event(
        app_name=app_record.name,
        event_type='registration',
        message=f"Application {app_record.name} registered successfully"
    )

    return app_record.name

Container Management

def _create_container(self, app_name: str, spec: dict, replica_index: int) -> str:
    """Create a single container instance."""
    container_config = {
        'image': spec['spec']['image'],
        'name': f"{app_name}-{replica_index}",
        'labels': {
            'orchestry.app': app_name,
            'orchestry.replica': str(replica_index),
            'orchestry.managed': 'true'
        },
        'network': 'orchestry',
        'detach': True,
        'restart_policy': {'Name': 'unless-stopped'}
    }

    # Add environment variables
    if 'environment' in spec['spec']:
        container_config['environment'] = self._build_environment(
            spec['spec']['environment'], app_name, replica_index
        )

    # Add resource limits
    if 'resources' in spec['spec']:
        container_config['mem_limit'] = spec['spec']['resources'].get('memory', '512m')
        container_config['cpu_quota'] = self._parse_cpu_limit(
            spec['spec']['resources'].get('cpu', '500m')
        )

    # Add port configuration
    if 'ports' in spec['spec']:
        container_config['ports'] = self._configure_ports(spec['spec']['ports'])

    # Create container
    container = self.client.containers.run(**container_config)

    # Wait for network assignment
    self._wait_for_network(container)

    return container.id

Scaling Operations

async def scale_app(self, app_name: str, target_replicas: int) -> dict:
    """Scale application to target replica count."""
    async with self._lock:
        app_record = await self.state_store.get_app(app_name)
        if not app_record:
            raise AppNotFoundError(app_name)

        current_replicas = len(self.instances.get(app_name, []))

        if target_replicas > current_replicas:
            # Scale out
            await self._scale_out(app_name, target_replicas - current_replicas)
        elif target_replicas < current_replicas:
            # Scale in
            await self._scale_in(app_name, current_replicas - target_replicas)

        # Update database
        app_record.replicas = target_replicas
        app_record.last_scaled_at = time.time()
        await self.state_store.update_app(app_record)

        # Update nginx configuration
        await self.nginx.update_upstream(app_name, self.instances[app_name])

        return {
            'app_name': app_name,
            'previous_replicas': current_replicas,
            'current_replicas': target_replicas,
            'scaling_time': time.time() - start_time
        }

Container Instance Management

@dataclass
class ContainerInstance:
    container_id: str
    ip: str
    port: int
    state: str  # ready, draining, down
    cpu_percent: float = 0.0
    memory_percent: float = 0.0
    last_seen: float = 0.0
    failures: int = 0

    def is_healthy(self) -> bool:
        return self.state == 'ready' and self.failures < 3

    def update_metrics(self, stats: dict):
        self.cpu_percent = self._calculate_cpu_percent(stats)
        self.memory_percent = self._calculate_memory_percent(stats)
        self.last_seen = time.time()

Health Integration

def _on_health_status_change(self, container_id: str, is_healthy: bool):
    """Callback for health status changes."""
    for app_name, instances in self.instances.items():
        for instance in instances:
            if instance.container_id == container_id:
                if is_healthy:
                    instance.state = 'ready'
                    instance.failures = 0
                else:
                    instance.failures += 1
                    if instance.failures >= 3:
                        instance.state = 'down'
                        # Schedule container replacement
                        self._schedule_replacement(app_name, container_id)

                # Update nginx configuration
                self._update_nginx_config(app_name)
                break

Auto Scaler

The Auto Scaler makes intelligent scaling decisions based on multiple metrics and configurable policies.

Scaling Policy Engine

@dataclass
class ScalingPolicy:
    min_replicas: int = 1
    max_replicas: int = 5
    target_rps_per_replica: int = 50
    max_p95_latency_ms: int = 250
    max_conn_per_replica: int = 80
    scale_out_threshold_pct: int = 80
    scale_in_threshold_pct: int = 30
    window_seconds: int = 20
    cooldown_seconds: int = 30
    max_cpu_percent: float = 70.0
    max_memory_percent: float = 75.0

Decision Algorithm

def evaluate_scaling(self, app_name: str, metrics: ScalingMetrics) -> ScalingDecision:
    """Evaluate if scaling is needed based on current metrics."""
    policy = self.policies.get(app_name)
    if not policy:
        return ScalingDecision(should_scale=False, reason="No policy configured")

    # Check cooldown period
    if self._in_cooldown(app_name, policy.cooldown_seconds):
        return ScalingDecision(should_scale=False, reason="In cooldown period")

    # Calculate scale factors for each metric
    scale_factors = self._calculate_scale_factors(metrics, policy)

    # Determine scaling direction
    max_factor = max(scale_factors.values())
    min_factor = min(scale_factors.values())

    current_replicas = metrics.healthy_replicas

    # Scale out decision
    if max_factor > policy.scale_out_threshold_pct / 100:
        target_replicas = self._calculate_scale_out_target(
            current_replicas, scale_factors, policy
        )
        return ScalingDecision(
            should_scale=True,
            target_replicas=min(target_replicas, policy.max_replicas),
            current_replicas=current_replicas,
            reason=f"Scale out: {self._get_dominant_metric(scale_factors)} exceeds threshold",
            triggered_by=self._get_triggered_metrics(scale_factors, policy.scale_out_threshold_pct / 100),
            metrics=metrics
        )

    # Scale in decision
    elif (max_factor < policy.scale_in_threshold_pct / 100 and 
          current_replicas > policy.min_replicas):
        target_replicas = self._calculate_scale_in_target(
            current_replicas, scale_factors, policy
        )
        return ScalingDecision(
            should_scale=True,
            target_replicas=max(target_replicas, policy.min_replicas),
            current_replicas=current_replicas,
            reason=f"Scale in: All metrics below threshold",
            triggered_by=['all_metrics_low'],
            metrics=metrics
        )

    return ScalingDecision(
        should_scale=False,
        target_replicas=current_replicas,
        current_replicas=current_replicas,
        reason="Metrics within acceptable range"
    )

Metrics Calculation

def _calculate_scale_factors(self, metrics: ScalingMetrics, policy: ScalingPolicy) -> dict:
    """Calculate how much each metric contributes to scaling pressure."""
    factors = {}

    # CPU utilization factor
    if policy.max_cpu_percent > 0:
        factors['cpu'] = metrics.cpu_percent / policy.max_cpu_percent

    # Memory utilization factor
    if policy.max_memory_percent > 0:
        factors['memory'] = metrics.memory_percent / policy.max_memory_percent

    # RPS factor (requests per replica)
    if policy.target_rps_per_replica > 0 and metrics.healthy_replicas > 0:
        current_rps_per_replica = metrics.rps / metrics.healthy_replicas
        factors['rps'] = current_rps_per_replica / policy.target_rps_per_replica

    # Latency factor
    if policy.max_p95_latency_ms > 0:
        factors['latency'] = metrics.p95_latency_ms / policy.max_p95_latency_ms

    # Connection factor
    if policy.max_conn_per_replica > 0 and metrics.healthy_replicas > 0:
        current_conn_per_replica = metrics.active_connections / metrics.healthy_replicas
        factors['connections'] = current_conn_per_replica / policy.max_conn_per_replica

    # Store for debugging
    self.last_scale_factors[app_name] = factors

    return factors

Scaling Target Calculation

def _calculate_scale_out_target(self, current: int, factors: dict, policy: ScalingPolicy) -> int:
    """Calculate target replicas for scale out."""
    # Use the highest factor to determine scale out amount
    max_factor = max(factors.values())

    # Conservative scaling: increase by 1-3 replicas based on pressure
    if max_factor > 1.5:  # Very high pressure
        return current + min(3, policy.max_replicas - current)
    elif max_factor > 1.2:  # High pressure
        return current + min(2, policy.max_replicas - current)
    else:  # Moderate pressure
        return current + 1

def _calculate_scale_in_target(self, current: int, factors: dict, policy: ScalingPolicy) -> int:
    """Calculate target replicas for scale in."""
    # Conservative scaling: decrease by 1 replica at a time
    return max(current - 1, policy.min_replicas)

Health Checker

The Health Checker monitors application health and triggers recovery actions.

Health Check Implementation

class HealthChecker:
    def __init__(self):
        self.health_status = {}  # container_id -> HealthStatus
        self.check_tasks = {}    # container_id -> asyncio.Task
        self.session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
        self._health_change_callback = None

    async def start_monitoring(self, container_id: str, config: HealthCheckConfig):
        """Start health monitoring for a container."""
        if container_id in self.check_tasks:
            self.check_tasks[container_id].cancel()

        self.health_status[container_id] = HealthStatus(
            container_id=container_id,
            status='unknown',
            consecutive_failures=0,
            last_check=None
        )

        # Start health check task
        self.check_tasks[container_id] = asyncio.create_task(
            self._health_check_loop(container_id, config)
        )

Health Check Types

async def _perform_health_check(self, container_id: str, config: HealthCheckConfig) -> bool:
    """Perform a single health check."""
    try:
        if config.protocol == 'HTTP':
            return await self._http_health_check(container_id, config)
        elif config.protocol == 'TCP':
            return await self._tcp_health_check(container_id, config)
        else:
            raise ValueError(f"Unsupported protocol: {config.protocol}")
    except Exception as e:
        logger.warning(f"Health check failed for {container_id}: {e}")
        return False

async def _http_health_check(self, container_id: str, config: HealthCheckConfig) -> bool:
    """Perform HTTP health check."""
    container = self._get_container(container_id)
    if not container:
        return False

    # Get container IP
    ip = self._get_container_ip(container)
    url = f"http://{ip}:{config.port}{config.path}"

    # Prepare headers
    headers = {}
    if hasattr(config, 'headers') and config.headers:
        for header in config.headers:
            headers[header.name] = header.value

    # Make request
    async with self.session.get(url, headers=headers) as response:
        # Check status code
        if hasattr(config, 'expected_status_codes'):
            return response.status in config.expected_status_codes
        else:
            return 200 <= response.status < 300

async def _tcp_health_check(self, container_id: str, config: HealthCheckConfig) -> bool:
    """Perform TCP health check."""
    container = self._get_container(container_id)
    if not container:
        return False

    ip = self._get_container_ip(container)

    try:
        reader, writer = await asyncio.wait_for(
            asyncio.open_connection(ip, config.port),
            timeout=config.timeout_seconds
        )
        writer.close()
        await writer.wait_closed()
        return True
    except (asyncio.TimeoutError, ConnectionRefusedError, OSError):
        return False

Health Status Management

@dataclass
class HealthStatus:
    container_id: str
    status: str  # 'healthy', 'unhealthy', 'unknown'
    consecutive_failures: int
    consecutive_successes: int
    last_check: Optional[float]
    last_success: Optional[float]
    last_failure: Optional[float]
    total_checks: int = 0
    total_failures: int = 0

async def _update_health_status(self, container_id: str, is_healthy: bool, config: HealthCheckConfig):
    """Update health status based on check result."""
    status = self.health_status[container_id]
    status.total_checks += 1
    status.last_check = time.time()

    if is_healthy:
        status.consecutive_failures = 0
        status.consecutive_successes += 1
        status.last_success = time.time()

        # Mark as healthy if enough successes
        if (status.status != 'healthy' and 
            status.consecutive_successes >= config.success_threshold):
            await self._set_health_status(container_id, 'healthy')
    else:
        status.consecutive_successes = 0
        status.consecutive_failures += 1
        status.total_failures += 1
        status.last_failure = time.time()

        # Mark as unhealthy if enough failures
        if (status.status != 'unhealthy' and 
            status.consecutive_failures >= config.failure_threshold):
            await self._set_health_status(container_id, 'unhealthy')

Nginx Manager

The Nginx Manager handles dynamic load balancer configuration.

Configuration Generation

class DockerNginxManager:
    def __init__(self, config_path='/etc/nginx/conf.d'):
        self.config_path = Path(config_path)
        self.template_path = Path('/etc/nginx/templates')
        self.active_configs = set()

    async def update_upstream(self, app_name: str, instances: List[ContainerInstance]):
        """Update upstream configuration for an application."""
        # Filter healthy instances
        healthy_instances = [i for i in instances if i.is_healthy()]

        if not healthy_instances:
            # Remove configuration if no healthy instances
            await self._remove_upstream(app_name)
            return

        # Generate upstream configuration
        config_content = self._generate_upstream_config(app_name, healthy_instances)

        # Write configuration file
        config_file = self.config_path / f"{app_name}.conf"
        await self._write_config_file(config_file, config_content)

        # Test configuration
        if await self._test_nginx_config():
            # Reload nginx
            await self._reload_nginx()
            self.active_configs.add(app_name)
        else:
            # Remove bad configuration
            config_file.unlink(missing_ok=True)
            raise NginxConfigurationError(f"Invalid configuration for {app_name}")

Template System

def _generate_upstream_config(self, app_name: str, instances: List[ContainerInstance]) -> str:
    """Generate nginx upstream configuration."""
    # Load template
    template_file = self.template_path / 'upstream.conf.j2'
    if template_file.exists():
        template = Template(template_file.read_text())
        return template.render(
            app_name=app_name,
            instances=instances,
            upstream_method='least_conn',
            keepalive=32
        )

    # Fallback to built-in template
    config = f"""
# Generated configuration for {app_name}
upstream {app_name} {{
    least_conn;
    keepalive 32;

"""

    for instance in instances:
        config += f"    server {instance.ip}:{instance.port}"
        if instance.state == 'draining':
            config += " down"
        config += ";\n"

    config += "}\n\n"

    # Add location block
    config += f"""
location /{app_name} {{
    proxy_pass http://{app_name};
    proxy_http_version 1.1;
    proxy_set_header Connection "";
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_set_header X-Forwarded-Proto $scheme;

    # Health check
    proxy_next_upstream error timeout invalid_header http_500 http_502 http_503;
    proxy_connect_timeout 5s;
    proxy_send_timeout 60s;
    proxy_read_timeout 60s;
}}
"""

    return config

Configuration Management

async def _test_nginx_config(self) -> bool:
    """Test nginx configuration validity."""
    try:
        result = await asyncio.create_subprocess_exec(
            'nginx', '-t',
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        stdout, stderr = await result.communicate()
        return result.returncode == 0
    except Exception as e:
        logger.error(f"Failed to test nginx configuration: {e}")
        return False

async def _reload_nginx(self):
    """Reload nginx configuration."""
    try:
        result = await asyncio.create_subprocess_exec(
            'nginx', '-s', 'reload',
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        await result.communicate()
        if result.returncode != 0:
            raise NginxReloadError("Failed to reload nginx")
    except Exception as e:
        logger.error(f"Failed to reload nginx: {e}")
        raise NginxReloadError(str(e))

State Manager

The State Manager provides database abstraction and state persistence.

Database Connection Management

class DatabaseManager:
    def __init__(self, config: dict):
        self.config = config
        self.pool = None
        self._lock = asyncio.Lock()

    async def initialize(self):
        """Initialize database connection pool."""
        self.pool = await asyncpg.create_pool(
            host=self.config['host'],
            port=self.config['port'],
            user=self.config['user'],
            password=self.config['password'],
            database=self.config['database'],
            min_size=5,
            max_size=self.config.get('pool_size', 10),
            command_timeout=30
        )

        # Create tables if they don't exist
        await self._create_tables()

    @contextmanager
    async def get_connection(self):
        """Get database connection from pool."""
        async with self.pool.acquire() as connection:
            yield connection

Application Data Management

async def store_app(self, app_record: AppRecord):
    """Store application record in database."""
    async with self.get_connection() as conn:
        await conn.execute(
            """
            INSERT INTO applications (name, spec, status, created_at, updated_at, replicas, mode)
            VALUES ($1, $2, $3, $4, $5, $6, $7)
            """,
            app_record.name,
            json.dumps(app_record.spec),
            app_record.status,
            datetime.fromtimestamp(app_record.created_at),
            datetime.fromtimestamp(app_record.updated_at),
            app_record.replicas,
            app_record.mode
        )

async def get_app(self, app_name: str) -> Optional[AppRecord]:
    """Retrieve application record from database."""
    async with self.get_connection() as conn:
        row = await conn.fetchrow(
            "SELECT * FROM applications WHERE name = $1",
            app_name
        )

        if not row:
            return None

        return AppRecord(
            name=row['name'],
            spec=json.loads(row['spec']),
            status=row['status'],
            created_at=row['created_at'].timestamp(),
            updated_at=row['updated_at'].timestamp(),
            replicas=row['replicas'],
            last_scaled_at=row['last_scaled_at'].timestamp() if row['last_scaled_at'] else None,
            mode=row['mode']
        )

Event Logging

async def log_event(self, app_name: str, event_type: str, message: str, details: dict = None):
    """Log system event."""
    async with self.get_connection() as conn:
        await conn.execute(
            """
            INSERT INTO events (app_name, event_type, message, details)
            VALUES ($1, $2, $3, $4)
            """,
            app_name,
            event_type,
            message,
            json.dumps(details) if details else None
        )

async def get_events(self, app_name: str = None, event_type: str = None, 
                    since: float = None, limit: int = 100) -> List[EventRecord]:
    """Retrieve system events with filtering."""
    conditions = []
    params = []
    param_count = 0

    if app_name:
        param_count += 1
        conditions.append(f"app_name = ${param_count}")
        params.append(app_name)

    if event_type:
        param_count += 1
        conditions.append(f"event_type = ${param_count}")
        params.append(event_type)

    if since:
        param_count += 1
        conditions.append(f"timestamp >= ${param_count}")
        params.append(datetime.fromtimestamp(since))

    where_clause = " AND ".join(conditions) if conditions else "TRUE"

    param_count += 1
    params.append(limit)

    async with self.get_connection() as conn:
        rows = await conn.fetch(
            f"""
            SELECT * FROM events 
            WHERE {where_clause}
            ORDER BY timestamp DESC 
            LIMIT ${param_count}
            """,
            *params
        )

        return [
            EventRecord(
                id=row['id'],
                app_name=row['app_name'],
                event_type=row['event_type'],
                message=row['message'],
                timestamp=row['timestamp'].timestamp(),
                details=json.loads(row['details']) if row['details'] else None
            )
            for row in rows
        ]

Component Interactions

Startup Sequence

class OrchestryController:
    async def start(self):
        """Start all components in correct order."""
        # 1. Initialize database
        await self.state_manager.initialize()

        # 2. Start application manager
        await self.app_manager.initialize()

        # 3. Start health checker
        await self.health_checker.start()

        # 4. Start auto scaler
        await self.auto_scaler.start()

        # 5. Start nginx manager
        await self.nginx_manager.initialize()

        # 6. Start API server
        await self.api_server.start()

        # 7. Start background tasks
        await self._start_background_tasks()

Event Flow

async def _handle_container_health_change(self, container_id: str, is_healthy: bool):
    """Handle container health status change."""
    # 1. Update application manager
    await self.app_manager.update_container_health(container_id, is_healthy)

    # 2. Update nginx configuration if needed
    app_name = await self.app_manager.get_app_for_container(container_id)
    if app_name:
        instances = self.app_manager.get_app_instances(app_name)
        await self.nginx_manager.update_upstream(app_name, instances)

    # 3. Log event
    await self.state_manager.log_event(
        app_name=app_name,
        event_type='health',
        message=f"Container {container_id[:12]} marked as {'healthy' if is_healthy else 'unhealthy'}",
        details={'container_id': container_id, 'healthy': is_healthy}
    )

Next Steps: Learn about the Database Schema and data persistence layer.