데이터 메시 아키텍처 구축 가이드 - 2026년 분산 데이터 관리 전략
1. 데이터 메시 개념과 필요성
1.1 데이터 메시란 무엇인가?
데이터 메시(Data Mesh)는 도메인 중심의 분산 데이터 아키텍처 패러다임입니다. 중앙 집중식 데이터 레이크나 웨어하우스 대신, 각 비즈니스 도메인이 자체 데이터를 소유하고 관리하는 접근 방식입니다.
# data_mesh_principles.py
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from enum import Enum
import json
from abc import ABC, abstractmethod
class DataMeshPrinciple(Enum):
DOMAIN_OWNERSHIP = "domain_ownership"
DATA_AS_PRODUCT = "data_as_product"
SELF_SERVE_PLATFORM = "self_serve_platform"
FEDERATED_GOVERNANCE = "federated_governance"
@dataclass
class DataDomain:
"""데이터 도메인 정의"""
domain_id: str
name: str
description: str
business_area: str
domain_owner: str
team_members: List[str]
data_products: List[str]
bounded_context: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
return {
'domain_id': self.domain_id,
'name': self.name,
'description': self.description,
'business_area': self.business_area,
'domain_owner': self.domain_owner,
'team_members': self.team_members,
'data_products': self.data_products,
'bounded_context': self.bounded_context
}
@dataclass
class DataProduct:
"""데이터 프로덕트 정의"""
product_id: str
name: str
description: str
domain_id: str
data_steward: str
sla_requirements: Dict[str, Any]
quality_metrics: Dict[str, Any]
schema_definition: Dict[str, Any]
access_patterns: List[str]
discovery_metadata: Dict[str, Any]
class DataMeshArchitecture:
"""데이터 메시 아키텍처 관리"""
def __init__(self):
self.domains = {}
self.data_products = {}
self.governance_policies = {}
self.platform_services = {}
def register_domain(self, domain: DataDomain) -> str:
"""도메인 등록"""
self.domains[domain.domain_id] = domain
# 도메인별 초기 설정
self._setup_domain_infrastructure(domain)
return domain.domain_id
def register_data_product(self, product: DataProduct) -> str:
"""데이터 프로덕트 등록"""
# 도메인 존재 확인
if product.domain_id not in self.domains:
raise ValueError(f"Domain {product.domain_id} not found")
self.data_products[product.product_id] = product
# 데이터 프로덕트 자동 설정
self._setup_data_product_infrastructure(product)
return product.product_id
def _setup_domain_infrastructure(self, domain: DataDomain):
"""도메인별 인프라 설정"""
infrastructure = {
'data_storage': f"s3://datamesh-{domain.domain_id}/",
'compute_resources': {
'spark_cluster': f"spark-{domain.domain_id}",
'kubernetes_namespace': f"datamesh-{domain.domain_id}"
},
'security': {
'iam_role': f"DataMeshDomain-{domain.domain_id}",
'encryption_key': f"datamesh-{domain.domain_id}-key"
},
'monitoring': {
'dashboard': f"https://monitoring.datamesh.com/domain/{domain.domain_id}",
'alerts': f"alerts-{domain.domain_id}"
}
}
self.platform_services[domain.domain_id] = infrastructure
def _setup_data_product_infrastructure(self, product: DataProduct):
"""데이터 프로덕트별 인프라 설정"""
product_infrastructure = {
'api_endpoint': f"https://api.datamesh.com/products/{product.product_id}",
'data_catalog_entry': {
'schema_registry': f"schema-{product.product_id}",
'lineage_tracking': True,
'quality_monitoring': True
},
'access_control': {
'consumer_groups': [],
'rate_limits': product.sla_requirements.get('rate_limit', '1000/min')
}
}
if product.product_id not in self.platform_services:
self.platform_services[product.product_id] = {}
self.platform_services[product.product_id].update(product_infrastructure)
def discover_data_products(self, search_criteria: Dict[str, Any]) -> List[DataProduct]:
"""데이터 프로덕트 디스커버리"""
results = []
for product in self.data_products.values():
if self._matches_criteria(product, search_criteria):
results.append(product)
return results
def _matches_criteria(self, product: DataProduct, criteria: Dict[str, Any]) -> bool:
"""검색 조건 매칭"""
for key, value in criteria.items():
if key == 'domain':
if product.domain_id != value:
return False
elif key == 'tags':
product_tags = product.discovery_metadata.get('tags', [])
if not any(tag in product_tags for tag in value):
return False
elif key == 'business_area':
domain = self.domains.get(product.domain_id)
if domain and domain.business_area != value:
return False
# 추가 검색 조건들...
return True
def get_mesh_topology(self) -> Dict[str, Any]:
"""데이터 메시 토폴로지 시각화"""
topology = {
'domains': [],
'data_products': [],
'relationships': []
}
for domain in self.domains.values():
topology['domains'].append({
'id': domain.domain_id,
'name': domain.name,
'business_area': domain.business_area,
'product_count': len([p for p in self.data_products.values()
if p.domain_id == domain.domain_id])
})
for product in self.data_products.values():
topology['data_products'].append({
'id': product.product_id,
'name': product.name,
'domain_id': product.domain_id,
'consumers': len(product.access_patterns)
})
# 데이터 프로덕트 간 관계 분석
# (실제로는 데이터 리니지 시스템에서 가져옴)
return topology
# 사용 예제
def setup_sample_data_mesh():
"""샘플 데이터 메시 설정"""
mesh = DataMeshArchitecture()
# 도메인 생성
customer_domain = DataDomain(
domain_id="customer",
name="Customer Domain",
description="Customer data and analytics",
business_area="Customer Experience",
domain_owner="alice@company.com",
team_members=["alice@company.com", "bob@company.com"],
data_products=["customer-profile", "customer-behavior"],
bounded_context={
"entities": ["Customer", "CustomerProfile", "CustomerPreference"],
"events": ["CustomerRegistered", "ProfileUpdated", "PreferenceChanged"]
}
)
order_domain = DataDomain(
domain_id="order",
name="Order Domain",
description="Order processing and fulfillment",
business_area="Operations",
domain_owner="charlie@company.com",
team_members=["charlie@company.com", "diana@company.com"],
data_products=["order-events", "fulfillment-metrics"],
bounded_context={
"entities": ["Order", "OrderItem", "Shipment"],
"events": ["OrderPlaced", "OrderShipped", "OrderDelivered"]
}
)
mesh.register_domain(customer_domain)
mesh.register_domain(order_domain)
# 데이터 프로덕트 생성
customer_profile_product = DataProduct(
product_id="customer-profile",
name="Customer Profile Data Product",
description="Comprehensive customer profile information",
domain_id="customer",
data_steward="alice@company.com",
sla_requirements={
"availability": "99.9%",
"latency": "< 100ms",
"rate_limit": "10000/min"
},
quality_metrics={
"completeness": "> 95%",
"accuracy": "> 99%",
"freshness": "< 1 hour"
},
schema_definition={
"format": "avro",
"version": "1.0",
"fields": [
{"name": "customer_id", "type": "string"},
{"name": "email", "type": "string"},
{"name": "demographics", "type": "record"},
{"name": "preferences", "type": "record"}
]
},
access_patterns=["real-time-api", "batch-export", "stream"],
discovery_metadata={
"tags": ["customer", "profile", "pii"],
"classification": "confidential",
"business_glossary": ["customer", "demographic", "preference"]
}
)
order_events_product = DataProduct(
product_id="order-events",
name="Order Events Stream",
description="Real-time order events for analytics",
domain_id="order",
data_steward="charlie@company.com",
sla_requirements={
"availability": "99.99%",
"latency": "< 10ms",
"rate_limit": "50000/min"
},
quality_metrics={
"completeness": "> 99%",
"accuracy": "> 99.9%",
"freshness": "< 1 second"
},
schema_definition={
"format": "json",
"version": "2.0",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "event_type", "type": "enum"},
{"name": "timestamp", "type": "datetime"},
{"name": "event_data", "type": "record"}
]
},
access_patterns=["kafka-stream", "webhook", "batch-export"],
discovery_metadata={
"tags": ["order", "events", "real-time"],
"classification": "internal",
"business_glossary": ["order", "transaction", "fulfillment"]
}
)
mesh.register_data_product(customer_profile_product)
mesh.register_data_product(order_events_product)
return mesh
# 데이터 메시 설정 및 토폴로지 확인
mesh = setup_sample_data_mesh()
topology = mesh.get_mesh_topology()
print("Data Mesh Topology:")
print(json.dumps(topology, indent=2))
1.2 전통적 아키텍처 vs 데이터 메시
| 구분 | 전통적 아키텍처 | 데이터 메시 |
|---|---|---|
| 데이터 소유권 | 중앙 데이터 팀 | 도메인 팀 |
| 확장성 | 수직 확장 | 수평 확장 |
| 거버넌스 | 중앙 집중식 | 연합형 |
| 기술 선택 | 표준화 | 도메인별 최적화 |
| 변화 속도 | 느림 | 빠름 |
2. 셀프 서비스 데이터 플랫폼 구축
2.1 플랫폼 아키텍처 설계
# self_serve_platform.py
from typing import Dict, List, Any, Optional, Protocol
from dataclasses import dataclass
import asyncio
import kubernetes
from kubernetes import client, config
import yaml
import json
import time
from abc import ABC, abstractmethod
class ResourceProvisioner(Protocol):
"""리소스 프로비저닝 인터페이스"""
async def provision(self, request: Dict[str, Any]) -> Dict[str, Any]:
...
async def deprovision(self, resource_id: str) -> bool:
...
@dataclass
class PlatformService:
"""플랫폼 서비스 정의"""
service_id: str
name: str
description: str
category: str # compute, storage, analytics, governance
config_template: Dict[str, Any]
resource_requirements: Dict[str, Any]
dependencies: List[str]
@dataclass
class ProvisioningRequest:
"""리소스 프로비저닝 요청"""
request_id: str
domain_id: str
service_type: str
configuration: Dict[str, Any]
requester: str
approval_required: bool = False
class SelfServeDataPlatform:
"""셀프 서비스 데이터 플랫폼"""
def __init__(self):
self.service_catalog = {}
self.provisioners = {}
self.active_resources = {}
self.approval_workflows = {}
# Kubernetes 클라이언트 설정
try:
config.load_incluster_config() # 클러스터 내부에서 실행 시
except:
config.load_kube_config() # 로컬 개발 환경
self.k8s_apps_v1 = client.AppsV1Api()
self.k8s_core_v1 = client.CoreV1Api()
self._initialize_service_catalog()
self._initialize_provisioners()
def _initialize_service_catalog(self):
"""서비스 카탈로그 초기화"""
# 데이터 스토리지 서비스
self.service_catalog['object-storage'] = PlatformService(
service_id='object-storage',
name='Object Storage',
description='S3-compatible object storage with automatic backup',
category='storage',
config_template={
'bucket_name': '${domain_id}-data',
'retention_days': 365,
'encryption': True,
'backup_enabled': True,
'access_logging': True
},
resource_requirements={
'storage_gb': '1000',
'iops': '3000'
},
dependencies=[]
)
# 스파크 컴퓨팅 클러스터
self.service_catalog['spark-cluster'] = PlatformService(
service_id='spark-cluster',
name='Spark Computing Cluster',
description='Auto-scaling Spark cluster for data processing',
category='compute',
config_template={
'cluster_name': '${domain_id}-spark',
'driver_cores': 2,
'driver_memory': '4g',
'executor_cores': 2,
'executor_memory': '4g',
'max_executors': 10,
'auto_scaling': True
},
resource_requirements={
'cpu_cores': '20',
'memory_gb': '80'
},
dependencies=['object-storage']
)
# 스트리밍 처리 서비스
self.service_catalog['kafka-cluster'] = PlatformService(
service_id='kafka-cluster',
name='Kafka Streaming Platform',
description='Managed Kafka cluster for real-time data streaming',
category='analytics',
config_template={
'cluster_name': '${domain_id}-kafka',
'partitions': 12,
'replication_factor': 3,
'retention_hours': 168, # 7 days
'compression_type': 'snappy',
'security_protocol': 'SSL'
},
resource_requirements={
'cpu_cores': '12',
'memory_gb': '24',
'storage_gb': '500'
},
dependencies=[]
)
# 데이터 카탈로그 서비스
self.service_catalog['data-catalog'] = PlatformService(
service_id='data-catalog',
name='Data Catalog & Discovery',
description='Automated data catalog with schema registry',
category='governance',
config_template={
'catalog_name': '${domain_id}-catalog',
'auto_discovery': True,
'schema_evolution': True,
'lineage_tracking': True,
'data_profiling': True
},
resource_requirements={
'cpu_cores': '4',
'memory_gb': '8'
},
dependencies=['object-storage']
)
# API 게이트웨이
self.service_catalog['api-gateway'] = PlatformService(
service_id='api-gateway',
name='Data API Gateway',
description='API gateway for data product access',
category='governance',
config_template={
'gateway_name': '${domain_id}-api',
'rate_limiting': True,
'authentication': 'oauth2',
'monitoring': True,
'caching': True
},
resource_requirements={
'cpu_cores': '2',
'memory_gb': '4'
},
dependencies=[]
)
def _initialize_provisioners(self):
"""리소스 프로비저너 초기화"""
self.provisioners['object-storage'] = S3StorageProvisioner()
self.provisioners['spark-cluster'] = SparkClusterProvisioner()
self.provisioners['kafka-cluster'] = KafkaClusterProvisioner()
self.provisioners['data-catalog'] = DataCatalogProvisioner()
self.provisioners['api-gateway'] = APIGatewayProvisioner()
async def request_service(self, request: ProvisioningRequest) -> str:
"""서비스 프로비저닝 요청"""
# 서비스 존재 확인
if request.service_type not in self.service_catalog:
raise ValueError(f"Unknown service type: {request.service_type}")
service = self.service_catalog[request.service_type]
# 승인이 필요한 경우
if request.approval_required or self._requires_approval(request):
approval_id = await self._submit_for_approval(request)
return f"approval:{approval_id}"
# 즉시 프로비저닝
return await self._provision_service(request, service)
async def _provision_service(self, request: ProvisioningRequest,
service: PlatformService) -> str:
"""서비스 프로비저닝 실행"""
try:
# 의존성 서비스 확인
await self._ensure_dependencies(request.domain_id, service.dependencies)
# 설정 값 처리
config = self._process_configuration(request, service)
# 프로비저너를 통한 리소스 생성
provisioner = self.provisioners[request.service_type]
resource_info = await provisioner.provision({
'request_id': request.request_id,
'domain_id': request.domain_id,
'configuration': config,
'resource_requirements': service.resource_requirements
})
# 리소스 등록
resource_id = f"{request.domain_id}-{request.service_type}"
self.active_resources[resource_id] = {
'resource_id': resource_id,
'request_id': request.request_id,
'service_type': request.service_type,
'domain_id': request.domain_id,
'configuration': config,
'resource_info': resource_info,
'created_at': time.time(),
'status': 'active'
}
return resource_id
except Exception as e:
# 실패한 경우 롤백
await self._rollback_provisioning(request)
raise e
def _process_configuration(self, request: ProvisioningRequest,
service: PlatformService) -> Dict[str, Any]:
"""설정 값 처리 및 변수 치환"""
config = service.config_template.copy()
# 템플릿 변수 치환
for key, value in config.items():
if isinstance(value, str) and '${' in value:
value = value.replace('${domain_id}', request.domain_id)
config[key] = value
# 사용자 제공 설정으로 오버라이드
config.update(request.configuration)
return config
async def _ensure_dependencies(self, domain_id: str, dependencies: List[str]):
"""의존성 서비스 확인 및 프로비저닝"""
for dep_service in dependencies:
dep_resource_id = f"{domain_id}-{dep_service}"
if dep_resource_id not in self.active_resources:
# 의존성 서비스 자동 프로비저닝
dep_request = ProvisioningRequest(
request_id=f"auto-{dep_resource_id}",
domain_id=domain_id,
service_type=dep_service,
configuration={},
requester="system",
approval_required=False
)
await self._provision_service(dep_request, self.service_catalog[dep_service])
def _requires_approval(self, request: ProvisioningRequest) -> bool:
"""승인 필요 여부 판단"""
service = self.service_catalog[request.service_type]
# 리소스 요구사항 기반 승인 필요 여부
cpu_cores = int(service.resource_requirements.get('cpu_cores', '0'))
memory_gb = int(service.resource_requirements.get('memory_gb', '0'))
# 대용량 리소스 요청 시 승인 필요
if cpu_cores > 50 or memory_gb > 100:
return True
# 거버넌스 카테고리는 승인 필요
if service.category == 'governance':
return True
return False
async def _submit_for_approval(self, request: ProvisioningRequest) -> str:
"""승인 워크플로우 제출"""
approval_id = f"approval-{request.request_id}"
self.approval_workflows[approval_id] = {
'approval_id': approval_id,
'request': request,
'status': 'pending',
'submitted_at': time.time(),
'approvers': ['platform-admin@company.com'],
'approval_notes': []
}
# 실제로는 승인 시스템(Jira, ServiceNow 등)에 티켓 생성
print(f"Approval request {approval_id} submitted for {request.service_type}")
return approval_id
async def approve_request(self, approval_id: str, approver: str,
approved: bool, notes: str = "") -> Optional[str]:
"""요청 승인/거절"""
if approval_id not in self.approval_workflows:
raise ValueError(f"Approval {approval_id} not found")
workflow = self.approval_workflows[approval_id]
if approved:
workflow['status'] = 'approved'
workflow['approved_by'] = approver
workflow['approved_at'] = time.time()
# 승인된 요청 프로비저닝
request = workflow['request']
service = self.service_catalog[request.service_type]
resource_id = await self._provision_service(request, service)
return resource_id
else:
workflow['status'] = 'rejected'
workflow['rejected_by'] = approver
workflow['rejection_reason'] = notes
return None
async def get_domain_resources(self, domain_id: str) -> List[Dict[str, Any]]:
"""도메인별 리소스 목록 조회"""
resources = []
for resource in self.active_resources.values():
if resource['domain_id'] == domain_id:
resources.append(resource)
return resources
async def get_service_catalog(self) -> Dict[str, PlatformService]:
"""서비스 카탈로그 조회"""
return self.service_catalog.copy()
async def monitor_resources(self, domain_id: str) -> Dict[str, Any]:
"""리소스 모니터링 정보"""
domain_resources = await self.get_domain_resources(domain_id)
monitoring_data = {
'domain_id': domain_id,
'total_resources': len(domain_resources),
'resource_health': {},
'cost_estimate': 0.0,
'usage_metrics': {}
}
for resource in domain_resources:
service_type = resource['service_type']
# 각 프로비저너로부터 모니터링 정보 수집
if service_type in self.provisioners:
provisioner = self.provisioners[service_type]
if hasattr(provisioner, 'get_monitoring_info'):
monitoring_info = await provisioner.get_monitoring_info(resource['resource_id'])
monitoring_data['resource_health'][resource['resource_id']] = monitoring_info
return monitoring_data
# 구체적인 프로비저너 구현 예제
class S3StorageProvisioner:
"""S3 스토리지 프로비저너"""
async def provision(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""S3 버킷 및 관련 리소스 프로비저닝"""
config = request['configuration']
domain_id = request['domain_id']
# Terraform 또는 AWS CDK를 통한 인프라 프로비저닝
terraform_config = {
'bucket_name': config['bucket_name'],
'versioning_enabled': True,
'encryption_enabled': config['encryption'],
'lifecycle_rules': [
{
'id': 'archive_old_data',
'transition_days': 30,
'storage_class': 'GLACIER'
}
],
'access_logging': {
'enabled': config['access_logging'],
'target_bucket': f"{config['bucket_name']}-logs"
}
}
# 실제 프로비저닝 로직 (AWS API 호출)
provisioned_resources = {
'bucket_arn': f"arn:aws:s3:::{config['bucket_name']}",
'bucket_url': f"s3://{config['bucket_name']}/",
'access_key_id': f"AKIA{domain_id.upper()}EXAMPLE",
'policy_arn': f"arn:aws:iam::123456789012:policy/{domain_id}-s3-policy"
}
return provisioned_resources
async def deprovision(self, resource_id: str) -> bool:
"""리소스 제거"""
# 실제로는 Terraform destroy 또는 AWS API 호출
print(f"Deprovisioning S3 storage: {resource_id}")
return True
class SparkClusterProvisioner:
"""Spark 클러스터 프로비저너"""
async def provision(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Kubernetes 기반 Spark 클러스터 프로비저닝"""
config = request['configuration']
domain_id = request['domain_id']
# Kubernetes 리소스 정의
spark_operator_config = {
'apiVersion': 'sparkoperator.k8s.io/v1beta2',
'kind': 'SparkApplication',
'metadata': {
'name': config['cluster_name'],
'namespace': f"datamesh-{domain_id}"
},
'spec': {
'type': 'Scala',
'mode': 'cluster',
'image': 'gcr.io/spark-operator/spark:v3.1.1',
'driver': {
'cores': config['driver_cores'],
'coreLimit': f"{config['driver_cores']}",
'memory': config['driver_memory'],
'serviceAccount': f"spark-{domain_id}"
},
'executor': {
'cores': config['executor_cores'],
'coreLimit': f"{config['executor_cores']}",
'memory': config['executor_memory'],
'instances': 2,
'serviceAccount': f"spark-{domain_id}"
},
'dynamicAllocation': {
'enabled': config['auto_scaling'],
'maxExecutors': config['max_executors']
}
}
}
# 네임스페이스 생성
await self._create_namespace(f"datamesh-{domain_id}")
# ServiceAccount 생성
await self._create_service_account(f"spark-{domain_id}", f"datamesh-{domain_id}")
return {
'cluster_name': config['cluster_name'],
'namespace': f"datamesh-{domain_id}",
'spark_ui_url': f"https://spark-ui.datamesh.com/{domain_id}/",
'history_server_url': f"https://spark-history.datamesh.com/{domain_id}/",
'service_account': f"spark-{domain_id}"
}
async def _create_namespace(self, namespace: str):
"""Kubernetes 네임스페이스 생성"""
k8s_core_v1 = client.CoreV1Api()
namespace_manifest = client.V1Namespace(
metadata=client.V1ObjectMeta(name=namespace)
)
try:
k8s_core_v1.create_namespace(body=namespace_manifest)
except client.rest.ApiException as e:
if e.status != 409: # 이미 존재하는 경우 무시
raise
async def _create_service_account(self, service_account: str, namespace: str):
"""Kubernetes ServiceAccount 생성"""
k8s_core_v1 = client.CoreV1Api()
sa_manifest = client.V1ServiceAccount(
metadata=client.V1ObjectMeta(
name=service_account,
namespace=namespace
)
)
try:
k8s_core_v1.create_namespaced_service_account(
namespace=namespace,
body=sa_manifest
)
except client.rest.ApiException as e:
if e.status != 409: # 이미 존재하는 경우 무시
raise
class KafkaClusterProvisioner:
"""Kafka 클러스터 프로비저너"""
async def provision(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Strimzi Kafka Operator를 통한 Kafka 클러스터 프로비저닝"""
config = request['configuration']
domain_id = request['domain_id']
kafka_cluster_config = {
'apiVersion': 'kafka.strimzi.io/v1beta2',
'kind': 'Kafka',
'metadata': {
'name': config['cluster_name'],
'namespace': f"datamesh-{domain_id}"
},
'spec': {
'kafka': {
'version': '3.0.0',
'replicas': config['replication_factor'],
'listeners': [
{
'name': 'plain',
'port': 9092,
'type': 'internal',
'tls': False
},
{
'name': 'tls',
'port': 9093,
'type': 'internal',
'tls': True
}
],
'config': {
'offsets.topic.replication.factor': config['replication_factor'],
'transaction.state.log.replication.factor': config['replication_factor'],
'transaction.state.log.min.isr': 2,
'default.replication.factor': config['replication_factor'],
'min.insync.replicas': 2,
'inter.broker.protocol.version': '3.0',
'log.retention.hours': config['retention_hours'],
'compression.type': config['compression_type']
},
'storage': {
'type': 'persistent-claim',
'size': '100Gi',
'class': 'fast-ssd'
}
},
'zookeeper': {
'replicas': 3,
'storage': {
'type': 'persistent-claim',
'size': '10Gi',
'class': 'fast-ssd'
}
},
'entityOperator': {
'topicOperator': {},
'userOperator': {}
}
}
}
return {
'cluster_name': config['cluster_name'],
'bootstrap_servers': f"{config['cluster_name']}-kafka-bootstrap.{domain_id}:9092",
'schema_registry_url': f"https://schema-registry.{domain_id}.datamesh.com",
'kafka_ui_url': f"https://kafka-ui.{domain_id}.datamesh.com"
}
class DataCatalogProvisioner:
"""데이터 카탈로그 프로비저너"""
async def provision(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Apache Atlas 기반 데이터 카탈로그 프로비저닝"""
config = request['configuration']
domain_id = request['domain_id']
# Helm을 통한 Apache Atlas 배포
atlas_values = {
'atlas': {
'domain': domain_id,
'elasticsearch': {
'enabled': True,
'storage': '50Gi'
},
'kafka': {
'enabled': True,
'external_kafka': f"{domain_id}-kafka"
},
'hbase': {
'enabled': True,
'storage': '100Gi'
}
},
'ingress': {
'enabled': True,
'hostname': f"catalog.{domain_id}.datamesh.com"
}
}
return {
'catalog_url': f"https://catalog.{domain_id}.datamesh.com",
'api_endpoint': f"https://catalog.{domain_id}.datamesh.com/api/atlas",
'schema_registry_url': f"https://catalog.{domain_id}.datamesh.com/schema",
'lineage_ui_url': f"https://catalog.{domain_id}.datamesh.com/lineage"
}
class APIGatewayProvisioner:
"""API 게이트웨이 프로비저너"""
async def provision(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Kong API Gateway 프로비저닝"""
config = request['configuration']
domain_id = request['domain_id']
kong_config = {
'apiVersion': 'configuration.konghq.com/v1',
'kind': 'KongIngress',
'metadata': {
'name': config['gateway_name'],
'namespace': f"datamesh-{domain_id}"
},
'proxy': {
'path': f"/{domain_id}/api/",
'connect_timeout': 10000,
'read_timeout': 10000,
'send_timeout': 10000
},
'route': {
'methods': ['GET', 'POST', 'PUT', 'DELETE'],
'protocols': ['https'],
'strip_path': True
}
}
return {
'gateway_url': f"https://api.datamesh.com/{domain_id}/",
'admin_ui_url': f"https://kong-admin.{domain_id}.datamesh.com",
'rate_limit': config.get('rate_limit', '1000/minute'),
'api_key_endpoint': f"https://api.datamesh.com/{domain_id}/auth/key"
}
# 사용 예제
async def demonstrate_self_serve_platform():
"""셀프 서비스 플랫폼 데모"""
platform = SelfServeDataPlatform()
# 서비스 카탈로그 조회
catalog = await platform.get_service_catalog()
print("Available Services:")
for service_id, service in catalog.items():
print(f"- {service.name}: {service.description}")
# 스파크 클러스터 요청
spark_request = ProvisioningRequest(
request_id="req-001",
domain_id="customer",
service_type="spark-cluster",
configuration={
'max_executors': 20, # 기본값보다 높게 설정
'driver_memory': '8g'
},
requester="alice@company.com"
)
resource_id = await platform.request_service(spark_request)
print(f"Spark cluster provisioned: {resource_id}")
# 도메인 리소스 조회
customer_resources = await platform.get_domain_resources("customer")
print(f"Customer domain has {len(customer_resources)} active resources")
# 모니터링 정보 조회
monitoring = await platform.monitor_resources("customer")
print(f"Domain monitoring: {json.dumps(monitoring, indent=2)}")
# 실행
# asyncio.run(demonstrate_self_serve_platform())
3. 연합형 거버넌스 구현
3.1 거버넌스 프레임워크 설계
# federated_governance.py
from dataclasses import dataclass
from typing import Dict, List, Any, Optional, Set
from enum import Enum
import json
import time
from abc import ABC, abstractmethod
import hashlib
class GovernanceLevel(Enum):
GLOBAL = "global"
DOMAIN = "domain"
PRODUCT = "product"
class PolicyType(Enum):
DATA_CLASSIFICATION = "data_classification"
ACCESS_CONTROL = "access_control"
DATA_QUALITY = "data_quality"
RETENTION = "retention"
PRIVACY = "privacy"
COMPLIANCE = "compliance"
@dataclass
class GovernancePolicy:
"""거버넌스 정책 정의"""
policy_id: str
name: str
description: str
policy_type: PolicyType
level: GovernanceLevel
scope: Dict[str, Any] # domain, product, or global scope
rules: List[Dict[str, Any]]
enforcement_level: str # "advisory", "warning", "blocking"
owner: str
created_at: float
version: str
@dataclass
class PolicyViolation:
"""정책 위반 사례"""
violation_id: str
policy_id: str
resource_id: str
violation_type: str
description: str
severity: str
detected_at: float
resolved: bool = False
class PolicyEngine:
"""정책 엔진 인터페이스"""
@abstractmethod
def evaluate_policy(self, policy: GovernancePolicy,
context: Dict[str, Any]) -> List[PolicyViolation]:
"""정책 평가"""
pass
class FederatedGovernanceFramework:
"""연합형 거버넌스 프레임워크"""
def __init__(self):
self.global_policies = {}
self.domain_policies = defaultdict(dict)
self.product_policies = defaultdict(dict)
self.policy_engines = {}
self.violations = []
self.governance_roles = {}
self.compliance_frameworks = {}
self._initialize_policy_engines()
self._setup_default_policies()
def _initialize_policy_engines(self):
"""정책 엔진 초기화"""
self.policy_engines[PolicyType.DATA_CLASSIFICATION] = DataClassificationEngine()
self.policy_engines[PolicyType.ACCESS_CONTROL] = AccessControlEngine()
self.policy_engines[PolicyType.DATA_QUALITY] = DataQualityEngine()
self.policy_engines[PolicyType.RETENTION] = RetentionEngine()
self.policy_engines[PolicyType.PRIVACY] = PrivacyEngine()
self.policy_engines[PolicyType.COMPLIANCE] = ComplianceEngine()
def _setup_default_policies(self):
"""기본 정책 설정"""
# 글로벌 데이터 분류 정책
data_classification_policy = GovernancePolicy(
policy_id="global-data-classification",
name="Global Data Classification Policy",
description="Organization-wide data classification standards",
policy_type=PolicyType.DATA_CLASSIFICATION,
level=GovernanceLevel.GLOBAL,
scope={"organization": "all"},
rules=[
{
"rule_id": "pii-classification",
"description": "Automatically classify PII data",
"conditions": {
"field_patterns": ["email", "phone", "ssn", "credit_card"],
"data_patterns": ["\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}\\b"]
},
"action": "classify_as_pii"
},
{
"rule_id": "financial-data",
"description": "Classify financial data",
"conditions": {
"field_patterns": ["account_number", "routing_number", "balance"],
"table_patterns": ["payment", "transaction", "account"]
},
"action": "classify_as_financial"
}
],
enforcement_level="blocking",
owner="chief-data-officer@company.com",
created_at=time.time(),
version="1.0"
)
# 글로벌 접근 제어 정책
access_control_policy = GovernancePolicy(
policy_id="global-access-control",
name="Global Access Control Policy",
description="Organization-wide access control rules",
policy_type=PolicyType.ACCESS_CONTROL,
level=GovernanceLevel.GLOBAL,
scope={"organization": "all"},
rules=[
{
"rule_id": "pii-access-restriction",
"description": "Restrict PII data access",
"conditions": {
"data_classification": "pii",
"user_role": "!data_protection_officer"
},
"action": "deny_access"
},
{
"rule_id": "cross-domain-access",
"description": "Cross-domain data access requires approval",
"conditions": {
"source_domain": "ne:target_domain"
},
"action": "require_approval"
}
],
enforcement_level="blocking",
owner="security-team@company.com",
created_at=time.time(),
version="1.0"
)
# 글로벌 데이터 품질 정책
data_quality_policy = GovernancePolicy(
policy_id="global-data-quality",
name="Global Data Quality Standards",
description="Minimum data quality requirements",
policy_type=PolicyType.DATA_QUALITY,
level=GovernanceLevel.GLOBAL,
scope={"organization": "all"},
rules=[
{
"rule_id": "completeness-threshold",
"description": "Data completeness must exceed 95%",
"conditions": {
"metric": "completeness",
"threshold": 0.95,
"operator": "gte"
},
"action": "flag_quality_issue"
},
{
"rule_id": "freshness-requirement",
"description": "Real-time data must be less than 1 hour old",
"conditions": {
"data_type": "real_time",
"age_hours": 1,
"operator": "gt"
},
"action": "flag_freshness_issue"
}
],
enforcement_level="warning",
owner="data-quality-team@company.com",
created_at=time.time(),
version="1.0"
)
# 정책 등록
self.register_policy(data_classification_policy)
self.register_policy(access_control_policy)
self.register_policy(data_quality_policy)
def register_policy(self, policy: GovernancePolicy) -> str:
"""정책 등록"""
if policy.level == GovernanceLevel.GLOBAL:
self.global_policies[policy.policy_id] = policy
elif policy.level == GovernanceLevel.DOMAIN:
domain_id = policy.scope.get('domain_id')
if not domain_id:
raise ValueError("Domain policy must specify domain_id in scope")
self.domain_policies[domain_id][policy.policy_id] = policy
elif policy.level == GovernanceLevel.PRODUCT:
product_id = policy.scope.get('product_id')
if not product_id:
raise ValueError("Product policy must specify product_id in scope")
self.product_policies[product_id][policy.policy_id] = policy
return policy.policy_id
def evaluate_compliance(self, resource_id: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""리소스의 정책 준수 평가"""
domain_id = context.get('domain_id')
product_id = context.get('product_id')
applicable_policies = []
# 글로벌 정책
applicable_policies.extend(self.global_policies.values())
# 도메인 정책
if domain_id and domain_id in self.domain_policies:
applicable_policies.extend(self.domain_policies[domain_id].values())
# 프로덕트 정책
if product_id and product_id in self.product_policies:
applicable_policies.extend(self.product_policies[product_id].values())
violations = []
policy_results = {}
for policy in applicable_policies:
engine = self.policy_engines.get(policy.policy_type)
if engine:
policy_violations = engine.evaluate_policy(policy, context)
violations.extend(policy_violations)
policy_results[policy.policy_id] = {
'policy_name': policy.name,
'compliance_status': 'compliant' if not policy_violations else 'violated',
'violations': len(policy_violations),
'enforcement_level': policy.enforcement_level
}
# 위반 사항 저장
self.violations.extend(violations)
# 전체 컴플라이언스 점수 계산
total_policies = len(applicable_policies)
compliant_policies = sum(1 for result in policy_results.values()
if result['compliance_status'] == 'compliant')
compliance_score = (compliant_policies / total_policies) if total_policies > 0 else 1.0
return {
'resource_id': resource_id,
'compliance_score': compliance_score,
'total_policies_evaluated': total_policies,
'policy_results': policy_results,
'violations': [v.__dict__ for v in violations],
'enforcement_actions': self._determine_enforcement_actions(violations)
}
def _determine_enforcement_actions(self, violations: List[PolicyViolation]) -> List[str]:
"""정책 위반에 대한 집행 조치 결정"""
actions = []
blocking_violations = [v for v in violations
if self._get_policy_enforcement_level(v.policy_id) == "blocking"]
warning_violations = [v for v in violations
if self._get_policy_enforcement_level(v.policy_id) == "warning"]
if blocking_violations:
actions.append("block_access")
actions.append("notify_governance_team")
if warning_violations:
actions.append("log_warning")
actions.append("notify_domain_owner")
return actions
def _get_policy_enforcement_level(self, policy_id: str) -> str:
"""정책의 집행 수준 조회"""
# 글로벌 정책 확인
if policy_id in self.global_policies:
return self.global_policies[policy_id].enforcement_level
# 도메인 정책 확인
for domain_policies in self.domain_policies.values():
if policy_id in domain_policies:
return domain_policies[policy_id].enforcement_level
# 프로덕트 정책 확인
for product_policies in self.product_policies.values():
if policy_id in product_policies:
return product_policies[policy_id].enforcement_level
return "advisory"
def create_domain_policy(self, domain_id: str, policy_template: str,
customizations: Dict[str, Any]) -> GovernancePolicy:
"""도메인별 정책 생성"""
base_policy = self._get_policy_template(policy_template)
# 정책 커스터마이징
custom_policy = GovernancePolicy(
policy_id=f"{domain_id}-{policy_template}",
name=f"{domain_id.title()} {base_policy.name}",
description=f"Domain-specific policy for {domain_id}",
policy_type=base_policy.policy_type,
level=GovernanceLevel.DOMAIN,
scope={"domain_id": domain_id},
rules=base_policy.rules.copy(),
enforcement_level=customizations.get('enforcement_level', base_policy.enforcement_level),
owner=customizations.get('owner', f"domain-owner-{domain_id}@company.com"),
created_at=time.time(),
version="1.0"
)
# 규칙 커스터마이징 적용
if 'rule_customizations' in customizations:
self._apply_rule_customizations(custom_policy, customizations['rule_customizations'])
self.register_policy(custom_policy)
return custom_policy
def _get_policy_template(self, template_name: str) -> GovernancePolicy:
"""정책 템플릿 조회"""
templates = {
'access_control': self.global_policies.get('global-access-control'),
'data_quality': self.global_policies.get('global-data-quality'),
'data_classification': self.global_policies.get('global-data-classification')
}
template = templates.get(template_name)
if not template:
raise ValueError(f"Unknown policy template: {template_name}")
return template
def _apply_rule_customizations(self, policy: GovernancePolicy,
customizations: Dict[str, Any]):
"""규칙 커스터마이징 적용"""
for rule in policy.rules:
rule_id = rule['rule_id']
if rule_id in customizations:
rule_customization = customizations[rule_id]
# 조건 업데이트
if 'conditions' in rule_customization:
rule['conditions'].update(rule_customization['conditions'])
# 액션 업데이트
if 'action' in rule_customization:
rule['action'] = rule_customization['action']
def get_governance_dashboard(self, domain_id: Optional[str] = None) -> Dict[str, Any]:
"""거버넌스 대시보드 데이터"""
# 필터링된 위반 사항
filtered_violations = self.violations
if domain_id:
# 도메인별 필터링 로직 (실제로는 리소스-도메인 매핑이 필요)
pass
# 위반 사항 통계
violation_stats = {
'total_violations': len(filtered_violations),
'open_violations': len([v for v in filtered_violations if not v.resolved]),
'violations_by_severity': {
'critical': len([v for v in filtered_violations if v.severity == 'critical']),
'high': len([v for v in filtered_violations if v.severity == 'high']),
'medium': len([v for v in filtered_violations if v.severity == 'medium']),
'low': len([v for v in filtered_violations if v.severity == 'low'])
},
'violations_by_type': {}
}
# 정책 통계
policy_stats = {
'total_policies': len(self.global_policies) +
sum(len(dp) for dp in self.domain_policies.values()) +
sum(len(pp) for pp in self.product_policies.values()),
'global_policies': len(self.global_policies),
'domain_policies': sum(len(dp) for dp in self.domain_policies.values()),
'product_policies': sum(len(pp) for pp in self.product_policies.values())
}
return {
'domain_id': domain_id,
'generated_at': time.time(),
'violation_stats': violation_stats,
'policy_stats': policy_stats,
'compliance_trends': self._calculate_compliance_trends(domain_id),
'top_violations': self._get_top_violations(filtered_violations),
'policy_effectiveness': self._calculate_policy_effectiveness()
}
def _calculate_compliance_trends(self, domain_id: Optional[str]) -> Dict[str, Any]:
"""컴플라이언스 트렌드 분석"""
# 실제로는 시계열 데이터를 분석
return {
'weekly_compliance_scores': [0.85, 0.87, 0.89, 0.91, 0.88],
'trend_direction': 'improving',
'key_improvements': ['data_quality', 'access_control'],
'areas_of_concern': ['data_retention']
}
def _get_top_violations(self, violations: List[PolicyViolation]) -> List[Dict[str, Any]]:
"""상위 위반 사항 조회"""
open_violations = [v for v in violations if not v.resolved]
sorted_violations = sorted(open_violations,
key=lambda x: (x.severity, x.detected_at), reverse=True)
return [v.__dict__ for v in sorted_violations[:10]]
def _calculate_policy_effectiveness(self) -> Dict[str, Any]:
"""정책 효과성 분석"""
# 실제로는 정책별 위반 추세를 분석
return {
'most_effective_policies': ['global-data-classification', 'domain-access-control'],
'least_effective_policies': ['global-retention'],
'policy_coverage': 0.92,
'enforcement_rate': 0.87
}
# 구체적인 정책 엔진 구현
class DataClassificationEngine(PolicyEngine):
"""데이터 분류 정책 엔진"""
def evaluate_policy(self, policy: GovernancePolicy,
context: Dict[str, Any]) -> List[PolicyViolation]:
violations = []
schema = context.get('schema', {})
data_sample = context.get('data_sample', {})
for rule in policy.rules:
if rule['rule_id'] == 'pii-classification':
violations.extend(self._check_pii_classification(policy, rule, schema, data_sample))
elif rule['rule_id'] == 'financial-data':
violations.extend(self._check_financial_classification(policy, rule, schema, data_sample))
return violations
def _check_pii_classification(self, policy: GovernancePolicy, rule: Dict[str, Any],
schema: Dict[str, Any], data_sample: Dict[str, Any]) -> List[PolicyViolation]:
violations = []
field_patterns = rule['conditions']['field_patterns']
data_patterns = rule['conditions']['data_patterns']
# 필드명 패턴 검사
for field_name in schema.get('fields', []):
if any(pattern in field_name.lower() for pattern in field_patterns):
current_classification = schema.get('classification', {}).get(field_name)
if current_classification != 'pii':
violations.append(PolicyViolation(
violation_id=f"pii-{field_name}-{int(time.time())}",
policy_id=policy.policy_id,
resource_id=context.get('resource_id', 'unknown'),
violation_type='missing_pii_classification',
description=f"Field '{field_name}' appears to contain PII but is not classified as such",
severity='high',
detected_at=time.time()
))
return violations
def _check_financial_classification(self, policy: GovernancePolicy, rule: Dict[str, Any],
schema: Dict[str, Any], data_sample: Dict[str, Any]) -> List[PolicyViolation]:
violations = []
field_patterns = rule['conditions']['field_patterns']
table_patterns = rule['conditions']['table_patterns']
table_name = context.get('table_name', '').lower()
# 테이블명 패턴 검사
if any(pattern in table_name for pattern in table_patterns):
current_classification = schema.get('table_classification')
if current_classification != 'financial':
violations.append(PolicyViolation(
violation_id=f"financial-table-{int(time.time())}",
policy_id=policy.policy_id,
resource_id=context.get('resource_id', 'unknown'),
violation_type='missing_financial_classification',
description=f"Table '{table_name}' appears to contain financial data but is not classified as such",
severity='high',
detected_at=time.time()
))
return violations
class AccessControlEngine(PolicyEngine):
"""접근 제어 정책 엔진"""
def evaluate_policy(self, policy: GovernancePolicy,
context: Dict[str, Any]) -> List[PolicyViolation]:
violations = []
user_context = context.get('user_context', {})
access_request = context.get('access_request', {})
for rule in policy.rules:
if rule['rule_id'] == 'pii-access-restriction':
violations.extend(self._check_pii_access(policy, rule, user_context, access_request))
elif rule['rule_id'] == 'cross-domain-access':
violations.extend(self._check_cross_domain_access(policy, rule, user_context, access_request))
return violations
def _check_pii_access(self, policy: GovernancePolicy, rule: Dict[str, Any],
user_context: Dict[str, Any], access_request: Dict[str, Any]) -> List[PolicyViolation]:
violations = []
data_classification = access_request.get('data_classification')
user_role = user_context.get('role')
if data_classification == 'pii' and user_role != 'data_protection_officer':
violations.append(PolicyViolation(
violation_id=f"pii-access-{user_context.get('user_id', 'unknown')}-{int(time.time())}",
policy_id=policy.policy_id,
resource_id=access_request.get('resource_id', 'unknown'),
violation_type='unauthorized_pii_access',
description=f"User with role '{user_role}' attempted to access PII data",
severity='critical',
detected_at=time.time()
))
return violations
def _check_cross_domain_access(self, policy: GovernancePolicy, rule: Dict[str, Any],
user_context: Dict[str, Any], access_request: Dict[str, Any]) -> List[PolicyViolation]:
violations = []
source_domain = user_context.get('domain')
target_domain = access_request.get('domain')
has_approval = access_request.get('has_approval', False)
if source_domain != target_domain and not has_approval:
violations.append(PolicyViolation(
violation_id=f"cross-domain-{source_domain}-{target_domain}-{int(time.time())}",
policy_id=policy.policy_id,
resource_id=access_request.get('resource_id', 'unknown'),
violation_type='unapproved_cross_domain_access',
description=f"Cross-domain access from '{source_domain}' to '{target_domain}' without approval",
severity='medium',
detected_at=time.time()
))
return violations
class DataQualityEngine(PolicyEngine):
"""데이터 품질 정책 엔진"""
def evaluate_policy(self, policy: GovernancePolicy,
context: Dict[str, Any]) -> List[PolicyViolation]:
violations = []
quality_metrics = context.get('quality_metrics', {})
for rule in policy.rules:
if rule['rule_id'] == 'completeness-threshold':
violations.extend(self._check_completeness(policy, rule, quality_metrics))
elif rule['rule_id'] == 'freshness-requirement':
violations.extend(self._check_freshness(policy, rule, context))
return violations
def _check_completeness(self, policy: GovernancePolicy, rule: Dict[str, Any],
quality_metrics: Dict[str, Any]) -> List[PolicyViolation]:
violations = []
completeness = quality_metrics.get('completeness', 0)
threshold = rule['conditions']['threshold']
if completeness < threshold:
violations.append(PolicyViolation(
violation_id=f"completeness-{int(time.time())}",
policy_id=policy.policy_id,
resource_id=context.get('resource_id', 'unknown'),
violation_type='low_data_completeness',
description=f"Data completeness ({completeness:.2%}) below threshold ({threshold:.2%})",
severity='medium',
detected_at=time.time()
))
return violations
def _check_freshness(self, policy: GovernancePolicy, rule: Dict[str, Any],
context: Dict[str, Any]) -> List[PolicyViolation]:
violations = []
data_type = context.get('data_type')
data_age_hours = context.get('data_age_hours', 0)
threshold_hours = rule['conditions']['age_hours']
if data_type == 'real_time' and data_age_hours > threshold_hours:
violations.append(PolicyViolation(
violation_id=f"freshness-{int(time.time())}",
policy_id=policy.policy_id,
resource_id=context.get('resource_id', 'unknown'),
violation_type='stale_data',
description=f"Real-time data is {data_age_hours:.1f} hours old (threshold: {threshold_hours} hours)",
severity='high',
detected_at=time.time()
))
return violations
class RetentionEngine(PolicyEngine):
"""데이터 보존 정책 엔진"""
def evaluate_policy(self, policy: GovernancePolicy,
context: Dict[str, Any]) -> List[PolicyViolation]:
# 데이터 보존 정책 평가 로직
return []
class PrivacyEngine(PolicyEngine):
"""프라이버시 정책 엔진"""
def evaluate_policy(self, policy: GovernancePolicy,
context: Dict[str, Any]) -> List[PolicyViolation]:
# 프라이버시 정책 평가 로직 (GDPR, CCPA 등)
return []
class ComplianceEngine(PolicyEngine):
"""컴플라이언스 정책 엔진"""
def evaluate_policy(self, policy: GovernancePolicy,
context: Dict[str, Any]) -> List[PolicyViolation]:
# 규제 컴플라이언스 정책 평가 로직
return []
# 사용 예제
def demonstrate_federated_governance():
"""연합형 거버넌스 데모"""
governance = FederatedGovernanceFramework()
# 도메인별 정책 생성
customer_access_policy = governance.create_domain_policy(
domain_id="customer",
policy_template="access_control",
customizations={
'enforcement_level': 'blocking',
'owner': 'customer-domain-owner@company.com',
'rule_customizations': {
'cross-domain-access': {
'conditions': {
'requires_manager_approval': True
}
}
}
}
)
print(f"Created domain policy: {customer_access_policy.policy_id}")
# 컴플라이언스 평가
context = {
'domain_id': 'customer',
'product_id': 'customer-profile',
'resource_id': 'customer-profile-db',
'schema': {
'fields': ['customer_id', 'email', 'phone_number', 'address'],
'classification': {
'customer_id': 'identifier',
'email': 'pii', # 올바르게 분류됨
'phone_number': 'identifier', # PII로 분류되지 않아 위반
'address': 'pii'
}
},
'user_context': {
'user_id': 'john.doe@company.com',
'role': 'data_analyst',
'domain': 'customer'
},
'access_request': {
'resource_id': 'customer-profile-db',
'data_classification': 'pii',
'domain': 'customer',
'has_approval': False
},
'quality_metrics': {
'completeness': 0.92, # 임계값 미달
'accuracy': 0.98
},
'data_type': 'real_time',
'data_age_hours': 2.5 # 신선도 문제
}
compliance_result = governance.evaluate_compliance('customer-profile-db', context)
print("\nCompliance Evaluation Result:")
print(json.dumps(compliance_result, indent=2))
# 거버넌스 대시보드
dashboard = governance.get_governance_dashboard(domain_id="customer")
print("\nGovernance Dashboard:")
print(json.dumps(dashboard, indent=2))
# 실행
# demonstrate_federated_governance()
4. 데이터 프로덕트 관리
4.1 데이터 프로덕트 라이프사이클
# data_product_management.py
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from enum import Enum
import time
import json
import uuid
from abc import ABC, abstractmethod
class ProductStage(Enum):
IDEATION = "ideation"
DEVELOPMENT = "development"
TESTING = "testing"
PRODUCTION = "production"
DEPRECATED = "deprecated"
class ProductType(Enum):
ANALYTICAL = "analytical"
OPERATIONAL = "operational"
REFERENCE = "reference"
DERIVED = "derived"
@dataclass
class DataProductMetrics:
"""데이터 프로덕트 메트릭"""
usage_count_daily: int = 0
unique_consumers: int = 0
average_query_time_ms: float = 0.0
data_freshness_score: float = 1.0
quality_score: float = 1.0
availability_percentage: float = 100.0
sla_compliance_rate: float = 1.0
@dataclass
class DataProductContract:
"""데이터 프로덕트 계약"""
contract_id: str
consumer_id: str
producer_id: str
sla_terms: Dict[str, Any]
usage_limits: Dict[str, Any]
data_access_patterns: List[str]
pricing_model: Optional[Dict[str, Any]] = None
created_at: float = field(default_factory=time.time)
expires_at: Optional[float] = None
@dataclass
class DataProductVersion:
"""데이터 프로덕트 버전"""
version_id: str
version_number: str
schema_version: str
breaking_changes: bool
changelog: List[Dict[str, Any]]
backward_compatible: bool
created_at: float
class DataProductLifecycleManager:
"""데이터 프로덕트 라이프사이클 관리"""
def __init__(self):
self.products = {}
self.product_versions = defaultdict(list)
self.contracts = {}
self.metrics_history = defaultdict(list)
self.lifecycle_policies = {}
self._setup_default_policies()
def _setup_default_policies(self):
"""기본 라이프사이클 정책 설정"""
self.lifecycle_policies = {
'version_retention': {
'max_versions': 10,
'min_versions': 3,
'retention_days': 365
},
'deprecation': {
'notice_period_days': 90,
'grace_period_days': 30,
'auto_deprecate_unused_days': 180
},
'sla_enforcement': {
'availability_threshold': 99.0,
'performance_threshold_ms': 1000,
'quality_threshold': 0.95
}
}
def create_data_product(self, product_definition: DataProduct) -> str:
"""데이터 프로덕트 생성"""
# 초기 검증
self._validate_product_definition(product_definition)
# 프로덕트 등록
self.products[product_definition.product_id] = product_definition
# 초기 버전 생성
initial_version = DataProductVersion(
version_id=f"{product_definition.product_id}-v1.0.0",
version_number="1.0.0",
schema_version="1.0.0",
breaking_changes=False,
changelog=[{
'type': 'creation',
'description': 'Initial product version',
'timestamp': time.time()
}],
backward_compatible=True,
created_at=time.time()
)
self.product_versions[product_definition.product_id].append(initial_version)
# 초기 인프라 설정
self._setup_product_infrastructure(product_definition)
return product_definition.product_id
def _validate_product_definition(self, product: DataProduct):
"""프로덕트 정의 검증"""
required_fields = ['product_id', 'name', 'domain_id', 'data_steward']
for field in required_fields:
if not getattr(product, field, None):
raise ValueError(f"Required field '{field}' is missing")
# 스키마 검증
if not product.schema_definition:
raise ValueError("Schema definition is required")
# SLA 검증
required_sla_fields = ['availability', 'latency']
for field in required_sla_fields:
if field not in product.sla_requirements:
raise ValueError(f"SLA requirement '{field}' is missing")
def _setup_product_infrastructure(self, product: DataProduct):
"""프로덕트 인프라 설정"""
infrastructure_config = {
'api_endpoints': {
'rest_api': f"https://api.datamesh.com/products/{product.product_id}/data",
'graphql_api': f"https://api.datamesh.com/products/{product.product_id}/graphql",
'streaming_api': f"wss://stream.datamesh.com/products/{product.product_id}"
},
'monitoring': {
'metrics_endpoint': f"https://monitoring.datamesh.com/products/{product.product_id}",
'health_check': f"https://api.datamesh.com/products/{product.product_id}/health",
'dashboard_url': f"https://dashboard.datamesh.com/products/{product.product_id}"
},
'documentation': {
'api_docs': f"https://docs.datamesh.com/products/{product.product_id}/api",
'schema_docs': f"https://docs.datamesh.com/products/{product.product_id}/schema",
'usage_guide': f"https://docs.datamesh.com/products/{product.product_id}/guide"
},
'security': {
'access_policies': f"datamesh:product:{product.product_id}:*",
'encryption': 'AES-256',
'authentication': 'OAuth2'
}
}
# 인프라 설정 저장
product.infrastructure_config = infrastructure_config
def evolve_product_schema(self, product_id: str, new_schema: Dict[str, Any],
breaking_changes: bool = False) -> str:
"""스키마 진화"""
if product_id not in self.products:
raise ValueError(f"Product {product_id} not found")
product = self.products[product_id]
current_versions = self.product_versions[product_id]
latest_version = max(current_versions, key=lambda v: v.created_at)
# 새 버전 번호 생성
current_version_parts = latest_version.version_number.split('.')
major, minor, patch = map(int, current_version_parts)
if breaking_changes:
new_version = f"{major + 1}.0.0"
else:
# 스키마 변경 유형에 따라 minor 또는 patch 증가
schema_changes = self._analyze_schema_changes(product.schema_definition, new_schema)
if schema_changes['has_new_fields']:
new_version = f"{major}.{minor + 1}.0"
else:
new_version = f"{major}.{minor}.{patch + 1}"
# 새 버전 생성
new_version_obj = DataProductVersion(
version_id=f"{product_id}-v{new_version}",
version_number=new_version,
schema_version=new_version,
breaking_changes=breaking_changes,
changelog=self._generate_schema_changelog(product.schema_definition, new_schema),
backward_compatible=not breaking_changes,
created_at=time.time()
)
self.product_versions[product_id].append(new_version_obj)
# 프로덕트 스키마 업데이트
product.schema_definition = new_schema
# 버전 정리 (정책에 따라)
self._cleanup_old_versions(product_id)
# 컨슈머 알림
if breaking_changes:
self._notify_breaking_change(product_id, new_version_obj)
return new_version_obj.version_id
def _analyze_schema_changes(self, old_schema: Dict[str, Any],
new_schema: Dict[str, Any]) -> Dict[str, bool]:
"""스키마 변경 분석"""
changes = {
'has_new_fields': False,
'has_removed_fields': False,
'has_type_changes': False,
'has_constraint_changes': False
}
old_fields = set(old_schema.get('fields', {}).keys())
new_fields = set(new_schema.get('fields', {}).keys())
changes['has_new_fields'] = bool(new_fields - old_fields)
changes['has_removed_fields'] = bool(old_fields - new_fields)
# 공통 필드의 타입 변경 검사
common_fields = old_fields & new_fields
for field in common_fields:
old_field_type = old_schema['fields'][field].get('type')
new_field_type = new_schema['fields'][field].get('type')
if old_field_type != new_field_type:
changes['has_type_changes'] = True
break
return changes
def _generate_schema_changelog(self, old_schema: Dict[str, Any],
new_schema: Dict[str, Any]) -> List[Dict[str, Any]]:
"""스키마 변경 로그 생성"""
changelog = []
old_fields = set(old_schema.get('fields', {}).keys())
new_fields = set(new_schema.get('fields', {}).keys())
# 추가된 필드
for field in new_fields - old_fields:
changelog.append({
'type': 'field_added',
'description': f"Added new field: {field}",
'field_name': field,
'timestamp': time.time()
})
# 제거된 필드
for field in old_fields - new_fields:
changelog.append({
'type': 'field_removed',
'description': f"Removed field: {field}",
'field_name': field,
'timestamp': time.time()
})
# 타입 변경
common_fields = old_fields & new_fields
for field in common_fields:
old_type = old_schema['fields'][field].get('type')
new_type = new_schema['fields'][field].get('type')
if old_type != new_type:
changelog.append({
'type': 'type_changed',
'description': f"Field {field} type changed from {old_type} to {new_type}",
'field_name': field,
'old_type': old_type,
'new_type': new_type,
'timestamp': time.time()
})
return changelog
def _cleanup_old_versions(self, product_id: str):
"""오래된 버전 정리"""
versions = self.product_versions[product_id]
policy = self.lifecycle_policies['version_retention']
if len(versions) > policy['max_versions']:
# 오래된 버전부터 정리 (최소 버전은 유지)
sorted_versions = sorted(versions, key=lambda v: v.created_at)
versions_to_remove = sorted_versions[:-policy['min_versions']]
for version in versions_to_remove:
# 사용 중인 버전은 제거하지 않음
if not self._is_version_in_use(product_id, version.version_id):
versions.remove(version)
def _is_version_in_use(self, product_id: str, version_id: str) -> bool:
"""버전 사용 여부 확인"""
# 활성 계약에서 특정 버전을 사용하는지 확인
for contract in self.contracts.values():
if (contract.producer_id == product_id and
contract.sla_terms.get('version') == version_id):
return True
return False
def _notify_breaking_change(self, product_id: str, new_version: DataProductVersion):
"""브레이킹 체인지 알림"""
affected_contracts = [
contract for contract in self.contracts.values()
if contract.producer_id == product_id
]
for contract in affected_contracts:
notification = {
'type': 'breaking_change_notice',
'product_id': product_id,
'new_version': new_version.version_number,
'consumer_id': contract.consumer_id,
'notice_date': time.time(),
'deprecation_date': time.time() + (90 * 24 * 3600), # 90일 후
'migration_guide': f"https://docs.datamesh.com/products/{product_id}/migration/{new_version.version_number}"
}
# 실제로는 이메일, Slack 등으로 알림 발송
print(f"Breaking change notification sent to {contract.consumer_id}")
def create_data_contract(self, producer_id: str, consumer_id: str,
contract_terms: Dict[str, Any]) -> str:
"""데이터 계약 생성"""
contract_id = f"contract-{uuid.uuid4().hex[:8]}"
contract = DataProductContract(
contract_id=contract_id,
consumer_id=consumer_id,
producer_id=producer_id,
sla_terms=contract_terms.get('sla_terms', {}),
usage_limits=contract_terms.get('usage_limits', {}),
data_access_patterns=contract_terms.get('access_patterns', ['api']),
pricing_model=contract_terms.get('pricing_model'),
expires_at=contract_terms.get('expires_at')
)
self.contracts[contract_id] = contract
# 계약에 따른 접근 권한 설정
self._setup_contract_access(contract)
return contract_id
def _setup_contract_access(self, contract: DataProductContract):
"""계약 기반 접근 권한 설정"""
access_config = {
'consumer_id': contract.consumer_id,
'producer_id': contract.producer_id,
'allowed_endpoints': contract.data_access_patterns,
'rate_limits': contract.usage_limits,
'sla_monitoring': True,
'audit_logging': True
}
# 실제로는 API 게이트웨이 설정 업데이트
print(f"Access configured for contract {contract.contract_id}")
def monitor_product_health(self, product_id: str) -> Dict[str, Any]:
"""프로덕트 헬스 모니터링"""
if product_id not in self.products:
raise ValueError(f"Product {product_id} not found")
# 메트릭 수집 (실제로는 모니터링 시스템에서 가져옴)
current_metrics = DataProductMetrics(
usage_count_daily=1250,
unique_consumers=45,
average_query_time_ms=85.5,
data_freshness_score=0.95,
quality_score=0.98,
availability_percentage=99.8,
sla_compliance_rate=0.99
)
self.metrics_history[product_id].append({
'timestamp': time.time(),
'metrics': current_metrics
})
# 헬스 점수 계산
health_score = self._calculate_health_score(current_metrics)
# SLA 위반 체크
sla_violations = self._check_sla_violations(product_id, current_metrics)
return {
'product_id': product_id,
'health_score': health_score,
'current_metrics': current_metrics.__dict__,
'sla_violations': sla_violations,
'recommendations': self._generate_health_recommendations(current_metrics),
'trend_analysis': self._analyze_metric_trends(product_id)
}
def _calculate_health_score(self, metrics: DataProductMetrics) -> float:
"""헬스 점수 계산"""
weights = {
'availability': 0.3,
'performance': 0.2,
'quality': 0.2,
'freshness': 0.15,
'sla_compliance': 0.15
}
scores = {
'availability': min(metrics.availability_percentage / 100.0, 1.0),
'performance': max(0, 1.0 - (metrics.average_query_time_ms / 1000.0)), # 1초 기준
'quality': metrics.quality_score,
'freshness': metrics.data_freshness_score,
'sla_compliance': metrics.sla_compliance_rate
}
weighted_score = sum(scores[key] * weights[key] for key in scores)
return round(weighted_score * 100, 2) # 0-100 점수
def _check_sla_violations(self, product_id: str,
metrics: DataProductMetrics) -> List[Dict[str, Any]]:
"""SLA 위반 검사"""
violations = []
policy = self.lifecycle_policies['sla_enforcement']
if metrics.availability_percentage < policy['availability_threshold']:
violations.append({
'type': 'availability_violation',
'description': f"Availability ({metrics.availability_percentage:.1f}%) below threshold ({policy['availability_threshold']}%)",
'severity': 'high',
'current_value': metrics.availability_percentage,
'threshold': policy['availability_threshold']
})
if metrics.average_query_time_ms > policy['performance_threshold_ms']:
violations.append({
'type': 'performance_violation',
'description': f"Average query time ({metrics.average_query_time_ms:.1f}ms) exceeds threshold ({policy['performance_threshold_ms']}ms)",
'severity': 'medium',
'current_value': metrics.average_query_time_ms,
'threshold': policy['performance_threshold_ms']
})
if metrics.quality_score < policy['quality_threshold']:
violations.append({
'type': 'quality_violation',
'description': f"Quality score ({metrics.quality_score:.2f}) below threshold ({policy['quality_threshold']})",
'severity': 'high',
'current_value': metrics.quality_score,
'threshold': policy['quality_threshold']
})
return violations
def _generate_health_recommendations(self, metrics: DataProductMetrics) -> List[str]:
"""헬스 개선 권장사항"""
recommendations = []
if metrics.availability_percentage < 99.0:
recommendations.append("Consider implementing redundancy and failover mechanisms")
if metrics.average_query_time_ms > 500:
recommendations.append("Optimize query performance or consider caching")
if metrics.quality_score < 0.95:
recommendations.append("Review and improve data quality processes")
if metrics.data_freshness_score < 0.9:
recommendations.append("Optimize data ingestion pipeline for better freshness")
if metrics.unique_consumers < 5:
recommendations.append("Consider improving discoverability and documentation")
return recommendations
def _analyze_metric_trends(self, product_id: str) -> Dict[str, Any]:
"""메트릭 트렌드 분석"""
history = self.metrics_history[product_id]
if len(history) < 2:
return {"insufficient_data": True}
# 최근 7일 데이터 분석
recent_history = history[-7:] if len(history) >= 7 else history
trends = {
'usage_trend': 'stable',
'performance_trend': 'stable',
'quality_trend': 'stable'
}
# 간단한 트렌드 분석 (실제로는 더 정교한 분석 필요)
if len(recent_history) >= 2:
first_usage = recent_history[0]['metrics'].usage_count_daily
last_usage = recent_history[-1]['metrics'].usage_count_daily
if last_usage > first_usage * 1.1:
trends['usage_trend'] = 'increasing'
elif last_usage < first_usage * 0.9:
trends['usage_trend'] = 'decreasing'
return trends
def get_product_portfolio_dashboard(self, domain_id: Optional[str] = None) -> Dict[str, Any]:
"""프로덕트 포트폴리오 대시보드"""
# 필터링된 프로덕트 목록
filtered_products = []
for product in self.products.values():
if domain_id is None or product.domain_id == domain_id:
filtered_products.append(product)
# 포트폴리오 통계
portfolio_stats = {
'total_products': len(filtered_products),
'products_by_stage': {stage.value: 0 for stage in ProductStage},
'products_by_type': {ptype.value: 0 for ptype in ProductType},
'total_consumers': 0,
'average_health_score': 0.0
}
health_scores = []
for product in filtered_products:
# 스테이지별 집계
if hasattr(product, 'stage'):
portfolio_stats['products_by_stage'][product.stage.value] += 1
# 타입별 집계
if hasattr(product, 'product_type'):
portfolio_stats['products_by_type'][product.product_type.value] += 1
# 헬스 점수 수집
try:
health_data = self.monitor_product_health(product.product_id)
health_scores.append(health_data['health_score'])
except:
pass
if health_scores:
portfolio_stats['average_health_score'] = sum(health_scores) / len(health_scores)
# 총 컨슈머 수 계산
unique_consumers = set()
for contract in self.contracts.values():
if any(p.product_id == contract.producer_id for p in filtered_products):
unique_consumers.add(contract.consumer_id)
portfolio_stats['total_consumers'] = len(unique_consumers)
return {
'domain_id': domain_id,
'generated_at': time.time(),
'portfolio_stats': portfolio_stats,
'top_products_by_usage': self._get_top_products_by_usage(filtered_products),
'health_alerts': self._get_health_alerts(filtered_products),
'contract_summary': self._get_contract_summary(filtered_products)
}
def _get_top_products_by_usage(self, products: List[DataProduct]) -> List[Dict[str, Any]]:
"""사용량 기준 상위 프로덕트"""
product_usage = []
for product in products:
if product.product_id in self.metrics_history:
latest_metrics = self.metrics_history[product.product_id][-1]['metrics']
product_usage.append({
'product_id': product.product_id,
'name': product.name,
'daily_usage': latest_metrics.usage_count_daily,
'unique_consumers': latest_metrics.unique_consumers
})
return sorted(product_usage, key=lambda x: x['daily_usage'], reverse=True)[:5]
def _get_health_alerts(self, products: List[DataProduct]) -> List[Dict[str, Any]]:
"""헬스 알림 목록"""
alerts = []
for product in products:
try:
health_data = self.monitor_product_health(product.product_id)
if health_data['sla_violations']:
alerts.append({
'product_id': product.product_id,
'name': product.name,
'health_score': health_data['health_score'],
'violations': health_data['sla_violations']
})
except:
continue
return sorted(alerts, key=lambda x: len(x['violations']), reverse=True)
def _get_contract_summary(self, products: List[DataProduct]) -> Dict[str, Any]:
"""계약 요약"""
product_ids = [p.product_id for p in products]
related_contracts = [
contract for contract in self.contracts.values()
if contract.producer_id in product_ids
]
return {
'total_contracts': len(related_contracts),
'active_contracts': len([c for c in related_contracts
if c.expires_at is None or c.expires_at > time.time()]),
'expiring_soon': len([c for c in related_contracts
if c.expires_at and c.expires_at - time.time() < 30 * 24 * 3600])
}
# 사용 예제
def demonstrate_product_management():
"""데이터 프로덕트 관리 데모"""
manager = DataProductLifecycleManager()
# 데이터 프로덕트 생성
customer_profile_product = DataProduct(
product_id="customer-profile-v2",
name="Enhanced Customer Profile",
description="Comprehensive customer profile with ML-driven insights",
domain_id="customer",
data_steward="alice@company.com",
sla_requirements={
"availability": "99.9%",
"latency": "< 100ms",
"freshness": "< 1 hour"
},
quality_metrics={
"completeness": "> 95%",
"accuracy": "> 99%"
},
schema_definition={
"version": "1.0.0",
"fields": {
"customer_id": {"type": "string", "required": True},
"profile_data": {"type": "object", "required": True},
"preferences": {"type": "object", "required": False},
"ml_insights": {"type": "object", "required": False}
}
},
access_patterns=["rest-api", "graphql", "streaming"],
discovery_metadata={
"tags": ["customer", "profile", "ml-insights"],
"business_context": "360-degree customer view"
}
)
product_id = manager.create_data_product(customer_profile_product)
print(f"Created data product: {product_id}")
# 스키마 진화
new_schema = {
"version": "1.1.0",
"fields": {
"customer_id": {"type": "string", "required": True},
"profile_data": {"type": "object", "required": True},
"preferences": {"type": "object", "required": False},
"ml_insights": {"type": "object", "required": False},
"risk_score": {"type": "number", "required": False} # 새 필드 추가
}
}
new_version_id = manager.evolve_product_schema(product_id, new_schema, breaking_changes=False)
print(f"Schema evolved: {new_version_id}")
# 데이터 계약 생성
contract_id = manager.create_data_contract(
producer_id=product_id,
consumer_id="marketing-analytics-team",
contract_terms={
'sla_terms': {
'availability': 99.5,
'max_response_time_ms': 200,
'version': new_version_id
},
'usage_limits': {
'requests_per_hour': 10000,
'concurrent_connections': 50
},
'access_patterns': ['rest-api', 'graphql'],
'expires_at': time.time() + (365 * 24 * 3600) # 1년 후 만료
}
)
print(f"Created data contract: {contract_id}")
# 프로덕트 헬스 모니터링
health_data = manager.monitor_product_health(product_id)
print(f"\nProduct Health Score: {health_data['health_score']}")
print(f"SLA Violations: {len(health_data['sla_violations'])}")
# 포트폴리오 대시보드
dashboard = manager.get_product_portfolio_dashboard(domain_id="customer")
print(f"\nPortfolio Dashboard:")
print(json.dumps(dashboard, indent=2, default=str))
# 실행
# demonstrate_product_management()
마무리
데이터 메시는 단순한 기술적 변화가 아닌 조직의 데이터 문화를 근본적으로 바꾸는 패러다임 시프트입니다. 2026년 현재, 많은 기업들이 중앙 집중식 데이터 아키텍처의 한계를 인식하고 데이터 메시로 전환하고 있습니다.
성공적인 데이터 메시 구축을 위한 핵심 요소:
- 도메인 중심 사고: 비즈니스 도메인을 중심으로 한 데이터 소유권과 책임 분산
- 프로덕트 마인드셋: 데이터를 프로덕트로 관리하는 사고 전환
- 플랫폼 투자: 도메인 팀이 자율적으로 활용할 수 있는 셀프 서비스 플랫폼
- 연합형 거버넌스: 중앙 통제와 도메인 자율성의 균형
데이터 메시는 복잡하고 장기적인 여정이지만, 올바른 접근 방식과 점진적인 구현을 통해 조직의 데이터 역량을 혁신적으로 향상시킬 수 있습니다. 특히 대규모 조직에서는 데이터의 확장성, 품질, 그리고 비즈니스 가치 창출 측면에서 큰 이점을 제공합니다.
앞으로는 AI와 머신러닝의 발전과 함께 더욱 지능화된 데이터 메시 플랫폼이 등장할 것이며, 이는 데이터 기반 의사결정을 더욱 빠르고 정확하게 만들어 줄 것입니다.