1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
| // TODO: 새로운 패턴의 프로토타입 구현
// 1. Event Publisher 구현
@Component
public class DistributedConsistencyEventPublisher implements ConsistencyEventPublisher {
private final EventStore eventStore;
private final EventBus eventBus;
@Override
public <T> void publishEvent(String aggregateId, String eventType, T eventData, List<String> targetServices) {
// TODO: 구현
// 1. 이벤트 생성 및 저장
ConsistencyEvent<T> event = createEvent(aggregateId, eventType, eventData, targetServices);
eventStore.save(event);
// 2. 이벤트 발행
try {
eventBus.publish(event);
eventStore.markAsPublished(event.getId());
} catch (Exception e) {
eventStore.markAsFailed(event.getId());
scheduleRetry(event);
}
}
private <T> ConsistencyEvent<T> createEvent(String aggregateId, String eventType, T eventData, List<String> targetServices) {
// TODO: 이벤트 객체 생성
return null;
}
private void scheduleRetry(ConsistencyEvent<?> event) {
// TODO: 재시도 스케줄링
}
}
// 2. Event Handler 기본 구현
public abstract class AbstractConsistencyEventHandler<T> implements ConsistencyEventHandler<T> {
@Override
public void handleEvent(ConsistencyEvent<T> event) {
String serviceName = getServiceName();
if (!event.getTargetServices().contains(serviceName)) {
return; // 이 서비스 대상이 아님
}
try {
// 멱등성 확인
if (isAlreadyProcessed(event.getId())) {
markAsProcessed(event, ProcessingStatus.DUPLICATE);
return;
}
// 비즈니스 로직 실행
ProcessingResult result = processEvent(event.getEventData());
if (result.isSuccessful()) {
markAsProcessed(event, ProcessingStatus.SUCCESS);
} else {
markAsProcessed(event, ProcessingStatus.FAILED);
scheduleRetry(event, result.getRetryDelay());
}
} catch (Exception e) {
markAsProcessed(event, ProcessingStatus.ERROR);
handleProcessingError(event, e);
}
}
protected abstract ProcessingResult processEvent(T eventData);
protected abstract boolean isAlreadyProcessed(String eventId);
protected abstract void markAsProcessed(ConsistencyEvent<T> event, ProcessingStatus status);
private void scheduleRetry(ConsistencyEvent<T> event, Duration delay) {
// TODO: 재시도 스케줄링
}
private void handleProcessingError(ConsistencyEvent<T> event, Exception e) {
// TODO: 에러 처리 및 알림
}
}
// 3. 구체적인 사용 예시
@Service
public class UserConsistencyService {
private final ConsistencyEventPublisher eventPublisher;
@Transactional
public void updateUser(User user) {
// 1. 로컬 데이터 업데이트
User savedUser = userRepository.save(user);
// 2. 일관성 이벤트 발행
List<String> targetServices = Arrays.asList(
"order-service",
"billing-service",
"shipping-service"
);
eventPublisher.publishEvent(
savedUser.getId().toString(),
"UserUpdated",
savedUser,
targetServices
);
}
}
// 각 서비스별 이벤트 핸들러
@Component
public class OrderServiceUserEventHandler extends AbstractConsistencyEventHandler<User> {
@Override
protected ProcessingResult processEvent(User userData) {
// TODO: 주문 서비스에서 사용자 정보 업데이트
try {
orderCustomerService.updateCustomerInfo(userData);
return ProcessingResult.success();
} catch (Exception e) {
return ProcessingResult.retry(Duration.ofMinutes(5));
}
}
@Override
public String getServiceName() {
return "order-service";
}
@Override
public boolean canHandle(ConsistencyEvent<?> event) {
return "UserUpdated".equals(event.getEventType());
}
// ... 다른 메서드들 구현
}
|