실시간 데이터 스트리밍 아키텍처 구축 가이드 - 2026년 대용량 데이터 처리 전략

데이터 스트리밍Apache KafkaApache FlinkPulsar실시간 처리빅데이터이벤트 드리븐마이크로서비스

실시간 데이터 스트리밍 아키텍처 구축 가이드 - 2026년 대용량 데이터 처리 전략

1. 실시간 데이터 스트리밍 개요

1.1 스트리밍 데이터의 중요성

현대 비즈니스에서 실시간 데이터 처리는 경쟁 우위의 핵심이 되었습니다. 고객 행동 분석, 사기 탐지, 실시간 추천 시스템 등 다양한 영역에서 밀리초 단위의 응답이 요구됩니다.

# streaming_architecture_overview.py
from dataclasses import dataclass
from typing import List, Dict, Any
from enum import Enum
import time
import json

class StreamingPattern(Enum):
    EVENT_SOURCING = "event_sourcing"
    CQRS = "cqrs"
    SAGA = "saga"
    EVENT_STREAMING = "event_streaming"
    CHANGE_DATA_CAPTURE = "cdc"

class DataVelocity(Enum):
    LOW = "< 1K events/sec"
    MEDIUM = "1K - 100K events/sec"
    HIGH = "100K - 1M events/sec"
    EXTREME = "> 1M events/sec"

@dataclass
class StreamingRequirements:
    throughput: DataVelocity
    latency_requirement: str  # "< 1ms", "< 10ms", "< 100ms", "< 1s"
    data_durability: bool
    exactly_once_delivery: bool
    multi_region: bool
    compliance_requirements: List[str]

class StreamingArchitectureDesigner:
    def __init__(self):
        self.architecture_patterns = {
            DataVelocity.LOW: {
                "recommended_stack": ["Kafka", "Kafka Streams", "PostgreSQL"],
                "deployment": "Single region, 3-node cluster",
                "cost_estimate": "$500-2000/month"
            },
            DataVelocity.MEDIUM: {
                "recommended_stack": ["Kafka", "Flink", "ClickHouse", "Redis"],
                "deployment": "Multi-AZ, 6-9 node cluster",
                "cost_estimate": "$2000-10000/month"
            },
            DataVelocity.HIGH: {
                "recommended_stack": ["Kafka/Pulsar", "Flink", "ScyllaDB", "Redis Cluster"],
                "deployment": "Multi-region, 15+ node cluster",
                "cost_estimate": "$10000-50000/month"
            },
            DataVelocity.EXTREME: {
                "recommended_stack": ["Pulsar", "Flink", "FoundationDB", "Hazelcast"],
                "deployment": "Global deployment, 50+ node cluster",
                "cost_estimate": "$50000+/month"
            }
        }

    def design_architecture(self, requirements: StreamingRequirements) -> Dict[str, Any]:
        """요구사항 기반 아키텍처 설계"""
        base_architecture = self.architecture_patterns[requirements.throughput]

        # 지연시간 요구사항에 따른 조정
        if "1ms" in requirements.latency_requirement:
            base_architecture["additional_components"] = ["In-memory processing", "Edge computing"]
            base_architecture["network_optimization"] = "Dedicated network, SR-IOV"

        # 정확히 한 번 전달 요구사항
        if requirements.exactly_once_delivery:
            base_architecture["transaction_support"] = True
            base_architecture["additional_complexity"] = "High"

        # 멀티 리전 요구사항
        if requirements.multi_region:
            base_architecture["replication"] = "Cross-region async replication"
            base_architecture["conflict_resolution"] = "Last-writer-wins with timestamps"

        return {
            "architecture": base_architecture,
            "estimated_complexity": self._calculate_complexity(requirements),
            "implementation_timeline": self._estimate_timeline(requirements),
            "operational_considerations": self._get_operational_notes(requirements)
        }

    def _calculate_complexity(self, requirements: StreamingRequirements) -> str:
        complexity_score = 0

        if requirements.throughput in [DataVelocity.HIGH, DataVelocity.EXTREME]:
            complexity_score += 3
        elif requirements.throughput == DataVelocity.MEDIUM:
            complexity_score += 2
        else:
            complexity_score += 1

        if requirements.exactly_once_delivery:
            complexity_score += 2
        if requirements.multi_region:
            complexity_score += 2
        if requirements.compliance_requirements:
            complexity_score += 1

        if complexity_score <= 3:
            return "Low"
        elif complexity_score <= 6:
            return "Medium"
        elif complexity_score <= 9:
            return "High"
        else:
            return "Very High"

    def _estimate_timeline(self, requirements: StreamingRequirements) -> str:
        base_weeks = {
            DataVelocity.LOW: 4,
            DataVelocity.MEDIUM: 8,
            DataVelocity.HIGH: 16,
            DataVelocity.EXTREME: 24
        }

        weeks = base_weeks[requirements.throughput]

        if requirements.exactly_once_delivery:
            weeks += 4
        if requirements.multi_region:
            weeks += 6
        if requirements.compliance_requirements:
            weeks += 2

        return f"{weeks} weeks"

    def _get_operational_notes(self, requirements: StreamingRequirements) -> List[str]:
        notes = ["Monitor lag and throughput continuously"]

        if requirements.throughput in [DataVelocity.HIGH, DataVelocity.EXTREME]:
            notes.extend([
                "Implement automated scaling policies",
                "Set up advanced monitoring with custom metrics",
                "Plan for disaster recovery scenarios"
            ])

        if requirements.exactly_once_delivery:
            notes.append("Monitor transaction coordinator performance")

        if requirements.multi_region:
            notes.extend([
                "Monitor cross-region replication lag",
                "Plan for network partition scenarios"
            ])

        return notes

# 사용 예제
requirements = StreamingRequirements(
    throughput=DataVelocity.HIGH,
    latency_requirement="< 10ms",
    data_durability=True,
    exactly_once_delivery=True,
    multi_region=True,
    compliance_requirements=["GDPR", "SOX"]
)

designer = StreamingArchitectureDesigner()
architecture_design = designer.design_architecture(requirements)

print("Streaming Architecture Design:")
print(json.dumps(architecture_design, indent=2))

1.2 스트리밍 vs 배치 처리 비교

구분스트리밍 처리배치 처리
지연시간밀리초 ~ 초분 ~ 시간
데이터량연속적, 무한고정적, 유한
복잡성높음낮음
비용높음낮음
사용 사례실시간 분석, 알림리포팅, ETL

2. Apache Kafka 심화 활용

2.1 고성능 Kafka 클러스터 구성

# kafka-cluster-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-cluster-config
data:
  server.properties: |
    # 브로커 설정
    broker.id=1
    listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
    advertised.listeners=PLAINTEXT://kafka-broker-1:9092,SSL://kafka-broker-1:9093

    # 성능 튜닝
    num.network.threads=8
    num.io.threads=16
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600

    # 로그 설정
    log.dirs=/kafka-logs
    num.partitions=12
    default.replication.factor=3
    min.insync.replicas=2

    # 압축 설정
    compression.type=snappy
    log.compression.type=snappy

    # 보존 정책
    log.retention.hours=168
    log.retention.bytes=1073741824
    log.segment.bytes=1073741824

    # 배치 처리 최적화
    batch.size=65536
    linger.ms=5
    buffer.memory=67108864

    # 프로듀서 idempotence
    enable.idempotence=true
    max.in.flight.requests.per.connection=5

    # 트랜잭션 설정
    transaction.state.log.replication.factor=3
    transaction.state.log.min.isr=2

---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka-cluster
spec:
  serviceName: kafka-headless
  replicas: 6  # 고가용성을 위한 6개 브로커
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      containers:
      - name: kafka
        image: confluentinc/cp-kafka:latest
        ports:
        - containerPort: 9092
        - containerPort: 9093
        env:
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: "zookeeper:2181"
        - name: KAFKA_ADVERTISED_LISTENERS
          value: "PLAINTEXT://$(POD_NAME).kafka-headless:9092"
        - name: KAFKA_JVM_PERFORMANCE_OPTS
          value: "-Xms6g -Xmx6g -XX:+UseG1GC -XX:G1HeapRegionSize=16m"
        volumeMounts:
        - name: kafka-storage
          mountPath: /kafka-logs
        resources:
          requests:
            memory: "8Gi"
            cpu: "2"
          limits:
            memory: "12Gi"
            cpu: "4"
  volumeClaimTemplates:
  - metadata:
      name: kafka-storage
    spec:
      accessModes: ["ReadWriteOnce"]
      storageClassName: "high-iops-ssd"
      resources:
        requests:
          storage: 1Ti

2.2 고급 Kafka 프로듀서/컨슈머 구현

# advanced_kafka_client.py
import asyncio
import json
import time
from typing import Dict, List, Optional, Callable, Any
from dataclasses import dataclass, asdict
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import logging
from concurrent.futures import ThreadPoolExecutor
import threading
from collections import defaultdict
import uuid

@dataclass
class StreamingEvent:
    event_id: str
    event_type: str
    timestamp: float
    payload: Dict[str, Any]
    metadata: Dict[str, Any] = None

    def to_bytes(self) -> bytes:
        return json.dumps(asdict(self)).encode('utf-8')

    @classmethod
    def from_bytes(cls, data: bytes) -> 'StreamingEvent':
        return cls(**json.loads(data.decode('utf-8')))

class HighThroughputProducer:
    def __init__(self, bootstrap_servers: List[str], **config):
        self.config = {
            'bootstrap_servers': bootstrap_servers,
            'batch_size': 65536,  # 64KB 배치
            'linger_ms': 5,       # 5ms 대기
            'compression_type': 'snappy',
            'acks': 'all',        # 모든 복제본 확인
            'retries': 2147483647,  # 최대 재시도
            'max_in_flight_requests_per_connection': 5,
            'enable_idempotence': True,
            'value_serializer': lambda x: x.to_bytes() if isinstance(x, StreamingEvent) else x,
            'key_serializer': lambda x: x.encode('utf-8') if x else None,
            **config
        }

        self.producer = KafkaProducer(**self.config)
        self.metrics = {
            'messages_sent': 0,
            'messages_failed': 0,
            'total_latency': 0.0,
            'batch_count': 0
        }
        self.callbacks = defaultdict(list)

    def send_event(self, topic: str, event: StreamingEvent,
                   partition: Optional[int] = None,
                   callback: Optional[Callable] = None) -> None:
        """이벤트 비동기 전송"""
        start_time = time.time()

        future = self.producer.send(
            topic,
            value=event,
            key=event.event_id,
            partition=partition
        )

        def on_success(record_metadata):
            latency = time.time() - start_time
            self.metrics['messages_sent'] += 1
            self.metrics['total_latency'] += latency

            if callback:
                callback(record_metadata, None)

        def on_error(exception):
            self.metrics['messages_failed'] += 1
            if callback:
                callback(None, exception)

        future.add_callback(on_success)
        future.add_errback(on_error)

    def send_batch(self, topic: str, events: List[StreamingEvent],
                   partition_key_func: Optional[Callable] = None) -> List:
        """배치 이벤트 전송"""
        futures = []

        for event in events:
            partition = None
            if partition_key_func:
                partition = partition_key_func(event)

            future = self.producer.send(
                topic,
                value=event,
                key=event.event_id,
                partition=partition
            )
            futures.append(future)

        # 배치 전송 강제 실행
        self.producer.flush()
        self.metrics['batch_count'] += 1

        return futures

    def get_metrics(self) -> Dict[str, Any]:
        """성능 메트릭 조회"""
        total_messages = self.metrics['messages_sent'] + self.metrics['messages_failed']
        avg_latency = (
            self.metrics['total_latency'] / max(self.metrics['messages_sent'], 1)
        )

        return {
            'total_messages': total_messages,
            'success_rate': self.metrics['messages_sent'] / max(total_messages, 1),
            'average_latency_ms': avg_latency * 1000,
            'batches_sent': self.metrics['batch_count'],
            'throughput_msg_per_sec': total_messages / max(time.time() - self.start_time, 1) if hasattr(self, 'start_time') else 0
        }

    def close(self):
        """프로듀서 정리"""
        self.producer.flush()
        self.producer.close()

class StreamingEventProcessor:
    def __init__(self, bootstrap_servers: List[str], group_id: str, **config):
        self.config = {
            'bootstrap_servers': bootstrap_servers,
            'group_id': group_id,
            'auto_offset_reset': 'latest',
            'enable_auto_commit': False,  # 수동 커밋 사용
            'max_poll_records': 1000,     # 한 번에 처리할 레코드 수
            'session_timeout_ms': 30000,
            'heartbeat_interval_ms': 3000,
            'fetch_min_bytes': 50000,     # 최소 50KB
            'fetch_max_wait_ms': 500,     # 최대 500ms 대기
            'value_deserializer': lambda x: StreamingEvent.from_bytes(x),
            'key_deserializer': lambda x: x.decode('utf-8') if x else None,
            **config
        }

        self.consumer = KafkaConsumer(**self.config)
        self.event_handlers = {}
        self.middleware = []
        self.metrics = {
            'messages_processed': 0,
            'messages_failed': 0,
            'processing_time': 0.0
        }
        self.running = False

    def register_handler(self, event_type: str, handler: Callable[[StreamingEvent], Any]):
        """이벤트 타입별 핸들러 등록"""
        self.event_handlers[event_type] = handler

    def add_middleware(self, middleware: Callable[[StreamingEvent], StreamingEvent]):
        """미들웨어 추가 (필터링, 변환 등)"""
        self.middleware.append(middleware)

    def subscribe(self, topics: List[str]):
        """토픽 구독"""
        self.consumer.subscribe(topics)

    def process_events(self):
        """이벤트 처리 메인 루프"""
        self.running = True

        try:
            while self.running:
                # 배치로 메시지 폴링
                message_batch = self.consumer.poll(timeout_ms=1000, max_records=1000)

                if not message_batch:
                    continue

                # 배치 처리
                self._process_batch(message_batch)

                # 수동 커밋
                self.consumer.commit()

        except KeyboardInterrupt:
            logging.info("Shutting down consumer...")
        finally:
            self.consumer.close()

    def _process_batch(self, message_batch):
        """메시지 배치 처리"""
        processed_count = 0
        failed_count = 0

        with ThreadPoolExecutor(max_workers=10) as executor:
            futures = []

            for topic_partition, messages in message_batch.items():
                for message in messages:
                    future = executor.submit(self._process_single_event, message.value)
                    futures.append(future)

            # 모든 처리 완료 대기
            for future in futures:
                try:
                    future.result(timeout=30)  # 30초 타임아웃
                    processed_count += 1
                except Exception as e:
                    logging.error(f"Event processing failed: {e}")
                    failed_count += 1

        self.metrics['messages_processed'] += processed_count
        self.metrics['messages_failed'] += failed_count

    def _process_single_event(self, event: StreamingEvent):
        """단일 이벤트 처리"""
        start_time = time.time()

        try:
            # 미들웨어 적용
            processed_event = event
            for middleware in self.middleware:
                processed_event = middleware(processed_event)
                if processed_event is None:
                    return  # 필터링됨

            # 이벤트 타입별 핸들러 실행
            if processed_event.event_type in self.event_handlers:
                handler = self.event_handlers[processed_event.event_type]
                handler(processed_event)
            else:
                logging.warning(f"No handler for event type: {processed_event.event_type}")

        finally:
            self.metrics['processing_time'] += time.time() - start_time

    def stop(self):
        """이벤트 처리 중단"""
        self.running = False

# 실시간 스트림 애널리틱스
class StreamAnalytics:
    def __init__(self, window_size_seconds: int = 60):
        self.window_size = window_size_seconds
        self.windows = defaultdict(lambda: defaultdict(list))
        self.aggregates = defaultdict(dict)

    def add_event(self, event: StreamingEvent):
        """이벤트 추가 및 윈도우 관리"""
        current_window = int(event.timestamp // self.window_size)

        # 이벤트를 적절한 윈도우에 추가
        self.windows[event.event_type][current_window].append(event)

        # 오래된 윈도우 정리 (메모리 관리)
        self._cleanup_old_windows(current_window)

        # 실시간 집계 업데이트
        self._update_aggregates(event.event_type, current_window)

    def _cleanup_old_windows(self, current_window: int):
        """오래된 윈도우 데이터 정리"""
        cutoff_window = current_window - 10  # 10개 윈도우만 유지

        for event_type in list(self.windows.keys()):
            windows_to_delete = [
                w for w in self.windows[event_type].keys()
                if w < cutoff_window
            ]
            for w in windows_to_delete:
                del self.windows[event_type][w]

    def _update_aggregates(self, event_type: str, window: int):
        """집계 메트릭 업데이트"""
        events = self.windows[event_type][window]

        self.aggregates[event_type][window] = {
            'count': len(events),
            'rate_per_second': len(events) / self.window_size,
            'avg_payload_size': sum(len(str(e.payload)) for e in events) / len(events),
            'unique_sources': len(set(e.metadata.get('source', 'unknown') for e in events))
        }

    def get_real_time_metrics(self, event_type: str) -> Dict[str, Any]:
        """실시간 메트릭 조회"""
        current_window = int(time.time() // self.window_size)
        recent_windows = range(current_window - 5, current_window + 1)

        total_count = 0
        total_rate = 0

        for window in recent_windows:
            if window in self.aggregates[event_type]:
                metrics = self.aggregates[event_type][window]
                total_count += metrics['count']
                total_rate += metrics['rate_per_second']

        return {
            'event_type': event_type,
            'total_events_5min': total_count,
            'average_rate': total_rate / len(recent_windows),
            'current_window_events': self.aggregates[event_type].get(current_window, {}).get('count', 0)
        }

# 사용 예제
async def streaming_pipeline_example():
    # 고성능 프로듀서 설정
    producer = HighThroughputProducer(
        bootstrap_servers=['localhost:9092'],
        client_id='high-throughput-producer'
    )

    # 이벤트 프로세서 설정
    processor = StreamingEventProcessor(
        bootstrap_servers=['localhost:9092'],
        group_id='analytics-consumer-group'
    )

    # 실시간 분석 엔진
    analytics = StreamAnalytics(window_size_seconds=60)

    # 이벤트 핸들러 등록
    def handle_user_action(event: StreamingEvent):
        analytics.add_event(event)
        print(f"Processed user action: {event.event_type}")

    def handle_system_event(event: StreamingEvent):
        analytics.add_event(event)
        print(f"Processed system event: {event.event_type}")

    processor.register_handler('user_action', handle_user_action)
    processor.register_handler('system_event', handle_system_event)

    # 필터링 미들웨어
    def spam_filter(event: StreamingEvent) -> Optional[StreamingEvent]:
        if 'spam' in event.payload.get('content', '').lower():
            return None  # 스팸 필터링
        return event

    processor.add_middleware(spam_filter)

    # 토픽 구독
    processor.subscribe(['user-events', 'system-events'])

    # 예제 이벤트 생성 및 전송
    for i in range(1000):
        event = StreamingEvent(
            event_id=str(uuid.uuid4()),
            event_type='user_action',
            timestamp=time.time(),
            payload={
                'user_id': f'user_{i % 100}',
                'action': 'page_view',
                'page': f'/page_{i % 10}'
            },
            metadata={'source': 'web_app'}
        )

        producer.send_event('user-events', event)

        if i % 100 == 0:
            await asyncio.sleep(0.1)  # 부하 조절

    # 메트릭 출력
    print("Producer metrics:", producer.get_metrics())
    print("Analytics metrics:", analytics.get_real_time_metrics('user_action'))

    # 정리
    producer.close()
    processor.stop()

# 실행
# asyncio.run(streaming_pipeline_example())

3. Apache Flink로 실시간 스트림 처리

3.1 Flink 클러스터 구성 및 최적화

# flink-cluster.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
data:
  flink-conf.yaml: |
    # 클러스터 설정
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 8
    parallelism.default: 16

    # 메모리 설정
    jobmanager.memory.process.size: 2g
    taskmanager.memory.process.size: 8g
    taskmanager.memory.flink.size: 6g

    # 체크포인트 설정
    state.backend: rocksdb
    state.checkpoints.dir: s3://flink-checkpoints/
    state.savepoints.dir: s3://flink-savepoints/
    execution.checkpointing.interval: 60000  # 1분
    execution.checkpointing.timeout: 600000  # 10분

    # RocksDB 최적화
    state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM
    state.backend.rocksdb.compaction.level.use-dynamic-size: true
    state.backend.rocksdb.compaction.level.target-file-size-base: 256mb

    # 네트워크 버퍼
    taskmanager.memory.network.fraction: 0.15
    taskmanager.network.memory.buffers-per-channel: 2
    taskmanager.network.memory.floating-buffers-per-gate: 8

    # 메트릭 리포터
    metrics.reporters: influx
    metrics.reporter.influx.class: org.apache.flink.metrics.influxdb.InfluxdbReporter
    metrics.reporter.influx.host: influxdb
    metrics.reporter.influx.port: 8086
    metrics.reporter.influx.db: flink-metrics

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 2  # 고가용성을 위한 이중화
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: flink:1.18-scala_2.12-java11
        command: ["/docker-entrypoint.sh", "jobmanager"]
        ports:
        - containerPort: 6123
        - containerPort: 8081
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: flink-jobmanager
        - name: FLINK_PROPERTIES
          value: |
            jobmanager.rpc.address: flink-jobmanager
            blob.server.port: 6124
            query.server.port: 6125
        volumeMounts:
        - name: flink-config
          mountPath: /opt/flink/conf
        resources:
          requests:
            memory: "2Gi"
            cpu: "1"
          limits:
            memory: "4Gi"
            cpu: "2"
      volumes:
      - name: flink-config
        configMap:
          name: flink-config

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 6  # 6개 TaskManager
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:1.18-scala_2.12-java11
        command: ["/docker-entrypoint.sh", "taskmanager"]
        ports:
        - containerPort: 6122
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: flink-jobmanager
        - name: TASK_MANAGER_NUMBER_OF_TASK_SLOTS
          value: "8"
        volumeMounts:
        - name: flink-config
          mountPath: /opt/flink/conf
        resources:
          requests:
            memory: "8Gi"
            cpu: "4"
          limits:
            memory: "12Gi"
            cpu: "6"
      volumes:
      - name: flink-config
        configMap:
          name: flink-config

3.2 복잡한 이벤트 처리 (Complex Event Processing)

// ComplexEventProcessor.java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;

// 이벤트 데이터 모델
public class UserEvent {
    public String userId;
    public String eventType;
    public long timestamp;
    public Map<String, Object> properties;

    // 생성자, getter, setter 생략
}

public class FraudAlert {
    public String userId;
    public String alertType;
    public long timestamp;
    public String description;
    public double riskScore;

    // 생성자, getter, setter 생략
}

public class ComplexEventProcessor {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 체크포인트 활성화
        env.enableCheckpointing(60000); // 1분마다

        // 워터마크 전략 설정 (늦은 이벤트 최대 5초 허용)
        WatermarkStrategy<UserEvent> watermarkStrategy = WatermarkStrategy
                .<UserEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((event, timestamp) -> event.timestamp);

        // Kafka에서 사용자 이벤트 스트림 생성
        DataStream<UserEvent> userEventStream = env
                .addSource(new UserEventKafkaSource())
                .assignTimestampsAndWatermarks(watermarkStrategy);

        // 1. 실시간 사기 탐지 패턴
        DataStream<FraudAlert> fraudAlerts = detectFraudPatterns(userEventStream);

        // 2. 사용자 세션 분석
        DataStream<UserSession> userSessions = analyzeUserSessions(userEventStream);

        // 3. 실시간 추천 시스템용 특징 계산
        DataStream<UserFeatures> userFeatures = calculateUserFeatures(userEventStream);

        // 결과를 다양한 싱크로 출력
        fraudAlerts.addSink(new FraudAlertKafkaSink());
        userSessions.addSink(new UserSessionElasticsearchSink());
        userFeatures.addSink(new UserFeatureRedisSink());

        env.execute("Complex Event Processing Pipeline");
    }

    // 복잡한 이벤트 패턴을 통한 사기 탐지
    private static DataStream<FraudAlert> detectFraudPatterns(DataStream<UserEvent> userEventStream) {

        // 패턴 1: 빠른 연속 로그인 시도 (5분 내 5회 이상)
        Pattern<UserEvent, ?> rapidLoginPattern = Pattern
                .<UserEvent>begin("first_login")
                .where(SimpleCondition.of(event -> "login".equals(event.eventType)))
                .next("subsequent_logins")
                .where(SimpleCondition.of(event -> "login".equals(event.eventType)))
                .times(4)  // 추가로 4번 더
                .within(Time.minutes(5));

        PatternStream<UserEvent> rapidLoginPatternStream = CEP.pattern(
                userEventStream.keyBy(event -> event.userId),
                rapidLoginPattern
        );

        DataStream<FraudAlert> rapidLoginAlerts = rapidLoginPatternStream.select(
                new PatternSelectFunction<UserEvent, FraudAlert>() {
                    @Override
                    public FraudAlert select(Map<String, List<UserEvent>> pattern) throws Exception {
                        UserEvent firstEvent = pattern.get("first_login").get(0);
                        return new FraudAlert(
                                firstEvent.userId,
                                "RAPID_LOGIN_ATTEMPTS",
                                Instant.now().toEpochMilli(),
                                "5 login attempts within 5 minutes",
                                0.8
                        );
                    }
                }
        );

        // 패턴 2: 지리적 불가능한 이동
        DataStream<FraudAlert> geoImpossibleAlerts = userEventStream
                .keyBy(event -> event.userId)
                .process(new GeographicalAnomalyDetector())
                .filter(alert -> alert != null);

        // 패턴 3: 비정상적인 거래 패턴
        Pattern<UserEvent, ?> suspiciousTransactionPattern = Pattern
                .<UserEvent>begin("large_transaction")
                .where(SimpleCondition.of(event ->
                    "transaction".equals(event.eventType) &&
                    (Double) event.properties.get("amount") > 10000))
                .followedBy("frequent_small_transactions")
                .where(SimpleCondition.of(event ->
                    "transaction".equals(event.eventType) &&
                    (Double) event.properties.get("amount") < 100))
                .times(10)
                .within(Time.hours(1));

        PatternStream<UserEvent> suspiciousTransactionStream = CEP.pattern(
                userEventStream.keyBy(event -> event.userId),
                suspiciousTransactionPattern
        );

        DataStream<FraudAlert> transactionAlerts = suspiciousTransactionStream.select(
                new PatternSelectFunction<UserEvent, FraudAlert>() {
                    @Override
                    public FraudAlert select(Map<String, List<UserEvent>> pattern) throws Exception {
                        UserEvent largeTransaction = pattern.get("large_transaction").get(0);
                        return new FraudAlert(
                                largeTransaction.userId,
                                "SUSPICIOUS_TRANSACTION_PATTERN",
                                Instant.now().toEpochMilli(),
                                "Large transaction followed by many small transactions",
                                0.9
                        );
                    }
                }
        );

        // 모든 사기 알림 통합
        return rapidLoginAlerts
                .union(geoImpossibleAlerts)
                .union(transactionAlerts);
    }

    // 지리적 이상 탐지 함수
    public static class GeographicalAnomalyDetector extends KeyedProcessFunction<String, UserEvent, FraudAlert> {

        private ValueState<Location> lastLocationState;
        private ValueState<Long> lastTimestampState;

        @Override
        public void open(Configuration parameters) throws Exception {
            lastLocationState = getRuntimeContext().getState(
                    new ValueStateDescriptor<>("lastLocation", Location.class)
            );
            lastTimestampState = getRuntimeContext().getState(
                    new ValueStateDescriptor<>("lastTimestamp", Types.LONG)
            );
        }

        @Override
        public void processElement(UserEvent event, Context ctx, Collector<FraudAlert> out) throws Exception {
            if (!"login".equals(event.eventType)) {
                return;
            }

            Location currentLocation = extractLocation(event);
            Location lastLocation = lastLocationState.value();
            Long lastTimestamp = lastTimestampState.value();

            if (lastLocation != null && lastTimestamp != null) {
                double distance = calculateDistance(lastLocation, currentLocation);
                long timeDiff = event.timestamp - lastTimestamp;
                double maxPossibleSpeed = distance / (timeDiff / 1000.0 / 3600.0); // km/h

                // 시속 1000km 이상으로 이동했다면 사기로 판단
                if (maxPossibleSpeed > 1000) {
                    out.collect(new FraudAlert(
                            event.userId,
                            "GEOGRAPHICAL_IMPOSSIBILITY",
                            event.timestamp,
                            String.format("Impossible travel: %.2f km in %.2f hours (%.2f km/h)",
                                    distance, timeDiff / 1000.0 / 3600.0, maxPossibleSpeed),
                            0.95
                    ));
                }
            }

            lastLocationState.update(currentLocation);
            lastTimestampState.update(event.timestamp);
        }

        private Location extractLocation(UserEvent event) {
            Map<String, Object> props = event.properties;
            return new Location(
                    (Double) props.get("latitude"),
                    (Double) props.get("longitude")
            );
        }

        private double calculateDistance(Location loc1, Location loc2) {
            // Haversine 공식을 사용한 거리 계산
            final int R = 6371; // 지구 반지름 (km)

            double latDistance = Math.toRadians(loc2.latitude - loc1.latitude);
            double lonDistance = Math.toRadians(loc2.longitude - loc1.longitude);

            double a = Math.sin(latDistance / 2) * Math.sin(latDistance / 2)
                    + Math.cos(Math.toRadians(loc1.latitude)) * Math.cos(Math.toRadians(loc2.latitude))
                    * Math.sin(lonDistance / 2) * Math.sin(lonDistance / 2);

            double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));

            return R * c;
        }
    }

    // 사용자 세션 분석
    private static DataStream<UserSession> analyzeUserSessions(DataStream<UserEvent> userEventStream) {
        return userEventStream
                .keyBy(event -> event.userId)
                .window(TumblingEventTimeWindows.of(Time.minutes(30))) // 30분 윈도우
                .process(new SessionAnalysisFunction());
    }

    public static class SessionAnalysisFunction extends ProcessWindowFunction<UserEvent, UserSession, String, TimeWindow> {

        @Override
        public void process(String userId, Context context, Iterable<UserEvent> elements, Collector<UserSession> out) {

            UserSession session = new UserSession();
            session.userId = userId;
            session.windowStart = context.window().getStart();
            session.windowEnd = context.window().getEnd();

            // 세션 메트릭 계산
            int eventCount = 0;
            long firstEventTime = Long.MAX_VALUE;
            long lastEventTime = Long.MIN_VALUE;
            Map<String, Integer> eventTypeCounts = new HashMap<>();
            Set<String> uniquePages = new HashSet<>();

            for (UserEvent event : elements) {
                eventCount++;
                firstEventTime = Math.min(firstEventTime, event.timestamp);
                lastEventTime = Math.max(lastEventTime, event.timestamp);

                eventTypeCounts.merge(event.eventType, 1, Integer::sum);

                if (event.properties.containsKey("page")) {
                    uniquePages.add((String) event.properties.get("page"));
                }
            }

            session.eventCount = eventCount;
            session.sessionDuration = lastEventTime - firstEventTime;
            session.uniquePageViews = uniquePages.size();
            session.eventTypeDistribution = eventTypeCounts;

            // 이상 점수 계산
            session.anomalyScore = calculateAnomalyScore(session);

            out.collect(session);
        }

        private double calculateAnomalyScore(UserSession session) {
            double score = 0.0;

            // 비정상적으로 많은 이벤트
            if (session.eventCount > 1000) {
                score += 0.3;
            }

            // 비정상적으로 긴 세션
            if (session.sessionDuration > 3600000) { // 1시간 이상
                score += 0.2;
            }

            // 봇 같은 행동 패턴 (페이지 뷰 없이 API 호출만)
            int apiCalls = session.eventTypeDistribution.getOrDefault("api_call", 0);
            int pageViews = session.eventTypeDistribution.getOrDefault("page_view", 0);

            if (apiCalls > 100 && pageViews == 0) {
                score += 0.5;
            }

            return Math.min(score, 1.0);
        }
    }

    // 실시간 사용자 특징 계산
    private static DataStream<UserFeatures> calculateUserFeatures(DataStream<UserEvent> userEventStream) {
        return userEventStream
                .keyBy(event -> event.userId)
                .process(new UserFeatureCalculator());
    }

    public static class UserFeatureCalculator extends KeyedProcessFunction<String, UserEvent, UserFeatures> {

        private MapState<String, Integer> hourlyActivityState;
        private MapState<String, Double> categoryPreferencesState;
        private ValueState<Long> lastActiveTimeState;

        @Override
        public void open(Configuration parameters) throws Exception {
            hourlyActivityState = getRuntimeContext().getMapState(
                    new MapStateDescriptor<>("hourlyActivity", Types.STRING, Types.INT)
            );
            categoryPreferencesState = getRuntimeContext().getMapState(
                    new MapStateDescriptor<>("categoryPreferences", Types.STRING, Types.DOUBLE)
            );
            lastActiveTimeState = getRuntimeContext().getState(
                    new ValueStateDescriptor<>("lastActiveTime", Types.LONG)
            );
        }

        @Override
        public void processElement(UserEvent event, Context ctx, Collector<UserFeatures> out) throws Exception {
            // 시간대별 활동 패턴 업데이트
            String hour = String.valueOf(Instant.ofEpochMilli(event.timestamp).getHour());
            Integer currentCount = hourlyActivityState.get(hour);
            hourlyActivityState.put(hour, (currentCount == null ? 0 : currentCount) + 1);

            // 카테고리 선호도 업데이트
            if (event.properties.containsKey("category")) {
                String category = (String) event.properties.get("category");
                Double currentScore = categoryPreferencesState.get(category);
                categoryPreferencesState.put(category, (currentScore == null ? 0.0 : currentScore) + 1.0);
            }

            // 마지막 활동 시간 업데이트
            lastActiveTimeState.update(event.timestamp);

            // 특징 벡터 생성 및 출력 (매 10번째 이벤트마다)
            if (event.timestamp % 10 == 0) {
                UserFeatures features = buildUserFeatures(event.userId);
                out.collect(features);
            }
        }

        private UserFeatures buildUserFeatures(String userId) throws Exception {
            UserFeatures features = new UserFeatures();
            features.userId = userId;
            features.timestamp = Instant.now().toEpochMilli();

            // 시간대별 활동 패턴
            Map<String, Integer> hourlyActivity = new HashMap<>();
            for (Map.Entry<String, Integer> entry : hourlyActivityState.entries()) {
                hourlyActivity.put(entry.getKey(), entry.getValue());
            }
            features.hourlyActivityPattern = hourlyActivity;

            // 카테고리 선호도
            Map<String, Double> categoryPrefs = new HashMap<>();
            for (Map.Entry<String, Double> entry : categoryPreferencesState.entries()) {
                categoryPrefs.put(entry.getKey(), entry.getValue());
            }
            features.categoryPreferences = categoryPrefs;

            // 활성 사용자 여부
            Long lastActiveTime = lastActiveTimeState.value();
            features.isActiveUser = lastActiveTime != null &&
                    (Instant.now().toEpochMilli() - lastActiveTime) < 86400000; // 24시간 이내

            return features;
        }
    }
}

// 보조 데이터 클래스들
class Location {
    public double latitude;
    public double longitude;

    public Location(double latitude, double longitude) {
        this.latitude = latitude;
        this.longitude = longitude;
    }
}

class UserSession {
    public String userId;
    public long windowStart;
    public long windowEnd;
    public int eventCount;
    public long sessionDuration;
    public int uniquePageViews;
    public Map<String, Integer> eventTypeDistribution;
    public double anomalyScore;
}

class UserFeatures {
    public String userId;
    public long timestamp;
    public Map<String, Integer> hourlyActivityPattern;
    public Map<String, Double> categoryPreferences;
    public boolean isActiveUser;
}

4. Apache Pulsar 고급 활용

4.1 멀티 테넌트 Pulsar 클러스터 구성

# pulsar-cluster.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: pulsar-config
data:
  bookkeeper.conf: |
    # BookKeeper 설정
    zkServers=zookeeper:2181
    journalDirectories=/pulsar/data/bookkeeper/journal
    ledgerDirectories=/pulsar/data/bookkeeper/ledgers

    # 성능 최적화
    flushInterval=100
    ensembleSize=5
    writeQuorumSize=3
    ackQuorumSize=2

    # 저장소 설정
    dbStorage_writeCacheMaxSizeMb=512
    dbStorage_readAheadCacheMaxSizeMb=256

    # Netty 설정
    nettyMaxFrameSizeBytes=5242880
    serverTcpNoDelay=true

  broker.conf: |
    # 기본 설정
    zookeeperServers=zookeeper:2181
    configurationStoreServers=zookeeper:2181
    clusterName=pulsar-cluster

    # 다중 테넌트 설정
    authenticationEnabled=true
    authorizationEnabled=true
    superUserRoles=admin,proxy

    # 브로커 성능 설정
    numIOThreads=16
    numWorkerThreads=32
    maxConcurrentLookupRequests=50000
    maxConcurrentTopicLoadRequest=5000

    # 메모리 관리
    managedLedgerDefaultEnsembleSize=5
    managedLedgerDefaultWriteQuorum=3
    managedLedgerDefaultAckQuorum=2
    managedLedgerCacheSizeMB=2048
    managedLedgerCacheEvictionWatermark=0.25

    # 메시지 설정
    maxMessageSize=5242880
    defaultRetentionTimeInMinutes=10080  # 7일
    defaultRetentionSizeInMB=1000

    # 함수 설정
    functionsWorkerEnabled=true
    PulsarFunctionsCluster=pulsar-cluster

  proxy.conf: |
    # 프록시 설정
    zookeeperServers=zookeeper:2181
    configurationStoreServers=zookeeper:2181
    clusterName=pulsar-cluster

    # 인증 설정
    authenticationEnabled=true
    authorizationEnabled=true
    forwardAuthorizationCredentials=true

    # 성능 설정
    maxConcurrentInboundConnections=10000
    maxConcurrentLookupRequests=50000

---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: pulsar-bookkeeper
spec:
  serviceName: pulsar-bookkeeper-headless
  replicas: 5
  selector:
    matchLabels:
      app: pulsar-bookkeeper
  template:
    metadata:
      labels:
        app: pulsar-bookkeeper
    spec:
      containers:
      - name: bookkeeper
        image: apachepulsar/pulsar:3.0.0
        command: ["bin/bookkeeper", "bookie"]
        ports:
        - containerPort: 3181
        env:
        - name: BOOKIE_MEM
          value: "-Xms2g -Xmx2g -XX:+UseG1GC"
        volumeMounts:
        - name: bookkeeper-data
          mountPath: /pulsar/data
        - name: pulsar-config
          mountPath: /pulsar/conf
        resources:
          requests:
            memory: "4Gi"
            cpu: "2"
          limits:
            memory: "6Gi"
            cpu: "4"
      volumes:
      - name: pulsar-config
        configMap:
          name: pulsar-config
  volumeClaimTemplates:
  - metadata:
      name: bookkeeper-data
    spec:
      accessModes: ["ReadWriteOnce"]
      storageClassName: "high-iops-ssd"
      resources:
        requests:
          storage: "500Gi"

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: pulsar-broker
spec:
  replicas: 6
  selector:
    matchLabels:
      app: pulsar-broker
  template:
    metadata:
      labels:
        app: pulsar-broker
    spec:
      containers:
      - name: broker
        image: apachepulsar/pulsar:3.0.0
        command: ["bin/pulsar", "broker"]
        ports:
        - containerPort: 6650
        - containerPort: 8080
        env:
        - name: PULSAR_MEM
          value: "-Xms4g -Xmx4g -XX:+UseG1GC"
        volumeMounts:
        - name: pulsar-config
          mountPath: /pulsar/conf
        resources:
          requests:
            memory: "6Gi"
            cpu: "3"
          limits:
            memory: "8Gi"
            cpu: "4"
      volumes:
      - name: pulsar-config
        configMap:
          name: pulsar-config

4.2 고급 Pulsar Functions 구현

// AdvancedPulsarFunctions.java
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.StateContext;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

// 실시간 이상 탐지 함수
public class AnomalyDetectionFunction implements Function<String, String> {

    private static final String METRICS_STATE_KEY = "user_metrics";
    private static final String THRESHOLD_STATE_KEY = "threshold";
    private static final double ANOMALY_THRESHOLD = 3.0; // 3 sigma

    @Override
    public String process(String input, Context context) throws Exception {
        // JSON 파싱
        Map<String, Object> event = parseJson(input);
        String userId = (String) event.get("userId");
        String metricName = (String) event.get("metricName");
        double value = ((Number) event.get("value")).doubleValue();
        long timestamp = ((Number) event.get("timestamp")).longValue();

        // 사용자별 메트릭 히스토리 조회
        StateContext stateContext = context.getCurrentRecord().getKey()
                .map(key -> context.getStateContext(key))
                .orElse(context.getStateContext(userId));

        String metricsKey = METRICS_STATE_KEY + "_" + metricName;
        List<Double> historicalValues = stateContext.getState(metricsKey);

        if (historicalValues == null) {
            historicalValues = new ArrayList<>();
        }

        // 새 값 추가 (최대 100개 유지)
        historicalValues.add(value);
        if (historicalValues.size() > 100) {
            historicalValues.remove(0);
        }

        // 통계 계산
        if (historicalValues.size() >= 10) { // 최소 10개 샘플 필요
            double mean = calculateMean(historicalValues);
            double stdDev = calculateStdDev(historicalValues, mean);
            double zScore = Math.abs((value - mean) / stdDev);

            if (zScore > ANOMALY_THRESHOLD) {
                // 이상 탐지!
                Map<String, Object> anomalyAlert = new HashMap<>();
                anomalyAlert.put("userId", userId);
                anomalyAlert.put("metricName", metricName);
                anomalyAlert.put("value", value);
                anomalyAlert.put("expectedRange", Arrays.asList(
                    mean - 2 * stdDev, mean + 2 * stdDev
                ));
                anomalyAlert.put("zScore", zScore);
                anomalyAlert.put("timestamp", timestamp);
                anomalyAlert.put("severity", zScore > 5.0 ? "CRITICAL" : "HIGH");

                // 상태 업데이트
                stateContext.put(metricsKey, historicalValues);

                return toJson(anomalyAlert);
            }
        }

        // 상태 업데이트
        stateContext.put(metricsKey, historicalValues);

        return null; // 정상 케이스는 출력하지 않음
    }

    private double calculateMean(List<Double> values) {
        return values.stream().mapToDouble(Double::doubleValue).average().orElse(0.0);
    }

    private double calculateStdDev(List<Double> values, double mean) {
        double variance = values.stream()
                .mapToDouble(value -> Math.pow(value - mean, 2))
                .average()
                .orElse(0.0);
        return Math.sqrt(variance);
    }

    // JSON 파싱 및 직렬화 헬퍼 메소드들
    private Map<String, Object> parseJson(String json) {
        // JSON 파싱 로직 (Jackson 또는 다른 라이브러리 사용)
        // 여기서는 예시를 위해 단순화
        return new HashMap<>();
    }

    private String toJson(Map<String, Object> map) {
        // JSON 직렬화 로직
        return "";
    }
}

// 실시간 집계 함수
public class RealTimeAggregationFunction implements Function<String, String> {

    private static final String AGGREGATION_STATE_KEY = "aggregation_data";
    private static final long WINDOW_SIZE_MS = 60000; // 1분 윈도우

    @Override
    public String process(String input, Context context) throws Exception {
        Map<String, Object> event = parseJson(input);
        String groupKey = (String) event.get("groupKey");
        String metric = (String) event.get("metric");
        double value = ((Number) event.get("value")).doubleValue();
        long timestamp = ((Number) event.get("timestamp")).longValue();

        StateContext stateContext = context.getStateContext(groupKey);

        // 현재 윈도우 계산
        long windowStart = (timestamp / WINDOW_SIZE_MS) * WINDOW_SIZE_MS;

        // 윈도우별 집계 데이터 조회
        String aggregationKey = AGGREGATION_STATE_KEY + "_" + windowStart;
        Map<String, Object> windowData = stateContext.getState(aggregationKey);

        if (windowData == null) {
            windowData = new HashMap<>();
            windowData.put("windowStart", windowStart);
            windowData.put("windowEnd", windowStart + WINDOW_SIZE_MS);
            windowData.put("count", 0L);
            windowData.put("sum", 0.0);
            windowData.put("min", Double.MAX_VALUE);
            windowData.put("max", Double.MIN_VALUE);
            windowData.put("values", new ArrayList<Double>());
        }

        // 집계 업데이트
        long count = (Long) windowData.get("count") + 1;
        double sum = (Double) windowData.get("sum") + value;
        double min = Math.min((Double) windowData.get("min"), value);
        double max = Math.max((Double) windowData.get("max"), value);

        @SuppressWarnings("unchecked")
        List<Double> values = (List<Double>) windowData.get("values");
        values.add(value);

        windowData.put("count", count);
        windowData.put("sum", sum);
        windowData.put("min", min);
        windowData.put("max", max);
        windowData.put("avg", sum / count);
        windowData.put("values", values);

        // 백분위수 계산
        if (values.size() >= 10) {
            Collections.sort(values);
            windowData.put("p50", calculatePercentile(values, 50));
            windowData.put("p95", calculatePercentile(values, 95));
            windowData.put("p99", calculatePercentile(values, 99));
        }

        // 상태 저장
        stateContext.put(aggregationKey, windowData);

        // 윈도우가 완료되었는지 확인 (다음 윈도우의 첫 이벤트인 경우)
        long currentWindow = (System.currentTimeMillis() / WINDOW_SIZE_MS) * WINDOW_SIZE_MS;
        if (windowStart < currentWindow) {
            // 완료된 윈도우 데이터 출력
            Map<String, Object> result = new HashMap<>();
            result.put("groupKey", groupKey);
            result.put("metric", metric);
            result.putAll(windowData);

            // 오래된 윈도우 데이터 정리
            cleanupOldWindows(stateContext, currentWindow);

            return toJson(result);
        }

        return null;
    }

    private double calculatePercentile(List<Double> sortedValues, int percentile) {
        int index = (int) Math.ceil(sortedValues.size() * percentile / 100.0) - 1;
        return sortedValues.get(Math.max(0, Math.min(index, sortedValues.size() - 1)));
    }

    private void cleanupOldWindows(StateContext stateContext, long currentWindow) {
        // 1시간 이전 윈도우 데이터 삭제
        long cutoffTime = currentWindow - (3600000); // 1시간

        // 실제 구현에서는 상태의 모든 키를 조회하여 정리
        // 여기서는 예시를 위해 생략
    }

    private Map<String, Object> parseJson(String json) { return new HashMap<>(); }
    private String toJson(Map<String, Object> map) { return ""; }
}

// ML 기반 예측 함수
public class MLPredictionFunction implements Function<String, String> {

    private MLModel model; // 사전 훈련된 모델

    @Override
    public void initialize(Context context) throws Exception {
        // 모델 로딩
        this.model = loadModel(context.getUserConfigValue("model_path").toString());
    }

    @Override
    public String process(String input, Context context) throws Exception {
        Map<String, Object> event = parseJson(input);

        // 특징 벡터 추출
        double[] features = extractFeatures(event);

        // 모델 예측
        double prediction = model.predict(features);
        double confidence = model.getPredictionConfidence(features);

        // 예측 결과 구성
        Map<String, Object> result = new HashMap<>();
        result.put("inputId", event.get("id"));
        result.put("prediction", prediction);
        result.put("confidence", confidence);
        result.put("timestamp", System.currentTimeMillis());
        result.put("modelVersion", model.getVersion());

        // 신뢰도가 높은 예측만 출력
        if (confidence > 0.8) {
            return toJson(result);
        }

        return null;
    }

    private double[] extractFeatures(Map<String, Object> event) {
        // 이벤트에서 ML 특징 추출
        List<Double> features = new ArrayList<>();

        // 수치형 특징들
        features.add(((Number) event.getOrDefault("feature1", 0.0)).doubleValue());
        features.add(((Number) event.getOrDefault("feature2", 0.0)).doubleValue());

        // 범주형 특징 인코딩
        String category = (String) event.get("category");
        features.addAll(oneHotEncode(category));

        return features.stream().mapToDouble(Double::doubleValue).toArray();
    }

    private List<Double> oneHotEncode(String category) {
        // 원-핫 인코딩 로직
        Map<String, Integer> categoryMap = Map.of(
            "A", 0, "B", 1, "C", 2
        );

        List<Double> encoded = Arrays.asList(0.0, 0.0, 0.0);
        if (categoryMap.containsKey(category)) {
            encoded.set(categoryMap.get(category), 1.0);
        }

        return encoded;
    }

    private MLModel loadModel(String modelPath) {
        // 모델 로딩 로직 (ONNX, TensorFlow Lite 등)
        return new DummyMLModel();
    }

    private Map<String, Object> parseJson(String json) { return new HashMap<>(); }
    private String toJson(Map<String, Object> map) { return ""; }
}

// 더미 ML 모델 클래스
class DummyMLModel implements MLModel {
    public double predict(double[] features) { return Math.random(); }
    public double getPredictionConfidence(double[] features) { return Math.random(); }
    public String getVersion() { return "1.0.0"; }
}

interface MLModel {
    double predict(double[] features);
    double getPredictionConfidence(double[] features);
    String getVersion();
}

5. 스트리밍 데이터 품질 관리

5.1 데이터 품질 모니터링 시스템

# data_quality_monitoring.py
import asyncio
import json
import time
import logging
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass, field
from collections import defaultdict, deque
from enum import Enum
import statistics
import re
from datetime import datetime, timedelta
import pandas as pd
import numpy as np

class QualityCheckType(Enum):
    SCHEMA_VALIDATION = "schema_validation"
    DATA_COMPLETENESS = "data_completeness"
    DATA_FRESHNESS = "data_freshness"
    VALUE_RANGE = "value_range"
    PATTERN_MATCHING = "pattern_matching"
    DUPLICATE_DETECTION = "duplicate_detection"
    REFERENTIAL_INTEGRITY = "referential_integrity"
    STATISTICAL_ANOMALY = "statistical_anomaly"

@dataclass
class QualityCheckResult:
    check_type: QualityCheckType
    passed: bool
    message: str
    score: float  # 0.0 - 1.0
    details: Dict[str, Any] = field(default_factory=dict)
    timestamp: float = field(default_factory=time.time)

@dataclass
class DataQualityRule:
    rule_id: str
    check_type: QualityCheckType
    description: str
    config: Dict[str, Any]
    severity: str = "MEDIUM"  # LOW, MEDIUM, HIGH, CRITICAL
    enabled: bool = True

class StreamingDataQualityMonitor:
    def __init__(self):
        self.quality_rules = {}
        self.quality_metrics = defaultdict(lambda: {
            'total_records': 0,
            'passed_checks': 0,
            'failed_checks': 0,
            'quality_score': 1.0,
            'check_history': deque(maxlen=1000)
        })
        self.schema_cache = {}
        self.reference_data = {}
        self.statistical_profiles = defaultdict(lambda: {
            'mean': None,
            'std': None,
            'min': None,
            'max': None,
            'percentiles': {},
            'value_counts': defaultdict(int),
            'null_rate': 0.0
        })

    def add_quality_rule(self, rule: DataQualityRule):
        """품질 규칙 추가"""
        self.quality_rules[rule.rule_id] = rule

    def remove_quality_rule(self, rule_id: str):
        """품질 규칙 제거"""
        if rule_id in self.quality_rules:
            del self.quality_rules[rule_id]

    def check_data_quality(self, stream_name: str, record: Dict[str, Any]) -> List[QualityCheckResult]:
        """데이터 품질 검사 수행"""
        results = []

        # 활성화된 모든 규칙에 대해 검사
        for rule_id, rule in self.quality_rules.items():
            if not rule.enabled:
                continue

            result = self._execute_quality_check(rule, record, stream_name)
            results.append(result)

        # 메트릭 업데이트
        self._update_quality_metrics(stream_name, results)

        return results

    def _execute_quality_check(self, rule: DataQualityRule, record: Dict[str, Any],
                               stream_name: str) -> QualityCheckResult:
        """개별 품질 검사 실행"""

        if rule.check_type == QualityCheckType.SCHEMA_VALIDATION:
            return self._check_schema_validation(rule, record)

        elif rule.check_type == QualityCheckType.DATA_COMPLETENESS:
            return self._check_data_completeness(rule, record)

        elif rule.check_type == QualityCheckType.DATA_FRESHNESS:
            return self._check_data_freshness(rule, record)

        elif rule.check_type == QualityCheckType.VALUE_RANGE:
            return self._check_value_range(rule, record)

        elif rule.check_type == QualityCheckType.PATTERN_MATCHING:
            return self._check_pattern_matching(rule, record)

        elif rule.check_type == QualityCheckType.DUPLICATE_DETECTION:
            return self._check_duplicate_detection(rule, record, stream_name)

        elif rule.check_type == QualityCheckType.REFERENTIAL_INTEGRITY:
            return self._check_referential_integrity(rule, record)

        elif rule.check_type == QualityCheckType.STATISTICAL_ANOMALY:
            return self._check_statistical_anomaly(rule, record, stream_name)

        else:
            return QualityCheckResult(
                check_type=rule.check_type,
                passed=False,
                message=f"Unknown check type: {rule.check_type}",
                score=0.0
            )

    def _check_schema_validation(self, rule: DataQualityRule, record: Dict[str, Any]) -> QualityCheckResult:
        """스키마 검증"""
        required_fields = rule.config.get('required_fields', [])
        optional_fields = rule.config.get('optional_fields', [])
        field_types = rule.config.get('field_types', {})

        passed = True
        issues = []
        score = 1.0

        # 필수 필드 검사
        for field in required_fields:
            if field not in record:
                passed = False
                issues.append(f"Missing required field: {field}")
                score -= 0.2

        # 데이터 타입 검사
        for field, expected_type in field_types.items():
            if field in record:
                if not self._check_field_type(record[field], expected_type):
                    passed = False
                    issues.append(f"Field {field} has incorrect type. Expected {expected_type}")
                    score -= 0.1

        # 허용되지 않은 필드 검사
        if rule.config.get('strict_schema', False):
            allowed_fields = set(required_fields + optional_fields)
            extra_fields = set(record.keys()) - allowed_fields
            if extra_fields:
                passed = False
                issues.append(f"Unexpected fields: {list(extra_fields)}")
                score -= 0.1

        return QualityCheckResult(
            check_type=rule.check_type,
            passed=passed,
            message="; ".join(issues) if issues else "Schema validation passed",
            score=max(0.0, score),
            details={'issues': issues}
        )

    def _check_data_completeness(self, rule: DataQualityRule, record: Dict[str, Any]) -> QualityCheckResult:
        """데이터 완전성 검사"""
        fields_to_check = rule.config.get('fields', list(record.keys()))
        max_null_ratio = rule.config.get('max_null_ratio', 0.1)

        null_count = 0
        total_count = len(fields_to_check)

        for field in fields_to_check:
            value = record.get(field)
            if value is None or value == "" or (isinstance(value, str) and value.strip() == ""):
                null_count += 1

        null_ratio = null_count / total_count if total_count > 0 else 0
        passed = null_ratio <= max_null_ratio
        score = max(0.0, 1.0 - (null_ratio / max_null_ratio))

        return QualityCheckResult(
            check_type=rule.check_type,
            passed=passed,
            message=f"Null ratio: {null_ratio:.2%} (threshold: {max_null_ratio:.2%})",
            score=score,
            details={
                'null_count': null_count,
                'total_fields': total_count,
                'null_ratio': null_ratio
            }
        )

    def _check_data_freshness(self, rule: DataQualityRule, record: Dict[str, Any]) -> QualityCheckResult:
        """데이터 신선도 검사"""
        timestamp_field = rule.config.get('timestamp_field', 'timestamp')
        max_age_seconds = rule.config.get('max_age_seconds', 300)  # 5분

        if timestamp_field not in record:
            return QualityCheckResult(
                check_type=rule.check_type,
                passed=False,
                message=f"Timestamp field '{timestamp_field}' not found",
                score=0.0
            )

        try:
            record_timestamp = record[timestamp_field]
            if isinstance(record_timestamp, str):
                record_timestamp = datetime.fromisoformat(record_timestamp.replace('Z', '+00:00')).timestamp()
            elif isinstance(record_timestamp, datetime):
                record_timestamp = record_timestamp.timestamp()

            current_time = time.time()
            age_seconds = current_time - record_timestamp

            passed = age_seconds <= max_age_seconds
            score = max(0.0, 1.0 - (age_seconds / max_age_seconds)) if max_age_seconds > 0 else 1.0

            return QualityCheckResult(
                check_type=rule.check_type,
                passed=passed,
                message=f"Data age: {age_seconds:.1f}s (threshold: {max_age_seconds}s)",
                score=score,
                details={
                    'age_seconds': age_seconds,
                    'max_age_seconds': max_age_seconds
                }
            )

        except Exception as e:
            return QualityCheckResult(
                check_type=rule.check_type,
                passed=False,
                message=f"Error parsing timestamp: {str(e)}",
                score=0.0
            )

    def _check_value_range(self, rule: DataQualityRule, record: Dict[str, Any]) -> QualityCheckResult:
        """값 범위 검사"""
        field_ranges = rule.config.get('field_ranges', {})

        passed = True
        issues = []
        score = 1.0

        for field, range_config in field_ranges.items():
            if field not in record:
                continue

            value = record[field]
            min_value = range_config.get('min')
            max_value = range_config.get('max')
            allowed_values = range_config.get('allowed_values')

            if min_value is not None and value < min_value:
                passed = False
                issues.append(f"Field {field}: {value} < {min_value}")
                score -= 0.2

            if max_value is not None and value > max_value:
                passed = False
                issues.append(f"Field {field}: {value} > {max_value}")
                score -= 0.2

            if allowed_values is not None and value not in allowed_values:
                passed = False
                issues.append(f"Field {field}: {value} not in allowed values")
                score -= 0.2

        return QualityCheckResult(
            check_type=rule.check_type,
            passed=passed,
            message="; ".join(issues) if issues else "Value range check passed",
            score=max(0.0, score),
            details={'issues': issues}
        )

    def _check_pattern_matching(self, rule: DataQualityRule, record: Dict[str, Any]) -> QualityCheckResult:
        """패턴 매칭 검사"""
        field_patterns = rule.config.get('field_patterns', {})

        passed = True
        issues = []
        score = 1.0

        for field, pattern in field_patterns.items():
            if field not in record:
                continue

            value = str(record[field])

            if not re.match(pattern, value):
                passed = False
                issues.append(f"Field {field} doesn't match pattern {pattern}")
                score -= 0.3

        return QualityCheckResult(
            check_type=rule.check_type,
            passed=passed,
            message="; ".join(issues) if issues else "Pattern matching passed",
            score=max(0.0, score),
            details={'issues': issues}
        )

    def _check_duplicate_detection(self, rule: DataQualityRule, record: Dict[str, Any],
                                   stream_name: str) -> QualityCheckResult:
        """중복 검사"""
        key_fields = rule.config.get('key_fields', ['id'])
        window_size = rule.config.get('window_size', 3600)  # 1시간

        # 키 생성
        key_values = [str(record.get(field, '')) for field in key_fields]
        record_key = '|'.join(key_values)

        # 중복 체크용 상태 관리 (실제로는 Redis나 다른 저장소 사용)
        duplicate_cache_key = f"{stream_name}_duplicates"
        if not hasattr(self, '_duplicate_cache'):
            self._duplicate_cache = defaultdict(lambda: deque(maxlen=10000))

        current_time = time.time()

        # 오래된 엔트리 제거
        cache = self._duplicate_cache[duplicate_cache_key]
        while cache and cache[0][1] < current_time - window_size:
            cache.popleft()

        # 중복 체크
        is_duplicate = any(entry[0] == record_key for entry in cache)

        if not is_duplicate:
            cache.append((record_key, current_time))

        return QualityCheckResult(
            check_type=rule.check_type,
            passed=not is_duplicate,
            message="Duplicate detected" if is_duplicate else "No duplicate found",
            score=0.0 if is_duplicate else 1.0,
            details={
                'record_key': record_key,
                'is_duplicate': is_duplicate
            }
        )

    def _check_referential_integrity(self, rule: DataQualityRule, record: Dict[str, Any]) -> QualityCheckResult:
        """참조 무결성 검사"""
        reference_checks = rule.config.get('reference_checks', [])

        passed = True
        issues = []
        score = 1.0

        for check in reference_checks:
            field = check['field']
            reference_table = check['reference_table']

            if field not in record:
                continue

            value = record[field]

            # 참조 테이블에서 값 확인
            if reference_table not in self.reference_data:
                passed = False
                issues.append(f"Reference table {reference_table} not found")
                score -= 0.5
                continue

            if value not in self.reference_data[reference_table]:
                passed = False
                issues.append(f"Field {field}: {value} not found in {reference_table}")
                score -= 0.3

        return QualityCheckResult(
            check_type=rule.check_type,
            passed=passed,
            message="; ".join(issues) if issues else "Referential integrity check passed",
            score=max(0.0, score),
            details={'issues': issues}
        )

    def _check_statistical_anomaly(self, rule: DataQualityRule, record: Dict[str, Any],
                                   stream_name: str) -> QualityCheckResult:
        """통계적 이상 탐지"""
        numeric_fields = rule.config.get('numeric_fields', [])
        z_threshold = rule.config.get('z_threshold', 3.0)

        passed = True
        issues = []
        score = 1.0

        for field in numeric_fields:
            if field not in record:
                continue

            try:
                value = float(record[field])
                profile = self.statistical_profiles[f"{stream_name}_{field}"]

                # 프로파일 업데이트
                self._update_statistical_profile(profile, value)

                # 이상 탐지 (충분한 데이터가 있는 경우에만)
                if profile['mean'] is not None and profile['std'] is not None and profile['std'] > 0:
                    z_score = abs((value - profile['mean']) / profile['std'])

                    if z_score > z_threshold:
                        passed = False
                        issues.append(f"Field {field}: z-score {z_score:.2f} > {z_threshold}")
                        score -= min(0.5, (z_score - z_threshold) / z_threshold * 0.3)

            except ValueError:
                passed = False
                issues.append(f"Field {field}: cannot convert to numeric")
                score -= 0.2

        return QualityCheckResult(
            check_type=rule.check_type,
            passed=passed,
            message="; ".join(issues) if issues else "No statistical anomalies detected",
            score=max(0.0, score),
            details={'issues': issues}
        )

    def _update_statistical_profile(self, profile: Dict, value: float):
        """통계 프로파일 업데이트"""
        if not hasattr(self, '_profile_values'):
            self._profile_values = defaultdict(lambda: deque(maxlen=1000))

        # 값 추가
        profile_key = id(profile)  # 프로파일 식별자
        values = self._profile_values[profile_key]
        values.append(value)

        # 통계 계산
        if len(values) >= 10:
            profile['mean'] = statistics.mean(values)
            profile['std'] = statistics.stdev(values) if len(values) > 1 else 0
            profile['min'] = min(values)
            profile['max'] = max(values)

    def _update_quality_metrics(self, stream_name: str, results: List[QualityCheckResult]):
        """품질 메트릭 업데이트"""
        metrics = self.quality_metrics[stream_name]

        metrics['total_records'] += 1

        passed_count = sum(1 for result in results if result.passed)
        failed_count = len(results) - passed_count

        metrics['passed_checks'] += passed_count
        metrics['failed_checks'] += failed_count

        # 전체 품질 점수 계산
        if results:
            current_score = sum(result.score for result in results) / len(results)
            # 지수적 이동 평균으로 품질 점수 업데이트
            alpha = 0.1
            metrics['quality_score'] = (1 - alpha) * metrics['quality_score'] + alpha * current_score

        # 히스토리 추가
        metrics['check_history'].append({
            'timestamp': time.time(),
            'results': results,
            'quality_score': current_score if results else 1.0
        })

    def _check_field_type(self, value: Any, expected_type: str) -> bool:
        """필드 타입 검사"""
        type_mapping = {
            'string': str,
            'int': int,
            'float': (int, float),
            'bool': bool,
            'list': list,
            'dict': dict
        }

        if expected_type not in type_mapping:
            return True  # 알 수 없는 타입은 통과

        expected_python_type = type_mapping[expected_type]
        return isinstance(value, expected_python_type)

    def get_quality_metrics(self, stream_name: Optional[str] = None) -> Dict[str, Any]:
        """품질 메트릭 조회"""
        if stream_name:
            return dict(self.quality_metrics[stream_name])
        else:
            return dict(self.quality_metrics)

    def get_quality_report(self, stream_name: str, hours: int = 24) -> Dict[str, Any]:
        """품질 리포트 생성"""
        metrics = self.quality_metrics[stream_name]

        # 지정된 시간 범위의 히스토리 필터링
        cutoff_time = time.time() - (hours * 3600)
        recent_history = [
            entry for entry in metrics['check_history']
            if entry['timestamp'] >= cutoff_time
        ]

        if not recent_history:
            return {
                'stream_name': stream_name,
                'period_hours': hours,
                'no_data': True
            }

        # 품질 트렌드 분석
        quality_scores = [entry['quality_score'] for entry in recent_history]

        # 실패한 검사 유형별 집계
        failed_checks_by_type = defaultdict(int)
        for entry in recent_history:
            for result in entry['results']:
                if not result.passed:
                    failed_checks_by_type[result.check_type.value] += 1

        return {
            'stream_name': stream_name,
            'period_hours': hours,
            'total_records': len(recent_history),
            'average_quality_score': statistics.mean(quality_scores),
            'min_quality_score': min(quality_scores),
            'max_quality_score': max(quality_scores),
            'quality_trend': quality_scores[-10:],  # 최근 10개 포인트
            'failed_checks_by_type': dict(failed_checks_by_type),
            'current_quality_score': metrics['quality_score']
        }

# 사용 예제
def setup_data_quality_monitoring():
    monitor = StreamingDataQualityMonitor()

    # 스키마 검증 규칙
    schema_rule = DataQualityRule(
        rule_id="user_event_schema",
        check_type=QualityCheckType.SCHEMA_VALIDATION,
        description="User event schema validation",
        config={
            'required_fields': ['user_id', 'event_type', 'timestamp'],
            'optional_fields': ['properties', 'metadata'],
            'field_types': {
                'user_id': 'string',
                'event_type': 'string',
                'timestamp': 'float',
                'properties': 'dict'
            },
            'strict_schema': False
        },
        severity="CRITICAL"
    )

    # 데이터 완전성 규칙
    completeness_rule = DataQualityRule(
        rule_id="data_completeness",
        check_type=QualityCheckType.DATA_COMPLETENESS,
        description="Check for null/empty values",
        config={
            'fields': ['user_id', 'event_type'],
            'max_null_ratio': 0.05  # 5% 이하
        },
        severity="HIGH"
    )

    # 데이터 신선도 규칙
    freshness_rule = DataQualityRule(
        rule_id="data_freshness",
        check_type=QualityCheckType.DATA_FRESHNESS,
        description="Check data freshness",
        config={
            'timestamp_field': 'timestamp',
            'max_age_seconds': 300  # 5분
        },
        severity="MEDIUM"
    )

    # 값 범위 규칙
    range_rule = DataQualityRule(
        rule_id="value_ranges",
        check_type=QualityCheckType.VALUE_RANGE,
        description="Check value ranges",
        config={
            'field_ranges': {
                'age': {'min': 0, 'max': 150},
                'rating': {'min': 1, 'max': 5},
                'status': {'allowed_values': ['active', 'inactive', 'pending']}
            }
        },
        severity="MEDIUM"
    )

    # 패턴 매칭 규칙
    pattern_rule = DataQualityRule(
        rule_id="pattern_matching",
        check_type=QualityCheckType.PATTERN_MATCHING,
        description="Check field patterns",
        config={
            'field_patterns': {
                'email': r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
                'phone': r'^\+?1?[-.\s]?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}$'
            }
        },
        severity="LOW"
    )

    # 중복 탐지 규칙
    duplicate_rule = DataQualityRule(
        rule_id="duplicate_detection",
        check_type=QualityCheckType.DUPLICATE_DETECTION,
        description="Detect duplicate records",
        config={
            'key_fields': ['user_id', 'event_type', 'timestamp'],
            'window_size': 3600  # 1시간
        },
        severity="MEDIUM"
    )

    # 통계적 이상 탐지 규칙
    anomaly_rule = DataQualityRule(
        rule_id="statistical_anomaly",
        check_type=QualityCheckType.STATISTICAL_ANOMALY,
        description="Detect statistical anomalies",
        config={
            'numeric_fields': ['session_duration', 'page_views', 'transaction_amount'],
            'z_threshold': 3.5
        },
        severity="HIGH"
    )

    # 모든 규칙 추가
    for rule in [schema_rule, completeness_rule, freshness_rule, range_rule,
                 pattern_rule, duplicate_rule, anomaly_rule]:
        monitor.add_quality_rule(rule)

    return monitor

# 실시간 품질 모니터링 실행
async def real_time_quality_monitoring():
    monitor = setup_data_quality_monitoring()

    # 예제 데이터 스트림
    sample_records = [
        {
            'user_id': 'user_123',
            'event_type': 'page_view',
            'timestamp': time.time(),
            'properties': {'page': '/home'},
            'session_duration': 150.5,
            'age': 25,
            'email': 'user@example.com'
        },
        {
            'user_id': 'user_456',
            'event_type': 'click',
            'timestamp': time.time() - 1000,  # 오래된 데이터
            'properties': {'button': 'signup'},
            'session_duration': 2000.0,  # 이상치
            'age': -5,  # 잘못된 범위
            'email': 'invalid-email'  # 잘못된 패턴
        }
    ]

    for record in sample_records:
        results = monitor.check_data_quality('user_events', record)

        print(f"Record: {record['user_id']}")
        for result in results:
            status = "PASS" if result.passed else "FAIL"
            print(f"  {result.check_type.value}: {status} (score: {result.score:.2f}) - {result.message}")
        print()

    # 품질 리포트 생성
    quality_report = monitor.get_quality_report('user_events', hours=1)
    print("Quality Report:", json.dumps(quality_report, indent=2))

# 실행
# asyncio.run(real_time_quality_monitoring())

6. 성능 최적화 및 운영

6.1 스트리밍 시스템 성능 튜닝

# streaming_performance_tuner.py
import asyncio
import time
import psutil
import json
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from collections import deque, defaultdict
import statistics
import threading
from concurrent.futures import ThreadPoolExecutor
import gc
import resource

@dataclass
class PerformanceMetrics:
    timestamp: float
    throughput_msg_per_sec: float
    latency_p50_ms: float
    latency_p95_ms: float
    latency_p99_ms: float
    cpu_usage_percent: float
    memory_usage_mb: float
    gc_collections: int
    active_connections: int
    queue_depth: int
    error_rate: float

class StreamingPerformanceTuner:
    def __init__(self):
        self.metrics_history = deque(maxlen=1000)
        self.performance_targets = {
            'throughput_min': 10000,  # msg/sec
            'latency_p95_max': 100.0,  # ms
            'cpu_usage_max': 80.0,     # %
            'memory_usage_max': 8192,  # MB
            'error_rate_max': 0.01     # 1%
        }

        self.tuning_parameters = {
            'batch_size': 1000,
            'worker_threads': psutil.cpu_count(),
            'buffer_size': 100000,
            'compression_level': 1,
            'flush_interval_ms': 5,
            'prefetch_count': 1000
        }

        self.auto_tune_enabled = True
        self.tuning_history = []

    def collect_performance_metrics(self, application_metrics: Dict[str, Any]) -> PerformanceMetrics:
        """성능 메트릭 수집"""
        # 시스템 메트릭
        cpu_percent = psutil.cpu_percent(interval=1)
        memory_info = psutil.virtual_memory()
        memory_usage_mb = (memory_info.total - memory_info.available) / (1024 * 1024)

        # 애플리케이션 메트릭
        throughput = application_metrics.get('throughput_msg_per_sec', 0.0)
        latencies = application_metrics.get('latencies_ms', [])
        error_rate = application_metrics.get('error_rate', 0.0)
        active_connections = application_metrics.get('active_connections', 0)
        queue_depth = application_metrics.get('queue_depth', 0)

        # 지연시간 백분위수 계산
        if latencies:
            latencies_sorted = sorted(latencies)
            p50 = self._percentile(latencies_sorted, 50)
            p95 = self._percentile(latencies_sorted, 95)
            p99 = self._percentile(latencies_sorted, 99)
        else:
            p50 = p95 = p99 = 0.0

        # GC 정보
        gc_stats = gc.get_stats()
        gc_collections = sum(stat['collections'] for stat in gc_stats)

        metrics = PerformanceMetrics(
            timestamp=time.time(),
            throughput_msg_per_sec=throughput,
            latency_p50_ms=p50,
            latency_p95_ms=p95,
            latency_p99_ms=p99,
            cpu_usage_percent=cpu_percent,
            memory_usage_mb=memory_usage_mb,
            gc_collections=gc_collections,
            active_connections=active_connections,
            queue_depth=queue_depth,
            error_rate=error_rate
        )

        self.metrics_history.append(metrics)

        if self.auto_tune_enabled:
            self._auto_tune_parameters(metrics)

        return metrics

    def _percentile(self, sorted_data: List[float], percentile: int) -> float:
        """백분위수 계산"""
        if not sorted_data:
            return 0.0
        index = int((percentile / 100.0) * len(sorted_data))
        return sorted_data[min(index, len(sorted_data) - 1)]

    def _auto_tune_parameters(self, current_metrics: PerformanceMetrics):
        """자동 파라미터 튜닝"""
        tuning_actions = []

        # 처리량이 목표치보다 낮은 경우
        if current_metrics.throughput_msg_per_sec < self.performance_targets['throughput_min']:
            if current_metrics.cpu_usage_percent < 70:
                # CPU 여유가 있으면 처리량 증대
                if self.tuning_parameters['batch_size'] < 5000:
                    new_batch_size = min(5000, int(self.tuning_parameters['batch_size'] * 1.2))
                    tuning_actions.append(f"Increase batch_size: {self.tuning_parameters['batch_size']} -> {new_batch_size}")
                    self.tuning_parameters['batch_size'] = new_batch_size

                if self.tuning_parameters['worker_threads'] < psutil.cpu_count() * 2:
                    new_threads = min(psutil.cpu_count() * 2, self.tuning_parameters['worker_threads'] + 2)
                    tuning_actions.append(f"Increase worker_threads: {self.tuning_parameters['worker_threads']} -> {new_threads}")
                    self.tuning_parameters['worker_threads'] = new_threads

        # 지연시간이 목표치보다 높은 경우
        if current_metrics.latency_p95_ms > self.performance_targets['latency_p95_max']:
            # 플러시 간격 줄이기
            if self.tuning_parameters['flush_interval_ms'] > 1:
                new_flush_interval = max(1, int(self.tuning_parameters['flush_interval_ms'] * 0.8))
                tuning_actions.append(f"Decrease flush_interval_ms: {self.tuning_parameters['flush_interval_ms']} -> {new_flush_interval}")
                self.tuning_parameters['flush_interval_ms'] = new_flush_interval

            # 배치 크기 줄이기 (낮은 지연시간을 위해)
            if self.tuning_parameters['batch_size'] > 100:
                new_batch_size = max(100, int(self.tuning_parameters['batch_size'] * 0.8))
                tuning_actions.append(f"Decrease batch_size for latency: {self.tuning_parameters['batch_size']} -> {new_batch_size}")
                self.tuning_parameters['batch_size'] = new_batch_size

        # CPU 사용률이 높은 경우
        if current_metrics.cpu_usage_percent > self.performance_targets['cpu_usage_max']:
            # 압축 레벨 낮추기
            if self.tuning_parameters['compression_level'] > 0:
                new_compression = max(0, self.tuning_parameters['compression_level'] - 1)
                tuning_actions.append(f"Decrease compression_level: {self.tuning_parameters['compression_level']} -> {new_compression}")
                self.tuning_parameters['compression_level'] = new_compression

            # 워커 스레드 줄이기
            if self.tuning_parameters['worker_threads'] > psutil.cpu_count():
                new_threads = max(psutil.cpu_count(), self.tuning_parameters['worker_threads'] - 2)
                tuning_actions.append(f"Decrease worker_threads: {self.tuning_parameters['worker_threads']} -> {new_threads}")
                self.tuning_parameters['worker_threads'] = new_threads

        # 메모리 사용률이 높은 경우
        if current_metrics.memory_usage_mb > self.performance_targets['memory_usage_max']:
            # 버퍼 크기 줄이기
            if self.tuning_parameters['buffer_size'] > 10000:
                new_buffer_size = max(10000, int(self.tuning_parameters['buffer_size'] * 0.8))
                tuning_actions.append(f"Decrease buffer_size: {self.tuning_parameters['buffer_size']} -> {new_buffer_size}")
                self.tuning_parameters['buffer_size'] = new_buffer_size

            # 프리페치 카운트 줄이기
            if self.tuning_parameters['prefetch_count'] > 100:
                new_prefetch = max(100, int(self.tuning_parameters['prefetch_count'] * 0.8))
                tuning_actions.append(f"Decrease prefetch_count: {self.tuning_parameters['prefetch_count']} -> {new_prefetch}")
                self.tuning_parameters['prefetch_count'] = new_prefetch

        # 큐 깊이가 높은 경우 (백프레셰 현상)
        if current_metrics.queue_depth > 1000:
            # 처리 속도 증대
            if self.tuning_parameters['batch_size'] < 3000:
                new_batch_size = min(3000, int(self.tuning_parameters['batch_size'] * 1.1))
                tuning_actions.append(f"Increase batch_size for queue: {self.tuning_parameters['batch_size']} -> {new_batch_size}")
                self.tuning_parameters['batch_size'] = new_batch_size

        # 튜닝 액션 로깅
        if tuning_actions:
            tuning_event = {
                'timestamp': time.time(),
                'metrics': current_metrics.__dict__,
                'actions': tuning_actions,
                'new_parameters': self.tuning_parameters.copy()
            }
            self.tuning_history.append(tuning_event)

            print(f"Auto-tuning performed at {time.ctime()}:")
            for action in tuning_actions:
                print(f"  - {action}")

    def analyze_performance_trends(self, window_minutes: int = 30) -> Dict[str, Any]:
        """성능 트렌드 분석"""
        if len(self.metrics_history) < 2:
            return {"insufficient_data": True}

        # 시간 윈도우 내 메트릭 필터링
        cutoff_time = time.time() - (window_minutes * 60)
        recent_metrics = [m for m in self.metrics_history if m.timestamp >= cutoff_time]

        if len(recent_metrics) < 2:
            return {"insufficient_recent_data": True}

        # 트렌드 계산
        throughput_values = [m.throughput_msg_per_sec for m in recent_metrics]
        latency_p95_values = [m.latency_p95_ms for m in recent_metrics]
        cpu_values = [m.cpu_usage_percent for m in recent_metrics]
        memory_values = [m.memory_usage_mb for m in recent_metrics]
        error_rates = [m.error_rate for m in recent_metrics]

        def calculate_trend(values):
            if len(values) < 2:
                return 0.0
            return (values[-1] - values[0]) / len(values)  # 단위 시간당 변화율

        return {
            'window_minutes': window_minutes,
            'sample_count': len(recent_metrics),
            'trends': {
                'throughput_trend': calculate_trend(throughput_values),
                'latency_trend': calculate_trend(latency_p95_values),
                'cpu_trend': calculate_trend(cpu_values),
                'memory_trend': calculate_trend(memory_values),
                'error_rate_trend': calculate_trend(error_rates)
            },
            'current_stats': {
                'throughput_avg': statistics.mean(throughput_values),
                'latency_p95_avg': statistics.mean(latency_p95_values),
                'cpu_avg': statistics.mean(cpu_values),
                'memory_avg': statistics.mean(memory_values),
                'error_rate_avg': statistics.mean(error_rates)
            },
            'performance_score': self._calculate_performance_score(recent_metrics[-1])
        }

    def _calculate_performance_score(self, metrics: PerformanceMetrics) -> float:
        """성능 점수 계산 (0-100)"""
        score = 100.0

        # 처리량 점수 (30%)
        throughput_ratio = min(1.0, metrics.throughput_msg_per_sec / self.performance_targets['throughput_min'])
        score -= (1 - throughput_ratio) * 30

        # 지연시간 점수 (25%)
        if metrics.latency_p95_ms > 0:
            latency_penalty = max(0, (metrics.latency_p95_ms - self.performance_targets['latency_p95_max']) / self.performance_targets['latency_p95_max'])
            score -= min(25, latency_penalty * 25)

        # CPU 사용률 점수 (20%)
        cpu_penalty = max(0, (metrics.cpu_usage_percent - self.performance_targets['cpu_usage_max']) / 100)
        score -= min(20, cpu_penalty * 20)

        # 메모리 사용률 점수 (15%)
        memory_penalty = max(0, (metrics.memory_usage_mb - self.performance_targets['memory_usage_max']) / self.performance_targets['memory_usage_max'])
        score -= min(15, memory_penalty * 15)

        # 에러율 점수 (10%)
        error_penalty = metrics.error_rate / self.performance_targets['error_rate_max']
        score -= min(10, error_penalty * 10)

        return max(0.0, score)

    def get_tuning_recommendations(self) -> List[Dict[str, Any]]:
        """튜닝 권장사항 생성"""
        if len(self.metrics_history) < 5:
            return [{"message": "Insufficient data for recommendations"}]

        latest_metrics = self.metrics_history[-1]
        recommendations = []

        # 처리량 개선 권장사항
        if latest_metrics.throughput_msg_per_sec < self.performance_targets['throughput_min'] * 0.8:
            recommendations.append({
                'priority': 'HIGH',
                'category': 'throughput',
                'recommendation': 'Increase batch size and consider adding more worker threads',
                'rationale': f'Current throughput ({latest_metrics.throughput_msg_per_sec:.0f} msg/s) is below 80% of target',
                'suggested_parameters': {
                    'batch_size': min(5000, self.tuning_parameters['batch_size'] * 2),
                    'worker_threads': min(psutil.cpu_count() * 2, self.tuning_parameters['worker_threads'] + 4)
                }
            })

        # 지연시간 개선 권장사항
        if latest_metrics.latency_p95_ms > self.performance_targets['latency_p95_max'] * 1.2:
            recommendations.append({
                'priority': 'HIGH',
                'category': 'latency',
                'recommendation': 'Reduce batch size and flush interval for lower latency',
                'rationale': f'P95 latency ({latest_metrics.latency_p95_ms:.1f}ms) exceeds target by 20%',
                'suggested_parameters': {
                    'batch_size': max(100, self.tuning_parameters['batch_size'] // 2),
                    'flush_interval_ms': max(1, self.tuning_parameters['flush_interval_ms'] // 2)
                }
            })

        # 리소스 사용량 최적화
        if latest_metrics.cpu_usage_percent > self.performance_targets['cpu_usage_max']:
            recommendations.append({
                'priority': 'MEDIUM',
                'category': 'resource_optimization',
                'recommendation': 'Optimize CPU usage by adjusting compression and thread count',
                'rationale': f'CPU usage ({latest_metrics.cpu_usage_percent:.1f}%) exceeds target',
                'suggested_parameters': {
                    'compression_level': max(0, self.tuning_parameters['compression_level'] - 1),
                    'worker_threads': max(psutil.cpu_count(), self.tuning_parameters['worker_threads'] - 2)
                }
            })

        if latest_metrics.memory_usage_mb > self.performance_targets['memory_usage_max']:
            recommendations.append({
                'priority': 'MEDIUM',
                'category': 'memory_optimization',
                'recommendation': 'Reduce buffer sizes and implement memory pooling',
                'rationale': f'Memory usage ({latest_metrics.memory_usage_mb:.0f}MB) exceeds target',
                'suggested_parameters': {
                    'buffer_size': max(10000, self.tuning_parameters['buffer_size'] // 2),
                    'prefetch_count': max(100, self.tuning_parameters['prefetch_count'] // 2)
                }
            })

        # 안정성 개선
        if latest_metrics.error_rate > self.performance_targets['error_rate_max']:
            recommendations.append({
                'priority': 'CRITICAL',
                'category': 'reliability',
                'recommendation': 'Investigate error causes and implement circuit breaker pattern',
                'rationale': f'Error rate ({latest_metrics.error_rate:.2%}) exceeds acceptable threshold',
                'suggested_actions': [
                    'Review error logs for patterns',
                    'Implement exponential backoff for retries',
                    'Add health checks and circuit breakers'
                ]
            })

        return recommendations

    def export_performance_report(self, format: str = "json") -> str:
        """성능 리포트 내보내기"""
        if not self.metrics_history:
            return json.dumps({"error": "No performance data available"})

        latest_metrics = self.metrics_history[-1]
        trends = self.analyze_performance_trends()
        recommendations = self.get_tuning_recommendations()

        report = {
            'timestamp': time.time(),
            'report_type': 'streaming_performance_report',
            'current_metrics': latest_metrics.__dict__,
            'performance_targets': self.performance_targets,
            'current_parameters': self.tuning_parameters,
            'trends_analysis': trends,
            'tuning_recommendations': recommendations,
            'tuning_history': self.tuning_history[-10:],  # 최근 10개 튜닝 이벤트
            'performance_score': self._calculate_performance_score(latest_metrics)
        }

        if format.lower() == "json":
            return json.dumps(report, indent=2, default=str)
        else:
            # CSV나 다른 포맷 지원 가능
            return json.dumps(report, indent=2, default=str)

# 부하 테스트 시뮬레이터
class LoadTestSimulator:
    def __init__(self, target_throughput: int = 10000):
        self.target_throughput = target_throughput
        self.current_load = 0
        self.error_injection_rate = 0.0

    async def simulate_variable_load(self, duration_minutes: int = 60):
        """가변 부하 시뮬레이션"""
        print(f"Starting {duration_minutes}-minute load test simulation...")

        start_time = time.time()
        end_time = start_time + (duration_minutes * 60)

        tuner = StreamingPerformanceTuner()

        while time.time() < end_time:
            elapsed_ratio = (time.time() - start_time) / (duration_minutes * 60)

            # 사인파 패턴으로 부하 변화
            load_multiplier = 1 + 0.5 * math.sin(elapsed_ratio * 4 * math.pi)  # 2번 사이클
            current_throughput = int(self.target_throughput * load_multiplier)

            # 랜덤 에러 주입
            if elapsed_ratio > 0.7:  # 70% 지점부터 에러 주입
                self.error_injection_rate = min(0.05, (elapsed_ratio - 0.7) * 0.1)

            # 시뮬레이션된 애플리케이션 메트릭
            latencies = [
                max(1, np.random.lognormal(3, 0.5)) for _ in range(100)
            ]  # 로그정규분포로 현실적인 지연시간 생성

            app_metrics = {
                'throughput_msg_per_sec': current_throughput + np.random.normal(0, current_throughput * 0.1),
                'latencies_ms': latencies,
                'error_rate': self.error_injection_rate + np.random.uniform(0, 0.01),
                'active_connections': np.random.randint(50, 200),
                'queue_depth': max(0, np.random.randint(0, 1000) + (current_throughput - 8000) // 10)
            }

            # 성능 메트릭 수집 및 튜닝
            metrics = tuner.collect_performance_metrics(app_metrics)

            print(f"Time: {elapsed_ratio:.1%}, "
                  f"Throughput: {metrics.throughput_msg_per_sec:.0f}, "
                  f"P95 Latency: {metrics.latency_p95_ms:.1f}ms, "
                  f"CPU: {metrics.cpu_usage_percent:.1f}%, "
                  f"Score: {tuner._calculate_performance_score(metrics):.1f}")

            await asyncio.sleep(10)  # 10초 간격

        # 최종 리포트 생성
        final_report = tuner.export_performance_report()
        print("\n=== Final Performance Report ===")
        print(final_report)

        return tuner

# 실행 예제
async def performance_optimization_demo():
    simulator = LoadTestSimulator(target_throughput=15000)
    tuner = await simulator.simulate_variable_load(duration_minutes=30)

    # 권장사항 출력
    recommendations = tuner.get_tuning_recommendations()
    print("\n=== Tuning Recommendations ===")
    for rec in recommendations:
        print(f"Priority: {rec['priority']}")
        print(f"Category: {rec['category']}")
        print(f"Recommendation: {rec['recommendation']}")
        print(f"Rationale: {rec['rationale']}")
        if 'suggested_parameters' in rec:
            print(f"Suggested Parameters: {rec['suggested_parameters']}")
        print()

# 실행
# asyncio.run(performance_optimization_demo())

마무리

실시간 데이터 스트리밍 아키텍처는 현대 비즈니스의 핵심 인프라가 되었습니다. Apache Kafka, Flink, Pulsar 같은 최신 기술들을 활용하여 확장 가능하고 신뢰할 수 있는 스트리밍 시스템을 구축할 수 있습니다.

성공적인 스트리밍 아키텍처 구축을 위한 핵심 원칙:

  1. 확장성 우선 설계: 처음부터 수평 확장을 고려한 아키텍처 설계
  2. 데이터 품질 관리: 실시간 데이터 검증 및 모니터링 시스템 구축
  3. 성능 최적화: 지속적인 모니터링과 자동 튜닝을 통한 성능 개선
  4. 장애 복구: 고가용성과 재해 복구 전략 수립
  5. 보안: 데이터 암호화와 접근 제어를 통한 보안 강화

스트리밍 시스템은 복잡하지만, 올바른 설계와 운영을 통해 비즈니스에 실질적인 가치를 제공할 수 있습니다. 특히 실시간 의사결정이 중요한 현대 비즈니스 환경에서 그 중요성은 더욱 커질 것입니다.

궁금한 점이 있으신가요?

문의사항이 있으시면 언제든지 연락주세요.