데이터 메시 아키텍처 구축 가이드 - 2026년 분산 데이터 관리 전략

데이터 메시Data Mesh분산 데이터데이터 거버넌스도메인 기반 설계셀프 서비스데이터 플랫폼

데이터 메시 아키텍처 구축 가이드 - 2026년 분산 데이터 관리 전략

1. 데이터 메시 개념과 필요성

1.1 데이터 메시란 무엇인가?

데이터 메시(Data Mesh)는 도메인 중심의 분산 데이터 아키텍처 패러다임입니다. 중앙 집중식 데이터 레이크나 웨어하우스 대신, 각 비즈니스 도메인이 자체 데이터를 소유하고 관리하는 접근 방식입니다.

# data_mesh_principles.py
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from enum import Enum
import json
from abc import ABC, abstractmethod

class DataMeshPrinciple(Enum):
    DOMAIN_OWNERSHIP = "domain_ownership"
    DATA_AS_PRODUCT = "data_as_product"
    SELF_SERVE_PLATFORM = "self_serve_platform"
    FEDERATED_GOVERNANCE = "federated_governance"

@dataclass
class DataDomain:
    """데이터 도메인 정의"""
    domain_id: str
    name: str
    description: str
    business_area: str
    domain_owner: str
    team_members: List[str]
    data_products: List[str]
    bounded_context: Dict[str, Any]

    def to_dict(self) -> Dict[str, Any]:
        return {
            'domain_id': self.domain_id,
            'name': self.name,
            'description': self.description,
            'business_area': self.business_area,
            'domain_owner': self.domain_owner,
            'team_members': self.team_members,
            'data_products': self.data_products,
            'bounded_context': self.bounded_context
        }

@dataclass
class DataProduct:
    """데이터 프로덕트 정의"""
    product_id: str
    name: str
    description: str
    domain_id: str
    data_steward: str
    sla_requirements: Dict[str, Any]
    quality_metrics: Dict[str, Any]
    schema_definition: Dict[str, Any]
    access_patterns: List[str]
    discovery_metadata: Dict[str, Any]

class DataMeshArchitecture:
    """데이터 메시 아키텍처 관리"""

    def __init__(self):
        self.domains = {}
        self.data_products = {}
        self.governance_policies = {}
        self.platform_services = {}

    def register_domain(self, domain: DataDomain) -> str:
        """도메인 등록"""
        self.domains[domain.domain_id] = domain

        # 도메인별 초기 설정
        self._setup_domain_infrastructure(domain)

        return domain.domain_id

    def register_data_product(self, product: DataProduct) -> str:
        """데이터 프로덕트 등록"""
        # 도메인 존재 확인
        if product.domain_id not in self.domains:
            raise ValueError(f"Domain {product.domain_id} not found")

        self.data_products[product.product_id] = product

        # 데이터 프로덕트 자동 설정
        self._setup_data_product_infrastructure(product)

        return product.product_id

    def _setup_domain_infrastructure(self, domain: DataDomain):
        """도메인별 인프라 설정"""
        infrastructure = {
            'data_storage': f"s3://datamesh-{domain.domain_id}/",
            'compute_resources': {
                'spark_cluster': f"spark-{domain.domain_id}",
                'kubernetes_namespace': f"datamesh-{domain.domain_id}"
            },
            'security': {
                'iam_role': f"DataMeshDomain-{domain.domain_id}",
                'encryption_key': f"datamesh-{domain.domain_id}-key"
            },
            'monitoring': {
                'dashboard': f"https://monitoring.datamesh.com/domain/{domain.domain_id}",
                'alerts': f"alerts-{domain.domain_id}"
            }
        }

        self.platform_services[domain.domain_id] = infrastructure

    def _setup_data_product_infrastructure(self, product: DataProduct):
        """데이터 프로덕트별 인프라 설정"""
        product_infrastructure = {
            'api_endpoint': f"https://api.datamesh.com/products/{product.product_id}",
            'data_catalog_entry': {
                'schema_registry': f"schema-{product.product_id}",
                'lineage_tracking': True,
                'quality_monitoring': True
            },
            'access_control': {
                'consumer_groups': [],
                'rate_limits': product.sla_requirements.get('rate_limit', '1000/min')
            }
        }

        if product.product_id not in self.platform_services:
            self.platform_services[product.product_id] = {}

        self.platform_services[product.product_id].update(product_infrastructure)

    def discover_data_products(self, search_criteria: Dict[str, Any]) -> List[DataProduct]:
        """데이터 프로덕트 디스커버리"""
        results = []

        for product in self.data_products.values():
            if self._matches_criteria(product, search_criteria):
                results.append(product)

        return results

    def _matches_criteria(self, product: DataProduct, criteria: Dict[str, Any]) -> bool:
        """검색 조건 매칭"""
        for key, value in criteria.items():
            if key == 'domain':
                if product.domain_id != value:
                    return False
            elif key == 'tags':
                product_tags = product.discovery_metadata.get('tags', [])
                if not any(tag in product_tags for tag in value):
                    return False
            elif key == 'business_area':
                domain = self.domains.get(product.domain_id)
                if domain and domain.business_area != value:
                    return False
            # 추가 검색 조건들...

        return True

    def get_mesh_topology(self) -> Dict[str, Any]:
        """데이터 메시 토폴로지 시각화"""
        topology = {
            'domains': [],
            'data_products': [],
            'relationships': []
        }

        for domain in self.domains.values():
            topology['domains'].append({
                'id': domain.domain_id,
                'name': domain.name,
                'business_area': domain.business_area,
                'product_count': len([p for p in self.data_products.values()
                                    if p.domain_id == domain.domain_id])
            })

        for product in self.data_products.values():
            topology['data_products'].append({
                'id': product.product_id,
                'name': product.name,
                'domain_id': product.domain_id,
                'consumers': len(product.access_patterns)
            })

        # 데이터 프로덕트 간 관계 분석
        # (실제로는 데이터 리니지 시스템에서 가져옴)

        return topology

# 사용 예제
def setup_sample_data_mesh():
    """샘플 데이터 메시 설정"""
    mesh = DataMeshArchitecture()

    # 도메인 생성
    customer_domain = DataDomain(
        domain_id="customer",
        name="Customer Domain",
        description="Customer data and analytics",
        business_area="Customer Experience",
        domain_owner="alice@company.com",
        team_members=["alice@company.com", "bob@company.com"],
        data_products=["customer-profile", "customer-behavior"],
        bounded_context={
            "entities": ["Customer", "CustomerProfile", "CustomerPreference"],
            "events": ["CustomerRegistered", "ProfileUpdated", "PreferenceChanged"]
        }
    )

    order_domain = DataDomain(
        domain_id="order",
        name="Order Domain",
        description="Order processing and fulfillment",
        business_area="Operations",
        domain_owner="charlie@company.com",
        team_members=["charlie@company.com", "diana@company.com"],
        data_products=["order-events", "fulfillment-metrics"],
        bounded_context={
            "entities": ["Order", "OrderItem", "Shipment"],
            "events": ["OrderPlaced", "OrderShipped", "OrderDelivered"]
        }
    )

    mesh.register_domain(customer_domain)
    mesh.register_domain(order_domain)

    # 데이터 프로덕트 생성
    customer_profile_product = DataProduct(
        product_id="customer-profile",
        name="Customer Profile Data Product",
        description="Comprehensive customer profile information",
        domain_id="customer",
        data_steward="alice@company.com",
        sla_requirements={
            "availability": "99.9%",
            "latency": "< 100ms",
            "rate_limit": "10000/min"
        },
        quality_metrics={
            "completeness": "> 95%",
            "accuracy": "> 99%",
            "freshness": "< 1 hour"
        },
        schema_definition={
            "format": "avro",
            "version": "1.0",
            "fields": [
                {"name": "customer_id", "type": "string"},
                {"name": "email", "type": "string"},
                {"name": "demographics", "type": "record"},
                {"name": "preferences", "type": "record"}
            ]
        },
        access_patterns=["real-time-api", "batch-export", "stream"],
        discovery_metadata={
            "tags": ["customer", "profile", "pii"],
            "classification": "confidential",
            "business_glossary": ["customer", "demographic", "preference"]
        }
    )

    order_events_product = DataProduct(
        product_id="order-events",
        name="Order Events Stream",
        description="Real-time order events for analytics",
        domain_id="order",
        data_steward="charlie@company.com",
        sla_requirements={
            "availability": "99.99%",
            "latency": "< 10ms",
            "rate_limit": "50000/min"
        },
        quality_metrics={
            "completeness": "> 99%",
            "accuracy": "> 99.9%",
            "freshness": "< 1 second"
        },
        schema_definition={
            "format": "json",
            "version": "2.0",
            "fields": [
                {"name": "order_id", "type": "string"},
                {"name": "customer_id", "type": "string"},
                {"name": "event_type", "type": "enum"},
                {"name": "timestamp", "type": "datetime"},
                {"name": "event_data", "type": "record"}
            ]
        },
        access_patterns=["kafka-stream", "webhook", "batch-export"],
        discovery_metadata={
            "tags": ["order", "events", "real-time"],
            "classification": "internal",
            "business_glossary": ["order", "transaction", "fulfillment"]
        }
    )

    mesh.register_data_product(customer_profile_product)
    mesh.register_data_product(order_events_product)

    return mesh

# 데이터 메시 설정 및 토폴로지 확인
mesh = setup_sample_data_mesh()
topology = mesh.get_mesh_topology()
print("Data Mesh Topology:")
print(json.dumps(topology, indent=2))

1.2 전통적 아키텍처 vs 데이터 메시

구분전통적 아키텍처데이터 메시
데이터 소유권중앙 데이터 팀도메인 팀
확장성수직 확장수평 확장
거버넌스중앙 집중식연합형
기술 선택표준화도메인별 최적화
변화 속도느림빠름

2. 셀프 서비스 데이터 플랫폼 구축

2.1 플랫폼 아키텍처 설계

# self_serve_platform.py
from typing import Dict, List, Any, Optional, Protocol
from dataclasses import dataclass
import asyncio
import kubernetes
from kubernetes import client, config
import yaml
import json
import time
from abc import ABC, abstractmethod

class ResourceProvisioner(Protocol):
    """리소스 프로비저닝 인터페이스"""
    async def provision(self, request: Dict[str, Any]) -> Dict[str, Any]:
        ...

    async def deprovision(self, resource_id: str) -> bool:
        ...

@dataclass
class PlatformService:
    """플랫폼 서비스 정의"""
    service_id: str
    name: str
    description: str
    category: str  # compute, storage, analytics, governance
    config_template: Dict[str, Any]
    resource_requirements: Dict[str, Any]
    dependencies: List[str]

@dataclass
class ProvisioningRequest:
    """리소스 프로비저닝 요청"""
    request_id: str
    domain_id: str
    service_type: str
    configuration: Dict[str, Any]
    requester: str
    approval_required: bool = False

class SelfServeDataPlatform:
    """셀프 서비스 데이터 플랫폼"""

    def __init__(self):
        self.service_catalog = {}
        self.provisioners = {}
        self.active_resources = {}
        self.approval_workflows = {}

        # Kubernetes 클라이언트 설정
        try:
            config.load_incluster_config()  # 클러스터 내부에서 실행 시
        except:
            config.load_kube_config()  # 로컬 개발 환경

        self.k8s_apps_v1 = client.AppsV1Api()
        self.k8s_core_v1 = client.CoreV1Api()

        self._initialize_service_catalog()
        self._initialize_provisioners()

    def _initialize_service_catalog(self):
        """서비스 카탈로그 초기화"""

        # 데이터 스토리지 서비스
        self.service_catalog['object-storage'] = PlatformService(
            service_id='object-storage',
            name='Object Storage',
            description='S3-compatible object storage with automatic backup',
            category='storage',
            config_template={
                'bucket_name': '${domain_id}-data',
                'retention_days': 365,
                'encryption': True,
                'backup_enabled': True,
                'access_logging': True
            },
            resource_requirements={
                'storage_gb': '1000',
                'iops': '3000'
            },
            dependencies=[]
        )

        # 스파크 컴퓨팅 클러스터
        self.service_catalog['spark-cluster'] = PlatformService(
            service_id='spark-cluster',
            name='Spark Computing Cluster',
            description='Auto-scaling Spark cluster for data processing',
            category='compute',
            config_template={
                'cluster_name': '${domain_id}-spark',
                'driver_cores': 2,
                'driver_memory': '4g',
                'executor_cores': 2,
                'executor_memory': '4g',
                'max_executors': 10,
                'auto_scaling': True
            },
            resource_requirements={
                'cpu_cores': '20',
                'memory_gb': '80'
            },
            dependencies=['object-storage']
        )

        # 스트리밍 처리 서비스
        self.service_catalog['kafka-cluster'] = PlatformService(
            service_id='kafka-cluster',
            name='Kafka Streaming Platform',
            description='Managed Kafka cluster for real-time data streaming',
            category='analytics',
            config_template={
                'cluster_name': '${domain_id}-kafka',
                'partitions': 12,
                'replication_factor': 3,
                'retention_hours': 168,  # 7 days
                'compression_type': 'snappy',
                'security_protocol': 'SSL'
            },
            resource_requirements={
                'cpu_cores': '12',
                'memory_gb': '24',
                'storage_gb': '500'
            },
            dependencies=[]
        )

        # 데이터 카탈로그 서비스
        self.service_catalog['data-catalog'] = PlatformService(
            service_id='data-catalog',
            name='Data Catalog & Discovery',
            description='Automated data catalog with schema registry',
            category='governance',
            config_template={
                'catalog_name': '${domain_id}-catalog',
                'auto_discovery': True,
                'schema_evolution': True,
                'lineage_tracking': True,
                'data_profiling': True
            },
            resource_requirements={
                'cpu_cores': '4',
                'memory_gb': '8'
            },
            dependencies=['object-storage']
        )

        # API 게이트웨이
        self.service_catalog['api-gateway'] = PlatformService(
            service_id='api-gateway',
            name='Data API Gateway',
            description='API gateway for data product access',
            category='governance',
            config_template={
                'gateway_name': '${domain_id}-api',
                'rate_limiting': True,
                'authentication': 'oauth2',
                'monitoring': True,
                'caching': True
            },
            resource_requirements={
                'cpu_cores': '2',
                'memory_gb': '4'
            },
            dependencies=[]
        )

    def _initialize_provisioners(self):
        """리소스 프로비저너 초기화"""
        self.provisioners['object-storage'] = S3StorageProvisioner()
        self.provisioners['spark-cluster'] = SparkClusterProvisioner()
        self.provisioners['kafka-cluster'] = KafkaClusterProvisioner()
        self.provisioners['data-catalog'] = DataCatalogProvisioner()
        self.provisioners['api-gateway'] = APIGatewayProvisioner()

    async def request_service(self, request: ProvisioningRequest) -> str:
        """서비스 프로비저닝 요청"""

        # 서비스 존재 확인
        if request.service_type not in self.service_catalog:
            raise ValueError(f"Unknown service type: {request.service_type}")

        service = self.service_catalog[request.service_type]

        # 승인이 필요한 경우
        if request.approval_required or self._requires_approval(request):
            approval_id = await self._submit_for_approval(request)
            return f"approval:{approval_id}"

        # 즉시 프로비저닝
        return await self._provision_service(request, service)

    async def _provision_service(self, request: ProvisioningRequest,
                                service: PlatformService) -> str:
        """서비스 프로비저닝 실행"""

        try:
            # 의존성 서비스 확인
            await self._ensure_dependencies(request.domain_id, service.dependencies)

            # 설정 값 처리
            config = self._process_configuration(request, service)

            # 프로비저너를 통한 리소스 생성
            provisioner = self.provisioners[request.service_type]
            resource_info = await provisioner.provision({
                'request_id': request.request_id,
                'domain_id': request.domain_id,
                'configuration': config,
                'resource_requirements': service.resource_requirements
            })

            # 리소스 등록
            resource_id = f"{request.domain_id}-{request.service_type}"
            self.active_resources[resource_id] = {
                'resource_id': resource_id,
                'request_id': request.request_id,
                'service_type': request.service_type,
                'domain_id': request.domain_id,
                'configuration': config,
                'resource_info': resource_info,
                'created_at': time.time(),
                'status': 'active'
            }

            return resource_id

        except Exception as e:
            # 실패한 경우 롤백
            await self._rollback_provisioning(request)
            raise e

    def _process_configuration(self, request: ProvisioningRequest,
                              service: PlatformService) -> Dict[str, Any]:
        """설정 값 처리 및 변수 치환"""
        config = service.config_template.copy()

        # 템플릿 변수 치환
        for key, value in config.items():
            if isinstance(value, str) and '${' in value:
                value = value.replace('${domain_id}', request.domain_id)
                config[key] = value

        # 사용자 제공 설정으로 오버라이드
        config.update(request.configuration)

        return config

    async def _ensure_dependencies(self, domain_id: str, dependencies: List[str]):
        """의존성 서비스 확인 및 프로비저닝"""
        for dep_service in dependencies:
            dep_resource_id = f"{domain_id}-{dep_service}"

            if dep_resource_id not in self.active_resources:
                # 의존성 서비스 자동 프로비저닝
                dep_request = ProvisioningRequest(
                    request_id=f"auto-{dep_resource_id}",
                    domain_id=domain_id,
                    service_type=dep_service,
                    configuration={},
                    requester="system",
                    approval_required=False
                )

                await self._provision_service(dep_request, self.service_catalog[dep_service])

    def _requires_approval(self, request: ProvisioningRequest) -> bool:
        """승인 필요 여부 판단"""
        service = self.service_catalog[request.service_type]

        # 리소스 요구사항 기반 승인 필요 여부
        cpu_cores = int(service.resource_requirements.get('cpu_cores', '0'))
        memory_gb = int(service.resource_requirements.get('memory_gb', '0'))

        # 대용량 리소스 요청 시 승인 필요
        if cpu_cores > 50 or memory_gb > 100:
            return True

        # 거버넌스 카테고리는 승인 필요
        if service.category == 'governance':
            return True

        return False

    async def _submit_for_approval(self, request: ProvisioningRequest) -> str:
        """승인 워크플로우 제출"""
        approval_id = f"approval-{request.request_id}"

        self.approval_workflows[approval_id] = {
            'approval_id': approval_id,
            'request': request,
            'status': 'pending',
            'submitted_at': time.time(),
            'approvers': ['platform-admin@company.com'],
            'approval_notes': []
        }

        # 실제로는 승인 시스템(Jira, ServiceNow 등)에 티켓 생성
        print(f"Approval request {approval_id} submitted for {request.service_type}")

        return approval_id

    async def approve_request(self, approval_id: str, approver: str,
                            approved: bool, notes: str = "") -> Optional[str]:
        """요청 승인/거절"""
        if approval_id not in self.approval_workflows:
            raise ValueError(f"Approval {approval_id} not found")

        workflow = self.approval_workflows[approval_id]

        if approved:
            workflow['status'] = 'approved'
            workflow['approved_by'] = approver
            workflow['approved_at'] = time.time()

            # 승인된 요청 프로비저닝
            request = workflow['request']
            service = self.service_catalog[request.service_type]
            resource_id = await self._provision_service(request, service)

            return resource_id
        else:
            workflow['status'] = 'rejected'
            workflow['rejected_by'] = approver
            workflow['rejection_reason'] = notes

            return None

    async def get_domain_resources(self, domain_id: str) -> List[Dict[str, Any]]:
        """도메인별 리소스 목록 조회"""
        resources = []

        for resource in self.active_resources.values():
            if resource['domain_id'] == domain_id:
                resources.append(resource)

        return resources

    async def get_service_catalog(self) -> Dict[str, PlatformService]:
        """서비스 카탈로그 조회"""
        return self.service_catalog.copy()

    async def monitor_resources(self, domain_id: str) -> Dict[str, Any]:
        """리소스 모니터링 정보"""
        domain_resources = await self.get_domain_resources(domain_id)

        monitoring_data = {
            'domain_id': domain_id,
            'total_resources': len(domain_resources),
            'resource_health': {},
            'cost_estimate': 0.0,
            'usage_metrics': {}
        }

        for resource in domain_resources:
            service_type = resource['service_type']

            # 각 프로비저너로부터 모니터링 정보 수집
            if service_type in self.provisioners:
                provisioner = self.provisioners[service_type]
                if hasattr(provisioner, 'get_monitoring_info'):
                    monitoring_info = await provisioner.get_monitoring_info(resource['resource_id'])
                    monitoring_data['resource_health'][resource['resource_id']] = monitoring_info

        return monitoring_data

# 구체적인 프로비저너 구현 예제
class S3StorageProvisioner:
    """S3 스토리지 프로비저너"""

    async def provision(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """S3 버킷 및 관련 리소스 프로비저닝"""
        config = request['configuration']
        domain_id = request['domain_id']

        # Terraform 또는 AWS CDK를 통한 인프라 프로비저닝
        terraform_config = {
            'bucket_name': config['bucket_name'],
            'versioning_enabled': True,
            'encryption_enabled': config['encryption'],
            'lifecycle_rules': [
                {
                    'id': 'archive_old_data',
                    'transition_days': 30,
                    'storage_class': 'GLACIER'
                }
            ],
            'access_logging': {
                'enabled': config['access_logging'],
                'target_bucket': f"{config['bucket_name']}-logs"
            }
        }

        # 실제 프로비저닝 로직 (AWS API 호출)
        provisioned_resources = {
            'bucket_arn': f"arn:aws:s3:::{config['bucket_name']}",
            'bucket_url': f"s3://{config['bucket_name']}/",
            'access_key_id': f"AKIA{domain_id.upper()}EXAMPLE",
            'policy_arn': f"arn:aws:iam::123456789012:policy/{domain_id}-s3-policy"
        }

        return provisioned_resources

    async def deprovision(self, resource_id: str) -> bool:
        """리소스 제거"""
        # 실제로는 Terraform destroy 또는 AWS API 호출
        print(f"Deprovisioning S3 storage: {resource_id}")
        return True

class SparkClusterProvisioner:
    """Spark 클러스터 프로비저너"""

    async def provision(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Kubernetes 기반 Spark 클러스터 프로비저닝"""
        config = request['configuration']
        domain_id = request['domain_id']

        # Kubernetes 리소스 정의
        spark_operator_config = {
            'apiVersion': 'sparkoperator.k8s.io/v1beta2',
            'kind': 'SparkApplication',
            'metadata': {
                'name': config['cluster_name'],
                'namespace': f"datamesh-{domain_id}"
            },
            'spec': {
                'type': 'Scala',
                'mode': 'cluster',
                'image': 'gcr.io/spark-operator/spark:v3.1.1',
                'driver': {
                    'cores': config['driver_cores'],
                    'coreLimit': f"{config['driver_cores']}",
                    'memory': config['driver_memory'],
                    'serviceAccount': f"spark-{domain_id}"
                },
                'executor': {
                    'cores': config['executor_cores'],
                    'coreLimit': f"{config['executor_cores']}",
                    'memory': config['executor_memory'],
                    'instances': 2,
                    'serviceAccount': f"spark-{domain_id}"
                },
                'dynamicAllocation': {
                    'enabled': config['auto_scaling'],
                    'maxExecutors': config['max_executors']
                }
            }
        }

        # 네임스페이스 생성
        await self._create_namespace(f"datamesh-{domain_id}")

        # ServiceAccount 생성
        await self._create_service_account(f"spark-{domain_id}", f"datamesh-{domain_id}")

        return {
            'cluster_name': config['cluster_name'],
            'namespace': f"datamesh-{domain_id}",
            'spark_ui_url': f"https://spark-ui.datamesh.com/{domain_id}/",
            'history_server_url': f"https://spark-history.datamesh.com/{domain_id}/",
            'service_account': f"spark-{domain_id}"
        }

    async def _create_namespace(self, namespace: str):
        """Kubernetes 네임스페이스 생성"""
        k8s_core_v1 = client.CoreV1Api()

        namespace_manifest = client.V1Namespace(
            metadata=client.V1ObjectMeta(name=namespace)
        )

        try:
            k8s_core_v1.create_namespace(body=namespace_manifest)
        except client.rest.ApiException as e:
            if e.status != 409:  # 이미 존재하는 경우 무시
                raise

    async def _create_service_account(self, service_account: str, namespace: str):
        """Kubernetes ServiceAccount 생성"""
        k8s_core_v1 = client.CoreV1Api()

        sa_manifest = client.V1ServiceAccount(
            metadata=client.V1ObjectMeta(
                name=service_account,
                namespace=namespace
            )
        )

        try:
            k8s_core_v1.create_namespaced_service_account(
                namespace=namespace,
                body=sa_manifest
            )
        except client.rest.ApiException as e:
            if e.status != 409:  # 이미 존재하는 경우 무시
                raise

class KafkaClusterProvisioner:
    """Kafka 클러스터 프로비저너"""

    async def provision(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Strimzi Kafka Operator를 통한 Kafka 클러스터 프로비저닝"""
        config = request['configuration']
        domain_id = request['domain_id']

        kafka_cluster_config = {
            'apiVersion': 'kafka.strimzi.io/v1beta2',
            'kind': 'Kafka',
            'metadata': {
                'name': config['cluster_name'],
                'namespace': f"datamesh-{domain_id}"
            },
            'spec': {
                'kafka': {
                    'version': '3.0.0',
                    'replicas': config['replication_factor'],
                    'listeners': [
                        {
                            'name': 'plain',
                            'port': 9092,
                            'type': 'internal',
                            'tls': False
                        },
                        {
                            'name': 'tls',
                            'port': 9093,
                            'type': 'internal',
                            'tls': True
                        }
                    ],
                    'config': {
                        'offsets.topic.replication.factor': config['replication_factor'],
                        'transaction.state.log.replication.factor': config['replication_factor'],
                        'transaction.state.log.min.isr': 2,
                        'default.replication.factor': config['replication_factor'],
                        'min.insync.replicas': 2,
                        'inter.broker.protocol.version': '3.0',
                        'log.retention.hours': config['retention_hours'],
                        'compression.type': config['compression_type']
                    },
                    'storage': {
                        'type': 'persistent-claim',
                        'size': '100Gi',
                        'class': 'fast-ssd'
                    }
                },
                'zookeeper': {
                    'replicas': 3,
                    'storage': {
                        'type': 'persistent-claim',
                        'size': '10Gi',
                        'class': 'fast-ssd'
                    }
                },
                'entityOperator': {
                    'topicOperator': {},
                    'userOperator': {}
                }
            }
        }

        return {
            'cluster_name': config['cluster_name'],
            'bootstrap_servers': f"{config['cluster_name']}-kafka-bootstrap.{domain_id}:9092",
            'schema_registry_url': f"https://schema-registry.{domain_id}.datamesh.com",
            'kafka_ui_url': f"https://kafka-ui.{domain_id}.datamesh.com"
        }

class DataCatalogProvisioner:
    """데이터 카탈로그 프로비저너"""

    async def provision(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Apache Atlas 기반 데이터 카탈로그 프로비저닝"""
        config = request['configuration']
        domain_id = request['domain_id']

        # Helm을 통한 Apache Atlas 배포
        atlas_values = {
            'atlas': {
                'domain': domain_id,
                'elasticsearch': {
                    'enabled': True,
                    'storage': '50Gi'
                },
                'kafka': {
                    'enabled': True,
                    'external_kafka': f"{domain_id}-kafka"
                },
                'hbase': {
                    'enabled': True,
                    'storage': '100Gi'
                }
            },
            'ingress': {
                'enabled': True,
                'hostname': f"catalog.{domain_id}.datamesh.com"
            }
        }

        return {
            'catalog_url': f"https://catalog.{domain_id}.datamesh.com",
            'api_endpoint': f"https://catalog.{domain_id}.datamesh.com/api/atlas",
            'schema_registry_url': f"https://catalog.{domain_id}.datamesh.com/schema",
            'lineage_ui_url': f"https://catalog.{domain_id}.datamesh.com/lineage"
        }

class APIGatewayProvisioner:
    """API 게이트웨이 프로비저너"""

    async def provision(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Kong API Gateway 프로비저닝"""
        config = request['configuration']
        domain_id = request['domain_id']

        kong_config = {
            'apiVersion': 'configuration.konghq.com/v1',
            'kind': 'KongIngress',
            'metadata': {
                'name': config['gateway_name'],
                'namespace': f"datamesh-{domain_id}"
            },
            'proxy': {
                'path': f"/{domain_id}/api/",
                'connect_timeout': 10000,
                'read_timeout': 10000,
                'send_timeout': 10000
            },
            'route': {
                'methods': ['GET', 'POST', 'PUT', 'DELETE'],
                'protocols': ['https'],
                'strip_path': True
            }
        }

        return {
            'gateway_url': f"https://api.datamesh.com/{domain_id}/",
            'admin_ui_url': f"https://kong-admin.{domain_id}.datamesh.com",
            'rate_limit': config.get('rate_limit', '1000/minute'),
            'api_key_endpoint': f"https://api.datamesh.com/{domain_id}/auth/key"
        }

# 사용 예제
async def demonstrate_self_serve_platform():
    """셀프 서비스 플랫폼 데모"""
    platform = SelfServeDataPlatform()

    # 서비스 카탈로그 조회
    catalog = await platform.get_service_catalog()
    print("Available Services:")
    for service_id, service in catalog.items():
        print(f"- {service.name}: {service.description}")

    # 스파크 클러스터 요청
    spark_request = ProvisioningRequest(
        request_id="req-001",
        domain_id="customer",
        service_type="spark-cluster",
        configuration={
            'max_executors': 20,  # 기본값보다 높게 설정
            'driver_memory': '8g'
        },
        requester="alice@company.com"
    )

    resource_id = await platform.request_service(spark_request)
    print(f"Spark cluster provisioned: {resource_id}")

    # 도메인 리소스 조회
    customer_resources = await platform.get_domain_resources("customer")
    print(f"Customer domain has {len(customer_resources)} active resources")

    # 모니터링 정보 조회
    monitoring = await platform.monitor_resources("customer")
    print(f"Domain monitoring: {json.dumps(monitoring, indent=2)}")

# 실행
# asyncio.run(demonstrate_self_serve_platform())

3. 연합형 거버넌스 구현

3.1 거버넌스 프레임워크 설계

# federated_governance.py
from dataclasses import dataclass
from typing import Dict, List, Any, Optional, Set
from enum import Enum
import json
import time
from abc import ABC, abstractmethod
import hashlib

class GovernanceLevel(Enum):
    GLOBAL = "global"
    DOMAIN = "domain"
    PRODUCT = "product"

class PolicyType(Enum):
    DATA_CLASSIFICATION = "data_classification"
    ACCESS_CONTROL = "access_control"
    DATA_QUALITY = "data_quality"
    RETENTION = "retention"
    PRIVACY = "privacy"
    COMPLIANCE = "compliance"

@dataclass
class GovernancePolicy:
    """거버넌스 정책 정의"""
    policy_id: str
    name: str
    description: str
    policy_type: PolicyType
    level: GovernanceLevel
    scope: Dict[str, Any]  # domain, product, or global scope
    rules: List[Dict[str, Any]]
    enforcement_level: str  # "advisory", "warning", "blocking"
    owner: str
    created_at: float
    version: str

@dataclass
class PolicyViolation:
    """정책 위반 사례"""
    violation_id: str
    policy_id: str
    resource_id: str
    violation_type: str
    description: str
    severity: str
    detected_at: float
    resolved: bool = False

class PolicyEngine:
    """정책 엔진 인터페이스"""

    @abstractmethod
    def evaluate_policy(self, policy: GovernancePolicy,
                       context: Dict[str, Any]) -> List[PolicyViolation]:
        """정책 평가"""
        pass

class FederatedGovernanceFramework:
    """연합형 거버넌스 프레임워크"""

    def __init__(self):
        self.global_policies = {}
        self.domain_policies = defaultdict(dict)
        self.product_policies = defaultdict(dict)
        self.policy_engines = {}
        self.violations = []
        self.governance_roles = {}
        self.compliance_frameworks = {}

        self._initialize_policy_engines()
        self._setup_default_policies()

    def _initialize_policy_engines(self):
        """정책 엔진 초기화"""
        self.policy_engines[PolicyType.DATA_CLASSIFICATION] = DataClassificationEngine()
        self.policy_engines[PolicyType.ACCESS_CONTROL] = AccessControlEngine()
        self.policy_engines[PolicyType.DATA_QUALITY] = DataQualityEngine()
        self.policy_engines[PolicyType.RETENTION] = RetentionEngine()
        self.policy_engines[PolicyType.PRIVACY] = PrivacyEngine()
        self.policy_engines[PolicyType.COMPLIANCE] = ComplianceEngine()

    def _setup_default_policies(self):
        """기본 정책 설정"""

        # 글로벌 데이터 분류 정책
        data_classification_policy = GovernancePolicy(
            policy_id="global-data-classification",
            name="Global Data Classification Policy",
            description="Organization-wide data classification standards",
            policy_type=PolicyType.DATA_CLASSIFICATION,
            level=GovernanceLevel.GLOBAL,
            scope={"organization": "all"},
            rules=[
                {
                    "rule_id": "pii-classification",
                    "description": "Automatically classify PII data",
                    "conditions": {
                        "field_patterns": ["email", "phone", "ssn", "credit_card"],
                        "data_patterns": ["\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}\\b"]
                    },
                    "action": "classify_as_pii"
                },
                {
                    "rule_id": "financial-data",
                    "description": "Classify financial data",
                    "conditions": {
                        "field_patterns": ["account_number", "routing_number", "balance"],
                        "table_patterns": ["payment", "transaction", "account"]
                    },
                    "action": "classify_as_financial"
                }
            ],
            enforcement_level="blocking",
            owner="chief-data-officer@company.com",
            created_at=time.time(),
            version="1.0"
        )

        # 글로벌 접근 제어 정책
        access_control_policy = GovernancePolicy(
            policy_id="global-access-control",
            name="Global Access Control Policy",
            description="Organization-wide access control rules",
            policy_type=PolicyType.ACCESS_CONTROL,
            level=GovernanceLevel.GLOBAL,
            scope={"organization": "all"},
            rules=[
                {
                    "rule_id": "pii-access-restriction",
                    "description": "Restrict PII data access",
                    "conditions": {
                        "data_classification": "pii",
                        "user_role": "!data_protection_officer"
                    },
                    "action": "deny_access"
                },
                {
                    "rule_id": "cross-domain-access",
                    "description": "Cross-domain data access requires approval",
                    "conditions": {
                        "source_domain": "ne:target_domain"
                    },
                    "action": "require_approval"
                }
            ],
            enforcement_level="blocking",
            owner="security-team@company.com",
            created_at=time.time(),
            version="1.0"
        )

        # 글로벌 데이터 품질 정책
        data_quality_policy = GovernancePolicy(
            policy_id="global-data-quality",
            name="Global Data Quality Standards",
            description="Minimum data quality requirements",
            policy_type=PolicyType.DATA_QUALITY,
            level=GovernanceLevel.GLOBAL,
            scope={"organization": "all"},
            rules=[
                {
                    "rule_id": "completeness-threshold",
                    "description": "Data completeness must exceed 95%",
                    "conditions": {
                        "metric": "completeness",
                        "threshold": 0.95,
                        "operator": "gte"
                    },
                    "action": "flag_quality_issue"
                },
                {
                    "rule_id": "freshness-requirement",
                    "description": "Real-time data must be less than 1 hour old",
                    "conditions": {
                        "data_type": "real_time",
                        "age_hours": 1,
                        "operator": "gt"
                    },
                    "action": "flag_freshness_issue"
                }
            ],
            enforcement_level="warning",
            owner="data-quality-team@company.com",
            created_at=time.time(),
            version="1.0"
        )

        # 정책 등록
        self.register_policy(data_classification_policy)
        self.register_policy(access_control_policy)
        self.register_policy(data_quality_policy)

    def register_policy(self, policy: GovernancePolicy) -> str:
        """정책 등록"""

        if policy.level == GovernanceLevel.GLOBAL:
            self.global_policies[policy.policy_id] = policy
        elif policy.level == GovernanceLevel.DOMAIN:
            domain_id = policy.scope.get('domain_id')
            if not domain_id:
                raise ValueError("Domain policy must specify domain_id in scope")
            self.domain_policies[domain_id][policy.policy_id] = policy
        elif policy.level == GovernanceLevel.PRODUCT:
            product_id = policy.scope.get('product_id')
            if not product_id:
                raise ValueError("Product policy must specify product_id in scope")
            self.product_policies[product_id][policy.policy_id] = policy

        return policy.policy_id

    def evaluate_compliance(self, resource_id: str, context: Dict[str, Any]) -> Dict[str, Any]:
        """리소스의 정책 준수 평가"""

        domain_id = context.get('domain_id')
        product_id = context.get('product_id')

        applicable_policies = []

        # 글로벌 정책
        applicable_policies.extend(self.global_policies.values())

        # 도메인 정책
        if domain_id and domain_id in self.domain_policies:
            applicable_policies.extend(self.domain_policies[domain_id].values())

        # 프로덕트 정책
        if product_id and product_id in self.product_policies:
            applicable_policies.extend(self.product_policies[product_id].values())

        violations = []
        policy_results = {}

        for policy in applicable_policies:
            engine = self.policy_engines.get(policy.policy_type)
            if engine:
                policy_violations = engine.evaluate_policy(policy, context)
                violations.extend(policy_violations)

                policy_results[policy.policy_id] = {
                    'policy_name': policy.name,
                    'compliance_status': 'compliant' if not policy_violations else 'violated',
                    'violations': len(policy_violations),
                    'enforcement_level': policy.enforcement_level
                }

        # 위반 사항 저장
        self.violations.extend(violations)

        # 전체 컴플라이언스 점수 계산
        total_policies = len(applicable_policies)
        compliant_policies = sum(1 for result in policy_results.values()
                               if result['compliance_status'] == 'compliant')

        compliance_score = (compliant_policies / total_policies) if total_policies > 0 else 1.0

        return {
            'resource_id': resource_id,
            'compliance_score': compliance_score,
            'total_policies_evaluated': total_policies,
            'policy_results': policy_results,
            'violations': [v.__dict__ for v in violations],
            'enforcement_actions': self._determine_enforcement_actions(violations)
        }

    def _determine_enforcement_actions(self, violations: List[PolicyViolation]) -> List[str]:
        """정책 위반에 대한 집행 조치 결정"""
        actions = []

        blocking_violations = [v for v in violations
                             if self._get_policy_enforcement_level(v.policy_id) == "blocking"]
        warning_violations = [v for v in violations
                            if self._get_policy_enforcement_level(v.policy_id) == "warning"]

        if blocking_violations:
            actions.append("block_access")
            actions.append("notify_governance_team")

        if warning_violations:
            actions.append("log_warning")
            actions.append("notify_domain_owner")

        return actions

    def _get_policy_enforcement_level(self, policy_id: str) -> str:
        """정책의 집행 수준 조회"""
        # 글로벌 정책 확인
        if policy_id in self.global_policies:
            return self.global_policies[policy_id].enforcement_level

        # 도메인 정책 확인
        for domain_policies in self.domain_policies.values():
            if policy_id in domain_policies:
                return domain_policies[policy_id].enforcement_level

        # 프로덕트 정책 확인
        for product_policies in self.product_policies.values():
            if policy_id in product_policies:
                return product_policies[policy_id].enforcement_level

        return "advisory"

    def create_domain_policy(self, domain_id: str, policy_template: str,
                           customizations: Dict[str, Any]) -> GovernancePolicy:
        """도메인별 정책 생성"""

        base_policy = self._get_policy_template(policy_template)

        # 정책 커스터마이징
        custom_policy = GovernancePolicy(
            policy_id=f"{domain_id}-{policy_template}",
            name=f"{domain_id.title()} {base_policy.name}",
            description=f"Domain-specific policy for {domain_id}",
            policy_type=base_policy.policy_type,
            level=GovernanceLevel.DOMAIN,
            scope={"domain_id": domain_id},
            rules=base_policy.rules.copy(),
            enforcement_level=customizations.get('enforcement_level', base_policy.enforcement_level),
            owner=customizations.get('owner', f"domain-owner-{domain_id}@company.com"),
            created_at=time.time(),
            version="1.0"
        )

        # 규칙 커스터마이징 적용
        if 'rule_customizations' in customizations:
            self._apply_rule_customizations(custom_policy, customizations['rule_customizations'])

        self.register_policy(custom_policy)

        return custom_policy

    def _get_policy_template(self, template_name: str) -> GovernancePolicy:
        """정책 템플릿 조회"""
        templates = {
            'access_control': self.global_policies.get('global-access-control'),
            'data_quality': self.global_policies.get('global-data-quality'),
            'data_classification': self.global_policies.get('global-data-classification')
        }

        template = templates.get(template_name)
        if not template:
            raise ValueError(f"Unknown policy template: {template_name}")

        return template

    def _apply_rule_customizations(self, policy: GovernancePolicy,
                                 customizations: Dict[str, Any]):
        """규칙 커스터마이징 적용"""
        for rule in policy.rules:
            rule_id = rule['rule_id']
            if rule_id in customizations:
                rule_customization = customizations[rule_id]

                # 조건 업데이트
                if 'conditions' in rule_customization:
                    rule['conditions'].update(rule_customization['conditions'])

                # 액션 업데이트
                if 'action' in rule_customization:
                    rule['action'] = rule_customization['action']

    def get_governance_dashboard(self, domain_id: Optional[str] = None) -> Dict[str, Any]:
        """거버넌스 대시보드 데이터"""

        # 필터링된 위반 사항
        filtered_violations = self.violations
        if domain_id:
            # 도메인별 필터링 로직 (실제로는 리소스-도메인 매핑이 필요)
            pass

        # 위반 사항 통계
        violation_stats = {
            'total_violations': len(filtered_violations),
            'open_violations': len([v for v in filtered_violations if not v.resolved]),
            'violations_by_severity': {
                'critical': len([v for v in filtered_violations if v.severity == 'critical']),
                'high': len([v for v in filtered_violations if v.severity == 'high']),
                'medium': len([v for v in filtered_violations if v.severity == 'medium']),
                'low': len([v for v in filtered_violations if v.severity == 'low'])
            },
            'violations_by_type': {}
        }

        # 정책 통계
        policy_stats = {
            'total_policies': len(self.global_policies) +
                            sum(len(dp) for dp in self.domain_policies.values()) +
                            sum(len(pp) for pp in self.product_policies.values()),
            'global_policies': len(self.global_policies),
            'domain_policies': sum(len(dp) for dp in self.domain_policies.values()),
            'product_policies': sum(len(pp) for pp in self.product_policies.values())
        }

        return {
            'domain_id': domain_id,
            'generated_at': time.time(),
            'violation_stats': violation_stats,
            'policy_stats': policy_stats,
            'compliance_trends': self._calculate_compliance_trends(domain_id),
            'top_violations': self._get_top_violations(filtered_violations),
            'policy_effectiveness': self._calculate_policy_effectiveness()
        }

    def _calculate_compliance_trends(self, domain_id: Optional[str]) -> Dict[str, Any]:
        """컴플라이언스 트렌드 분석"""
        # 실제로는 시계열 데이터를 분석
        return {
            'weekly_compliance_scores': [0.85, 0.87, 0.89, 0.91, 0.88],
            'trend_direction': 'improving',
            'key_improvements': ['data_quality', 'access_control'],
            'areas_of_concern': ['data_retention']
        }

    def _get_top_violations(self, violations: List[PolicyViolation]) -> List[Dict[str, Any]]:
        """상위 위반 사항 조회"""
        open_violations = [v for v in violations if not v.resolved]
        sorted_violations = sorted(open_violations,
                                 key=lambda x: (x.severity, x.detected_at), reverse=True)

        return [v.__dict__ for v in sorted_violations[:10]]

    def _calculate_policy_effectiveness(self) -> Dict[str, Any]:
        """정책 효과성 분석"""
        # 실제로는 정책별 위반 추세를 분석
        return {
            'most_effective_policies': ['global-data-classification', 'domain-access-control'],
            'least_effective_policies': ['global-retention'],
            'policy_coverage': 0.92,
            'enforcement_rate': 0.87
        }

# 구체적인 정책 엔진 구현
class DataClassificationEngine(PolicyEngine):
    """데이터 분류 정책 엔진"""

    def evaluate_policy(self, policy: GovernancePolicy,
                       context: Dict[str, Any]) -> List[PolicyViolation]:
        violations = []

        schema = context.get('schema', {})
        data_sample = context.get('data_sample', {})

        for rule in policy.rules:
            if rule['rule_id'] == 'pii-classification':
                violations.extend(self._check_pii_classification(policy, rule, schema, data_sample))
            elif rule['rule_id'] == 'financial-data':
                violations.extend(self._check_financial_classification(policy, rule, schema, data_sample))

        return violations

    def _check_pii_classification(self, policy: GovernancePolicy, rule: Dict[str, Any],
                                schema: Dict[str, Any], data_sample: Dict[str, Any]) -> List[PolicyViolation]:
        violations = []

        field_patterns = rule['conditions']['field_patterns']
        data_patterns = rule['conditions']['data_patterns']

        # 필드명 패턴 검사
        for field_name in schema.get('fields', []):
            if any(pattern in field_name.lower() for pattern in field_patterns):
                current_classification = schema.get('classification', {}).get(field_name)
                if current_classification != 'pii':
                    violations.append(PolicyViolation(
                        violation_id=f"pii-{field_name}-{int(time.time())}",
                        policy_id=policy.policy_id,
                        resource_id=context.get('resource_id', 'unknown'),
                        violation_type='missing_pii_classification',
                        description=f"Field '{field_name}' appears to contain PII but is not classified as such",
                        severity='high',
                        detected_at=time.time()
                    ))

        return violations

    def _check_financial_classification(self, policy: GovernancePolicy, rule: Dict[str, Any],
                                      schema: Dict[str, Any], data_sample: Dict[str, Any]) -> List[PolicyViolation]:
        violations = []

        field_patterns = rule['conditions']['field_patterns']
        table_patterns = rule['conditions']['table_patterns']

        table_name = context.get('table_name', '').lower()

        # 테이블명 패턴 검사
        if any(pattern in table_name for pattern in table_patterns):
            current_classification = schema.get('table_classification')
            if current_classification != 'financial':
                violations.append(PolicyViolation(
                    violation_id=f"financial-table-{int(time.time())}",
                    policy_id=policy.policy_id,
                    resource_id=context.get('resource_id', 'unknown'),
                    violation_type='missing_financial_classification',
                    description=f"Table '{table_name}' appears to contain financial data but is not classified as such",
                    severity='high',
                    detected_at=time.time()
                ))

        return violations

class AccessControlEngine(PolicyEngine):
    """접근 제어 정책 엔진"""

    def evaluate_policy(self, policy: GovernancePolicy,
                       context: Dict[str, Any]) -> List[PolicyViolation]:
        violations = []

        user_context = context.get('user_context', {})
        access_request = context.get('access_request', {})

        for rule in policy.rules:
            if rule['rule_id'] == 'pii-access-restriction':
                violations.extend(self._check_pii_access(policy, rule, user_context, access_request))
            elif rule['rule_id'] == 'cross-domain-access':
                violations.extend(self._check_cross_domain_access(policy, rule, user_context, access_request))

        return violations

    def _check_pii_access(self, policy: GovernancePolicy, rule: Dict[str, Any],
                         user_context: Dict[str, Any], access_request: Dict[str, Any]) -> List[PolicyViolation]:
        violations = []

        data_classification = access_request.get('data_classification')
        user_role = user_context.get('role')

        if data_classification == 'pii' and user_role != 'data_protection_officer':
            violations.append(PolicyViolation(
                violation_id=f"pii-access-{user_context.get('user_id', 'unknown')}-{int(time.time())}",
                policy_id=policy.policy_id,
                resource_id=access_request.get('resource_id', 'unknown'),
                violation_type='unauthorized_pii_access',
                description=f"User with role '{user_role}' attempted to access PII data",
                severity='critical',
                detected_at=time.time()
            ))

        return violations

    def _check_cross_domain_access(self, policy: GovernancePolicy, rule: Dict[str, Any],
                                 user_context: Dict[str, Any], access_request: Dict[str, Any]) -> List[PolicyViolation]:
        violations = []

        source_domain = user_context.get('domain')
        target_domain = access_request.get('domain')
        has_approval = access_request.get('has_approval', False)

        if source_domain != target_domain and not has_approval:
            violations.append(PolicyViolation(
                violation_id=f"cross-domain-{source_domain}-{target_domain}-{int(time.time())}",
                policy_id=policy.policy_id,
                resource_id=access_request.get('resource_id', 'unknown'),
                violation_type='unapproved_cross_domain_access',
                description=f"Cross-domain access from '{source_domain}' to '{target_domain}' without approval",
                severity='medium',
                detected_at=time.time()
            ))

        return violations

class DataQualityEngine(PolicyEngine):
    """데이터 품질 정책 엔진"""

    def evaluate_policy(self, policy: GovernancePolicy,
                       context: Dict[str, Any]) -> List[PolicyViolation]:
        violations = []

        quality_metrics = context.get('quality_metrics', {})

        for rule in policy.rules:
            if rule['rule_id'] == 'completeness-threshold':
                violations.extend(self._check_completeness(policy, rule, quality_metrics))
            elif rule['rule_id'] == 'freshness-requirement':
                violations.extend(self._check_freshness(policy, rule, context))

        return violations

    def _check_completeness(self, policy: GovernancePolicy, rule: Dict[str, Any],
                          quality_metrics: Dict[str, Any]) -> List[PolicyViolation]:
        violations = []

        completeness = quality_metrics.get('completeness', 0)
        threshold = rule['conditions']['threshold']

        if completeness < threshold:
            violations.append(PolicyViolation(
                violation_id=f"completeness-{int(time.time())}",
                policy_id=policy.policy_id,
                resource_id=context.get('resource_id', 'unknown'),
                violation_type='low_data_completeness',
                description=f"Data completeness ({completeness:.2%}) below threshold ({threshold:.2%})",
                severity='medium',
                detected_at=time.time()
            ))

        return violations

    def _check_freshness(self, policy: GovernancePolicy, rule: Dict[str, Any],
                        context: Dict[str, Any]) -> List[PolicyViolation]:
        violations = []

        data_type = context.get('data_type')
        data_age_hours = context.get('data_age_hours', 0)
        threshold_hours = rule['conditions']['age_hours']

        if data_type == 'real_time' and data_age_hours > threshold_hours:
            violations.append(PolicyViolation(
                violation_id=f"freshness-{int(time.time())}",
                policy_id=policy.policy_id,
                resource_id=context.get('resource_id', 'unknown'),
                violation_type='stale_data',
                description=f"Real-time data is {data_age_hours:.1f} hours old (threshold: {threshold_hours} hours)",
                severity='high',
                detected_at=time.time()
            ))

        return violations

class RetentionEngine(PolicyEngine):
    """데이터 보존 정책 엔진"""

    def evaluate_policy(self, policy: GovernancePolicy,
                       context: Dict[str, Any]) -> List[PolicyViolation]:
        # 데이터 보존 정책 평가 로직
        return []

class PrivacyEngine(PolicyEngine):
    """프라이버시 정책 엔진"""

    def evaluate_policy(self, policy: GovernancePolicy,
                       context: Dict[str, Any]) -> List[PolicyViolation]:
        # 프라이버시 정책 평가 로직 (GDPR, CCPA 등)
        return []

class ComplianceEngine(PolicyEngine):
    """컴플라이언스 정책 엔진"""

    def evaluate_policy(self, policy: GovernancePolicy,
                       context: Dict[str, Any]) -> List[PolicyViolation]:
        # 규제 컴플라이언스 정책 평가 로직
        return []

# 사용 예제
def demonstrate_federated_governance():
    """연합형 거버넌스 데모"""
    governance = FederatedGovernanceFramework()

    # 도메인별 정책 생성
    customer_access_policy = governance.create_domain_policy(
        domain_id="customer",
        policy_template="access_control",
        customizations={
            'enforcement_level': 'blocking',
            'owner': 'customer-domain-owner@company.com',
            'rule_customizations': {
                'cross-domain-access': {
                    'conditions': {
                        'requires_manager_approval': True
                    }
                }
            }
        }
    )

    print(f"Created domain policy: {customer_access_policy.policy_id}")

    # 컴플라이언스 평가
    context = {
        'domain_id': 'customer',
        'product_id': 'customer-profile',
        'resource_id': 'customer-profile-db',
        'schema': {
            'fields': ['customer_id', 'email', 'phone_number', 'address'],
            'classification': {
                'customer_id': 'identifier',
                'email': 'pii',  # 올바르게 분류됨
                'phone_number': 'identifier',  # PII로 분류되지 않아 위반
                'address': 'pii'
            }
        },
        'user_context': {
            'user_id': 'john.doe@company.com',
            'role': 'data_analyst',
            'domain': 'customer'
        },
        'access_request': {
            'resource_id': 'customer-profile-db',
            'data_classification': 'pii',
            'domain': 'customer',
            'has_approval': False
        },
        'quality_metrics': {
            'completeness': 0.92,  # 임계값 미달
            'accuracy': 0.98
        },
        'data_type': 'real_time',
        'data_age_hours': 2.5  # 신선도 문제
    }

    compliance_result = governance.evaluate_compliance('customer-profile-db', context)

    print("\nCompliance Evaluation Result:")
    print(json.dumps(compliance_result, indent=2))

    # 거버넌스 대시보드
    dashboard = governance.get_governance_dashboard(domain_id="customer")
    print("\nGovernance Dashboard:")
    print(json.dumps(dashboard, indent=2))

# 실행
# demonstrate_federated_governance()

4. 데이터 프로덕트 관리

4.1 데이터 프로덕트 라이프사이클

# data_product_management.py
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from enum import Enum
import time
import json
import uuid
from abc import ABC, abstractmethod

class ProductStage(Enum):
    IDEATION = "ideation"
    DEVELOPMENT = "development"
    TESTING = "testing"
    PRODUCTION = "production"
    DEPRECATED = "deprecated"

class ProductType(Enum):
    ANALYTICAL = "analytical"
    OPERATIONAL = "operational"
    REFERENCE = "reference"
    DERIVED = "derived"

@dataclass
class DataProductMetrics:
    """데이터 프로덕트 메트릭"""
    usage_count_daily: int = 0
    unique_consumers: int = 0
    average_query_time_ms: float = 0.0
    data_freshness_score: float = 1.0
    quality_score: float = 1.0
    availability_percentage: float = 100.0
    sla_compliance_rate: float = 1.0

@dataclass
class DataProductContract:
    """데이터 프로덕트 계약"""
    contract_id: str
    consumer_id: str
    producer_id: str
    sla_terms: Dict[str, Any]
    usage_limits: Dict[str, Any]
    data_access_patterns: List[str]
    pricing_model: Optional[Dict[str, Any]] = None
    created_at: float = field(default_factory=time.time)
    expires_at: Optional[float] = None

@dataclass
class DataProductVersion:
    """데이터 프로덕트 버전"""
    version_id: str
    version_number: str
    schema_version: str
    breaking_changes: bool
    changelog: List[Dict[str, Any]]
    backward_compatible: bool
    created_at: float

class DataProductLifecycleManager:
    """데이터 프로덕트 라이프사이클 관리"""

    def __init__(self):
        self.products = {}
        self.product_versions = defaultdict(list)
        self.contracts = {}
        self.metrics_history = defaultdict(list)
        self.lifecycle_policies = {}

        self._setup_default_policies()

    def _setup_default_policies(self):
        """기본 라이프사이클 정책 설정"""
        self.lifecycle_policies = {
            'version_retention': {
                'max_versions': 10,
                'min_versions': 3,
                'retention_days': 365
            },
            'deprecation': {
                'notice_period_days': 90,
                'grace_period_days': 30,
                'auto_deprecate_unused_days': 180
            },
            'sla_enforcement': {
                'availability_threshold': 99.0,
                'performance_threshold_ms': 1000,
                'quality_threshold': 0.95
            }
        }

    def create_data_product(self, product_definition: DataProduct) -> str:
        """데이터 프로덕트 생성"""

        # 초기 검증
        self._validate_product_definition(product_definition)

        # 프로덕트 등록
        self.products[product_definition.product_id] = product_definition

        # 초기 버전 생성
        initial_version = DataProductVersion(
            version_id=f"{product_definition.product_id}-v1.0.0",
            version_number="1.0.0",
            schema_version="1.0.0",
            breaking_changes=False,
            changelog=[{
                'type': 'creation',
                'description': 'Initial product version',
                'timestamp': time.time()
            }],
            backward_compatible=True,
            created_at=time.time()
        )

        self.product_versions[product_definition.product_id].append(initial_version)

        # 초기 인프라 설정
        self._setup_product_infrastructure(product_definition)

        return product_definition.product_id

    def _validate_product_definition(self, product: DataProduct):
        """프로덕트 정의 검증"""
        required_fields = ['product_id', 'name', 'domain_id', 'data_steward']

        for field in required_fields:
            if not getattr(product, field, None):
                raise ValueError(f"Required field '{field}' is missing")

        # 스키마 검증
        if not product.schema_definition:
            raise ValueError("Schema definition is required")

        # SLA 검증
        required_sla_fields = ['availability', 'latency']
        for field in required_sla_fields:
            if field not in product.sla_requirements:
                raise ValueError(f"SLA requirement '{field}' is missing")

    def _setup_product_infrastructure(self, product: DataProduct):
        """프로덕트 인프라 설정"""

        infrastructure_config = {
            'api_endpoints': {
                'rest_api': f"https://api.datamesh.com/products/{product.product_id}/data",
                'graphql_api': f"https://api.datamesh.com/products/{product.product_id}/graphql",
                'streaming_api': f"wss://stream.datamesh.com/products/{product.product_id}"
            },
            'monitoring': {
                'metrics_endpoint': f"https://monitoring.datamesh.com/products/{product.product_id}",
                'health_check': f"https://api.datamesh.com/products/{product.product_id}/health",
                'dashboard_url': f"https://dashboard.datamesh.com/products/{product.product_id}"
            },
            'documentation': {
                'api_docs': f"https://docs.datamesh.com/products/{product.product_id}/api",
                'schema_docs': f"https://docs.datamesh.com/products/{product.product_id}/schema",
                'usage_guide': f"https://docs.datamesh.com/products/{product.product_id}/guide"
            },
            'security': {
                'access_policies': f"datamesh:product:{product.product_id}:*",
                'encryption': 'AES-256',
                'authentication': 'OAuth2'
            }
        }

        # 인프라 설정 저장
        product.infrastructure_config = infrastructure_config

    def evolve_product_schema(self, product_id: str, new_schema: Dict[str, Any],
                             breaking_changes: bool = False) -> str:
        """스키마 진화"""

        if product_id not in self.products:
            raise ValueError(f"Product {product_id} not found")

        product = self.products[product_id]
        current_versions = self.product_versions[product_id]
        latest_version = max(current_versions, key=lambda v: v.created_at)

        # 새 버전 번호 생성
        current_version_parts = latest_version.version_number.split('.')
        major, minor, patch = map(int, current_version_parts)

        if breaking_changes:
            new_version = f"{major + 1}.0.0"
        else:
            # 스키마 변경 유형에 따라 minor 또는 patch 증가
            schema_changes = self._analyze_schema_changes(product.schema_definition, new_schema)
            if schema_changes['has_new_fields']:
                new_version = f"{major}.{minor + 1}.0"
            else:
                new_version = f"{major}.{minor}.{patch + 1}"

        # 새 버전 생성
        new_version_obj = DataProductVersion(
            version_id=f"{product_id}-v{new_version}",
            version_number=new_version,
            schema_version=new_version,
            breaking_changes=breaking_changes,
            changelog=self._generate_schema_changelog(product.schema_definition, new_schema),
            backward_compatible=not breaking_changes,
            created_at=time.time()
        )

        self.product_versions[product_id].append(new_version_obj)

        # 프로덕트 스키마 업데이트
        product.schema_definition = new_schema

        # 버전 정리 (정책에 따라)
        self._cleanup_old_versions(product_id)

        # 컨슈머 알림
        if breaking_changes:
            self._notify_breaking_change(product_id, new_version_obj)

        return new_version_obj.version_id

    def _analyze_schema_changes(self, old_schema: Dict[str, Any],
                              new_schema: Dict[str, Any]) -> Dict[str, bool]:
        """스키마 변경 분석"""
        changes = {
            'has_new_fields': False,
            'has_removed_fields': False,
            'has_type_changes': False,
            'has_constraint_changes': False
        }

        old_fields = set(old_schema.get('fields', {}).keys())
        new_fields = set(new_schema.get('fields', {}).keys())

        changes['has_new_fields'] = bool(new_fields - old_fields)
        changes['has_removed_fields'] = bool(old_fields - new_fields)

        # 공통 필드의 타입 변경 검사
        common_fields = old_fields & new_fields
        for field in common_fields:
            old_field_type = old_schema['fields'][field].get('type')
            new_field_type = new_schema['fields'][field].get('type')
            if old_field_type != new_field_type:
                changes['has_type_changes'] = True
                break

        return changes

    def _generate_schema_changelog(self, old_schema: Dict[str, Any],
                                 new_schema: Dict[str, Any]) -> List[Dict[str, Any]]:
        """스키마 변경 로그 생성"""
        changelog = []

        old_fields = set(old_schema.get('fields', {}).keys())
        new_fields = set(new_schema.get('fields', {}).keys())

        # 추가된 필드
        for field in new_fields - old_fields:
            changelog.append({
                'type': 'field_added',
                'description': f"Added new field: {field}",
                'field_name': field,
                'timestamp': time.time()
            })

        # 제거된 필드
        for field in old_fields - new_fields:
            changelog.append({
                'type': 'field_removed',
                'description': f"Removed field: {field}",
                'field_name': field,
                'timestamp': time.time()
            })

        # 타입 변경
        common_fields = old_fields & new_fields
        for field in common_fields:
            old_type = old_schema['fields'][field].get('type')
            new_type = new_schema['fields'][field].get('type')
            if old_type != new_type:
                changelog.append({
                    'type': 'type_changed',
                    'description': f"Field {field} type changed from {old_type} to {new_type}",
                    'field_name': field,
                    'old_type': old_type,
                    'new_type': new_type,
                    'timestamp': time.time()
                })

        return changelog

    def _cleanup_old_versions(self, product_id: str):
        """오래된 버전 정리"""
        versions = self.product_versions[product_id]
        policy = self.lifecycle_policies['version_retention']

        if len(versions) > policy['max_versions']:
            # 오래된 버전부터 정리 (최소 버전은 유지)
            sorted_versions = sorted(versions, key=lambda v: v.created_at)
            versions_to_remove = sorted_versions[:-policy['min_versions']]

            for version in versions_to_remove:
                # 사용 중인 버전은 제거하지 않음
                if not self._is_version_in_use(product_id, version.version_id):
                    versions.remove(version)

    def _is_version_in_use(self, product_id: str, version_id: str) -> bool:
        """버전 사용 여부 확인"""
        # 활성 계약에서 특정 버전을 사용하는지 확인
        for contract in self.contracts.values():
            if (contract.producer_id == product_id and
                contract.sla_terms.get('version') == version_id):
                return True
        return False

    def _notify_breaking_change(self, product_id: str, new_version: DataProductVersion):
        """브레이킹 체인지 알림"""
        affected_contracts = [
            contract for contract in self.contracts.values()
            if contract.producer_id == product_id
        ]

        for contract in affected_contracts:
            notification = {
                'type': 'breaking_change_notice',
                'product_id': product_id,
                'new_version': new_version.version_number,
                'consumer_id': contract.consumer_id,
                'notice_date': time.time(),
                'deprecation_date': time.time() + (90 * 24 * 3600),  # 90일 후
                'migration_guide': f"https://docs.datamesh.com/products/{product_id}/migration/{new_version.version_number}"
            }

            # 실제로는 이메일, Slack 등으로 알림 발송
            print(f"Breaking change notification sent to {contract.consumer_id}")

    def create_data_contract(self, producer_id: str, consumer_id: str,
                           contract_terms: Dict[str, Any]) -> str:
        """데이터 계약 생성"""

        contract_id = f"contract-{uuid.uuid4().hex[:8]}"

        contract = DataProductContract(
            contract_id=contract_id,
            consumer_id=consumer_id,
            producer_id=producer_id,
            sla_terms=contract_terms.get('sla_terms', {}),
            usage_limits=contract_terms.get('usage_limits', {}),
            data_access_patterns=contract_terms.get('access_patterns', ['api']),
            pricing_model=contract_terms.get('pricing_model'),
            expires_at=contract_terms.get('expires_at')
        )

        self.contracts[contract_id] = contract

        # 계약에 따른 접근 권한 설정
        self._setup_contract_access(contract)

        return contract_id

    def _setup_contract_access(self, contract: DataProductContract):
        """계약 기반 접근 권한 설정"""

        access_config = {
            'consumer_id': contract.consumer_id,
            'producer_id': contract.producer_id,
            'allowed_endpoints': contract.data_access_patterns,
            'rate_limits': contract.usage_limits,
            'sla_monitoring': True,
            'audit_logging': True
        }

        # 실제로는 API 게이트웨이 설정 업데이트
        print(f"Access configured for contract {contract.contract_id}")

    def monitor_product_health(self, product_id: str) -> Dict[str, Any]:
        """프로덕트 헬스 모니터링"""

        if product_id not in self.products:
            raise ValueError(f"Product {product_id} not found")

        # 메트릭 수집 (실제로는 모니터링 시스템에서 가져옴)
        current_metrics = DataProductMetrics(
            usage_count_daily=1250,
            unique_consumers=45,
            average_query_time_ms=85.5,
            data_freshness_score=0.95,
            quality_score=0.98,
            availability_percentage=99.8,
            sla_compliance_rate=0.99
        )

        self.metrics_history[product_id].append({
            'timestamp': time.time(),
            'metrics': current_metrics
        })

        # 헬스 점수 계산
        health_score = self._calculate_health_score(current_metrics)

        # SLA 위반 체크
        sla_violations = self._check_sla_violations(product_id, current_metrics)

        return {
            'product_id': product_id,
            'health_score': health_score,
            'current_metrics': current_metrics.__dict__,
            'sla_violations': sla_violations,
            'recommendations': self._generate_health_recommendations(current_metrics),
            'trend_analysis': self._analyze_metric_trends(product_id)
        }

    def _calculate_health_score(self, metrics: DataProductMetrics) -> float:
        """헬스 점수 계산"""
        weights = {
            'availability': 0.3,
            'performance': 0.2,
            'quality': 0.2,
            'freshness': 0.15,
            'sla_compliance': 0.15
        }

        scores = {
            'availability': min(metrics.availability_percentage / 100.0, 1.0),
            'performance': max(0, 1.0 - (metrics.average_query_time_ms / 1000.0)),  # 1초 기준
            'quality': metrics.quality_score,
            'freshness': metrics.data_freshness_score,
            'sla_compliance': metrics.sla_compliance_rate
        }

        weighted_score = sum(scores[key] * weights[key] for key in scores)
        return round(weighted_score * 100, 2)  # 0-100 점수

    def _check_sla_violations(self, product_id: str,
                             metrics: DataProductMetrics) -> List[Dict[str, Any]]:
        """SLA 위반 검사"""
        violations = []
        policy = self.lifecycle_policies['sla_enforcement']

        if metrics.availability_percentage < policy['availability_threshold']:
            violations.append({
                'type': 'availability_violation',
                'description': f"Availability ({metrics.availability_percentage:.1f}%) below threshold ({policy['availability_threshold']}%)",
                'severity': 'high',
                'current_value': metrics.availability_percentage,
                'threshold': policy['availability_threshold']
            })

        if metrics.average_query_time_ms > policy['performance_threshold_ms']:
            violations.append({
                'type': 'performance_violation',
                'description': f"Average query time ({metrics.average_query_time_ms:.1f}ms) exceeds threshold ({policy['performance_threshold_ms']}ms)",
                'severity': 'medium',
                'current_value': metrics.average_query_time_ms,
                'threshold': policy['performance_threshold_ms']
            })

        if metrics.quality_score < policy['quality_threshold']:
            violations.append({
                'type': 'quality_violation',
                'description': f"Quality score ({metrics.quality_score:.2f}) below threshold ({policy['quality_threshold']})",
                'severity': 'high',
                'current_value': metrics.quality_score,
                'threshold': policy['quality_threshold']
            })

        return violations

    def _generate_health_recommendations(self, metrics: DataProductMetrics) -> List[str]:
        """헬스 개선 권장사항"""
        recommendations = []

        if metrics.availability_percentage < 99.0:
            recommendations.append("Consider implementing redundancy and failover mechanisms")

        if metrics.average_query_time_ms > 500:
            recommendations.append("Optimize query performance or consider caching")

        if metrics.quality_score < 0.95:
            recommendations.append("Review and improve data quality processes")

        if metrics.data_freshness_score < 0.9:
            recommendations.append("Optimize data ingestion pipeline for better freshness")

        if metrics.unique_consumers < 5:
            recommendations.append("Consider improving discoverability and documentation")

        return recommendations

    def _analyze_metric_trends(self, product_id: str) -> Dict[str, Any]:
        """메트릭 트렌드 분석"""
        history = self.metrics_history[product_id]

        if len(history) < 2:
            return {"insufficient_data": True}

        # 최근 7일 데이터 분석
        recent_history = history[-7:] if len(history) >= 7 else history

        trends = {
            'usage_trend': 'stable',
            'performance_trend': 'stable',
            'quality_trend': 'stable'
        }

        # 간단한 트렌드 분석 (실제로는 더 정교한 분석 필요)
        if len(recent_history) >= 2:
            first_usage = recent_history[0]['metrics'].usage_count_daily
            last_usage = recent_history[-1]['metrics'].usage_count_daily

            if last_usage > first_usage * 1.1:
                trends['usage_trend'] = 'increasing'
            elif last_usage < first_usage * 0.9:
                trends['usage_trend'] = 'decreasing'

        return trends

    def get_product_portfolio_dashboard(self, domain_id: Optional[str] = None) -> Dict[str, Any]:
        """프로덕트 포트폴리오 대시보드"""

        # 필터링된 프로덕트 목록
        filtered_products = []
        for product in self.products.values():
            if domain_id is None or product.domain_id == domain_id:
                filtered_products.append(product)

        # 포트폴리오 통계
        portfolio_stats = {
            'total_products': len(filtered_products),
            'products_by_stage': {stage.value: 0 for stage in ProductStage},
            'products_by_type': {ptype.value: 0 for ptype in ProductType},
            'total_consumers': 0,
            'average_health_score': 0.0
        }

        health_scores = []

        for product in filtered_products:
            # 스테이지별 집계
            if hasattr(product, 'stage'):
                portfolio_stats['products_by_stage'][product.stage.value] += 1

            # 타입별 집계
            if hasattr(product, 'product_type'):
                portfolio_stats['products_by_type'][product.product_type.value] += 1

            # 헬스 점수 수집
            try:
                health_data = self.monitor_product_health(product.product_id)
                health_scores.append(health_data['health_score'])
            except:
                pass

        if health_scores:
            portfolio_stats['average_health_score'] = sum(health_scores) / len(health_scores)

        # 총 컨슈머 수 계산
        unique_consumers = set()
        for contract in self.contracts.values():
            if any(p.product_id == contract.producer_id for p in filtered_products):
                unique_consumers.add(contract.consumer_id)
        portfolio_stats['total_consumers'] = len(unique_consumers)

        return {
            'domain_id': domain_id,
            'generated_at': time.time(),
            'portfolio_stats': portfolio_stats,
            'top_products_by_usage': self._get_top_products_by_usage(filtered_products),
            'health_alerts': self._get_health_alerts(filtered_products),
            'contract_summary': self._get_contract_summary(filtered_products)
        }

    def _get_top_products_by_usage(self, products: List[DataProduct]) -> List[Dict[str, Any]]:
        """사용량 기준 상위 프로덕트"""
        product_usage = []

        for product in products:
            if product.product_id in self.metrics_history:
                latest_metrics = self.metrics_history[product.product_id][-1]['metrics']
                product_usage.append({
                    'product_id': product.product_id,
                    'name': product.name,
                    'daily_usage': latest_metrics.usage_count_daily,
                    'unique_consumers': latest_metrics.unique_consumers
                })

        return sorted(product_usage, key=lambda x: x['daily_usage'], reverse=True)[:5]

    def _get_health_alerts(self, products: List[DataProduct]) -> List[Dict[str, Any]]:
        """헬스 알림 목록"""
        alerts = []

        for product in products:
            try:
                health_data = self.monitor_product_health(product.product_id)
                if health_data['sla_violations']:
                    alerts.append({
                        'product_id': product.product_id,
                        'name': product.name,
                        'health_score': health_data['health_score'],
                        'violations': health_data['sla_violations']
                    })
            except:
                continue

        return sorted(alerts, key=lambda x: len(x['violations']), reverse=True)

    def _get_contract_summary(self, products: List[DataProduct]) -> Dict[str, Any]:
        """계약 요약"""
        product_ids = [p.product_id for p in products]
        related_contracts = [
            contract for contract in self.contracts.values()
            if contract.producer_id in product_ids
        ]

        return {
            'total_contracts': len(related_contracts),
            'active_contracts': len([c for c in related_contracts
                                   if c.expires_at is None or c.expires_at > time.time()]),
            'expiring_soon': len([c for c in related_contracts
                                if c.expires_at and c.expires_at - time.time() < 30 * 24 * 3600])
        }

# 사용 예제
def demonstrate_product_management():
    """데이터 프로덕트 관리 데모"""

    manager = DataProductLifecycleManager()

    # 데이터 프로덕트 생성
    customer_profile_product = DataProduct(
        product_id="customer-profile-v2",
        name="Enhanced Customer Profile",
        description="Comprehensive customer profile with ML-driven insights",
        domain_id="customer",
        data_steward="alice@company.com",
        sla_requirements={
            "availability": "99.9%",
            "latency": "< 100ms",
            "freshness": "< 1 hour"
        },
        quality_metrics={
            "completeness": "> 95%",
            "accuracy": "> 99%"
        },
        schema_definition={
            "version": "1.0.0",
            "fields": {
                "customer_id": {"type": "string", "required": True},
                "profile_data": {"type": "object", "required": True},
                "preferences": {"type": "object", "required": False},
                "ml_insights": {"type": "object", "required": False}
            }
        },
        access_patterns=["rest-api", "graphql", "streaming"],
        discovery_metadata={
            "tags": ["customer", "profile", "ml-insights"],
            "business_context": "360-degree customer view"
        }
    )

    product_id = manager.create_data_product(customer_profile_product)
    print(f"Created data product: {product_id}")

    # 스키마 진화
    new_schema = {
        "version": "1.1.0",
        "fields": {
            "customer_id": {"type": "string", "required": True},
            "profile_data": {"type": "object", "required": True},
            "preferences": {"type": "object", "required": False},
            "ml_insights": {"type": "object", "required": False},
            "risk_score": {"type": "number", "required": False}  # 새 필드 추가
        }
    }

    new_version_id = manager.evolve_product_schema(product_id, new_schema, breaking_changes=False)
    print(f"Schema evolved: {new_version_id}")

    # 데이터 계약 생성
    contract_id = manager.create_data_contract(
        producer_id=product_id,
        consumer_id="marketing-analytics-team",
        contract_terms={
            'sla_terms': {
                'availability': 99.5,
                'max_response_time_ms': 200,
                'version': new_version_id
            },
            'usage_limits': {
                'requests_per_hour': 10000,
                'concurrent_connections': 50
            },
            'access_patterns': ['rest-api', 'graphql'],
            'expires_at': time.time() + (365 * 24 * 3600)  # 1년 후 만료
        }
    )
    print(f"Created data contract: {contract_id}")

    # 프로덕트 헬스 모니터링
    health_data = manager.monitor_product_health(product_id)
    print(f"\nProduct Health Score: {health_data['health_score']}")
    print(f"SLA Violations: {len(health_data['sla_violations'])}")

    # 포트폴리오 대시보드
    dashboard = manager.get_product_portfolio_dashboard(domain_id="customer")
    print(f"\nPortfolio Dashboard:")
    print(json.dumps(dashboard, indent=2, default=str))

# 실행
# demonstrate_product_management()

마무리

데이터 메시는 단순한 기술적 변화가 아닌 조직의 데이터 문화를 근본적으로 바꾸는 패러다임 시프트입니다. 2026년 현재, 많은 기업들이 중앙 집중식 데이터 아키텍처의 한계를 인식하고 데이터 메시로 전환하고 있습니다.

성공적인 데이터 메시 구축을 위한 핵심 요소:

  1. 도메인 중심 사고: 비즈니스 도메인을 중심으로 한 데이터 소유권과 책임 분산
  2. 프로덕트 마인드셋: 데이터를 프로덕트로 관리하는 사고 전환
  3. 플랫폼 투자: 도메인 팀이 자율적으로 활용할 수 있는 셀프 서비스 플랫폼
  4. 연합형 거버넌스: 중앙 통제와 도메인 자율성의 균형

데이터 메시는 복잡하고 장기적인 여정이지만, 올바른 접근 방식과 점진적인 구현을 통해 조직의 데이터 역량을 혁신적으로 향상시킬 수 있습니다. 특히 대규모 조직에서는 데이터의 확장성, 품질, 그리고 비즈니스 가치 창출 측면에서 큰 이점을 제공합니다.

앞으로는 AI와 머신러닝의 발전과 함께 더욱 지능화된 데이터 메시 플랫폼이 등장할 것이며, 이는 데이터 기반 의사결정을 더욱 빠르고 정확하게 만들어 줄 것입니다.

궁금한 점이 있으신가요?

문의사항이 있으시면 언제든지 연락주세요.