컨테이너 오케스트레이션 고도화 가이드 - 2026년 Kubernetes 엔터프라이즈 전략
1. 현대 컨테이너 오케스트레이션 개요
1.1 Kubernetes 생태계의 진화
2026년 현재, Kubernetes는 단순한 컨테이너 오케스트레이션 도구를 넘어서 클라우드 네이티브 플랫폼의 핵심 기반이 되었습니다. 엔터프라이즈 환경에서는 복잡한 멀티 클러스터 환경과 고도화된 보안, 그리고 AI/ML 워크로드까지 지원하는 종합적인 플랫폼이 요구됩니다.
# enterprise-k8s-architecture.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: enterprise-architecture-config
namespace: kube-system
data:
cluster-topology: |
# 엔터프라이즈 멀티 클러스터 토폴로지
management_cluster:
name: "management-prod"
role: "hub"
regions:
- us-east-1
- eu-west-1
- ap-southeast-1
services:
- cluster-api
- argocd
- istio-control-plane
- monitoring-stack
workload_clusters:
production:
- name: "prod-us-east"
region: "us-east-1"
node_pools:
- name: "system"
machine_type: "c5.2xlarge"
min_nodes: 3
max_nodes: 10
- name: "application"
machine_type: "c5.4xlarge"
min_nodes: 5
max_nodes: 50
- name: "ml-workload"
machine_type: "p3.2xlarge"
min_nodes: 0
max_nodes: 20
taints:
- key: "workload-type"
value: "ml"
effect: "NoSchedule"
- name: "prod-eu-west"
region: "eu-west-1"
node_pools:
- name: "system"
machine_type: "c5.2xlarge"
min_nodes: 3
max_nodes: 10
- name: "application"
machine_type: "c5.4xlarge"
min_nodes: 5
max_nodes: 50
staging:
- name: "staging-us-east"
region: "us-east-1"
node_pools:
- name: "system"
machine_type: "c5.large"
min_nodes: 1
max_nodes: 5
- name: "application"
machine_type: "c5.xlarge"
min_nodes: 2
max_nodes: 15
development:
- name: "dev-shared"
region: "us-east-1"
node_pools:
- name: "shared"
machine_type: "c5.large"
min_nodes: 2
max_nodes: 10
security-policies: |
# 엔터프라이즈 보안 정책
pod_security_standards: "restricted"
network_policies: "enabled"
admission_controllers:
- PodSecurity
- ResourceQuota
- LimitRanger
- NetworkPolicy
- ValidatingAdmissionWebhook
- MutatingAdmissionWebhook
rbac_policies:
cluster_roles:
- name: "platform-admin"
permissions: ["*"]
subjects: ["platform-team@company.com"]
- name: "namespace-admin"
permissions: ["get", "list", "create", "update", "delete"]
resources: ["pods", "services", "deployments", "configmaps", "secrets"]
subjects: ["team-leads@company.com"]
- name: "developer"
permissions: ["get", "list", "create", "update"]
resources: ["pods", "services", "deployments", "configmaps"]
subjects: ["developers@company.com"]
monitoring-config: |
# 모니터링 및 옵저버빌리티
metrics_collection:
prometheus:
retention: "30d"
storage_class: "fast-ssd"
storage_size: "500Gi"
grafana:
dashboards:
- "kubernetes-cluster-overview"
- "application-performance"
- "resource-utilization"
- "security-events"
logging:
fluentd:
output_destinations:
- elasticsearch
- s3
retention_policy: "90d"
tracing:
jaeger:
sampling_rate: 0.1
storage_backend: "elasticsearch"
---
apiVersion: cluster.x-k8s.io/v1beta1
kind: Cluster
metadata:
name: prod-us-east
namespace: default
spec:
clusterNetwork:
pods:
cidrBlocks: ["192.168.0.0/16"]
services:
cidrBlocks: ["10.128.0.0/12"]
infrastructureRef:
apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
kind: AWSCluster
name: prod-us-east
controlPlaneRef:
kind: KubeadmControlPlane
apiVersion: controlplane.cluster.x-k8s.io/v1beta1
name: prod-us-east-control-plane
---
apiVersion: controlplane.cluster.x-k8s.io/v1beta1
kind: KubeadmControlPlane
metadata:
name: prod-us-east-control-plane
spec:
replicas: 3
machineTemplate:
infrastructureRef:
kind: AWSMachineTemplate
apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
name: prod-us-east-control-plane
kubeadmConfigSpec:
initConfiguration:
nodeRegistration:
kubeletExtraArgs:
cloud-provider: aws
read-only-port: "0"
anonymous-auth: "false"
authorization-mode: "Webhook"
event-qps: "0"
clusterConfiguration:
apiServer:
extraArgs:
cloud-provider: aws
audit-log-maxage: "30"
audit-log-maxbackup: "10"
audit-log-maxsize: "100"
audit-log-path: "/var/log/audit.log"
audit-policy-file: "/etc/kubernetes/audit-policy.yaml"
enable-admission-plugins: "NodeRestriction,PodSecurity,ResourceQuota"
encryption-provider-config: "/etc/kubernetes/encryption-config.yaml"
extraVolumes:
- name: audit-policy
hostPath: "/etc/kubernetes/audit-policy.yaml"
mountPath: "/etc/kubernetes/audit-policy.yaml"
readOnly: true
- name: encryption-config
hostPath: "/etc/kubernetes/encryption-config.yaml"
mountPath: "/etc/kubernetes/encryption-config.yaml"
readOnly: true
controllerManager:
extraArgs:
cloud-provider: aws
terminated-pod-gc-threshold: "1000"
etcd:
local:
extraArgs:
auto-compaction-mode: periodic
auto-compaction-retention: "1"
version: "v1.28.5"
1.2 엔터프라이즈 요구사항과 과제
# enterprise_k8s_requirements.py
from dataclasses import dataclass
from typing import Dict, List, Any, Optional
from enum import Enum
import json
import time
class ClusterTier(Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
DISASTER_RECOVERY = "disaster_recovery"
class WorkloadType(Enum):
WEB_APPLICATION = "web_application"
MICROSERVICE = "microservice"
BATCH_JOB = "batch_job"
ML_TRAINING = "ml_training"
ML_INFERENCE = "ml_inference"
DATABASE = "database"
MESSAGE_QUEUE = "message_queue"
@dataclass
class EnterpriseRequirements:
"""엔터프라이즈 Kubernetes 요구사항"""
# 가용성 요구사항
availability_sla: float # 99.9%, 99.99% 등
rpo_minutes: int # Recovery Point Objective
rto_minutes: int # Recovery Time Objective
# 확장성 요구사항
max_nodes: int
max_pods_per_cluster: int
expected_growth_rate: float # 연간 증가율
# 보안 요구사항
compliance_frameworks: List[str] # SOC2, HIPAA, PCI-DSS 등
network_isolation_required: bool
encryption_at_rest: bool
encryption_in_transit: bool
# 성능 요구사항
max_pod_startup_time_seconds: int
max_service_discovery_latency_ms: int
resource_efficiency_target: float # 리소스 사용률 목표
# 운영 요구사항
multi_region_deployment: bool
disaster_recovery_automation: bool
zero_downtime_updates: bool
automated_scaling: bool
class KubernetesArchitectureAnalyzer:
"""Kubernetes 아키텍처 분석기"""
def __init__(self):
self.cluster_configurations = {}
self.workload_patterns = {}
self.resource_usage_history = {}
def analyze_requirements(self, requirements: EnterpriseRequirements) -> Dict[str, Any]:
"""요구사항 분석 및 아키텍처 권장사항 생성"""
architecture_recommendations = {
"cluster_topology": self._recommend_cluster_topology(requirements),
"node_configuration": self._recommend_node_configuration(requirements),
"networking": self._recommend_networking(requirements),
"storage": self._recommend_storage(requirements),
"security": self._recommend_security(requirements),
"monitoring": self._recommend_monitoring(requirements),
"backup_strategy": self._recommend_backup_strategy(requirements)
}
return {
"requirements": requirements.__dict__,
"recommendations": architecture_recommendations,
"estimated_costs": self._estimate_costs(requirements, architecture_recommendations),
"implementation_timeline": self._create_implementation_timeline(requirements),
"risk_assessment": self._assess_risks(requirements)
}
def _recommend_cluster_topology(self, req: EnterpriseRequirements) -> Dict[str, Any]:
"""클러스터 토폴로지 권장사항"""
if req.availability_sla >= 0.9999: # 99.99% 이상
topology_type = "multi_region_ha"
clusters = {
"management": {
"count": 1,
"regions": ["primary"],
"purpose": "cluster lifecycle management"
},
"production": {
"count": 2 if req.multi_region_deployment else 1,
"regions": ["primary", "secondary"] if req.multi_region_deployment else ["primary"],
"purpose": "production workloads"
},
"dr": {
"count": 1,
"regions": ["dr"],
"purpose": "disaster recovery"
}
}
elif req.availability_sla >= 0.999: # 99.9% 이상
topology_type = "single_region_ha"
clusters = {
"production": {
"count": 1,
"zones": ["us-east-1a", "us-east-1b", "us-east-1c"],
"purpose": "production workloads"
},
"staging": {
"count": 1,
"zones": ["us-east-1a", "us-east-1b"],
"purpose": "staging and testing"
}
}
else:
topology_type = "basic"
clusters = {
"shared": {
"count": 1,
"zones": ["us-east-1a", "us-east-1b"],
"purpose": "shared environment"
}
}
return {
"topology_type": topology_type,
"clusters": clusters,
"service_mesh_required": req.max_nodes > 100,
"api_gateway_required": True
}
def _recommend_node_configuration(self, req: EnterpriseRequirements) -> Dict[str, Any]:
"""노드 구성 권장사항"""
node_pools = []
# 시스템 노드 풀
system_pool = {
"name": "system",
"purpose": "system workloads",
"instance_type": "c5.2xlarge",
"min_nodes": 3,
"max_nodes": 5,
"taints": [
{
"key": "node-role.kubernetes.io/system",
"value": "true",
"effect": "NoSchedule"
}
]
}
node_pools.append(system_pool)
# 애플리케이션 노드 풀
app_pool = {
"name": "application",
"purpose": "general workloads",
"instance_type": "c5.4xlarge",
"min_nodes": 5,
"max_nodes": min(req.max_nodes - 5, 100),
"auto_scaling": req.automated_scaling
}
node_pools.append(app_pool)
# ML 워크로드가 있는 경우
if any("ml" in str(wt).lower() for wt in [WorkloadType.ML_TRAINING, WorkloadType.ML_INFERENCE]):
ml_pool = {
"name": "ml-workload",
"purpose": "machine learning workloads",
"instance_type": "p3.2xlarge",
"min_nodes": 0,
"max_nodes": 20,
"taints": [
{
"key": "workload-type",
"value": "ml",
"effect": "NoSchedule"
}
],
"auto_scaling": True,
"spot_instances": True # 비용 절감
}
node_pools.append(ml_pool)
# 메모리 집약적 워크로드
if req.resource_efficiency_target > 0.8:
memory_pool = {
"name": "memory-optimized",
"purpose": "memory intensive workloads",
"instance_type": "r5.4xlarge",
"min_nodes": 0,
"max_nodes": 10,
"taints": [
{
"key": "workload-type",
"value": "memory-intensive",
"effect": "NoSchedule"
}
]
}
node_pools.append(memory_pool)
return {
"node_pools": node_pools,
"cluster_autoscaler": req.automated_scaling,
"vertical_pod_autoscaler": True,
"node_problem_detector": True
}
def _recommend_networking(self, req: EnterpriseRequirements) -> Dict[str, Any]:
"""네트워킹 구성 권장사항"""
networking = {
"cni": "calico" if req.network_isolation_required else "aws-vpc-cni",
"service_mesh": {
"required": req.max_nodes > 50,
"type": "istio",
"features": {
"mTLS": req.encryption_in_transit,
"traffic_management": True,
"observability": True,
"security_policies": req.network_isolation_required
}
},
"ingress": {
"type": "istio-gateway" if req.max_nodes > 50 else "nginx-ingress",
"ssl_termination": True,
"rate_limiting": True,
"waf": req.compliance_frameworks and "PCI-DSS" in req.compliance_frameworks
},
"network_policies": req.network_isolation_required,
"pod_security_policies": req.compliance_frameworks is not None
}
return networking
def _recommend_storage(self, req: EnterpriseRequirements) -> Dict[str, Any]:
"""스토리지 구성 권장사항"""
storage_classes = [
{
"name": "fast-ssd",
"provisioner": "ebs.csi.aws.com",
"parameters": {
"type": "gp3",
"iops": "3000",
"throughput": "250",
"encrypted": "true" if req.encryption_at_rest else "false"
},
"reclaimPolicy": "Retain",
"volumeBindingMode": "WaitForFirstConsumer"
},
{
"name": "bulk-storage",
"provisioner": "ebs.csi.aws.com",
"parameters": {
"type": "sc1",
"encrypted": "true" if req.encryption_at_rest else "false"
},
"reclaimPolicy": "Delete",
"volumeBindingMode": "WaitForFirstConsumer"
}
]
if req.availability_sla >= 0.999:
storage_classes.append({
"name": "replicated-storage",
"provisioner": "rook-ceph",
"parameters": {
"replication": "3",
"crushRoot": "default",
"crushLeaf": "host",
"encrypted": "true" if req.encryption_at_rest else "false"
}
})
return {
"storage_classes": storage_classes,
"csi_drivers": ["ebs-csi", "efs-csi"],
"backup_solution": "velero" if req.disaster_recovery_automation else "manual",
"snapshot_controller": True
}
def _recommend_security(self, req: EnterpriseRequirements) -> Dict[str, Any]:
"""보안 구성 권장사항"""
security_config = {
"pod_security_standards": "restricted" if req.compliance_frameworks else "baseline",
"admission_controllers": [
"PodSecurity",
"ResourceQuota",
"LimitRanger",
"NetworkPolicy" if req.network_isolation_required else None,
"ValidatingAdmissionWebhook",
"MutatingAdmissionWebhook"
],
"rbac": {
"enabled": True,
"cluster_roles": self._generate_rbac_roles(req),
"service_accounts": True
},
"secrets_management": {
"csi_driver": "secrets-store-csi-driver",
"provider": "aws-secrets-manager",
"encryption": req.encryption_at_rest
},
"image_security": {
"image_scanning": True,
"admission_controller": "ImagePolicyWebhook",
"allowed_registries": ["company.com/registry", "public.ecr.aws"]
},
"runtime_security": {
"falco": req.compliance_frameworks is not None,
"apparmor": True,
"seccomp": True
}
}
if req.compliance_frameworks:
security_config.update({
"audit_logging": True,
"compliance_scanning": True,
"vulnerability_scanning": True,
"penetration_testing": "quarterly"
})
return {k: v for k, v in security_config.items() if v is not None}
def _generate_rbac_roles(self, req: EnterpriseRequirements) -> List[Dict[str, Any]]:
"""RBAC 역할 생성"""
roles = [
{
"name": "platform-admin",
"scope": "cluster",
"permissions": ["*"],
"subjects": ["platform-team@company.com"]
},
{
"name": "namespace-admin",
"scope": "namespace",
"permissions": [
"get", "list", "create", "update", "delete", "patch"
],
"resources": [
"pods", "services", "deployments", "configmaps",
"secrets", "persistentvolumeclaims"
],
"subjects": ["team-leads@company.com"]
},
{
"name": "developer",
"scope": "namespace",
"permissions": ["get", "list", "create", "update", "patch"],
"resources": [
"pods", "services", "deployments", "configmaps"
],
"subjects": ["developers@company.com"]
},
{
"name": "readonly",
"scope": "namespace",
"permissions": ["get", "list"],
"resources": ["*"],
"subjects": ["auditors@company.com"]
}
]
if req.compliance_frameworks:
roles.append({
"name": "compliance-auditor",
"scope": "cluster",
"permissions": ["get", "list"],
"resources": ["*"],
"subjects": ["compliance-team@company.com"]
})
return roles
def _recommend_monitoring(self, req: EnterpriseRequirements) -> Dict[str, Any]:
"""모니터링 구성 권장사항"""
return {
"metrics": {
"prometheus": {
"retention": "30d",
"storage_size": "500Gi" if req.max_nodes > 100 else "100Gi",
"high_availability": req.availability_sla >= 0.999,
"federation": req.multi_region_deployment
},
"grafana": {
"dashboards": [
"kubernetes-cluster-overview",
"application-performance",
"resource-utilization",
"security-events"
],
"alerting": True
}
},
"logging": {
"fluentd": {
"enabled": True,
"outputs": ["elasticsearch", "s3"],
"retention": "90d"
},
"elasticsearch": {
"replicas": 3 if req.availability_sla >= 0.999 else 1,
"storage_size": "1Ti" if req.max_nodes > 100 else "200Gi"
}
},
"tracing": {
"jaeger": {
"enabled": req.max_nodes > 50,
"sampling_rate": 0.1,
"storage_backend": "elasticsearch"
}
},
"alerting": {
"alertmanager": {
"enabled": True,
"receivers": ["slack", "pagerduty", "email"],
"routes": self._generate_alert_routes(req)
}
}
}
def _generate_alert_routes(self, req: EnterpriseRequirements) -> List[Dict[str, Any]]:
"""알림 라우팅 규칙 생성"""
routes = [
{
"match": {"severity": "critical"},
"receiver": "pagerduty",
"group_wait": "10s",
"group_interval": "5m",
"repeat_interval": "1h"
},
{
"match": {"severity": "warning"},
"receiver": "slack",
"group_wait": "30s",
"group_interval": "15m",
"repeat_interval": "24h"
}
]
if req.compliance_frameworks:
routes.append({
"match": {"category": "security"},
"receiver": "security-team",
"group_wait": "0s",
"group_interval": "1m",
"repeat_interval": "15m"
})
return routes
def _recommend_backup_strategy(self, req: EnterpriseRequirements) -> Dict[str, Any]:
"""백업 전략 권장사항"""
if req.disaster_recovery_automation:
backup_frequency = "hourly" if req.rpo_minutes <= 60 else "daily"
cross_region = req.multi_region_deployment
else:
backup_frequency = "daily"
cross_region = False
return {
"backup_solution": "velero",
"schedule": {
"frequency": backup_frequency,
"retention": f"{req.rpo_minutes // (24 * 60) * 7}d" # 일주일치 유지
},
"storage_location": {
"provider": "aws",
"bucket": "k8s-backups-company",
"cross_region_replication": cross_region
},
"disaster_recovery": {
"automated_failover": req.disaster_recovery_automation,
"rto_target": f"{req.rto_minutes}m",
"testing_schedule": "monthly"
}
}
def _estimate_costs(self, req: EnterpriseRequirements,
recommendations: Dict[str, Any]) -> Dict[str, float]:
"""비용 추정"""
# 단순화된 비용 계산 (실제로는 더 복잡한 계산 필요)
base_cost_per_node = 200 # 월 $200 per node
cluster_config = recommendations["cluster_topology"]
node_config = recommendations["node_configuration"]
total_nodes = sum(pool["max_nodes"] for pool in node_config["node_pools"])
monthly_costs = {
"compute": total_nodes * base_cost_per_node,
"storage": req.max_nodes * 50, # 스토리지 비용
"networking": req.max_nodes * 30, # 네트워킹 비용
"monitoring": 1000 if req.max_nodes > 100 else 500,
"security_tools": 2000 if req.compliance_frameworks else 500
}
monthly_costs["total"] = sum(monthly_costs.values())
monthly_costs["annual"] = monthly_costs["total"] * 12
return monthly_costs
def _create_implementation_timeline(self, req: EnterpriseRequirements) -> Dict[str, str]:
"""구현 일정 생성"""
if req.availability_sla >= 0.9999:
timeline = {
"phase_1_infrastructure": "8 weeks",
"phase_2_security": "4 weeks",
"phase_3_monitoring": "3 weeks",
"phase_4_migration": "6 weeks",
"phase_5_optimization": "4 weeks",
"total_duration": "25 weeks"
}
elif req.max_nodes > 100:
timeline = {
"phase_1_infrastructure": "6 weeks",
"phase_2_security": "3 weeks",
"phase_3_monitoring": "2 weeks",
"phase_4_migration": "4 weeks",
"phase_5_optimization": "2 weeks",
"total_duration": "17 weeks"
}
else:
timeline = {
"phase_1_infrastructure": "4 weeks",
"phase_2_security": "2 weeks",
"phase_3_monitoring": "1 week",
"phase_4_migration": "3 weeks",
"phase_5_optimization": "1 week",
"total_duration": "11 weeks"
}
return timeline
def _assess_risks(self, req: EnterpriseRequirements) -> List[Dict[str, str]]:
"""위험 평가"""
risks = []
if req.availability_sla >= 0.9999:
risks.append({
"risk": "Complex multi-region setup",
"impact": "High",
"probability": "Medium",
"mitigation": "Phased rollout with extensive testing"
})
if req.compliance_frameworks:
risks.append({
"risk": "Compliance audit failures",
"impact": "Critical",
"probability": "Low",
"mitigation": "Regular compliance scanning and audits"
})
if req.max_nodes > 500:
risks.append({
"risk": "Scale-related performance issues",
"impact": "High",
"probability": "Medium",
"mitigation": "Load testing and gradual scaling"
})
risks.append({
"risk": "Skills gap in team",
"impact": "Medium",
"probability": "High",
"mitigation": "Training programs and external consulting"
})
return risks
# 사용 예제
def analyze_enterprise_k8s_requirements():
"""엔터프라이즈 Kubernetes 요구사항 분석 예제"""
# 대규모 엔터프라이즈 요구사항 정의
enterprise_req = EnterpriseRequirements(
availability_sla=0.9999, # 99.99%
rpo_minutes=30, # 30분 RPO
rto_minutes=60, # 1시간 RTO
max_nodes=1000,
max_pods_per_cluster=30000,
expected_growth_rate=0.5, # 50% 연간 증가
compliance_frameworks=["SOC2", "HIPAA"],
network_isolation_required=True,
encryption_at_rest=True,
encryption_in_transit=True,
max_pod_startup_time_seconds=30,
max_service_discovery_latency_ms=100,
resource_efficiency_target=0.75,
multi_region_deployment=True,
disaster_recovery_automation=True,
zero_downtime_updates=True,
automated_scaling=True
)
analyzer = KubernetesArchitectureAnalyzer()
analysis_result = analyzer.analyze_requirements(enterprise_req)
print("=== Enterprise Kubernetes Architecture Analysis ===")
print(json.dumps(analysis_result, indent=2))
# 실행
# analyze_enterprise_k8s_requirements()
2. 고급 워크로드 스케줄링
2.1 커스텀 스케줄러 구현
// custom-scheduler.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"math"
"sort"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
// EnterpriseScheduler 엔터프라이즈 커스텀 스케줄러
type EnterpriseScheduler struct {
clientset kubernetes.Interface
frameworkHandle framework.Handle
}
// WorkloadType 워크로드 타입 정의
type WorkloadType string
const (
WebApplication WorkloadType = "web-application"
BatchJob WorkloadType = "batch-job"
MLTraining WorkloadType = "ml-training"
MLInference WorkloadType = "ml-inference"
Database WorkloadType = "database"
Cache WorkloadType = "cache"
)
// NodeAffinity 노드 친화성 규칙
type NodeAffinity struct {
WorkloadType WorkloadType
RequiredLabels map[string]string
PreferredLabels map[string]int32 // label -> weight
AntiAffinityRules []string
}
// ResourceProfile 리소스 프로필
type ResourceProfile struct {
CPUWeight float64
MemoryWeight float64
NetworkWeight float64
StorageWeight float64
GPUWeight float64
}
// SLARequirement SLA 요구사항
type SLARequirement struct {
MaxLatencyMs int32
AvailabilityPct float64
ThroughputReqs int32
IsolationLevel string
}
// WorkloadMetadata 워크로드 메타데이터
type WorkloadMetadata struct {
Type WorkloadType `json:"type"`
Priority int32 `json:"priority"`
SLA SLARequirement `json:"sla"`
ResourceProfile ResourceProfile `json:"resource_profile"`
Affinity NodeAffinity `json:"affinity"`
}
// NodeScore 노드 점수
type NodeScore struct {
NodeName string
Score float64
Reasons []string
}
// Score 스케줄링 점수 계산
func (es *EnterpriseScheduler) Score(
ctx context.Context,
state *framework.CycleState,
pod *v1.Pod,
nodeName string) (int64, *framework.Status) {
// 워크로드 메타데이터 파싱
workloadMeta, err := es.parseWorkloadMetadata(pod)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("Failed to parse workload metadata: %v", err))
}
// 노드 정보 조회
node, err := es.clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("Failed to get node %s: %v", nodeName, err))
}
// 복합 점수 계산
totalScore := es.calculateCompositeScore(pod, node, workloadMeta)
// 0-100 범위로 정규화
normalizedScore := int64(math.Max(0, math.Min(100, totalScore)))
return normalizedScore, framework.NewStatus(framework.Success, "")
}
// calculateCompositeScore 복합 점수 계산
func (es *EnterpriseScheduler) calculateCompositeScore(
pod *v1.Pod,
node *v1.Node,
meta WorkloadMetadata) float64 {
var totalScore float64
// 1. 리소스 적합성 점수 (40%)
resourceScore := es.calculateResourceScore(pod, node, meta.ResourceProfile)
totalScore += resourceScore * 0.4
// 2. 워크로드 친화성 점수 (25%)
affinityScore := es.calculateAffinityScore(node, meta.Affinity)
totalScore += affinityScore * 0.25
// 3. SLA 요구사항 점수 (20%)
slaScore := es.calculateSLAScore(node, meta.SLA)
totalScore += slaScore * 0.2
// 4. 부하 분산 점수 (15%)
loadBalanceScore := es.calculateLoadBalanceScore(node)
totalScore += loadBalanceScore * 0.15
return totalScore
}
// calculateResourceScore 리소스 적합성 점수 계산
func (es *EnterpriseScheduler) calculateResourceScore(
pod *v1.Pod,
node *v1.Node,
profile ResourceProfile) float64 {
// 노드의 가용 리소스 계산
allocatable := node.Status.Allocatable
capacity := node.Status.Capacity
cpuAllocatable := allocatable.Cpu().MilliValue()
memAllocatable := allocatable.Memory().Value()
// 파드의 리소스 요구사항
var podCPUReq, podMemReq int64
for _, container := range pod.Spec.Containers {
if container.Resources.Requests != nil {
if cpu := container.Resources.Requests.Cpu(); cpu != nil {
podCPUReq += cpu.MilliValue()
}
if mem := container.Resources.Requests.Memory(); mem != nil {
podMemReq += mem.Value()
}
}
}
// 리소스 사용률 계산
cpuUtilization := float64(podCPUReq) / float64(cpuAllocatable)
memUtilization := float64(podMemReq) / float64(memAllocatable)
// GPU 리소스 확인
var gpuScore float64 = 100.0
if profile.GPUWeight > 0 {
gpuCapacity := capacity["nvidia.com/gpu"]
if gpuCapacity.Value() > 0 {
gpuScore = 100.0
} else {
gpuScore = 0.0 // GPU 필요하지만 없음
}
}
// 각 리소스별 점수 계산 (사용률이 낮을수록 높은 점수)
cpuScore := math.Max(0, (1.0 - cpuUtilization) * 100)
memScore := math.Max(0, (1.0 - memUtilization) * 100)
// 가중 평균 계산
weightedScore := (cpuScore*profile.CPUWeight +
memScore*profile.MemoryWeight +
gpuScore*profile.GPUWeight) /
(profile.CPUWeight + profile.MemoryWeight + profile.GPUWeight)
return weightedScore
}
// calculateAffinityScore 친화성 점수 계산
func (es *EnterpriseScheduler) calculateAffinityScore(
node *v1.Node,
affinity NodeAffinity) float64 {
score := 100.0
nodeLabels := node.GetLabels()
// 필수 라벨 확인
for requiredLabel, requiredValue := range affinity.RequiredLabels {
if nodeValue, exists := nodeLabels[requiredLabel]; !exists || nodeValue != requiredValue {
return 0.0 // 필수 조건 불만족
}
}
// 선호 라벨 점수 계산
var preferredScore float64
var totalWeight int32
for preferredLabel, weight := range affinity.PreferredLabels {
totalWeight += weight
if _, exists := nodeLabels[preferredLabel]; exists {
preferredScore += float64(weight)
}
}
if totalWeight > 0 {
score = (preferredScore / float64(totalWeight)) * 100
}
// Anti-affinity 규칙 확인
for _, antiAffinityLabel := range affinity.AntiAffinityRules {
if _, exists := nodeLabels[antiAffinityLabel]; exists {
score *= 0.5 // Anti-affinity 페널티
}
}
return score
}
// calculateSLAScore SLA 요구사항 점수 계산
func (es *EnterpriseScheduler) calculateSLAScore(
node *v1.Node,
sla SLARequirement) float64 {
score := 100.0
nodeLabels := node.GetLabels()
// 네트워크 지연 시간 확인
if networkZone, exists := nodeLabels["failure-domain.beta.kubernetes.io/zone"]; exists {
// 실제로는 네트워크 메트릭을 조회해야 함
// 여기서는 단순화된 로직 사용
if sla.MaxLatencyMs < 10 && networkZone != "us-east-1a" {
score -= 20 // 높은 지연 시간 요구사항인데 멀리 있는 존
}
}
// 가용성 요구사항 확인
if sla.AvailabilityPct >= 99.99 {
if nodeType, exists := nodeLabels["node.kubernetes.io/instance-type"]; exists {
// 고가용성 요구사항인데 spot instance인 경우
if nodeType == "spot" {
score -= 30
}
}
}
// 격리 수준 확인
if sla.IsolationLevel == "dedicated" {
if dedicated, exists := nodeLabels["node-type"]; !exists || dedicated != "dedicated" {
score -= 50
}
}
return math.Max(0, score)
}
// calculateLoadBalanceScore 부하 분산 점수 계산
func (es *EnterpriseScheduler) calculateLoadBalanceScore(node *v1.Node) float64 {
// 실제로는 현재 노드의 부하를 메트릭 시스템에서 조회
// 여기서는 단순화된 로직 사용
// 노드의 현재 파드 수 조회 (실제 구현에서는 메트릭 서버 사용)
nodeLabels := node.GetLabels()
// 존 기반 분산 고려
zone := nodeLabels["failure-domain.beta.kubernetes.io/zone"]
// 단순한 라운드 로빈 스코어링
// 실제로는 더 정교한 부하 분산 알고리즘 필요
baseScore := 100.0
// 존 다양성 보너스
if zone == "us-east-1a" {
baseScore += 5 // 기본 존 보너스
}
return baseScore
}
// parseWorkloadMetadata 워크로드 메타데이터 파싱
func (es *EnterpriseScheduler) parseWorkloadMetadata(pod *v1.Pod) (WorkloadMetadata, error) {
annotations := pod.GetAnnotations()
// 기본값 설정
meta := WorkloadMetadata{
Type: WebApplication,
Priority: 50,
SLA: SLARequirement{
MaxLatencyMs: 1000,
AvailabilityPct: 99.9,
ThroughputReqs: 1000,
IsolationLevel: "shared",
},
ResourceProfile: ResourceProfile{
CPUWeight: 1.0,
MemoryWeight: 1.0,
NetworkWeight: 0.5,
StorageWeight: 0.5,
GPUWeight: 0.0,
},
Affinity: NodeAffinity{
RequiredLabels: make(map[string]string),
PreferredLabels: make(map[string]int32),
},
}
// 어노테이션에서 메타데이터 파싱
if metadataJSON, exists := annotations["scheduler.enterprise.io/workload-metadata"]; exists {
if err := json.Unmarshal([]byte(metadataJSON), &meta); err != nil {
return meta, fmt.Errorf("failed to parse workload metadata: %v", err)
}
}
// 레이블에서 워크로드 타입 추론
if workloadType, exists := pod.GetLabels()["workload.enterprise.io/type"]; exists {
meta.Type = WorkloadType(workloadType)
}
// 우선순위 클래스에서 우선순위 추론
if pod.Spec.PriorityClassName != nil {
switch *pod.Spec.PriorityClassName {
case "critical":
meta.Priority = 100
case "high":
meta.Priority = 80
case "medium":
meta.Priority = 50
case "low":
meta.Priority = 20
}
}
return meta, nil
}
// Filter 필터링 단계
func (es *EnterpriseScheduler) Filter(
ctx context.Context,
state *framework.CycleState,
pod *v1.Pod,
nodeInfo *framework.NodeInfo) *framework.Status {
node := nodeInfo.Node()
// 워크로드 메타데이터 파싱
workloadMeta, err := es.parseWorkloadMetadata(pod)
if err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("Failed to parse workload metadata: %v", err))
}
// 필수 친화성 규칙 확인
if !es.checkRequiredAffinity(node, workloadMeta.Affinity) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable,
"Node doesn't meet required affinity rules")
}
// 리소스 요구사항 확인
if !es.checkResourceRequirements(pod, node) {
return framework.NewStatus(framework.Unschedulable,
"Insufficient resources on node")
}
// SLA 요구사항 확인
if !es.checkSLARequirements(node, workloadMeta.SLA) {
return framework.NewStatus(framework.Unschedulable,
"Node doesn't meet SLA requirements")
}
return framework.NewStatus(framework.Success, "")
}
// checkRequiredAffinity 필수 친화성 확인
func (es *EnterpriseScheduler) checkRequiredAffinity(
node *v1.Node,
affinity NodeAffinity) bool {
nodeLabels := node.GetLabels()
// 필수 라벨 확인
for requiredLabel, requiredValue := range affinity.RequiredLabels {
if nodeValue, exists := nodeLabels[requiredLabel]; !exists || nodeValue != requiredValue {
return false
}
}
return true
}
// checkResourceRequirements 리소스 요구사항 확인
func (es *EnterpriseScheduler) checkResourceRequirements(
pod *v1.Pod,
node *v1.Node) bool {
allocatable := node.Status.Allocatable
var totalCPUReq, totalMemReq int64
for _, container := range pod.Spec.Containers {
if container.Resources.Requests != nil {
if cpu := container.Resources.Requests.Cpu(); cpu != nil {
totalCPUReq += cpu.MilliValue()
}
if mem := container.Resources.Requests.Memory(); mem != nil {
totalMemReq += mem.Value()
}
}
}
// CPU 확인
if totalCPUReq > allocatable.Cpu().MilliValue() {
return false
}
// 메모리 확인
if totalMemReq > allocatable.Memory().Value() {
return false
}
return true
}
// checkSLARequirements SLA 요구사항 확인
func (es *EnterpriseScheduler) checkSLARequirements(
node *v1.Node,
sla SLARequirement) bool {
nodeLabels := node.GetLabels()
// 격리 수준 확인
if sla.IsolationLevel == "dedicated" {
if nodeType, exists := nodeLabels["node-type"]; !exists || nodeType != "dedicated" {
return false
}
}
// 고가용성 요구사항 확인
if sla.AvailabilityPct >= 99.99 {
if nodeType, exists := nodeLabels["node.kubernetes.io/instance-type"]; exists && nodeType == "spot" {
return false
}
}
return true
}
// Name 스케줄러 이름 반환
func (es *EnterpriseScheduler) Name() string {
return "enterprise-scheduler"
}
// main 메인 함수
func main() {
// Kubernetes 클라이언트 설정
config, err := rest.InClusterConfig()
if err != nil {
log.Fatalf("Failed to get in-cluster config: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Failed to create clientset: %v", err)
}
// 엔터프라이즈 스케줄러 초기화
scheduler := &EnterpriseScheduler{
clientset: clientset,
}
log.Printf("Starting Enterprise Scheduler: %s", scheduler.Name())
// 실제 스케줄러 프레임워크와 통합하는 코드는 더 복잡함
// 여기서는 구조와 로직만 보여줌
}
2.2 고급 스케줄링 정책 구성
# advanced-scheduling-policies.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: advanced-scheduling-policies
namespace: kube-system
data:
workload-profiles: |
# 워크로드 프로필 정의
profiles:
web-application:
resource_profile:
cpu_weight: 1.0
memory_weight: 1.0
network_weight: 0.8
storage_weight: 0.3
gpu_weight: 0.0
sla_requirements:
max_latency_ms: 200
availability_pct: 99.9
throughput_reqs: 5000
isolation_level: "shared"
node_affinity:
required_labels:
workload-type: "web"
preferred_labels:
instance-generation: "current" # weight: 100
network-performance: "high" # weight: 80
anti_affinity_rules:
- "spot-instance=true"
batch-job:
resource_profile:
cpu_weight: 1.5
memory_weight: 1.2
network_weight: 0.3
storage_weight: 0.7
gpu_weight: 0.0
sla_requirements:
max_latency_ms: 5000
availability_pct: 99.0
throughput_reqs: 1000
isolation_level: "shared"
node_affinity:
preferred_labels:
instance-generation: "previous" # weight: 90 (비용 효율적)
spot-instance: "true" # weight: 100 (비용 절감)
ml-training:
resource_profile:
cpu_weight: 1.2
memory_weight: 1.5
network_weight: 0.9
storage_weight: 1.0
gpu_weight: 2.0
sla_requirements:
max_latency_ms: 10000
availability_pct: 99.5
throughput_reqs: 2000
isolation_level: "dedicated"
node_affinity:
required_labels:
accelerator: "gpu"
gpu-type: "v100"
preferred_labels:
nvme-storage: "available" # weight: 90
infiniband: "available" # weight: 100
ml-inference:
resource_profile:
cpu_weight: 0.8
memory_weight: 1.0
network_weight: 1.2
storage_weight: 0.5
gpu_weight: 1.5
sla_requirements:
max_latency_ms: 50
availability_pct: 99.99
throughput_reqs: 10000
isolation_level: "shared"
node_affinity:
required_labels:
accelerator: "gpu"
preferred_labels:
gpu-type: "t4" # weight: 100 (추론용으로 적합)
network-performance: "high" # weight: 90
database:
resource_profile:
cpu_weight: 1.0
memory_weight: 2.0
network_weight: 0.8
storage_weight: 2.0
gpu_weight: 0.0
sla_requirements:
max_latency_ms: 10
availability_pct: 99.99
throughput_reqs: 15000
isolation_level: "dedicated"
node_affinity:
required_labels:
storage-type: "nvme"
workload-type: "stateful"
preferred_labels:
memory-optimized: "true" # weight: 100
local-ssd: "available" # weight: 90
anti_affinity_rules:
- "spot-instance=true"
cache:
resource_profile:
cpu_weight: 0.7
memory_weight: 2.5
network_weight: 1.2
storage_weight: 0.2
gpu_weight: 0.0
sla_requirements:
max_latency_ms: 1
availability_pct: 99.9
throughput_reqs: 50000
isolation_level: "shared"
node_affinity:
preferred_labels:
memory-optimized: "true" # weight: 100
network-performance: "high" # weight: 90
priority-classes: |
# 우선순위 클래스 정의
classes:
critical:
value: 1000000
global_default: false
description: "Critical system workloads"
preemption_policy: "PreemptLowerPriority"
high:
value: 100000
global_default: false
description: "High priority application workloads"
preemption_policy: "PreemptLowerPriority"
medium:
value: 10000
global_default: true
description: "Default priority for application workloads"
preemption_policy: "PreemptLowerPriority"
low:
value: 1000
global_default: false
description: "Low priority batch workloads"
preemption_policy: "Never"
best-effort:
value: 0
global_default: false
description: "Best effort workloads, can be preempted anytime"
preemption_policy: "Never"
---
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: critical
value: 1000000
globalDefault: false
description: "Critical system workloads that cannot be preempted"
preemptionPolicy: PreemptLowerPriority
---
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: high
value: 100000
globalDefault: false
description: "High priority application workloads"
preemptionPolicy: PreemptLowerPriority
---
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: medium
value: 10000
globalDefault: true
description: "Default priority for application workloads"
preemptionPolicy: PreemptLowerPriority
---
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: low
value: 1000
globalDefault: false
description: "Low priority batch workloads"
preemptionPolicy: Never
---
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: best-effort
value: 0
globalDefault: false
description: "Best effort workloads"
preemptionPolicy: Never
---
# 리소스 쿼터 정책
apiVersion: v1
kind: ResourceQuota
metadata:
name: compute-quota-critical
namespace: critical-workloads
spec:
hard:
requests.cpu: "100"
requests.memory: "200Gi"
limits.cpu: "200"
limits.memory: "400Gi"
persistentvolumeclaims: "50"
services: "20"
count/deployments.apps: "50"
scopeSelector:
matchExpressions:
- scopeName: PriorityClass
operator: In
values: ["critical"]
---
apiVersion: v1
kind: ResourceQuota
metadata:
name: compute-quota-high
namespace: production-workloads
spec:
hard:
requests.cpu: "200"
requests.memory: "400Gi"
limits.cpu: "400"
limits.memory: "800Gi"
persistentvolumeclaims: "100"
services: "50"
count/deployments.apps: "100"
scopeSelector:
matchExpressions:
- scopeName: PriorityClass
operator: In
values: ["high"]
---
# 네트워크 정책 - 워크로드별 네트워크 분리
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: ml-workload-network-policy
namespace: ml-workloads
spec:
podSelector:
matchLabels:
workload-type: ml-training
policyTypes:
- Ingress
- Egress
ingress:
- from:
- podSelector:
matchLabels:
workload-type: ml-training
- podSelector:
matchLabels:
component: ml-orchestrator
ports:
- protocol: TCP
port: 8080
egress:
- to:
- podSelector:
matchLabels:
workload-type: ml-training
- to:
- podSelector:
matchLabels:
component: data-storage
ports:
- protocol: TCP
port: 3306
- protocol: TCP
port: 6379
---
# 파드 디스럽션 버젯
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: critical-workload-pdb
namespace: critical-workloads
spec:
minAvailable: "90%"
selector:
matchLabels:
priority-class: critical
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: high-priority-pdb
namespace: production-workloads
spec:
maxUnavailable: "25%"
selector:
matchLabels:
priority-class: high
---
# 수직 파드 오토스케일러 구성
apiVersion: autoscaling.k8s.io/v1
kind: VerticalPodAutoscaler
metadata:
name: ml-training-vpa
namespace: ml-workloads
spec:
targetRef:
apiVersion: "apps/v1"
kind: Deployment
name: ml-training-job
updatePolicy:
updateMode: "Auto"
resourcePolicy:
containerPolicies:
- containerName: '*'
minAllowed:
cpu: 100m
memory: 128Mi
maxAllowed:
cpu: 8
memory: 32Gi
controlledResources: ["cpu", "memory"]
controlledValues: RequestsAndLimits
---
# 수평 파드 오토스케일러 구성
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: web-application-hpa
namespace: production-workloads
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: web-application
minReplicas: 3
maxReplicas: 100
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: active_connections
target:
type: AverageValue
averageValue: "1000"
- type: External
external:
metric:
name: queue_length
selector:
matchLabels:
queue_name: "web_requests"
target:
type: Value
value: "100"
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 60
- type: Pods
value: 2
periodSeconds: 60
selectPolicy: Min
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 15
- type: Pods
value: 4
periodSeconds: 15
selectPolicy: Max
---
# 클러스터 오토스케일러 구성
apiVersion: v1
kind: ConfigMap
metadata:
name: cluster-autoscaler-status
namespace: kube-system
data:
nodes.max: "1000"
nodes.min: "10"
scale-down-enabled: "true"
scale-down-delay-after-add: "10m"
scale-down-unneeded-time: "10m"
scale-down-utilization-threshold: "0.5"
skip-nodes-with-local-storage: "false"
skip-nodes-with-system-pods: "false"
max-node-provision-time: "15m"
node-group-auto-discovery: "asg:tag=k8s.io/cluster-autoscaler/enabled,k8s.io/cluster-autoscaler/production-cluster"
---
# 노드 풀별 오토스케일링 정책
apiVersion: v1
kind: ConfigMap
metadata:
name: nodepool-scaling-policies
namespace: kube-system
data:
policies: |
node_pools:
system:
min_nodes: 3
max_nodes: 5
scale_down_disabled: true
priority: 1000
application:
min_nodes: 5
max_nodes: 100
scale_up_cooldown: "3m"
scale_down_cooldown: "10m"
utilization_threshold: 0.7
priority: 800
ml-workload:
min_nodes: 0
max_nodes: 50
scale_up_cooldown: "1m" # GPU 인스턴스는 빠른 스케일업
scale_down_cooldown: "30m" # 하지만 느린 스케일다운
utilization_threshold: 0.8
priority: 600
spot_instances_enabled: true
spot_max_price: "1.50"
memory-optimized:
min_nodes: 0
max_nodes: 20
scale_up_cooldown: "5m"
scale_down_cooldown: "15m"
utilization_threshold: 0.6
priority: 700
batch-workload:
min_nodes: 0
max_nodes: 200
scale_up_cooldown: "5m"
scale_down_cooldown: "5m" # 배치 작업은 빠른 스케일다운
utilization_threshold: 0.5
priority: 400
spot_instances_enabled: true
spot_max_price: "0.50"
3. 멀티 클러스터 관리
3.1 Cluster API를 활용한 클러스터 라이프사이클 관리
# cluster_lifecycle_manager.py
import asyncio
import yaml
import json
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from kubernetes import client, config
from kubernetes.client.rest import ApiException
import boto3
from enum import Enum
class ClusterState(Enum):
CREATING = "creating"
READY = "ready"
UPDATING = "updating"
DELETING = "deleting"
ERROR = "error"
class ClusterTier(Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
DISASTER_RECOVERY = "disaster_recovery"
@dataclass
class ClusterSpec:
"""클러스터 사양 정의"""
name: str
tier: ClusterTier
region: str
kubernetes_version: str
control_plane_config: Dict[str, Any]
node_pools: List[Dict[str, Any]]
networking_config: Dict[str, Any]
addons: List[str]
backup_config: Dict[str, Any]
monitoring_config: Dict[str, Any]
security_config: Dict[str, Any]
@dataclass
class ClusterStatus:
"""클러스터 상태"""
name: str
state: ClusterState
created_at: float
last_updated: float
kubernetes_version: str
node_count: int
ready_nodes: int
control_plane_ready: bool
addons_ready: Dict[str, bool]
health_score: float
cost_estimate: float
class ClusterLifecycleManager:
"""클러스터 라이프사이클 관리자"""
def __init__(self, management_cluster_kubeconfig: str):
# 관리 클러스터에 연결
config.load_kube_config(config_file=management_cluster_kubeconfig)
self.k8s_client = client.ApiClient()
self.custom_api = client.CustomObjectsApi()
self.core_api = client.CoreV1Api()
self.apps_api = client.AppsV1Api()
# AWS 클라이언트 초기화
self.ec2_client = boto3.client('ec2')
self.eks_client = boto3.client('eks')
self.cluster_specs = {}
self.cluster_statuses = {}
# 기본 설정
self.default_addons = [
"cluster-autoscaler",
"aws-load-balancer-controller",
"external-dns",
"cert-manager",
"prometheus-operator",
"istio-base",
"istio-istiod"
]
async def create_cluster(self, cluster_spec: ClusterSpec) -> str:
"""새 클러스터 생성"""
print(f"Creating cluster: {cluster_spec.name}")
try:
# 1. Cluster API 리소스 생성
cluster_manifest = self._generate_cluster_manifest(cluster_spec)
await self._apply_cluster_manifest(cluster_manifest)
# 2. 클러스터 상태 추적 시작
await self._track_cluster_creation(cluster_spec.name)
# 3. 클러스터 정보 저장
self.cluster_specs[cluster_spec.name] = cluster_spec
self.cluster_statuses[cluster_spec.name] = ClusterStatus(
name=cluster_spec.name,
state=ClusterState.CREATING,
created_at=time.time(),
last_updated=time.time(),
kubernetes_version=cluster_spec.kubernetes_version,
node_count=0,
ready_nodes=0,
control_plane_ready=False,
addons_ready={addon: False for addon in cluster_spec.addons},
health_score=0.0,
cost_estimate=0.0
)
print(f"Cluster creation initiated: {cluster_spec.name}")
return cluster_spec.name
except Exception as e:
print(f"Failed to create cluster {cluster_spec.name}: {str(e)}")
raise
def _generate_cluster_manifest(self, spec: ClusterSpec) -> Dict[str, Any]:
"""Cluster API 매니페스트 생성"""
# AWS 클러스터 매니페스트
aws_cluster = {
"apiVersion": "infrastructure.cluster.x-k8s.io/v1beta1",
"kind": "AWSCluster",
"metadata": {
"name": spec.name,
"namespace": "default"
},
"spec": {
"region": spec.region,
"sshKeyName": f"k8s-{spec.name}",
"networkSpec": {
"vpc": {
"cidrBlock": spec.networking_config.get("vpc_cidr", "10.0.0.0/16"),
"tags": {
"Name": f"k8s-{spec.name}-vpc",
"kubernetes.io/cluster/" + spec.name: "owned"
}
},
"subnets": self._generate_subnet_spec(spec)
},
"bastion": {
"enabled": spec.tier in [ClusterTier.PRODUCTION, ClusterTier.STAGING]
}
}
}
# 컨트롤 플레인 매니페스트
control_plane = {
"apiVersion": "controlplane.cluster.x-k8s.io/v1beta1",
"kind": "KubeadmControlPlane",
"metadata": {
"name": f"{spec.name}-control-plane",
"namespace": "default"
},
"spec": {
"replicas": spec.control_plane_config.get("replicas", 3),
"machineTemplate": {
"infrastructureRef": {
"kind": "AWSMachineTemplate",
"apiVersion": "infrastructure.cluster.x-k8s.io/v1beta1",
"name": f"{spec.name}-control-plane"
}
},
"kubeadmConfigSpec": self._generate_kubeadm_config(spec),
"version": spec.kubernetes_version
}
}
# 컨트롤 플레인 머신 템플릿
cp_machine_template = {
"apiVersion": "infrastructure.cluster.x-k8s.io/v1beta1",
"kind": "AWSMachineTemplate",
"metadata": {
"name": f"{spec.name}-control-plane",
"namespace": "default"
},
"spec": {
"template": {
"spec": {
"instanceType": spec.control_plane_config.get("instance_type", "c5.xlarge"),
"iamInstanceProfile": f"control-plane.cluster-api-provider-aws.sigs.k8s.io",
"rootVolume": {
"size": spec.control_plane_config.get("root_volume_size", 120),
"type": "gp3",
"iops": 3000,
"throughput": 250,
"encrypted": True
},
"sshKeyName": f"k8s-{spec.name}",
"subnet": {
"filters": [
{
"name": "tag:Name",
"values": [f"k8s-{spec.name}-subnet-private-*"]
}
]
}
}
}
}
}
# 클러스터 매니페스트
cluster = {
"apiVersion": "cluster.x-k8s.io/v1beta1",
"kind": "Cluster",
"metadata": {
"name": spec.name,
"namespace": "default",
"labels": {
"cluster.x-k8s.io/cluster-name": spec.name,
"tier": spec.tier.value
}
},
"spec": {
"clusterNetwork": {
"pods": {
"cidrBlocks": [spec.networking_config.get("pod_cidr", "192.168.0.0/16")]
},
"services": {
"cidrBlocks": [spec.networking_config.get("service_cidr", "10.128.0.0/12")]
}
},
"infrastructureRef": {
"apiVersion": "infrastructure.cluster.x-k8s.io/v1beta1",
"kind": "AWSCluster",
"name": spec.name
},
"controlPlaneRef": {
"kind": "KubeadmControlPlane",
"apiVersion": "controlplane.cluster.x-k8s.io/v1beta1",
"name": f"{spec.name}-control-plane"
}
}
}
# 워커 노드 풀 매니페스트
worker_manifests = []
for pool in spec.node_pools:
machine_deployment, machine_template = self._generate_worker_pool_manifest(spec, pool)
worker_manifests.extend([machine_deployment, machine_template])
return {
"manifests": [
aws_cluster,
cluster,
control_plane,
cp_machine_template
] + worker_manifests
}
def _generate_subnet_spec(self, spec: ClusterSpec) -> List[Dict[str, Any]]:
"""서브넷 사양 생성"""
subnets = []
availability_zones = ["a", "b", "c"]
# 프라이빗 서브넷
for i, az in enumerate(availability_zones):
subnets.append({
"availabilityZone": f"{spec.region}{az}",
"cidrBlock": f"10.0.{i * 64}.0/18",
"isPublic": False,
"tags": {
"Name": f"k8s-{spec.name}-subnet-private-{spec.region}{az}",
"kubernetes.io/role/internal-elb": "1",
"kubernetes.io/cluster/" + spec.name: "owned"
}
})
# 퍼블릭 서브넷
for i, az in enumerate(availability_zones):
subnets.append({
"availabilityZone": f"{spec.region}{az}",
"cidrBlock": f"10.0.{192 + i * 16}.0/20",
"isPublic": True,
"tags": {
"Name": f"k8s-{spec.name}-subnet-public-{spec.region}{az}",
"kubernetes.io/role/elb": "1",
"kubernetes.io/cluster/" + spec.name: "owned"
}
})
return subnets
def _generate_kubeadm_config(self, spec: ClusterSpec) -> Dict[str, Any]:
"""kubeadm 설정 생성"""
config = {
"initConfiguration": {
"nodeRegistration": {
"kubeletExtraArgs": {
"cloud-provider": "aws",
"read-only-port": "0",
"anonymous-auth": "false",
"authorization-mode": "Webhook",
"event-qps": "0"
}
}
},
"clusterConfiguration": {
"apiServer": {
"extraArgs": {
"cloud-provider": "aws",
"audit-log-maxage": "30",
"audit-log-maxbackup": "10",
"audit-log-maxsize": "100",
"audit-log-path": "/var/log/audit.log",
"audit-policy-file": "/etc/kubernetes/audit-policy.yaml",
"enable-admission-plugins": "NodeRestriction,PodSecurity,ResourceQuota",
"encryption-provider-config": "/etc/kubernetes/encryption-config.yaml"
},
"extraVolumes": [
{
"name": "audit-policy",
"hostPath": "/etc/kubernetes/audit-policy.yaml",
"mountPath": "/etc/kubernetes/audit-policy.yaml",
"readOnly": True
},
{
"name": "encryption-config",
"hostPath": "/etc/kubernetes/encryption-config.yaml",
"mountPath": "/etc/kubernetes/encryption-config.yaml",
"readOnly": True
}
]
},
"controllerManager": {
"extraArgs": {
"cloud-provider": "aws",
"terminated-pod-gc-threshold": "1000"
}
},
"etcd": {
"local": {
"extraArgs": {
"auto-compaction-mode": "periodic",
"auto-compaction-retention": "1"
}
}
}
},
"joinConfiguration": {
"nodeRegistration": {
"kubeletExtraArgs": {
"cloud-provider": "aws",
"read-only-port": "0",
"anonymous-auth": "false"
}
}
}
}
# 보안 설정 추가 (Production 환경)
if spec.tier in [ClusterTier.PRODUCTION, ClusterTier.STAGING]:
config["clusterConfiguration"]["apiServer"]["extraArgs"].update({
"tls-min-version": "VersionTLS12",
"tls-cipher-suites": "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"
})
return config
def _generate_worker_pool_manifest(self, spec: ClusterSpec, pool: Dict[str, Any]) -> tuple:
"""워커 노드 풀 매니페스트 생성"""
pool_name = pool["name"]
# MachineDeployment
machine_deployment = {
"apiVersion": "cluster.x-k8s.io/v1beta1",
"kind": "MachineDeployment",
"metadata": {
"name": f"{spec.name}-{pool_name}",
"namespace": "default",
"labels": {
"cluster.x-k8s.io/cluster-name": spec.name,
"pool-name": pool_name
}
},
"spec": {
"clusterName": spec.name,
"replicas": pool.get("min_nodes", 1),
"template": {
"spec": {
"clusterName": spec.name,
"infrastructureRef": {
"name": f"{spec.name}-{pool_name}",
"apiVersion": "infrastructure.cluster.x-k8s.io/v1beta1",
"kind": "AWSMachineTemplate"
},
"bootstrap": {
"configRef": {
"name": f"{spec.name}-{pool_name}",
"apiVersion": "bootstrap.cluster.x-k8s.io/v1beta1",
"kind": "KubeadmConfigTemplate"
}
},
"version": spec.kubernetes_version
}
}
}
}
# AWSMachineTemplate
machine_template = {
"apiVersion": "infrastructure.cluster.x-k8s.io/v1beta1",
"kind": "AWSMachineTemplate",
"metadata": {
"name": f"{spec.name}-{pool_name}",
"namespace": "default"
},
"spec": {
"template": {
"spec": {
"instanceType": pool.get("machine_type", "c5.large"),
"iamInstanceProfile": "nodes.cluster-api-provider-aws.sigs.k8s.io",
"rootVolume": {
"size": pool.get("root_volume_size", 80),
"type": "gp3",
"iops": 3000,
"throughput": 250,
"encrypted": True
},
"sshKeyName": f"k8s-{spec.name}",
"subnet": {
"filters": [
{
"name": "tag:Name",
"values": [f"k8s-{spec.name}-subnet-private-*"]
}
]
}
}
}
}
}
# Spot 인스턴스 설정
if pool.get("spot_instances", False):
machine_template["spec"]["template"]["spec"]["spotMarketOptions"] = {
"maxPrice": str(pool.get("spot_max_price", "1.0"))
}
# 전용 인스턴스 설정
if pool.get("dedicated", False):
machine_template["spec"]["template"]["spec"]["tenancy"] = "dedicated"
return machine_deployment, machine_template
async def _apply_cluster_manifest(self, cluster_manifest: Dict[str, Any]):
"""클러스터 매니페스트 적용"""
manifests = cluster_manifest["manifests"]
for manifest in manifests:
try:
group = manifest["apiVersion"].split("/")[0]
version = manifest["apiVersion"].split("/")[1] if "/" in manifest["apiVersion"] else manifest["apiVersion"]
plural = manifest["kind"].lower() + "s"
# 매니페스트 적용
self.custom_api.create_namespaced_custom_object(
group=group,
version=version,
namespace="default",
plural=plural,
body=manifest
)
print(f"Applied {manifest['kind']}: {manifest['metadata']['name']}")
except ApiException as e:
if e.status == 409: # 이미 존재
print(f"{manifest['kind']} {manifest['metadata']['name']} already exists")
else:
raise
async def _track_cluster_creation(self, cluster_name: str):
"""클러스터 생성 추적"""
print(f"Tracking creation of cluster: {cluster_name}")
max_wait_time = 3600 # 1시간 대기
start_time = time.time()
while time.time() - start_time < max_wait_time:
try:
# 클러스터 상태 확인
cluster_status = await self._get_cluster_status(cluster_name)
if cluster_status.state == ClusterState.READY:
print(f"Cluster {cluster_name} is ready!")
# 애드온 설치 시작
await self._install_addons(cluster_name)
break
elif cluster_status.state == ClusterState.ERROR:
raise Exception(f"Cluster {cluster_name} creation failed")
print(f"Cluster {cluster_name} state: {cluster_status.state.value}")
await asyncio.sleep(60) # 1분 대기
except Exception as e:
print(f"Error tracking cluster creation: {str(e)}")
await asyncio.sleep(30)
async def _get_cluster_status(self, cluster_name: str) -> ClusterStatus:
"""클러스터 상태 조회"""
try:
# Cluster API에서 클러스터 정보 조회
cluster = self.custom_api.get_namespaced_custom_object(
group="cluster.x-k8s.io",
version="v1beta1",
namespace="default",
plural="clusters",
name=cluster_name
)
# 컨트롤 플레인 상태 확인
control_plane_ready = False
if cluster.get("status", {}).get("controlPlaneReady"):
control_plane_ready = True
# 노드 상태 확인
node_count = 0
ready_nodes = 0
if control_plane_ready:
# 워커 클러스터에 연결하여 노드 상태 확인
# (실제로는 워커 클러스터의 kubeconfig를 사용)
pass
state = ClusterState.CREATING
if cluster.get("status", {}).get("phase") == "Provisioned":
state = ClusterState.READY
elif cluster.get("status", {}).get("phase") == "Failed":
state = ClusterState.ERROR
return ClusterStatus(
name=cluster_name,
state=state,
created_at=time.time(), # 실제로는 클러스터 생성 시간
last_updated=time.time(),
kubernetes_version=cluster["spec"]["topology"]["version"],
node_count=node_count,
ready_nodes=ready_nodes,
control_plane_ready=control_plane_ready,
addons_ready={},
health_score=0.8 if state == ClusterState.READY else 0.0,
cost_estimate=0.0
)
except ApiException as e:
if e.status == 404:
raise Exception(f"Cluster {cluster_name} not found")
else:
raise
async def _install_addons(self, cluster_name: str):
"""애드온 설치"""
print(f"Installing addons for cluster: {cluster_name}")
# 클러스터 스펙 조회
spec = self.cluster_specs.get(cluster_name)
if not spec:
raise Exception(f"Cluster spec not found for {cluster_name}")
for addon in spec.addons:
try:
await self._install_single_addon(cluster_name, addon)
print(f"Installed addon: {addon}")
except Exception as e:
print(f"Failed to install addon {addon}: {str(e)}")
async def _install_single_addon(self, cluster_name: str, addon: str):
"""단일 애드온 설치"""
addon_configs = {
"cluster-autoscaler": self._get_cluster_autoscaler_config(cluster_name),
"aws-load-balancer-controller": self._get_alb_controller_config(cluster_name),
"external-dns": self._get_external_dns_config(cluster_name),
"cert-manager": self._get_cert_manager_config(cluster_name),
"prometheus-operator": self._get_prometheus_config(cluster_name),
"istio-base": self._get_istio_base_config(cluster_name),
"istio-istiod": self._get_istio_istiod_config(cluster_name)
}
if addon not in addon_configs:
raise Exception(f"Unknown addon: {addon}")
config = addon_configs[addon]
# 애드온별 설치 로직
# (실제로는 Helm 차트나 매니페스트를 적용)
print(f"Installing {addon} with config: {json.dumps(config, indent=2)}")
def _get_cluster_autoscaler_config(self, cluster_name: str) -> Dict[str, Any]:
"""클러스터 오토스케일러 설정"""
return {
"clusterName": cluster_name,
"nodeGroups": {
"autoDiscovery": {
"clusterName": cluster_name,
"tags": ["k8s.io/cluster-autoscaler/enabled", f"k8s.io/cluster-autoscaler/{cluster_name}"]
}
},
"resourceLimits": {
"maxNodesTotal": 1000,
"cores": {"min": 0, "max": 5000},
"memory": {"min": 0, "max": "10000Gi"}
},
"scaleDownDelayAfterAdd": "10m",
"scaleDownUnneededTime": "10m",
"scaleDownUtilizationThreshold": 0.5
}
def _get_alb_controller_config(self, cluster_name: str) -> Dict[str, Any]:
"""ALB 컨트롤러 설정"""
return {
"clusterName": cluster_name,
"serviceAccount": {
"create": True,
"name": "aws-load-balancer-controller",
"annotations": {
"eks.amazonaws.com/role-arn": f"arn:aws:iam::ACCOUNT_ID:role/{cluster_name}-alb-controller-role"
}
},
"region": self.cluster_specs[cluster_name].region,
"vpcId": "vpc-xxxxxxxxx" # 실제로는 VPC ID 조회
}
def _get_external_dns_config(self, cluster_name: str) -> Dict[str, Any]:
"""External DNS 설정"""
return {
"provider": "aws",
"aws": {
"region": self.cluster_specs[cluster_name].region,
"zoneType": "public"
},
"domainFilters": ["example.com"],
"serviceAccount": {
"create": True,
"annotations": {
"eks.amazonaws.com/role-arn": f"arn:aws:iam::ACCOUNT_ID:role/{cluster_name}-external-dns-role"
}
}
}
def _get_cert_manager_config(self, cluster_name: str) -> Dict[str, Any]:
"""Cert Manager 설정"""
return {
"installCRDs": True,
"serviceAccount": {
"create": True,
"annotations": {
"eks.amazonaws.com/role-arn": f"arn:aws:iam::ACCOUNT_ID:role/{cluster_name}-cert-manager-role"
}
},
"clusterIssuer": {
"letsencrypt-prod": {
"server": "https://acme-v02.api.letsencrypt.org/directory",
"email": "admin@example.com"
}
}
}
def _get_prometheus_config(self, cluster_name: str) -> Dict[str, Any]:
"""Prometheus 설정"""
return {
"prometheus": {
"prometheusSpec": {
"storageSpec": {
"volumeClaimTemplate": {
"spec": {
"storageClassName": "gp3",
"accessModes": ["ReadWriteOnce"],
"resources": {"requests": {"storage": "50Gi"}}
}
}
},
"retention": "30d"
}
},
"grafana": {
"enabled": True,
"persistence": {
"enabled": True,
"storageClassName": "gp3",
"size": "10Gi"
}
},
"alertmanager": {
"enabled": True,
"config": {
"global": {"slack_api_url": "https://hooks.slack.com/services/..."},
"route": {"group_by": ["alertname"], "receiver": "web.hook"},
"receivers": [{"name": "web.hook", "slack_configs": [{"channel": "#alerts"}]}]
}
}
}
def _get_istio_base_config(self, cluster_name: str) -> Dict[str, Any]:
"""Istio Base 설정"""
return {
"defaultRevision": "default",
"base": {
"enableCRDTemplates": False,
"validationURL": ""
}
}
def _get_istio_istiod_config(self, cluster_name: str) -> Dict[str, Any]:
"""Istio Istiod 설정"""
return {
"revision": "default",
"pilot": {
"env": {
"EXTERNAL_ISTIOD": False
}
},
"global": {
"meshID": "mesh1",
"network": cluster_name,
"cluster": cluster_name
}
}
async def upgrade_cluster(self, cluster_name: str, new_version: str) -> bool:
"""클러스터 업그레이드"""
print(f"Upgrading cluster {cluster_name} to version {new_version}")
try:
# 1. 컨트롤 플레인 업그레이드
await self._upgrade_control_plane(cluster_name, new_version)
# 2. 노드 풀 업그레이드
await self._upgrade_node_pools(cluster_name, new_version)
# 3. 애드온 업그레이드
await self._upgrade_addons(cluster_name)
print(f"Cluster {cluster_name} upgrade completed")
return True
except Exception as e:
print(f"Cluster upgrade failed: {str(e)}")
return False
async def _upgrade_control_plane(self, cluster_name: str, new_version: str):
"""컨트롤 플레인 업그레이드"""
# KubeadmControlPlane 리소스 업데이트
try:
control_plane = self.custom_api.get_namespaced_custom_object(
group="controlplane.cluster.x-k8s.io",
version="v1beta1",
namespace="default",
plural="kubeadmcontrolplanes",
name=f"{cluster_name}-control-plane"
)
# 버전 업데이트
control_plane["spec"]["version"] = new_version
# 패치 적용
self.custom_api.patch_namespaced_custom_object(
group="controlplane.cluster.x-k8s.io",
version="v1beta1",
namespace="default",
plural="kubeadmcontrolplanes",
name=f"{cluster_name}-control-plane",
body=control_plane
)
# 업그레이드 완료까지 대기
await self._wait_for_control_plane_upgrade(cluster_name, new_version)
except ApiException as e:
raise Exception(f"Failed to upgrade control plane: {str(e)}")
async def _upgrade_node_pools(self, cluster_name: str, new_version: str):
"""노드 풀 업그레이드"""
try:
# 모든 MachineDeployment 조회
machine_deployments = self.custom_api.list_namespaced_custom_object(
group="cluster.x-k8s.io",
version="v1beta1",
namespace="default",
plural="machinedeployments",
label_selector=f"cluster.x-k8s.io/cluster-name={cluster_name}"
)
for md in machine_deployments["items"]:
# 버전 업데이트
md["spec"]["template"]["spec"]["version"] = new_version
# 롤링 업데이트 적용
self.custom_api.patch_namespaced_custom_object(
group="cluster.x-k8s.io",
version="v1beta1",
namespace="default",
plural="machinedeployments",
name=md["metadata"]["name"],
body=md
)
# 업그레이드 완료까지 대기
await self._wait_for_node_pool_upgrade(md["metadata"]["name"], new_version)
except ApiException as e:
raise Exception(f"Failed to upgrade node pools: {str(e)}")
async def _wait_for_control_plane_upgrade(self, cluster_name: str, new_version: str):
"""컨트롤 플레인 업그레이드 대기"""
max_wait_time = 1800 # 30분 대기
start_time = time.time()
while time.time() - start_time < max_wait_time:
try:
control_plane = self.custom_api.get_namespaced_custom_object(
group="controlplane.cluster.x-k8s.io",
version="v1beta1",
namespace="default",
plural="kubeadmcontrolplanes",
name=f"{cluster_name}-control-plane"
)
status = control_plane.get("status", {})
if (status.get("version") == new_version and
status.get("ready") and
status.get("updatedReplicas") == status.get("replicas")):
print(f"Control plane upgrade to {new_version} completed")
return
await asyncio.sleep(30)
except ApiException as e:
if e.status != 404:
raise
raise Exception("Control plane upgrade timed out")
async def _wait_for_node_pool_upgrade(self, deployment_name: str, new_version: str):
"""노드 풀 업그레이드 대기"""
max_wait_time = 1800 # 30분 대기
start_time = time.time()
while time.time() - start_time < max_wait_time:
try:
md = self.custom_api.get_namespaced_custom_object(
group="cluster.x-k8s.io",
version="v1beta1",
namespace="default",
plural="machinedeployments",
name=deployment_name
)
status = md.get("status", {})
if (status.get("readyReplicas") == status.get("replicas") and
status.get("updatedReplicas") == status.get("replicas")):
print(f"Node pool {deployment_name} upgrade to {new_version} completed")
return
await asyncio.sleep(30)
except ApiException as e:
if e.status != 404:
raise
raise Exception(f"Node pool {deployment_name} upgrade timed out")
async def _upgrade_addons(self, cluster_name: str):
"""애드온 업그레이드"""
# 애드온별 업그레이드 로직
# (실제로는 Helm 차트 업그레이드)
print(f"Upgrading addons for cluster {cluster_name}")
async def delete_cluster(self, cluster_name: str) -> bool:
"""클러스터 삭제"""
print(f"Deleting cluster: {cluster_name}")
try:
# 1. 애드온 정리
await self._cleanup_addons(cluster_name)
# 2. 워커 노드 정리
await self._cleanup_worker_nodes(cluster_name)
# 3. 클러스터 리소스 삭제
await self._delete_cluster_resources(cluster_name)
# 4. 상태 정리
self.cluster_specs.pop(cluster_name, None)
self.cluster_statuses.pop(cluster_name, None)
print(f"Cluster {cluster_name} deletion completed")
return True
except Exception as e:
print(f"Cluster deletion failed: {str(e)}")
return False
async def _cleanup_addons(self, cluster_name: str):
"""애드온 정리"""
# 애드온별 정리 로직
print(f"Cleaning up addons for cluster {cluster_name}")
async def _cleanup_worker_nodes(self, cluster_name: str):
"""워커 노드 정리"""
# MachineDeployment 삭제
print(f"Cleaning up worker nodes for cluster {cluster_name}")
async def _delete_cluster_resources(self, cluster_name: str):
"""클러스터 리소스 삭제"""
# Cluster 리소스 삭제
try:
self.custom_api.delete_namespaced_custom_object(
group="cluster.x-k8s.io",
version="v1beta1",
namespace="default",
plural="clusters",
name=cluster_name
)
print(f"Deleted cluster resource: {cluster_name}")
except ApiException as e:
if e.status != 404:
raise
async def list_clusters(self) -> List[ClusterStatus]:
"""클러스터 목록 조회"""
statuses = []
try:
clusters = self.custom_api.list_namespaced_custom_object(
group="cluster.x-k8s.io",
version="v1beta1",
namespace="default",
plural="clusters"
)
for cluster in clusters["items"]:
cluster_name = cluster["metadata"]["name"]
# 클러스터 상태 조회
try:
status = await self._get_cluster_status(cluster_name)
statuses.append(status)
except Exception as e:
print(f"Failed to get status for cluster {cluster_name}: {str(e)}")
except ApiException as e:
print(f"Failed to list clusters: {str(e)}")
return statuses
async def get_cluster_info(self, cluster_name: str) -> Dict[str, Any]:
"""클러스터 상세 정보 조회"""
try:
status = await self._get_cluster_status(cluster_name)
spec = self.cluster_specs.get(cluster_name)
# 비용 계산
cost_estimate = await self._calculate_cluster_cost(cluster_name)
# 헬스 점수 계산
health_score = await self._calculate_health_score(cluster_name)
return {
"name": cluster_name,
"status": status.__dict__,
"spec": spec.__dict__ if spec else None,
"cost_estimate": cost_estimate,
"health_score": health_score,
"recommendations": await self._get_optimization_recommendations(cluster_name)
}
except Exception as e:
raise Exception(f"Failed to get cluster info: {str(e)}")
async def _calculate_cluster_cost(self, cluster_name: str) -> Dict[str, float]:
"""클러스터 비용 계산"""
# AWS Cost Explorer API를 사용한 실제 비용 조회
# 여기서는 단순화된 추정치 반환
return {
"monthly_compute": 5000.0,
"monthly_storage": 500.0,
"monthly_networking": 300.0,
"monthly_total": 5800.0
}
async def _calculate_health_score(self, cluster_name: str) -> float:
"""클러스터 헬스 점수 계산"""
# 다양한 메트릭을 종합한 헬스 점수
# 실제로는 Prometheus 메트릭을 조회
return 85.5
async def _get_optimization_recommendations(self, cluster_name: str) -> List[str]:
"""최적화 권장사항"""
return [
"Consider using spot instances for non-critical workloads",
"Enable cluster autoscaler for cost optimization",
"Review resource requests and limits for overprovisioning"
]
# 사용 예제
async def demonstrate_cluster_management():
"""클러스터 관리 데모"""
manager = ClusterLifecycleManager("/path/to/management/kubeconfig")
# 프로덕션 클러스터 스펙 정의
prod_cluster_spec = ClusterSpec(
name="prod-us-east",
tier=ClusterTier.PRODUCTION,
region="us-east-1",
kubernetes_version="v1.28.5",
control_plane_config={
"replicas": 3,
"instance_type": "c5.xlarge",
"root_volume_size": 120
},
node_pools=[
{
"name": "system",
"machine_type": "c5.large",
"min_nodes": 3,
"max_nodes": 5,
"root_volume_size": 80
},
{
"name": "application",
"machine_type": "c5.2xlarge",
"min_nodes": 5,
"max_nodes": 50,
"root_volume_size": 100
},
{
"name": "ml-workload",
"machine_type": "p3.2xlarge",
"min_nodes": 0,
"max_nodes": 20,
"spot_instances": True,
"spot_max_price": "1.50"
}
],
networking_config={
"vpc_cidr": "10.0.0.0/16",
"pod_cidr": "192.168.0.0/16",
"service_cidr": "10.128.0.0/12"
},
addons=[
"cluster-autoscaler",
"aws-load-balancer-controller",
"external-dns",
"cert-manager",
"prometheus-operator",
"istio-base",
"istio-istiod"
],
backup_config={
"enabled": True,
"schedule": "daily",
"retention": "30d"
},
monitoring_config={
"prometheus": True,
"grafana": True,
"jaeger": True
},
security_config={
"pod_security_policy": True,
"network_policy": True,
"encryption_at_rest": True,
"audit_logging": True
}
)
# 클러스터 생성
cluster_name = await manager.create_cluster(prod_cluster_spec)
print(f"Created cluster: {cluster_name}")
# 클러스터 정보 조회
cluster_info = await manager.get_cluster_info(cluster_name)
print(f"Cluster info: {json.dumps(cluster_info, indent=2, default=str)}")
# 클러스터 업그레이드
await manager.upgrade_cluster(cluster_name, "v1.28.6")
# 클러스터 목록 조회
clusters = await manager.list_clusters()
print(f"Active clusters: {len(clusters)}")
# 실행
# asyncio.run(demonstrate_cluster_management())
4. 엔터프라이즈 보안 강화
4.1 Pod Security Standards와 정책 기반 거버넌스
# enterprise-security-policies.yaml
apiVersion: v1
kind: Namespace
metadata:
name: critical-workloads
labels:
pod-security.kubernetes.io/enforce: restricted
pod-security.kubernetes.io/audit: restricted
pod-security.kubernetes.io/warn: restricted
compliance.enterprise.io/level: critical
data-classification: confidential
---
apiVersion: v1
kind: Namespace
metadata:
name: production-workloads
labels:
pod-security.kubernetes.io/enforce: restricted
pod-security.kubernetes.io/audit: restricted
pod-security.kubernetes.io/warn: restricted
compliance.enterprise.io/level: high
---
apiVersion: v1
kind: Namespace
metadata:
name: development-workloads
labels:
pod-security.kubernetes.io/enforce: baseline
pod-security.kubernetes.io/audit: restricted
pod-security.kubernetes.io/warn: restricted
compliance.enterprise.io/level: medium
---
# OPA Gatekeeper 정책
apiVersion: templates.gatekeeper.sh/v1beta1
kind: ConstraintTemplate
metadata:
name: requiredsecuritycontext
spec:
crd:
spec:
names:
kind: RequiredSecurityContext
validation:
openAPIV3Schema:
type: object
properties:
runAsNonRoot:
type: boolean
runAsUser:
type: integer
fsGroup:
type: integer
supplementalGroups:
type: array
items:
type: integer
targets:
- target: admission.k8s.gatekeeper.sh
rego: |
package requiredsecuritycontext
violation[{"msg": msg}] {
container := input.review.object.spec.containers[_]
not container.securityContext.runAsNonRoot
msg := sprintf("Container <%v> must run as non-root user", [container.name])
}
violation[{"msg": msg}] {
container := input.review.object.spec.containers[_]
container.securityContext.runAsUser < 1000
msg := sprintf("Container <%v> must run as user ID >= 1000", [container.name])
}
violation[{"msg": msg}] {
not input.review.object.spec.securityContext.fsGroup
msg := "Pod must specify fsGroup"
}
violation[{"msg": msg}] {
input.review.object.spec.securityContext.fsGroup < 1000
msg := "Pod fsGroup must be >= 1000"
}
violation[{"msg": msg}] {
container := input.review.object.spec.containers[_]
container.securityContext.privileged
msg := sprintf("Container <%v> cannot run in privileged mode", [container.name])
}
violation[{"msg": msg}] {
container := input.review.object.spec.containers[_]
capability := container.securityContext.capabilities.add[_]
capability in ["SYS_ADMIN", "NET_ADMIN", "SYS_TIME"]
msg := sprintf("Container <%v> cannot add dangerous capability <%v>", [container.name, capability])
}
---
apiVersion: config.gatekeeper.sh/v1alpha1
kind: RequiredSecurityContext
metadata:
name: must-run-as-nonroot
spec:
match:
- excludedNamespaces: ["kube-system", "kube-public", "gatekeeper-system"]
kinds:
- apiGroups: [""]
kinds: ["Pod"]
namespaces: ["production-workloads", "critical-workloads"]
parameters:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
---
# 이미지 정책
apiVersion: templates.gatekeeper.sh/v1beta1
kind: ConstraintTemplate
metadata:
name: allowedimageregistries
spec:
crd:
spec:
names:
kind: AllowedImageRegistries
validation:
openAPIV3Schema:
type: object
properties:
registries:
type: array
items:
type: string
exemptions:
type: array
items:
type: object
properties:
namespace:
type: string
image:
type: string
targets:
- target: admission.k8s.gatekeeper.sh
rego: |
package allowedimageregistries
violation[{"msg": msg}] {
container := input.review.object.spec.containers[_]
not is_allowed_registry(container.image)
not is_exempted(container.image)
msg := sprintf("Container image <%v> not from allowed registry", [container.image])
}
is_allowed_registry(image) {
registry := input.parameters.registries[_]
startswith(image, registry)
}
is_exempted(image) {
exemption := input.parameters.exemptions[_]
exemption.namespace == input.review.object.metadata.namespace
exemption.image == image
}
---
apiVersion: config.gatekeeper.sh/v1alpha1
kind: AllowedImageRegistries
metadata:
name: enterprise-image-policy
spec:
match:
- excludedNamespaces: ["kube-system"]
kinds:
- apiGroups: [""]
kinds: ["Pod"]
- apiGroups: ["apps"]
kinds: ["Deployment", "ReplicaSet", "StatefulSet", "DaemonSet"]
parameters:
registries:
- "company-registry.example.com/"
- "gcr.io/distroless/"
- "registry.k8s.io/"
exemptions:
- namespace: "monitoring"
image: "prom/prometheus:latest"
- namespace: "logging"
image: "elastic/elasticsearch:8.5.0"
---
# 네트워크 정책
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: default-deny-all
namespace: production-workloads
spec:
podSelector: {}
policyTypes:
- Ingress
- Egress
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: web-to-api-communication
namespace: production-workloads
spec:
podSelector:
matchLabels:
app: web-frontend
policyTypes:
- Egress
egress:
- to:
- podSelector:
matchLabels:
app: api-backend
ports:
- protocol: TCP
port: 8080
- to:
- namespaceSelector:
matchLabels:
name: shared-services
podSelector:
matchLabels:
app: database
ports:
- protocol: TCP
port: 5432
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: api-backend-policy
namespace: production-workloads
spec:
podSelector:
matchLabels:
app: api-backend
policyTypes:
- Ingress
- Egress
ingress:
- from:
- podSelector:
matchLabels:
app: web-frontend
- podSelector:
matchLabels:
app: mobile-api-gateway
ports:
- protocol: TCP
port: 8080
egress:
- to:
- namespaceSelector:
matchLabels:
name: shared-services
ports:
- protocol: TCP
port: 5432
- protocol: TCP
port: 6379
- to: [] # DNS resolution
ports:
- protocol: UDP
port: 53
---
# RBAC 정책
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: enterprise-platform-admin
rules:
- apiGroups: ["*"]
resources: ["*"]
verbs: ["*"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: enterprise-namespace-admin
rules:
- apiGroups: [""]
resources: ["pods", "services", "configmaps", "secrets", "persistentvolumeclaims"]
verbs: ["get", "list", "create", "update", "patch", "delete"]
- apiGroups: ["apps"]
resources: ["deployments", "replicasets", "statefulsets", "daemonsets"]
verbs: ["get", "list", "create", "update", "patch", "delete"]
- apiGroups: ["networking.k8s.io"]
resources: ["networkpolicies", "ingresses"]
verbs: ["get", "list", "create", "update", "patch", "delete"]
- apiGroups: ["autoscaling"]
resources: ["horizontalpodautoscalers"]
verbs: ["get", "list", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: enterprise-developer
rules:
- apiGroups: [""]
resources: ["pods", "pods/log", "pods/status"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["services", "configmaps"]
verbs: ["get", "list", "create", "update", "patch"]
- apiGroups: ["apps"]
resources: ["deployments", "replicasets"]
verbs: ["get", "list", "create", "update", "patch"]
- apiGroups: [""]
resources: ["secrets"]
verbs: ["get", "list"] # 시크릿은 읽기 전용
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: enterprise-readonly
rules:
- apiGroups: ["*"]
resources: ["*"]
verbs: ["get", "list", "watch"]
---
# 서비스 어카운트와 역할 바인딩
apiVersion: v1
kind: ServiceAccount
metadata:
name: platform-admin
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: platform-admin-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: enterprise-platform-admin
subjects:
- kind: ServiceAccount
name: platform-admin
namespace: kube-system
- kind: User
name: admin@company.com
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: namespace-admin-production
namespace: production-workloads
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: enterprise-namespace-admin
subjects:
- kind: User
name: team-lead-prod@company.com
apiGroup: rbac.authorization.k8s.io
- kind: Group
name: production-team
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: developers-production
namespace: production-workloads
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: enterprise-developer
subjects:
- kind: Group
name: developers
apiGroup: rbac.authorization.k8s.io
---
# 시크릿 암호화 정책
apiVersion: v1
kind: Secret
metadata:
name: encryption-config
namespace: kube-system
type: Opaque
data:
encryption-config.yaml: |
apiVersion: apiserver.config.k8s.io/v1
kind: EncryptionConfiguration
resources:
- resources:
- secrets
- configmaps
- persistentvolumes
providers:
- aescbc:
keys:
- name: key1
secret: <base64-encoded-32-byte-key>
- identity: {}
---
# 감사 정책
apiVersion: v1
kind: ConfigMap
metadata:
name: audit-policy
namespace: kube-system
data:
audit-policy.yaml: |
apiVersion: audit.k8s.io/v1
kind: Policy
rules:
# 시크릿과 관련된 모든 요청을 기록
- level: RequestResponse
resources:
- group: ""
resources: ["secrets"]
# RBAC 변경사항 기록
- level: RequestResponse
resources:
- group: "rbac.authorization.k8s.io"
resources: ["*"]
# 네트워크 정책 변경사항 기록
- level: RequestResponse
resources:
- group: "networking.k8s.io"
resources: ["networkpolicies"]
# 실행 및 연결 기록
- level: Request
resources:
- group: ""
resources: ["pods/exec", "pods/attach", "pods/portforward"]
# 시스템 구성 요소는 기록하지 않음
- level: None
users:
- "system:kube-proxy"
- "system:kube-controller-manager"
- "system:kube-scheduler"
- "system:node-controller"
- "system:volume-scheduler"
# 기본적으로 메타데이터만 기록
- level: Metadata
---
# Falco 런타임 보안 정책
apiVersion: v1
kind: ConfigMap
metadata:
name: falco-rules
namespace: falco-system
data:
k8s_audit_rules.yaml: |
- rule: Create Privileged Pod
desc: Detect creation of privileged pods
condition: >
ka and kcreate and pod and
ka.req.pod.spec.containers[*].securityContext.privileged=true
output: >
Privileged pod created (user=%ka.user.name verb=%ka.verb
pod=%ka.req.pod.name namespace=%ka.req.pod.namespace
image=%ka.req.pod.spec.containers[*].image)
priority: WARNING
tags: [k8s, privileged]
- rule: Create Sensitive Mount Pod
desc: Detect creation of pods with sensitive mounts
condition: >
ka and kcreate and pod and
(ka.req.pod.spec.volumes[*].hostPath.path in (/proc, /var/run/docker.sock, /var/lib/kubelet, /etc))
output: >
Pod created with sensitive host mount (user=%ka.user.name verb=%ka.verb
pod=%ka.req.pod.name namespace=%ka.req.pod.namespace
mount=%ka.req.pod.spec.volumes[*].hostPath.path)
priority: WARNING
tags: [k8s, mount]
- rule: Create NodePort Service
desc: Detect creation of NodePort services
condition: >
ka and kcreate and service and
ka.req.service.spec.type=NodePort
output: >
NodePort service created (user=%ka.user.name verb=%ka.verb
service=%ka.req.service.name namespace=%ka.req.service.namespace
port=%ka.req.service.spec.ports[*].nodePort)
priority: INFO
tags: [k8s, network]
falco_rules.yaml: |
- rule: Write below binary dir
desc: Detect writing to binary directories
condition: >
bin_dir and evt.dir = < and open_write and
not package_mgmt_procs and
not exe_running_docker_save and
not python_running_get_pip
output: >
File below a known binary directory opened for writing
(user=%user.name command=%proc.cmdline file=%fd.name
parent=%proc.pname pcmdline=%proc.pcmdline)
priority: ERROR
tags: [filesystem, mitre_persistence]
- rule: Netcat Remote Code Execution in Container
desc: Detect netcat spawning a shell
condition: >
spawned_process and container and
((proc.name = "nc" and (proc.args contains "-e" or proc.args contains "-c")) or
(proc.name = "ncat" and (proc.args contains "-e" or proc.args contains "-c" or proc.args contains "--sh-exec")))
output: >
Netcat runs inside container that spawns shell
(user=%user.name container_id=%container.id image=%container.image.repository
proc=%proc.name parent=%proc.pname cmdline=%proc.cmdline)
priority: WARNING
tags: [network, shell, mitre_execution]
- rule: Launch Suspicious Network Tool in Container
desc: Detect network tools launched inside container
condition: >
spawned_process and container and
network_tool_procs
output: >
Network tool launched in container
(user=%user.name container_id=%container.id image=%container.image.repository
proc=%proc.name parent=%proc.pname cmdline=%proc.cmdline)
priority: NOTICE
tags: [network, mitre_discovery, mitre_exfiltration]
---
# 스캐너 통합 (Trivy Operator)
apiVersion: aquasecurity.github.io/v1alpha1
kind: VulnerabilityReport
metadata:
name: scan-policy
namespace: trivy-system
spec:
scanner:
name: Trivy
vendor: Aqua Security
version: "0.35.0"
registry:
server: "company-registry.example.com"
updateStrategy: "automatic"
scanJobTemplate:
spec:
template:
spec:
restartPolicy: Never
containers:
- name: trivy
image: aquasec/trivy:latest
command:
- trivy
args:
- image
- --format=json
- --exit-code=0
- --no-progress
- --severity=UNKNOWN,LOW,MEDIUM,HIGH,CRITICAL
4.2 Zero Trust 네트워킹 구현
# zero_trust_network.py
import asyncio
import json
import yaml
from typing import Dict, List, Any, Optional, Set
from dataclasses import dataclass
from kubernetes import client, config
from kubernetes.client.rest import ApiException
import ipaddress
from enum import Enum
class TrustLevel(Enum):
UNTRUSTED = "untrusted"
LIMITED = "limited"
TRUSTED = "trusted"
PRIVILEGED = "privileged"
class NetworkZone(Enum):
DMZ = "dmz"
APPLICATION = "application"
DATABASE = "database"
MANAGEMENT = "management"
EXTERNAL = "external"
@dataclass
class ServiceIdentity:
"""서비스 아이덴티티"""
namespace: str
service_account: str
labels: Dict[str, str]
trust_level: TrustLevel
zone: NetworkZone
allowed_communications: Set[str]
@dataclass
class NetworkSegment:
"""네트워크 세그먼트"""
name: str
zone: NetworkZone
cidr_blocks: List[str]
trust_level: TrustLevel
ingress_rules: List[Dict[str, Any]]
egress_rules: List[Dict[str, Any]]
class ZeroTrustNetworkController:
"""Zero Trust 네트워크 컨트롤러"""
def __init__(self):
config.load_incluster_config()
self.k8s_client = client.ApiClient()
self.networking_api = client.NetworkingV1Api()
self.core_api = client.CoreV1Api()
# 서비스 메시 (Istio) API
self.custom_api = client.CustomObjectsApi()
# 네트워크 세그먼트 정의
self.network_segments = self._initialize_network_segments()
# 서비스 아이덴티티 레지스트리
self.service_identities = {}
# 통신 매트릭스
self.communication_matrix = self._initialize_communication_matrix()
def _initialize_network_segments(self) -> Dict[str, NetworkSegment]:
"""네트워크 세그먼트 초기화"""
segments = {}
# DMZ 존 - 외부 트래픽 진입점
segments["dmz"] = NetworkSegment(
name="dmz",
zone=NetworkZone.DMZ,
cidr_blocks=["10.0.0.0/24"],
trust_level=TrustLevel.UNTRUSTED,
ingress_rules=[
{
"from_external": True,
"ports": [80, 443],
"protocols": ["TCP"]
}
],
egress_rules=[
{
"to_zone": NetworkZone.APPLICATION,
"ports": [8080, 8443],
"protocols": ["TCP"]
}
]
)
# 애플리케이션 존
segments["application"] = NetworkSegment(
name="application",
zone=NetworkZone.APPLICATION,
cidr_blocks=["10.0.1.0/24", "10.0.2.0/24"],
trust_level=TrustLevel.LIMITED,
ingress_rules=[
{
"from_zone": NetworkZone.DMZ,
"ports": [8080, 8443],
"protocols": ["TCP"]
},
{
"from_zone": NetworkZone.APPLICATION,
"ports": [8080, 8443, 9090],
"protocols": ["TCP"]
}
],
egress_rules=[
{
"to_zone": NetworkZone.DATABASE,
"ports": [3306, 5432, 6379],
"protocols": ["TCP"]
},
{
"to_zone": NetworkZone.APPLICATION,
"ports": [8080, 8443, 9090],
"protocols": ["TCP"]
}
]
)
# 데이터베이스 존
segments["database"] = NetworkSegment(
name="database",
zone=NetworkZone.DATABASE,
cidr_blocks=["10.0.10.0/24"],
trust_level=TrustLevel.TRUSTED,
ingress_rules=[
{
"from_zone": NetworkZone.APPLICATION,
"ports": [3306, 5432, 6379],
"protocols": ["TCP"]
}
],
egress_rules=[
{
"to_external": True,
"ports": [53],
"protocols": ["UDP"] # DNS만 허용
}
]
)
# 관리 존
segments["management"] = NetworkSegment(
name="management",
zone=NetworkZone.MANAGEMENT,
cidr_blocks=["10.0.100.0/24"],
trust_level=TrustLevel.PRIVILEGED,
ingress_rules=[
{
"from_specific_ips": ["10.0.200.0/24"], # 관리자 네트워크
"ports": [22, 443, 6443],
"protocols": ["TCP"]
}
],
egress_rules=[
{
"to_any": True,
"ports": [80, 443, 22, 6443],
"protocols": ["TCP"]
}
]
)
return segments
def _initialize_communication_matrix(self) -> Dict[str, Dict[str, List[Dict[str, Any]]]]:
"""통신 매트릭스 초기화"""
return {
# 웹 프론트엔드는 API 백엔드와 통신 가능
"web-frontend": {
"api-backend": [
{"port": 8080, "protocol": "TCP", "purpose": "API calls"}
],
"auth-service": [
{"port": 8443, "protocol": "TCP", "purpose": "Authentication"}
]
},
# API 백엔드는 데이터베이스와 캐시에 접근 가능
"api-backend": {
"database": [
{"port": 5432, "protocol": "TCP", "purpose": "Database queries"}
],
"cache": [
{"port": 6379, "protocol": "TCP", "purpose": "Cache operations"}
],
"message-queue": [
{"port": 5672, "protocol": "TCP", "purpose": "Async messaging"}
]
},
# 인증 서비스는 사용자 데이터베이스에 접근
"auth-service": {
"user-database": [
{"port": 5432, "protocol": "TCP", "purpose": "User authentication"}
],
"ldap-server": [
{"port": 389, "protocol": "TCP", "purpose": "Directory lookup"},
{"port": 636, "protocol": "TCP", "purpose": "Secure LDAP"}
]
},
# 모니터링 시스템은 모든 서비스에서 메트릭 수집
"prometheus": {
"web-frontend": [
{"port": 9090, "protocol": "TCP", "purpose": "Metrics collection"}
],
"api-backend": [
{"port": 9090, "protocol": "TCP", "purpose": "Metrics collection"}
],
"database": [
{"port": 9187, "protocol": "TCP", "purpose": "Database metrics"}
]
}
}
async def register_service_identity(self, service_identity: ServiceIdentity) -> str:
"""서비스 아이덴티티 등록"""
identity_key = f"{service_identity.namespace}/{service_identity.service_account}"
# 서비스 아이덴티티 저장
self.service_identities[identity_key] = service_identity
# mTLS 인증서 생성
await self._create_service_certificate(service_identity)
# 네트워크 정책 생성
await self._create_network_policies(service_identity)
# Istio 서비스 정책 생성
await self._create_istio_policies(service_identity)
print(f"Registered service identity: {identity_key}")
return identity_key
async def _create_service_certificate(self, identity: ServiceIdentity):
"""서비스용 mTLS 인증서 생성"""
# cert-manager를 사용한 인증서 발급 요청
certificate_spec = {
"apiVersion": "cert-manager.io/v1",
"kind": "Certificate",
"metadata": {
"name": f"{identity.service_account}-tls",
"namespace": identity.namespace
},
"spec": {
"secretName": f"{identity.service_account}-tls-secret",
"duration": "8760h", # 1년
"renewBefore": "720h", # 30일 전 갱신
"subject": {
"organizations": ["company.com"],
"organizationalUnits": [identity.zone.value]
},
"commonName": f"{identity.service_account}.{identity.namespace}.svc.cluster.local",
"dnsNames": [
f"{identity.service_account}",
f"{identity.service_account}.{identity.namespace}",
f"{identity.service_account}.{identity.namespace}.svc",
f"{identity.service_account}.{identity.namespace}.svc.cluster.local"
],
"issuerRef": {
"name": "internal-ca-issuer",
"kind": "ClusterIssuer",
"group": "cert-manager.io"
}
}
}
try:
await self.custom_api.create_namespaced_custom_object(
group="cert-manager.io",
version="v1",
namespace=identity.namespace,
plural="certificates",
body=certificate_spec
)
print(f"Certificate created for {identity.service_account}")
except ApiException as e:
if e.status != 409: # 이미 존재하는 경우 무시
raise
async def _create_network_policies(self, identity: ServiceIdentity):
"""네트워크 정책 생성"""
# 기본 거부 정책
default_deny_policy = {
"apiVersion": "networking.k8s.io/v1",
"kind": "NetworkPolicy",
"metadata": {
"name": f"{identity.service_account}-default-deny",
"namespace": identity.namespace
},
"spec": {
"podSelector": {
"matchLabels": {
"app": identity.service_account
}
},
"policyTypes": ["Ingress", "Egress"]
}
}
# 허용된 통신만 허용하는 정책
allowed_ingress = []
allowed_egress = []
# 통신 매트릭스에서 허용된 연결 조회
service_comms = self.communication_matrix.get(identity.service_account, {})
for target_service, connections in service_comms.items():
for connection in connections:
egress_rule = {
"to": [
{
"podSelector": {
"matchLabels": {
"app": target_service
}
}
}
],
"ports": [
{
"protocol": connection["protocol"],
"port": connection["port"]
}
]
}
allowed_egress.append(egress_rule)
# 역방향 통신 (이 서비스로 들어오는 트래픽)
for source_service, targets in self.communication_matrix.items():
if identity.service_account in targets:
connections = targets[identity.service_account]
for connection in connections:
ingress_rule = {
"from": [
{
"podSelector": {
"matchLabels": {
"app": source_service
}
}
}
],
"ports": [
{
"protocol": connection["protocol"],
"port": connection["port"]
}
]
}
allowed_ingress.append(ingress_rule)
# DNS 접근 허용 (모든 서비스에 필요)
dns_egress = {
"to": [],
"ports": [
{"protocol": "UDP", "port": 53},
{"protocol": "TCP", "port": 53}
]
}
allowed_egress.append(dns_egress)
# 허용 정책 생성
if allowed_ingress or allowed_egress:
allow_policy = {
"apiVersion": "networking.k8s.io/v1",
"kind": "NetworkPolicy",
"metadata": {
"name": f"{identity.service_account}-allow",
"namespace": identity.namespace
},
"spec": {
"podSelector": {
"matchLabels": {
"app": identity.service_account
}
},
"policyTypes": ["Ingress", "Egress"],
"ingress": allowed_ingress,
"egress": allowed_egress
}
}
try:
await self.networking_api.create_namespaced_network_policy(
namespace=identity.namespace,
body=allow_policy
)
print(f"Network policy created for {identity.service_account}")
except ApiException as e:
if e.status != 409:
raise
async def _create_istio_policies(self, identity: ServiceIdentity):
"""Istio 보안 정책 생성"""
# PeerAuthentication - mTLS 강제
peer_auth_policy = {
"apiVersion": "security.istio.io/v1beta1",
"kind": "PeerAuthentication",
"metadata": {
"name": f"{identity.service_account}-mtls",
"namespace": identity.namespace
},
"spec": {
"selector": {
"matchLabels": {
"app": identity.service_account
}
},
"mtls": {
"mode": "STRICT"
}
}
}
# AuthorizationPolicy - 접근 제어
authz_rules = []
# 허용된 소스에서의 접근만 허용
for source_service, targets in self.communication_matrix.items():
if identity.service_account in targets:
connections = targets[identity.service_account]
for connection in connections:
rule = {
"from": [
{
"source": {
"principals": [f"cluster.local/ns/{identity.namespace}/sa/{source_service}"]
}
}
],
"to": [
{
"operation": {
"ports": [str(connection["port"])]
}
}
]
}
authz_rules.append(rule)
authorization_policy = {
"apiVersion": "security.istio.io/v1beta1",
"kind": "AuthorizationPolicy",
"metadata": {
"name": f"{identity.service_account}-authz",
"namespace": identity.namespace
},
"spec": {
"selector": {
"matchLabels": {
"app": identity.service_account
}
},
"rules": authz_rules if authz_rules else [{}] # 빈 규칙은 모든 접근 거부
}
}
try:
# PeerAuthentication 생성
await self.custom_api.create_namespaced_custom_object(
group="security.istio.io",
version="v1beta1",
namespace=identity.namespace,
plural="peerauthentications",
body=peer_auth_policy
)
# AuthorizationPolicy 생성
await self.custom_api.create_namespaced_custom_object(
group="security.istio.io",
version="v1beta1",
namespace=identity.namespace,
plural="authorizationpolicies",
body=authorization_policy
)
print(f"Istio policies created for {identity.service_account}")
except ApiException as e:
if e.status != 409:
raise
async def verify_communication(self, source_identity: str, target_identity: str,
port: int, protocol: str = "TCP") -> Dict[str, Any]:
"""통신 가능성 검증"""
source = self.service_identities.get(source_identity)
target = self.service_identities.get(target_identity)
if not source or not target:
return {
"allowed": False,
"reason": "Source or target identity not found",
"policy_violations": []
}
violations = []
# 1. Trust Level 확인
if source.trust_level == TrustLevel.UNTRUSTED and target.trust_level in [TrustLevel.TRUSTED, TrustLevel.PRIVILEGED]:
violations.append("Untrusted source cannot access trusted target")
# 2. Zone 간 통신 규칙 확인
source_segment = self.network_segments.get(source.zone.value)
target_segment = self.network_segments.get(target.zone.value)
zone_allowed = False
if source_segment and target_segment:
for egress_rule in source_segment.egress_rules:
if ("to_zone" in egress_rule and egress_rule["to_zone"] == target.zone and
port in egress_rule.get("ports", []) and
protocol in egress_rule.get("protocols", [])):
zone_allowed = True
break
if not zone_allowed:
violations.append(f"Zone-level communication not allowed from {source.zone} to {target.zone}")
# 3. 서비스 레벨 통신 매트릭스 확인
service_allowed = False
source_service = source.service_account
target_service = target.service_account
if source_service in self.communication_matrix:
target_comms = self.communication_matrix[source_service].get(target_service, [])
for comm in target_comms:
if comm["port"] == port and comm["protocol"] == protocol:
service_allowed = True
break
if not service_allowed:
violations.append(f"Service-level communication not allowed from {source_service} to {target_service}:{port}/{protocol}")
# 4. 시간 기반 접근 제어 (예제)
# 실제 구현에서는 더 복잡한 시간 기반 규칙 적용 가능
return {
"allowed": len(violations) == 0,
"reason": "Communication allowed" if len(violations) == 0 else "Policy violations detected",
"policy_violations": violations,
"source_trust_level": source.trust_level.value,
"target_trust_level": target.trust_level.value,
"source_zone": source.zone.value,
"target_zone": target.zone.value
}
async def monitor_network_traffic(self) -> Dict[str, Any]:
"""네트워크 트래픽 모니터링"""
# 실제로는 Istio 텔레메트리나 네트워크 모니터링 도구에서 데이터 수집
# 여기서는 시뮬레이션된 데이터 반환
suspicious_activities = []
# 1. 허용되지 않은 통신 시도 감지
unauthorized_attempts = [
{
"source": "production-workloads/web-frontend",
"target": "database/user-database:5432/TCP",
"timestamp": "2026-01-24T10:30:00Z",
"blocked": True,
"reason": "Direct database access from frontend"
},
{
"source": "development-workloads/test-app",
"target": "production-workloads/api-backend:8080/TCP",
"timestamp": "2026-01-24T10:35:00Z",
"blocked": True,
"reason": "Cross-environment communication"
}
]
suspicious_activities.extend(unauthorized_attempts)
# 2. 비정상적인 트래픽 패턴 감지
anomalous_patterns = [
{
"source": "production-workloads/api-backend",
"target": "external/suspicious-domain.com:443/TCP",
"timestamp": "2026-01-24T10:40:00Z",
"blocked": False,
"reason": "Communication with suspicious external domain",
"risk_level": "HIGH"
}
]
suspicious_activities.extend(anomalous_patterns)
# 3. 트래픽 통계
traffic_stats = {
"total_connections": 15847,
"allowed_connections": 15835,
"blocked_connections": 12,
"mTLS_coverage": 0.987,
"policy_compliance": 0.995
}
return {
"monitoring_period": "last_hour",
"traffic_statistics": traffic_stats,
"suspicious_activities": suspicious_activities,
"security_score": self._calculate_security_score(traffic_stats, suspicious_activities),
"recommendations": self._get_security_recommendations(suspicious_activities)
}
def _calculate_security_score(self, traffic_stats: Dict[str, Any],
suspicious_activities: List[Dict[str, Any]]) -> float:
"""보안 점수 계산"""
base_score = 100.0
# mTLS 커버리지에 따른 점수
mtls_score = traffic_stats["mTLS_coverage"] * 30
# 정책 준수율에 따른 점수
compliance_score = traffic_stats["policy_compliance"] * 30
# 차단된 연결 비율 (낮을수록 좋음)
block_rate = traffic_stats["blocked_connections"] / traffic_stats["total_connections"]
block_score = max(0, 20 - (block_rate * 1000)) # 1% 차단 시 10점 감점
# 의심 활동에 따른 감점
suspicion_penalty = min(20, len(suspicious_activities) * 2)
final_score = mtls_score + compliance_score + block_score + (20 - suspicion_penalty)
return round(final_score, 1)
def _get_security_recommendations(self, suspicious_activities: List[Dict[str, Any]]) -> List[str]:
"""보안 권장사항 생성"""
recommendations = []
if suspicious_activities:
recommendations.append("Review and investigate suspicious network activities")
# 여러 외부 도메인 접근이 있는 경우
external_accesses = [act for act in suspicious_activities if "external/" in act.get("target", "")]
if len(external_accesses) > 2:
recommendations.append("Implement egress gateway for external traffic control")
# 차단된 내부 통신이 많은 경우
internal_blocks = [act for act in suspicious_activities if act.get("blocked") and "external/" not in act.get("target", "")]
if len(internal_blocks) > 5:
recommendations.append("Review internal communication policies for over-restriction")
return recommendations
async def update_communication_matrix(self, source_service: str, target_service: str,
connections: List[Dict[str, Any]]) -> bool:
"""통신 매트릭스 업데이트"""
if source_service not in self.communication_matrix:
self.communication_matrix[source_service] = {}
self.communication_matrix[source_service][target_service] = connections
# 영향받는 모든 서비스의 정책 업데이트
affected_identities = []
for identity_key, identity in self.service_identities.items():
if identity.service_account in [source_service, target_service]:
affected_identities.append(identity)
for identity in affected_identities:
await self._create_network_policies(identity)
await self._create_istio_policies(identity)
print(f"Communication matrix updated: {source_service} -> {target_service}")
return True
async def get_security_posture(self) -> Dict[str, Any]:
"""전체 보안 상태 조회"""
# 등록된 서비스 아이덴티티 통계
identity_stats = {
"total_identities": len(self.service_identities),
"trust_level_distribution": {},
"zone_distribution": {}
}
for identity in self.service_identities.values():
trust_level = identity.trust_level.value
zone = identity.zone.value
identity_stats["trust_level_distribution"][trust_level] = \
identity_stats["trust_level_distribution"].get(trust_level, 0) + 1
identity_stats["zone_distribution"][zone] = \
identity_stats["zone_distribution"].get(zone, 0) + 1
# 네트워크 정책 통계
policy_stats = await self._get_policy_statistics()
# 트래픽 모니터링 결과
traffic_monitoring = await self.monitor_network_traffic()
return {
"identity_statistics": identity_stats,
"policy_statistics": policy_stats,
"traffic_monitoring": traffic_monitoring,
"compliance_status": {
"zero_trust_score": traffic_monitoring["security_score"],
"mtls_coverage": traffic_monitoring["traffic_statistics"]["mTLS_coverage"],
"policy_compliance": traffic_monitoring["traffic_statistics"]["policy_compliance"]
},
"recommendations": traffic_monitoring["recommendations"]
}
async def _get_policy_statistics(self) -> Dict[str, Any]:
"""정책 통계 조회"""
# Kubernetes 네트워크 정책 수
try:
network_policies = self.networking_api.list_network_policy_for_all_namespaces()
network_policy_count = len(network_policies.items)
except:
network_policy_count = 0
# Istio 보안 정책 수
try:
peer_auth_policies = self.custom_api.list_cluster_custom_object(
group="security.istio.io",
version="v1beta1",
plural="peerauthentications"
)
peer_auth_count = len(peer_auth_policies.get("items", []))
except:
peer_auth_count = 0
try:
authz_policies = self.custom_api.list_cluster_custom_object(
group="security.istio.io",
version="v1beta1",
plural="authorizationpolicies"
)
authz_count = len(authz_policies.get("items", []))
except:
authz_count = 0
return {
"network_policies": network_policy_count,
"peer_authentication_policies": peer_auth_count,
"authorization_policies": authz_count,
"communication_rules": sum(len(targets) for targets in self.communication_matrix.values())
}
# 사용 예제
async def demonstrate_zero_trust_network():
"""Zero Trust 네트워크 데모"""
controller = ZeroTrustNetworkController()
# 서비스 아이덴티티 등록
web_frontend_identity = ServiceIdentity(
namespace="production-workloads",
service_account="web-frontend",
labels={"app": "web-frontend", "tier": "frontend"},
trust_level=TrustLevel.LIMITED,
zone=NetworkZone.DMZ,
allowed_communications={"api-backend", "auth-service"}
)
api_backend_identity = ServiceIdentity(
namespace="production-workloads",
service_account="api-backend",
labels={"app": "api-backend", "tier": "backend"},
trust_level=TrustLevel.TRUSTED,
zone=NetworkZone.APPLICATION,
allowed_communications={"database", "cache", "message-queue"}
)
database_identity = ServiceIdentity(
namespace="production-workloads",
service_account="database",
labels={"app": "database", "tier": "data"},
trust_level=TrustLevel.PRIVILEGED,
zone=NetworkZone.DATABASE,
allowed_communications=set()
)
# 서비스 등록
await controller.register_service_identity(web_frontend_identity)
await controller.register_service_identity(api_backend_identity)
await controller.register_service_identity(database_identity)
# 통신 검증
comm_check = await controller.verify_communication(
source_identity="production-workloads/web-frontend",
target_identity="production-workloads/api-backend",
port=8080,
protocol="TCP"
)
print(f"Communication check result: {json.dumps(comm_check, indent=2)}")
# 보안 상태 조회
security_posture = await controller.get_security_posture()
print(f"Security posture: {json.dumps(security_posture, indent=2)}")
# 실행
# asyncio.run(demonstrate_zero_trust_network())
5. 고급 모니터링 및 옵저버빌리티
5.1 분산 추적과 메트릭 수집
# advanced-monitoring-stack.yaml
# Prometheus Operator 설정
apiVersion: monitoring.coreos.com/v1
kind: Prometheus
metadata:
name: enterprise-prometheus
namespace: monitoring
spec:
replicas: 2
retention: 30d
storage:
volumeClaimTemplate:
spec:
storageClassName: fast-ssd
resources:
requests:
storage: 500Gi
serviceAccountName: prometheus
serviceMonitorSelector:
matchLabels:
monitoring: enabled
ruleSelector:
matchLabels:
prometheus: enterprise
resources:
requests:
memory: 8Gi
cpu: 4
limits:
memory: 16Gi
cpu: 8
additionalScrapeConfigs:
name: additional-scrape-configs
key: prometheus-additional.yaml
remoteWrite:
- url: "https://prometheus-remote-write.company.com/api/v1/write"
basicAuth:
username:
name: prometheus-remote-write-auth
key: username
password:
name: prometheus-remote-write-auth
key: password
writeRelabelConfigs:
- sourceLabels: [__name__]
regex: 'kubernetes_pod_.*|kubernetes_node_.*'
action: drop
thanos:
image: thanosio/thanos:v0.32.0
objectStorageConfig:
name: thanos-objstore-config
key: thanos.yaml
---
# Thanos 설정
apiVersion: v1
kind: Secret
metadata:
name: thanos-objstore-config
namespace: monitoring
stringData:
thanos.yaml: |
type: S3
config:
bucket: "prometheus-thanos-storage"
endpoint: "s3.amazonaws.com"
region: "us-east-1"
access_key: ""
secret_key: ""
insecure: false
signature_version2: false
encrypt_sse: false
put_user_metadata: {}
http_config:
idle_conn_timeout: 90s
response_header_timeout: 2m
trace:
enable: false
part_size: 134217728
---
# Grafana 설정
apiVersion: apps/v1
kind: Deployment
metadata:
name: grafana
namespace: monitoring
spec:
replicas: 2
selector:
matchLabels:
app: grafana
template:
metadata:
labels:
app: grafana
spec:
serviceAccountName: grafana
securityContext:
runAsNonRoot: true
runAsUser: 65534
fsGroup: 65534
containers:
- name: grafana
image: grafana/grafana:10.0.0
ports:
- containerPort: 3000
env:
- name: GF_SECURITY_ADMIN_PASSWORD
valueFrom:
secretKeyRef:
name: grafana-credentials
key: admin-password
- name: GF_AUTH_GENERIC_OAUTH_ENABLED
value: "true"
- name: GF_AUTH_GENERIC_OAUTH_NAME
value: "Company SSO"
- name: GF_AUTH_GENERIC_OAUTH_CLIENT_ID
valueFrom:
secretKeyRef:
name: grafana-oauth
key: client-id
- name: GF_AUTH_GENERIC_OAUTH_CLIENT_SECRET
valueFrom:
secretKeyRef:
name: grafana-oauth
key: client-secret
- name: GF_AUTH_GENERIC_OAUTH_AUTH_URL
value: "https://auth.company.com/oauth2/authorize"
- name: GF_AUTH_GENERIC_OAUTH_TOKEN_URL
value: "https://auth.company.com/oauth2/token"
- name: GF_AUTH_GENERIC_OAUTH_API_URL
value: "https://auth.company.com/oauth2/userinfo"
volumeMounts:
- name: grafana-storage
mountPath: /var/lib/grafana
- name: grafana-config
mountPath: /etc/grafana
resources:
requests:
memory: 1Gi
cpu: 500m
limits:
memory: 2Gi
cpu: 1
volumes:
- name: grafana-storage
persistentVolumeClaim:
claimName: grafana-storage
- name: grafana-config
configMap:
name: grafana-config
---
# Jaeger 분산 추적
apiVersion: jaegertracing.io/v1
kind: Jaeger
metadata:
name: enterprise-jaeger
namespace: observability
spec:
strategy: production
storage:
type: elasticsearch
elasticsearch:
nodeCount: 3
redundancyPolicy: SingleRedundancy
storage:
storageClassName: fast-ssd
size: 200Gi
resources:
requests:
memory: 4Gi
cpu: 1
limits:
memory: 8Gi
cpu: 2
collector:
maxReplicas: 10
resources:
requests:
memory: 1Gi
cpu: 500m
limits:
memory: 2Gi
cpu: 1
query:
replicas: 3
resources:
requests:
memory: 512Mi
cpu: 250m
limits:
memory: 1Gi
cpu: 500m
---
# OpenTelemetry Collector
apiVersion: opentelemetry.io/v1alpha1
kind: OpenTelemetryCollector
metadata:
name: enterprise-otel-collector
namespace: observability
spec:
mode: daemonset
config: |
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
prometheus:
config:
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'kubernetes-pods'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
- source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
action: replace
regex: ([^:]+)(?::\d+)?;(\d+)
replacement: $1:$2
target_label: __address__
jaeger:
protocols:
grpc:
endpoint: 0.0.0.0:14250
thrift_http:
endpoint: 0.0.0.0:14268
zipkin:
endpoint: 0.0.0.0:9411
processors:
batch:
timeout: 1s
send_batch_size: 1024
memory_limiter:
limit_mib: 512
resource:
attributes:
- key: cluster.name
value: "production-cluster"
action: upsert
- key: environment
value: "production"
action: upsert
metricstransform:
transforms:
- include: ".*"
match_type: regexp
action: update
new_name: "k8s_${1}"
exporters:
prometheus:
endpoint: "0.0.0.0:8889"
namespace: "otel"
jaeger:
endpoint: "http://enterprise-jaeger-collector.observability:14250"
tls:
insecure: true
elasticsearch:
endpoints: ["https://elasticsearch.observability:9200"]
index: "otel-traces-{2006.01.02}"
tls:
ca_file: /etc/ssl/certs/ca-bundle.crt
otlp:
endpoint: "https://otlp-gateway.company.com:4317"
headers:
authorization: "Bearer ${OTEL_AUTH_TOKEN}"
extensions:
health_check:
endpoint: 0.0.0.0:13133
pprof:
endpoint: 0.0.0.0:1777
zpages:
endpoint: 0.0.0.0:55679
service:
extensions: [health_check, pprof, zpages]
pipelines:
traces:
receivers: [otlp, jaeger, zipkin]
processors: [memory_limiter, resource, batch]
exporters: [jaeger, elasticsearch, otlp]
metrics:
receivers: [otlp, prometheus]
processors: [memory_limiter, resource, metricstransform, batch]
exporters: [prometheus, otlp]
logs:
receivers: [otlp]
processors: [memory_limiter, resource, batch]
exporters: [elasticsearch, otlp]
---
# AlertManager 설정
apiVersion: monitoring.coreos.com/v1
kind: Alertmanager
metadata:
name: enterprise-alertmanager
namespace: monitoring
spec:
replicas: 3
storage:
volumeClaimTemplate:
spec:
storageClassName: fast-ssd
resources:
requests:
storage: 10Gi
configSecret: alertmanager-config
resources:
requests:
memory: 512Mi
cpu: 250m
limits:
memory: 1Gi
cpu: 500m
---
apiVersion: v1
kind: Secret
metadata:
name: alertmanager-config
namespace: monitoring
stringData:
alertmanager.yml: |
global:
smtp_smarthost: 'smtp.company.com:587'
smtp_from: 'alerts@company.com'
slack_api_url: 'https://hooks.slack.com/services/...'
route:
group_by: ['cluster', 'service', 'severity']
group_wait: 30s
group_interval: 5m
repeat_interval: 24h
receiver: 'default-receiver'
routes:
- match:
severity: critical
receiver: 'critical-alerts'
group_wait: 10s
group_interval: 1m
repeat_interval: 1h
- match:
severity: warning
receiver: 'warning-alerts'
group_wait: 1m
group_interval: 10m
repeat_interval: 24h
- match:
alertname: DeadMansSwitch
receiver: 'deadmansswitch'
repeat_interval: 5m
inhibit_rules:
- source_match:
severity: 'critical'
target_match:
severity: 'warning'
equal: ['cluster', 'service']
receivers:
- name: 'default-receiver'
slack_configs:
- channel: '#alerts'
title: 'Kubernetes Alert'
text: '{{ range .Alerts }}{{ .Annotations.description }}{{ end }}'
- name: 'critical-alerts'
slack_configs:
- channel: '#critical-alerts'
title: 'CRITICAL: {{ .GroupLabels.service }}'
text: '{{ range .Alerts }}{{ .Annotations.description }}{{ end }}'
actions:
- type: button
text: 'Runbook :green_book:'
url: '{{ (index .Alerts 0).Annotations.runbook_url }}'
pagerduty_configs:
- routing_key: 'YOUR_PAGERDUTY_KEY'
description: 'Critical alert in {{ .GroupLabels.cluster }}'
email_configs:
- to: 'oncall@company.com'
subject: 'CRITICAL: {{ .GroupLabels.service }} Alert'
body: |
{{ range .Alerts }}
Alert: {{ .Annotations.summary }}
Description: {{ .Annotations.description }}
Graph: {{ .GeneratorURL }}
{{ end }}
- name: 'warning-alerts'
slack_configs:
- channel: '#alerts'
title: 'Warning: {{ .GroupLabels.service }}'
text: '{{ range .Alerts }}{{ .Annotations.description }}{{ end }}'
- name: 'deadmansswitch'
slack_configs:
- channel: '#monitoring'
title: 'DeadMansSwitch'
text: 'Alerting pipeline is working'
---
# 커스텀 PrometheusRule
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: kubernetes-enterprise-rules
namespace: monitoring
labels:
prometheus: enterprise
spec:
groups:
- name: kubernetes.resources
rules:
- alert: KubernetesPodCrashLooping
expr: rate(kube_pod_container_status_restarts_total[5m]) * 60 * 5 > 0
for: 5m
labels:
severity: warning
annotations:
summary: "Pod crash looping"
description: "Pod {{ $labels.namespace }}/{{ $labels.pod }} is crash looping"
runbook_url: "https://runbooks.company.com/kubernetes/pod-crash-looping"
- alert: KubernetesNodeNotReady
expr: kube_node_status_condition{condition="Ready",status="true"} == 0
for: 5m
labels:
severity: critical
annotations:
summary: "Node not ready"
description: "Node {{ $labels.node }} has been not ready for more than 5 minutes"
runbook_url: "https://runbooks.company.com/kubernetes/node-not-ready"
- alert: KubernetesMemoryPressure
expr: kube_node_status_condition{condition="MemoryPressure",status="true"} == 1
for: 2m
labels:
severity: warning
annotations:
summary: "Node memory pressure"
description: "Node {{ $labels.node }} has memory pressure"
runbook_url: "https://runbooks.company.com/kubernetes/memory-pressure"
- alert: KubernetesDiskPressure
expr: kube_node_status_condition{condition="DiskPressure",status="true"} == 1
for: 2m
labels:
severity: warning
annotations:
summary: "Node disk pressure"
description: "Node {{ $labels.node }} has disk pressure"
runbook_url: "https://runbooks.company.com/kubernetes/disk-pressure"
- alert: KubernetesOutOfDisk
expr: kube_node_status_condition{condition="OutOfDisk",status="true"} == 1
for: 2m
labels:
severity: critical
annotations:
summary: "Node out of disk"
description: "Node {{ $labels.node }} is out of disk space"
runbook_url: "https://runbooks.company.com/kubernetes/out-of-disk"
- alert: KubernetesJobFailed
expr: kube_job_status_failed > 0
for: 0m
labels:
severity: warning
annotations:
summary: "Job failed"
description: "Job {{ $labels.namespace }}/{{ $labels.job_name }} failed"
runbook_url: "https://runbooks.company.com/kubernetes/job-failed"
- alert: KubernetesPersistentVolumeClaimPending
expr: kube_persistentvolumeclaim_status_phase{phase="Pending"} == 1
for: 5m
labels:
severity: warning
annotations:
summary: "PVC pending"
description: "PersistentVolumeClaim {{ $labels.namespace }}/{{ $labels.persistentvolumeclaim }} is pending"
runbook_url: "https://runbooks.company.com/kubernetes/pvc-pending"
- name: kubernetes.applications
rules:
- alert: KubernetesDeploymentReplicasMismatch
expr: kube_deployment_spec_replicas != kube_deployment_status_replicas_available
for: 5m
labels:
severity: warning
annotations:
summary: "Deployment replicas mismatch"
description: "Deployment {{ $labels.namespace }}/{{ $labels.deployment }} has {{ $value }} replicas available, expected {{ $labels.spec_replicas }}"
runbook_url: "https://runbooks.company.com/kubernetes/deployment-replicas-mismatch"
- alert: KubernetesStatefulSetReplicasMismatch
expr: kube_statefulset_status_replicas_ready != kube_statefulset_status_replicas
for: 5m
labels:
severity: warning
annotations:
summary: "StatefulSet replicas mismatch"
description: "StatefulSet {{ $labels.namespace }}/{{ $labels.statefulset }} has {{ $value }} ready replicas, expected {{ $labels.replicas }}"
runbook_url: "https://runbooks.company.com/kubernetes/statefulset-replicas-mismatch"
- alert: KubernetesHpaScalingAbility
expr: kube_hpa_status_condition{condition="AbleToScale", status="false"} == 1
for: 5m
labels:
severity: warning
annotations:
summary: "HPA scaling disabled"
description: "HPA {{ $labels.namespace }}/{{ $labels.hpa }} is unable to scale"
runbook_url: "https://runbooks.company.com/kubernetes/hpa-scaling-disabled"
- name: istio.service-mesh
rules:
- alert: IstioHighRequestLatency
expr: histogram_quantile(0.99, sum(rate(istio_request_duration_milliseconds_bucket[1m])) by (destination_service_name, le)) > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "High request latency in service mesh"
description: "Service {{ $labels.destination_service_name }} has 99th percentile latency above 1s"
runbook_url: "https://runbooks.company.com/istio/high-latency"
- alert: IstioHighErrorRate
expr: rate(istio_requests_total{response_code!~"2.."}[1m]) / rate(istio_requests_total[1m]) > 0.05
for: 2m
labels:
severity: critical
annotations:
summary: "High error rate in service mesh"
description: "Service {{ $labels.destination_service_name }} has error rate above 5%"
runbook_url: "https://runbooks.company.com/istio/high-error-rate"
- alert: IstioMTLSDisabled
expr: sum(istio_request_total{security_policy!="mutual_tls"}) by (destination_service_name) > 0
for: 0m
labels:
severity: warning
annotations:
summary: "mTLS disabled for service"
description: "Service {{ $labels.destination_service_name }} is not using mutual TLS"
runbook_url: "https://runbooks.company.com/istio/mtls-disabled"
---
# ServiceMonitor 예제
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: application-metrics
namespace: monitoring
labels:
monitoring: enabled
spec:
selector:
matchLabels:
app.kubernetes.io/name: web-application
endpoints:
- port: metrics
path: /metrics
interval: 30s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- production-workloads
- staging-workloads
---
# PodMonitor for lower-level monitoring
apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
name: istio-proxy-metrics
namespace: monitoring
labels:
monitoring: enabled
spec:
selector:
matchLabels:
app: istio-proxy
podMetricsEndpoints:
- port: http-monitoring
path: /stats/prometheus
interval: 30s
마무리
컨테이너 오케스트레이션의 고도화는 단순한 기술적 진화가 아닌, 현대 엔터프라이즈 IT 인프라의 핵심 역량 강화를 의미합니다. 2026년 현재, Kubernetes는 단순한 컨테이너 스케줄러를 넘어서 클라우드 네이티브 플랫폼의 기반이 되었습니다.
성공적인 엔터프라이즈 Kubernetes 도입을 위한 핵심 요소:
- 아키텍처 설계: 비즈니스 요구사항에 맞는 클러스터 토폴로지와 노드 구성
- 보안 강화: Zero Trust 원칙에 기반한 네트워크 보안과 정책 기반 거버넌스
- 운영 자동화: 클러스터 라이프사이클 관리와 워크로드 스케줄링 최적화
- 모니터링: 종합적인 옵저버빌리티를 통한 성능 최적화와 문제 예방
특히 멀티 클러스터 환경에서의 관리 복잡성과 보안 요구사항 증가로 인해, 자동화된 운영과 정책 기반 관리가 더욱 중요해졌습니다. 앞으로는 AI/ML 워크로드의 증가, 엣지 컴퓨팅 확산, 그리고 더욱 엄격해지는 규제 환경에 대응할 수 있는 유연하고 견고한 컨테이너 플랫폼이 경쟁 우위를 결정하게 될 것입니다.
계속해서 나머지 섹션을 작성하겠습니다. 다음은 보안과 모니터링 부분입니다.