실시간 데이터 스트림 처리에서 윈도잉(Windowing) 기법은 데이터 분석과 집계의 핵심이 되는 기술이다. 본 가이드에서는 윈도잉의 기본 개념부터 다양한 윈도우 유형, 실제 적용 사례, 그리고 성능 최적화 전략까지 체계적으로 설명할 것이다. 이 글을 통해 스트림 데이터 환경에서 윈도잉 기법이 왜 중요한지, 그리고 어떻게 활용할 수 있는지 명확하게 이해할 수 있을 것이다.
윈도잉 기법이란?
윈도잉(Windowing) 기법은 연속적인 데이터 스트림에서 특정 시간 범위나 개수 범위 내의 데이터를 그룹화하여 처리하는 기술이다. 마치 창문(window)을 통해 데이터를 바라보는 것과 같아서, 이 창문은 시간이나 개수에 따라 이동하며 창문 안에 들어온 데이터들만을 대상으로 분석과 처리를 수행한다.
기본 개념
윈도잉의 핵심 아이디어는 연속적인 데이터 스트림에서 특정 범위의 데이터만을 선택적으로 처리하는 것이다. 이는 마치 시간의 흐름에 따라 움직이는 창문(window)을 통해 데이터를 바라보는 것과 같다.
윈도잉의 핵심 요소
- 윈도우 크기 (Window Size): 한 번에 처리할 데이터의 범위
- 윈도우 슬라이드 (Window Slide): 윈도우가 이동하는 간격
- 윈도우 함수 (Window Function): 윈도우 내 데이터에 적용할 연산
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| # 윈도잉의 기본 개념을 보여주는 예시
from collections import deque
import time
class BasicWindow:
def __init__(self, window_size):
self.window_size = window_size
self.data_window = deque(maxlen=window_size)
def add_data(self, data_point):
"""새로운 데이터를 윈도우에 추가"""
self.data_window.append(data_point)
return self.process_window()
def process_window(self):
"""윈도우 내 데이터 처리"""
if not self.data_window:
return None
return {
'count': len(self.data_window),
'average': sum(self.data_window) / len(self.data_window),
'max': max(self.data_window),
'min': min(self.data_window)
}
# 사용 예시
window = BasicWindow(window_size=5)
data_stream = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
for data in data_stream:
result = window.add_data(data)
if result:
print(f"윈도우 처리 결과: {result}")
|
윈도잉의 동작 원리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| 시간 축: [1] [2] [3] [4] [5] [6] [7] [8] [9] [10]
↑
현재 시점
윈도우 크기 3인 경우:
- 시점 3: [1, 2, 3] 처리
- 시점 4: [2, 3, 4] 처리
- 시점 5: [3, 4, 5] 처리
- ...
윈도우 크기 5인 경우:
- 시점 5: [1, 2, 3, 4, 5] 처리
- 시점 6: [2, 3, 4, 5, 6] 처리
- 시점 7: [3, 4, 5, 6, 7] 처리
- ...
|
윈도잉의 장점
- 메모리 효율성: 전체 스트림을 저장하지 않고 필요한 부분만 유지
- 실시간 처리: 최신 데이터에 집중하여 빠른 응답
- 확장성: 데이터 양이 증가해도 일정한 메모리 사용량
- 유연성: 다양한 윈도우 크기와 함수로 다양한 분석 가능
윈도잉의 핵심 특징
메모리 효율성 (Memory Efficiency)
윈도잉의 가장 큰 장점은 무한한 스트림 데이터를 모두 저장할 필요 없이 필요한 부분만 처리하는 것이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| # 메모리 효율성 예시
class MemoryEfficientWindow:
def __init__(self, window_size):
self.window_size = window_size
self.data = deque(maxlen=window_size) # 최대 크기 제한
def add_data(self, value):
self.data.append(value)
# 윈도우가 가득 차면 자동으로 가장 오래된 데이터 제거
return self.calculate_stats()
def calculate_stats(self):
return {
'count': len(self.data),
'sum': sum(self.data),
'avg': sum(self.data) / len(self.data) if self.data else 0
}
# 무한 스트림에서도 메모리 사용량이 일정함
window = MemoryEfficientWindow(1000)
for i in range(1000000): # 100만 개 데이터
result = window.add_data(i)
# 메모리 사용량은 항상 1000개 데이터만큼만 유지됨
|
실시간 처리 (Real-time Processing)
윈도잉은 최신 데이터에 집중하여 빠른 응답 시간을 보장한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| class RealTimeWindow:
def __init__(self, window_duration_seconds):
self.window_duration = window_duration_seconds
self.events = deque()
def add_event(self, event):
current_time = time.time()
# 오래된 이벤트 제거
while self.events and self.events[0].timestamp < current_time - self.window_duration:
self.events.popleft()
self.events.append(event)
# 즉시 처리 결과 반환
return self.process_recent_events()
def process_recent_events(self):
# 최신 이벤트들만 처리하여 빠른 응답
recent_events = list(self.events)[-10:] # 최근 10개만
return {
'recent_count': len(recent_events),
'total_in_window': len(self.events)
}
|
패턴 인식 (Pattern Recognition)
윈도잉(windowing)은 지정된 시간 또는 개수 범위 내의 데이터만을 집중적으로 분석하기 때문에, 데이터 스트림에서 발생하는 변화나 트렌드를 빠르게 포착할 수 있다.
이 방식은 전체 데이터가 아닌 최근 데이터 집합만을 대상으로 하므로, 노이즈에 덜 민감하고, 시계열 데이터의 상승·하락·변동 패턴을 실시간으로 감지할 수 있다.
또한, 윈도우 내 데이터의 평균, 분산, 이동평균 등 다양한 통계적 특징을 계산함으로써, 이상치(anomaly)나 급격한 변화(trend shift)도 효과적으로 탐지할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| class PatternRecognitionWindow:
def __init__(self, window_size):
self.window_size = window_size
self.data = deque(maxlen=window_size)
def detect_trend(self, new_value):
self.data.append(new_value)
if len(self.data) < 3:
return "insufficient_data"
# 최근 3개 값으로 트렌드 분석
recent = list(self.data)[-3:]
if recent[0] < recent[1] < recent[2]:
return "increasing"
elif recent[0] > recent[1] > recent[2]:
return "decreasing"
else:
return "fluctuating"
def detect_anomaly(self, new_value, threshold=2.0):
self.data.append(new_value)
if len(self.data) < 10:
return False
# Z-score 기반 이상값 탐지
values = list(self.data)
mean = sum(values) / len(values)
std_dev = (sum((x - mean) ** 2 for x in values) / len(values)) ** 0.5
if std_dev == 0:
return False
z_score = abs((new_value - mean) / std_dev)
return z_score > threshold
|
리소스 최적화 (Resource Optimization)
윈도잉은 처리해야 할 데이터 양을 제한하여 시스템 부하를 감소시킨다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| class ResourceOptimizedWindow:
def __init__(self, max_events_per_second=1000):
self.max_events = max_events_per_second
self.events = deque()
self.last_processing_time = time.time()
def add_event(self, event):
current_time = time.time()
self.events.append(event)
# 처리 빈도 제한으로 CPU 부하 감소
if current_time - self.last_processing_time >= 1.0: # 1초마다 처리
result = self.process_batch()
self.last_processing_time = current_time
return result
return None
def process_batch(self):
# 배치 처리로 효율성 향상
batch = list(self.events)
self.events.clear()
return {
'processed_count': len(batch),
'avg_value': sum(e.value for e in batch) / len(batch) if batch else 0
}
|
확장성 (Scalability)
윈도잉은 데이터 양이 증가해도 일정한 성능을 유지한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| class ScalableWindow:
def __init__(self, window_size):
self.window_size = window_size
self.sum = 0
self.count = 0
self.data = deque(maxlen=window_size)
def add_value(self, value):
# O(1) 시간 복잡도로 확장 가능한 처리
if len(self.data) == self.window_size:
old_value = self.data.popleft()
self.sum -= old_value
self.count -= 1
self.data.append(value)
self.sum += value
self.count += 1
return {
'average': self.sum / self.count,
'count': self.count
}
|
윈도잉의 필요성과 중요성
왜 윈도잉이 필요한가?
현대의 데이터 환경에서는 초당 수만 건의 이벤트가 발생하는 경우가 많다. 이러한 대용량 실시간 데이터를 효과적으로 처리하기 위해서는 윈도잉 기법이 필수적이다.
실제 시나리오와 윈도잉의 필요성
1. 금융 거래 시스템
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| # 초당 수천 건의 주식 거래 데이터
class StockTradeAnalyzer:
def __init__(self):
self.price_window = deque(maxlen=100) # 최근 100개 거래
self.volume_window = deque(maxlen=1000) # 최근 1000개 거래
def process_trade(self, trade):
self.price_window.append(trade.price)
self.volume_window.append(trade.volume)
# 실시간 가격 변동 분석
price_volatility = self.calculate_volatility(list(self.price_window))
# 거래량 급증 탐지
volume_surge = self.detect_volume_surge(list(self.volume_window))
return {
'current_price': trade.price,
'price_volatility': price_volatility,
'volume_surge': volume_surge
}
|
2. 웹 로그 분석
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| # 초당 수만 건의 웹사이트 접속 로그
class WebLogAnalyzer:
def __init__(self, window_minutes=5):
self.window_minutes = window_minutes
self.requests = deque()
self.error_threshold = 0.1 # 10% 에러율 임계값
def add_request(self, request):
current_time = request.timestamp
# 5분 윈도우 밖의 요청 제거
while self.requests and self.requests[0].timestamp < current_time - self.window_minutes * 60:
self.requests.popleft()
self.requests.append(request)
# 실시간 성능 분석
return self.analyze_performance()
def analyze_performance(self):
if not self.requests:
return {}
total_requests = len(self.requests)
error_requests = sum(1 for req in self.requests if req.status_code >= 400)
error_rate = error_requests / total_requests
return {
'request_rate': total_requests / (self.window_minutes * 60), # 요청/초
'error_rate': error_rate,
'is_alert': error_rate > self.error_threshold
}
|
3. IoT 센서 모니터링
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| # 초당 수백 건의 센서 데이터
class SensorMonitor:
def __init__(self, sensor_id, window_size=60):
self.sensor_id = sensor_id
self.window_size = window_size
self.readings = deque(maxlen=window_size)
self.alert_threshold = 2.0 # Z-score 임계값
def add_reading(self, value, timestamp):
self.readings.append({'value': value, 'timestamp': timestamp})
if len(self.readings) >= 10: # 최소 데이터 필요
anomaly = self.detect_anomaly(value)
trend = self.analyze_trend()
return {
'sensor_id': self.sensor_id,
'current_value': value,
'is_anomaly': anomaly,
'trend': trend,
'avg_value': sum(r['value'] for r in self.readings) / len(self.readings)
}
return None
def detect_anomaly(self, current_value):
values = [r['value'] for r in self.readings]
mean = sum(values) / len(values)
std_dev = (sum((x - mean) ** 2 for x in values) / len(values)) ** 0.5
if std_dev == 0:
return False
z_score = abs((current_value - mean) / std_dev)
return z_score > self.alert_threshold
|
4. 소셜 미디어 트렌드 분석
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| # 초당 수만 건의 포스트와 댓글
class SocialMediaAnalyzer:
def __init__(self, hashtag, window_hours=1):
self.hashtag = hashtag
self.window_hours = window_hours
self.posts = deque()
self.engagement_metrics = {}
def add_post(self, post):
current_time = post.timestamp
# 1시간 윈도우 밖의 포스트 제거
while self.posts and self.posts[0].timestamp < current_time - self.window_hours * 3600:
old_post = self.posts.popleft()
self.update_engagement_metrics(old_post, remove=True)
self.posts.append(post)
self.update_engagement_metrics(post, remove=False)
return self.analyze_trends()
def update_engagement_metrics(self, post, remove=False):
factor = -1 if remove else 1
self.engagement_metrics['total_likes'] = self.engagement_metrics.get('total_likes', 0) + factor * post.likes
self.engagement_metrics['total_shares'] = self.engagement_metrics.get('total_shares', 0) + factor * post.shares
self.engagement_metrics['total_comments'] = self.engagement_metrics.get('total_comments', 0) + factor * post.comments
def analyze_trends(self):
return {
'hashtag': self.hashtag,
'post_count': len(self.posts),
'engagement_rate': (self.engagement_metrics.get('total_likes', 0) +
self.engagement_metrics.get('total_shares', 0) +
self.engagement_metrics.get('total_comments', 0)) / max(len(self.posts), 1),
'trending_score': self.calculate_trending_score()
}
|
윈도잉이 없을 때의 문제점
- 메모리 부족: 무한한 데이터 스트림을 모두 저장하려면 무한한 메모리가 필요
- 처리 지연: 전체 데이터를 처리해야 하므로 실시간 응답 불가능
- 리소스 낭비: 오래된 데이터도 계속 처리해야 하는 비효율성
- 확장성 한계: 데이터 양이 증가하면 시스템이 포화됨
윈도잉의 장점
- 메모리 사용량 제한: 윈도우 크기에 따라 메모리 사용량이 일정하게 유지됨
- 실시간 분석: 최신 데이터에 대한 즉각적인 인사이트 제공
- 확장성: 데이터 양이 증가해도 일정한 성능 유지
- 유연성: 다양한 윈도우 유형으로 다양한 분석 요구사항 충족
윈도잉 기법의 주요 유형
시간 기반 윈도우 (Time-based Windows)
시간을 기준으로 윈도우를 정의하는 가장 일반적인 방식이다.
고정 시간 윈도우 (Fixed Time Windows)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| import time
from datetime import datetime, timedelta
class FixedTimeWindow:
def __init__(self, window_duration_seconds):
self.window_duration = window_duration_seconds
self.current_window_start = None
self.window_data = []
def process_event(self, event):
event_time = event.timestamp
# 새로운 윈도우 시작 확인
if (self.current_window_start is None or
event_time >= self.current_window_start + self.window_duration):
# 이전 윈도우 처리
if self.window_data:
result = self.process_window(self.window_data)
print(f"윈도우 처리 완료: {result}")
# 새 윈도우 시작
self.current_window_start = event_time
self.window_data = [event]
else:
self.window_data.append(event)
def process_window(self, data):
return {
'window_start': self.current_window_start,
'event_count': len(data),
'avg_value': sum(e.value for e in data) / len(data) if data else 0
}
|
슬라이딩 시간 윈도우 (Sliding Time Windows)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| class SlidingTimeWindow:
def __init__(self, window_duration, slide_interval):
self.window_duration = window_duration
self.slide_interval = slide_interval
self.events = deque()
def add_event(self, event):
current_time = event.timestamp
# 윈도우 밖의 이벤트 제거
while self.events and self.events[0].timestamp < current_time - self.window_duration:
self.events.popleft()
self.events.append(event)
# 슬라이드 간격에 따른 처리
if len(self.events) > 0:
return self.process_window(list(self.events))
def process_window(self, events):
return {
'window_size': len(events),
'time_span': events[-1].timestamp - events[0].timestamp if len(events) > 1 else 0,
'avg_value': sum(e.value for e in events) / len(events)
}
|
개수 기반 윈도우 (Count-based Windows)
데이터 개수를 기준으로 윈도우를 정의하는 방식이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| class CountBasedWindow:
def __init__(self, window_size):
self.window_size = window_size
self.data_buffer = deque(maxlen=window_size)
def add_data(self, data):
self.data_buffer.append(data)
if len(self.data_buffer) == self.window_size:
return self.process_full_window(list(self.data_buffer))
return None
def process_full_window(self, window_data):
return {
'count': len(window_data),
'average': sum(window_data) / len(window_data),
'max': max(window_data),
'min': min(window_data),
'std_dev': self.calculate_std_dev(window_data)
}
def calculate_std_dev(self, data):
if len(data) < 2:
return 0
mean = sum(data) / len(data)
variance = sum((x - mean) ** 2 for x in data) / len(data)
return variance ** 0.5
|
세션 윈도우 (Session Windows)
사용자 활동이나 세션을 기준으로 윈도우를 정의하는 방식이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| class SessionWindow:
def __init__(self, session_timeout):
self.session_timeout = session_timeout
self.active_sessions = {} # session_id -> (last_activity, events)
def process_event(self, event):
session_id = event.session_id
current_time = event.timestamp
if session_id in self.active_sessions:
last_activity, events = self.active_sessions[session_id]
# 세션 타임아웃 확인
if current_time - last_activity > self.session_timeout:
# 세션 종료 및 처리
self.process_session(session_id, events)
events = []
events.append(event)
self.active_sessions[session_id] = (current_time, events)
else:
# 새 세션 시작
self.active_sessions[session_id] = (current_time, [event])
def process_session(self, session_id, events):
print(f"세션 {session_id} 처리: {len(events)}개 이벤트")
return {
'session_id': session_id,
'event_count': len(events),
'duration': events[-1].timestamp - events[0].timestamp if len(events) > 1 else 0
}
|
실제 구현 예제
실시간 모니터링 시스템
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
| import random
from datetime import datetime
import threading
import time
class RealTimeMonitor:
def __init__(self, window_size=100):
self.window_size = window_size
self.data_window = deque(maxlen=window_size)
self.alert_threshold = 2.0 # Z-score 임계값
self.lock = threading.Lock()
def add_metric(self, metric_value):
with self.lock:
self.data_window.append(metric_value)
if len(self.data_window) >= 10: # 최소 데이터 필요
return self.detect_anomaly(metric_value)
return False
def detect_anomaly(self, current_value):
values = list(self.data_window)
mean = sum(values) / len(values)
variance = sum((x - mean) ** 2 for x in values) / len(values)
std_dev = variance ** 0.5
if std_dev == 0:
return False
z_score = abs((current_value - mean) / std_dev)
return z_score > self.alert_threshold
def get_statistics(self):
with self.lock:
if not self.data_window:
return {}
values = list(self.data_window)
return {
'count': len(values),
'average': sum(values) / len(values),
'max': max(values),
'min': min(values),
'std_dev': self.calculate_std_dev(values)
}
def calculate_std_dev(self, values):
if len(values) < 2:
return 0
mean = sum(values) / len(values)
variance = sum((x - mean) ** 2 for x in values) / len(values)
return variance ** 0.5
# 사용 예시
def simulate_monitoring():
monitor = RealTimeMonitor(window_size=50)
def generate_data():
while True:
# 정상 데이터 (평균 100, 표준편차 10)
value = random.normalvariate(100, 10)
# 가끔 이상값 생성
if random.random() < 0.05: # 5% 확률로 이상값
value = random.normalvariate(200, 20)
is_anomaly = monitor.add_metric(value)
if is_anomaly:
print(f"🚨 이상값 감지: {value:.2f}")
time.sleep(0.1) # 100ms 간격
# 백그라운드에서 데이터 생성
thread = threading.Thread(target=generate_data, daemon=True)
thread.start()
# 주기적으로 통계 출력
while True:
stats = monitor.get_statistics()
if stats:
print(f"📊 통계: {stats}")
time.sleep(5)
|
금융 거래 분석
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
| from dataclasses import dataclass
from datetime import datetime
@dataclass
class Trade:
timestamp: datetime
symbol: str
price: float
volume: int
trade_type: str # 'buy' or 'sell'
class TradingAnalyzer:
def __init__(self, time_window_minutes=5):
self.time_window = time_window_minutes * 60 # 초 단위
self.trades = deque()
self.volume_profile = {} # 가격대별 거래량
def add_trade(self, trade):
current_time = trade.timestamp
# 윈도우 밖의 거래 제거
while self.trades and self.trades[0].timestamp < current_time - self.time_window:
old_trade = self.trades.popleft()
self.update_volume_profile(old_trade, remove=True)
self.trades.append(trade)
self.update_volume_profile(trade, remove=False)
return self.calculate_metrics()
def update_volume_profile(self, trade, remove=False):
price_level = round(trade.price, 2) # 가격을 0.01 단위로 반올림
if price_level not in self.volume_profile:
self.volume_profile[price_level] = 0
if remove:
self.volume_profile[price_level] -= trade.volume
if self.volume_profile[price_level] <= 0:
del self.volume_profile[price_level]
else:
self.volume_profile[price_level] += trade.volume
def calculate_metrics(self):
if not self.trades:
return {}
prices = [trade.price for trade in self.trades]
volumes = [trade.volume for trade in self.trades]
# 거래량 가중 평균 가격 (VWAP)
total_value = sum(p * v for p, v in zip(prices, volumes))
total_volume = sum(volumes)
vwap = total_value / total_volume if total_volume > 0 else 0
return {
'vwap': vwap,
'price_volatility': self.calculate_volatility(prices),
'total_volume': total_volume,
'trade_count': len(self.trades),
'avg_trade_size': total_volume / len(self.trades) if self.trades else 0,
'volume_profile': dict(sorted(self.volume_profile.items()))
}
def calculate_volatility(self, prices):
if len(prices) < 2:
return 0
returns = [(prices[i] - prices[i-1]) / prices[i-1] for i in range(1, len(prices))]
mean_return = sum(returns) / len(returns)
variance = sum((r - mean_return) ** 2 for r in returns) / len(returns)
return variance ** 0.5
|
웹 로그 분석
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
| @dataclass
class WebRequest:
timestamp: datetime
ip_address: str
method: str
url: str
status_code: int
response_time: float
user_agent: str
class WebLogAnalyzer:
def __init__(self, window_minutes=10):
self.window_minutes = window_minutes
self.requests = deque()
self.error_threshold = 0.1 # 10% 에러율 임계값
self.response_time_threshold = 2.0 # 2초 응답시간 임계값
def add_request(self, request):
current_time = request.timestamp
# 윈도우 밖의 요청 제거
while self.requests and self.requests[0].timestamp < current_time - self.window_minutes * 60:
self.requests.popleft()
self.requests.append(request)
return self.analyze_performance()
def analyze_performance(self):
if not self.requests:
return {}
total_requests = len(self.requests)
error_requests = sum(1 for req in self.requests if req.status_code >= 400)
slow_requests = sum(1 for req in self.requests if req.response_time > self.response_time_threshold)
error_rate = error_requests / total_requests
slow_rate = slow_requests / total_requests
# IP별 요청 수
ip_counts = {}
for req in self.requests:
ip_counts[req.ip_address] = ip_counts.get(req.ip_address, 0) + 1
# URL별 요청 수
url_counts = {}
for req in self.requests:
url_counts[req.url] = url_counts.get(req.url, 0) + 1
return {
'request_rate': total_requests / (self.window_minutes * 60), # 요청/초
'error_rate': error_rate,
'slow_request_rate': slow_rate,
'avg_response_time': sum(req.response_time for req in self.requests) / total_requests,
'is_alert': error_rate > self.error_threshold or slow_rate > 0.2,
'top_ips': sorted(ip_counts.items(), key=lambda x: x[1], reverse=True)[:5],
'top_urls': sorted(url_counts.items(), key=lambda x: x[1], reverse=True)[:5]
}
|
성능 최적화 전략
메모리 최적화
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| class OptimizedWindow:
def __init__(self, window_size):
self.window_size = window_size
self.sum = 0
self.count = 0
self.data = deque(maxlen=window_size)
def add_value(self, value):
if len(self.data) == self.window_size:
# 윈도우가 가득 찬 경우, 가장 오래된 값 제거
old_value = self.data.popleft()
self.sum -= old_value
self.count -= 1
self.data.append(value)
self.sum += value
self.count += 1
def get_average(self):
return self.sum / self.count if self.count > 0 else 0
def get_statistics(self):
if not self.data:
return {}
return {
'count': self.count,
'average': self.get_average(),
'max': max(self.data),
'min': min(self.data)
}
|
병렬 처리 최적화
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| from concurrent.futures import ThreadPoolExecutor
import threading
class ParallelWindowProcessor:
def __init__(self, num_workers=4):
self.executor = ThreadPoolExecutor(max_workers=num_workers)
self.lock = threading.Lock()
self.windows = {}
def process_window(self, window_id, data):
with self.lock:
if window_id not in self.windows:
self.windows[window_id] = []
self.windows[window_id].extend(data)
# 병렬로 윈도우 처리
future = self.executor.submit(self.analyze_window, window_id, data)
return future
def analyze_window(self, window_id, data):
# 윈도우 데이터 분석 로직
return {
'window_id': window_id,
'count': len(data),
'average': sum(data) / len(data) if data else 0,
'processed_at': datetime.now()
}
def shutdown(self):
self.executor.shutdown(wait=True)
|
지연 데이터 처리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
| class LateDataHandler:
def __init__(self, allowed_lateness_seconds=300): # 5분 지연 허용
self.allowed_lateness = allowed_lateness_seconds
self.pending_windows = {}
self.window_duration = 60 # 1분 윈도우
def process_event(self, event):
current_time = event.timestamp
window_start = self.get_window_start(current_time)
# 지연 데이터 확인
if current_time < window_start - self.allowed_lateness:
# 너무 오래된 데이터는 무시
print(f"지연 데이터 무시: {event}")
return None
# 윈도우에 데이터 추가
if window_start not in self.pending_windows:
self.pending_windows[window_start] = []
self.pending_windows[window_start].append(event)
# 윈도우 완료 확인
if self.is_window_complete(window_start):
return self.finalize_window(window_start)
def get_window_start(self, timestamp):
# 윈도우 시작 시간 계산
return timestamp - (timestamp % self.window_duration)
def is_window_complete(self, window_start):
# 윈도우 완료 조건 확인
current_time = time.time()
return current_time >= window_start + self.window_duration + self.allowed_lateness
def finalize_window(self, window_start):
if window_start in self.pending_windows:
data = self.pending_windows.pop(window_start)
return self.process_window_data(data)
return None
def process_window_data(self, data):
return {
'window_data': data,
'count': len(data),
'processed_at': datetime.now()
}
|
실무 적용 사례: Apache Kafka vs Apache Flink
Apache Kafka와 Apache Flink는 모두 스트림 데이터 처리에서 중요한 역할을 하지만, 각각 다른 목적과 특징을 가지고 있다. 윈도잉 기법 관점에서 두 플랫폼의 차이점을 살펴보자.
Apache Kafka: 메시지 브로커 (Message Broker)
Kafka의 핵심 역할:
- 데이터 전송: 프로듀서와 컨슈머 간의 메시지 전달
- 데이터 저장: 디스크에 메시지를 영구 저장
- 확장성: 수평적 확장으로 높은 처리량 지원
Kafka에서의 윈도잉:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
| from kafka import KafkaConsumer, KafkaProducer
import json
import time
from collections import deque
class KafkaWindowProcessor:
def __init__(self, bootstrap_servers, input_topic, output_topic):
self.consumer = KafkaConsumer(
input_topic,
bootstrap_servers=bootstrap_servers,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='latest'
)
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.output_topic = output_topic
# Kafka는 자체 윈도잉 기능이 없으므로 직접 구현
self.window_data = deque(maxlen=1000) # 최대 1000개 메시지
self.window_start_time = None
self.window_duration = 300 # 5분 윈도우
def process_messages(self):
for message in self.consumer:
event = {
'timestamp': message.value.get('timestamp', time.time()),
'value': message.value.get('value', 0),
'source': message.value.get('source', 'unknown')
}
result = self.process_with_window(event)
if result:
# 윈도우 처리 결과를 출력 토픽으로 전송
self.producer.send(self.output_topic, result)
print(f"윈도우 처리 결과 전송: {result}")
def process_with_window(self, event):
current_time = event['timestamp']
# 윈도우 시작 시간 설정
if self.window_start_time is None:
self.window_start_time = current_time
# 윈도우가 완료되었는지 확인
if current_time >= self.window_start_time + self.window_duration:
# 윈도우 처리
result = self.process_window()
# 새 윈도우 시작
self.window_start_time = current_time
self.window_data.clear()
self.window_data.append(event)
return result
else:
# 윈도우에 데이터 추가
self.window_data.append(event)
return None
def process_window(self):
if not self.window_data:
return None
values = [event['value'] for event in self.window_data]
return {
'window_start': self.window_start_time,
'window_end': self.window_start_time + self.window_duration,
'count': len(values),
'average': sum(values) / len(values),
'max': max(values),
'min': min(values)
}
def close(self):
self.consumer.close()
self.producer.close()
|
Kafka의 윈도잉 특징:
- ❌ 자체 윈도잉 기능 없음: 개발자가 직접 구현해야 함
- ✅ 높은 처리량: 초당 수십만 메시지 처리 가능
- ✅ 내구성: 디스크 저장으로 데이터 손실 방지
- ❌ 복잡한 윈도잉 로직: 상태 관리, 워터마크 등 직접 구현
Apache Flink: 스트림 처리 엔진 (Stream Processing Engine)
Flink의 핵심 역할:
- 데이터 처리: 복잡한 스트림 처리 로직 실행
- 윈도잉: 내장된 다양한 윈도잉 기능 제공
- 상태 관리: 분산 환경에서 상태를 안전하게 관리
Flink에서의 윈도잉:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
| # Flink 윈도잉 예시 (Python API)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.window import TimeWindow, CountWindow, SessionWindow
from pyflink.common.time import Time
from pyflink.datastream.functions import WindowFunction
from pyflink.common.typeinfo import Types
class FlinkWindowingExample:
def __init__(self):
self.env = StreamExecutionEnvironment.get_execution_environment()
self.env.set_parallelism(4) # 병렬 처리 설정
def run_time_windowing(self):
"""시간 기반 윈도잉"""
# 소켓에서 데이터 스트림 읽기
stream = self.env.socket_text_stream("localhost", 9999)
# 데이터 파싱
parsed_stream = stream.map(
lambda x: self.parse_event(x),
output_type=Types.ROW([Types.STRING(), Types.DOUBLE(), Types.LONG()])
)
# 시간 기반 윈도우 (5초)
time_windowed = parsed_stream \
.key_by(lambda x: x[0]) \
.window(TimeWindow.of(Time.seconds(5))) \
.apply(self.process_time_window)
time_windowed.print("Time Window Results")
return time_windowed
def run_count_windowing(self):
"""개수 기반 윈도잉"""
stream = self.env.socket_text_stream("localhost", 9999)
parsed_stream = stream.map(
lambda x: self.parse_event(x),
output_type=Types.ROW([Types.STRING(), Types.DOUBLE(), Types.LONG()])
)
# 개수 기반 윈도우 (100개)
count_windowed = parsed_stream \
.key_by(lambda x: x[0]) \
.count_window(100) \
.apply(self.process_count_window)
count_windowed.print("Count Window Results")
return count_windowed
def run_session_windowing(self):
"""세션 윈도잉"""
stream = self.env.socket_text_stream("localhost", 9999)
parsed_stream = stream.map(
lambda x: self.parse_event(x),
output_type=Types.ROW([Types.STRING(), Types.DOUBLE(), Types.LONG()])
)
# 세션 윈도우 (30초 타임아웃)
session_windowed = parsed_stream \
.key_by(lambda x: x[0]) \
.window(SessionWindow.with_gap(Time.seconds(30))) \
.apply(self.process_session_window)
session_windowed.print("Session Window Results")
return session_windowed
def parse_event(self, line):
"""이벤트 파싱"""
parts = line.split(',')
return (parts[0], float(parts[1]), int(parts[2]))
def process_time_window(self, key, window, events):
"""시간 윈도우 처리"""
values = [event[1] for event in events]
return {
'window_type': 'time',
'key': key,
'window_start': window.get_start(),
'window_end': window.get_end(),
'count': len(values),
'average': sum(values) / len(values) if values else 0,
'max': max(values) if values else 0,
'min': min(values) if values else 0
}
def process_count_window(self, key, window, events):
"""개수 윈도우 처리"""
values = [event[1] for event in events]
return {
'window_type': 'count',
'key': key,
'window_id': window.get_id(),
'count': len(values),
'average': sum(values) / len(values) if values else 0
}
def process_session_window(self, key, window, events):
"""세션 윈도우 처리"""
values = [event[1] for event in events]
return {
'window_type': 'session',
'key': key,
'session_start': window.get_start(),
'session_end': window.get_end(),
'session_duration': window.get_end() - window.get_start(),
'count': len(values),
'average': sum(values) / len(values) if values else 0
}
def execute(self):
"""모든 윈도잉 예제 실행"""
self.run_time_windowing()
self.run_count_windowing()
self.run_session_windowing()
self.env.execute("Flink Windowing Examples")
# 사용 예시
if __name__ == "__main__":
flink_example = FlinkWindowingExample()
flink_example.execute()
|
Flink의 윈도잉 특징:
- ✅ 내장 윈도잉 기능: 시간, 개수, 세션 윈도우 등 다양한 유형 지원
- ✅ 워터마크: 지연 데이터 처리와 윈도우 완료 보장
- ✅ 상태 관리: 체크포인트와 상태 백업으로 장애 복구
- ✅ 복잡한 연산: 조인, 집계, 패턴 매칭 등 고급 기능
- ❌ 상대적으로 낮은 처리량: 복잡한 처리로 인한 오버헤드
Kafka vs Flink 비교표
특징 | Apache Kafka | Apache Flink |
---|
주요 역할 | 메시지 브로커 | 스트림 처리 엔진 |
윈도잉 기능 | ❌ 직접 구현 필요 | ✅ 내장 기능 제공 |
처리량 | 매우 높음 (수십만/초) | 높음 (수만/초) |
지연 시간 | 매우 낮음 (밀리초) | 낮음 (초 단위) |
상태 관리 | ❌ 직접 구현 | ✅ 내장 기능 |
장애 복구 | ✅ 자동 복구 | ✅ 체크포인트 기반 |
확장성 | 매우 높음 | 높음 |
복잡한 연산 | ❌ 제한적 | ✅ 풍부한 기능 |
학습 곡선 | 낮음 | 높음 |
실제 아키텍처에서의 활용
Kafka + Flink 조합 (권장):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| # Kafka로 데이터 수집, Flink로 처리하는 아키텍처
class KafkaFlinkArchitecture:
def __init__(self):
self.kafka_source = KafkaConsumer(
'raw-data-topic',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
# Flink 환경 설정
self.env = StreamExecutionEnvironment.get_execution_environment()
self.env.set_parallelism(4)
def create_flink_kafka_source(self):
"""Kafka를 Flink 소스로 사용"""
return self.env \
.add_source(self.kafka_source) \
.map(lambda x: self.parse_event(x))
def process_with_flink_windowing(self):
"""Flink 윈도잉으로 처리"""
stream = self.create_flink_kafka_source()
# 복잡한 윈도잉 처리
processed = stream \
.key_by(lambda x: x['key']) \
.window(TimeWindow.of(Time.seconds(60))) \
.apply(self.complex_window_function)
return processed
def complex_window_function(self, key, window, events):
"""복잡한 윈도우 함수"""
# 통계 계산
values = [event['value'] for event in events]
# 이상값 탐지
mean = sum(values) / len(values)
std_dev = (sum((x - mean) ** 2 for x in values) / len(values)) ** 0.5
anomalies = [v for v in values if abs(v - mean) > 2 * std_dev]
return {
'key': key,
'window_start': window.get_start(),
'window_end': window.get_end(),
'count': len(values),
'average': mean,
'std_dev': std_dev,
'anomaly_count': len(anomalies),
'anomalies': anomalies
}
|
결론:
- Kafka: 데이터 전송과 저장에 특화, 높은 처리량과 낮은 지연 시간
- Flink: 복잡한 스트림 처리와 윈도잉에 특화, 풍부한 분석 기능
- 실무: Kafka로 데이터 수집 → Flink로 처리하는 하이브리드 아키텍처가 일반적
결론
윈도잉 기법은 현대 데이터 처리 시스템에서 필수적인 기술이다. 시간 기반, 개수 기반, 세션 기반 등 다양한 윈도우 유형을 상황에 맞게 선택하고, 메모리 최적화와 병렬 처리를 통해 효율적인 시스템을 구축할 수 있다.
실시간 모니터링, 금융 거래 분석, 웹 로그 분석 등 다양한 분야에서 윈도잉 기법이 활용되고 있으며, AI/ML과의 통합, 엣지 컴퓨팅과의 결합을 통해 더욱 발전할 것이다.
윈도잉 기법을 효과적으로 활용하기 위해서는 도메인에 맞는 적절한 윈도우 유형 선택, 메모리 및 성능 최적화, 지연 데이터 처리 전략 수립이 중요하다. 이러한 요소들을 고려하여 설계된 윈도잉 시스템은 대용량 실시간 데이터를 효율적으로 처리할 수 있는 강력한 도구가 될 것이다.
참고 자료
- Apache Flink 공식 문서: Windowing
- Apache Kafka Streams: Windowing
- Apache Spark Streaming: Window Operations
- Google Cloud Dataflow: Windowing
- AWS Kinesis: Windowing
- 네이버 D2: 실시간 데이터 스트림 처리와 윈도잉 기법