배경
서버에서 polling을 해야하는 상황이 생겼다. 아키텍처상 여러 서버들이 통신을 주고 받는데, 일렬로 구성된 3가지 서버에서 중간 서버의 코드를 작성해야하는 상황이였다. 구체적으로는 keycloak과 중간서버가 사용자의 인증을 기다려야하는 상태였다(아래 그림에서 External Auth Server).
External Auth Server는 Keycloak으로부터 인증 요청이 오면, 일단 201로 응답을 한 뒤 AD에 요청을 보내고 polling을 통해 AD의 승인 여부를 기다린다. polling하는 코드는 대략 아래와 같다.
@Async
protected CompletableFuture<ResponseEntity<String>> pollingAuthResult(String authReqId, String realmId, String clientId, String expiresIn){
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
LinkedMultiValueMap<String, String> body = new LinkedMultiValueMap<>();
body.add("grant_type", GrantTypeEnum.CIBA.getValue());
body.add("auth_req_id", authReqId);
body.add("client_id", clientId);
body.add("client_secret", ClientIdSecretMappingEnum.getClientSecretByClientId(clientId));
HttpEntity<MultiValueMap<String, String>> pollingRequestEntity = new HttpEntity<>(body, headers);
return CompletableFuture.supplyAsync(() -> {
int elapsedTime = 0;
int interval = 5;
int maxDuration = Integer.parseInt(expiresIn);
while (elapsedTime < maxDuration) {
try {
String tokenEndpointUrl = keycloakUrl + realmId + oidcAuthTokenSuffix;
ResponseEntity<String> authResponse = restApiService.sendRestRequest(tokenEndpointUrl, HttpMethod.POST, pollingRequestEntity, String.class);
if (authResponse.getStatusCode() == HttpStatus.OK) {
log.info("CIBA accept response: {}", authResponse.getBody());
}
return authResponse;
} catch (HttpClientErrorException e) {
log.info("Polling pending... Exception: {}", e.getMessage());
try {
Thread.sleep(interval * 1000L);
elapsedTime += interval;
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
} catch (Exception e) {
log.error("Error occurred while polling: {}", e.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Error occurred while polling");
}
}
return ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT).body("Polling timed out.");
});
}
배운점
CompletableFuture와 supplyAsync
CompletableFuture
비동기 작업을 지원하며 해당 작업용 스레드는 비동기용 스레드로 따로 호출된다. 다시 말해 톰캣 등 스프링 서버를 구동하는 스레드와는 별개의 스레드를 호출하여 비동기 작업을 진행한다.
CompletableFuture.completedFuture 메서드를 통해 비동기 작업을 진행한 것처럼 결과를 리턴할 수 있다.
supplyAsync(runAsync)
함수형 인터페이스인 Supplier 또는 Runnable 타입을 받아와서 비동기 작업의 결과를 처리하는 함수를 작성할 수 있다.
또한 메서드가 끝난 뒤 .thenApply() 등의 메서드를 체이닝하여 결과가 끝난 뒤 다른 결과를 반환, thenAccept로 결과를 받아서 사용하는 등의 .then을 활용한 여러 메서드를 제공할 수 있다.
future.thenApply(result -> result + "!");
예외 처리는 .exceptionally, .handle 메서드를 체이닝하여 처리할 수 있다.
future.handle((result, ex) -> ex != null ? "Error" : result);
@Async와 스레드 관리
위 CompletableFuture를 리턴하는 함수 위에 @Async를 사용하면 비동기로 처리하는 메서드임을 표시할 수 있고, 아래와 같이 AsyncConfig를 통해 @Async 어노테이션이 달린 메서드들을 위한 스레드만 따로 설정할 수도 있다.
@Configuration
public class AsyncConfig {
@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Async-");
executor.initialize();
return executor;
}
}
해당 스레드는 메인 스레드풀의 스레드와는 별개로 작동한다. 그리고 Queue를 두어 MaxPoolSize를 넘었을 때 대기하도록 설정할 수도 있다.
스레드의 사이즈나 실행 시간 등은 실행하는 서버 인스턴스의 resource와 실제 환경에서의 부하에 따라 조절이 필요하다.
'Programming-[Backend] > Java' 카테고리의 다른 글
[TIL] JVM HeapSize, HeapDumpPath 설정 (0) | 2025.01.06 |
---|---|
Virtual Thread 기초 (0) | 2024.12.08 |
[TIL] Docker image JVM Heap 크기 및 옵션 설정, buildpack-gradle bootBuildImage, Packeto buildpack (0) | 2024.07.26 |
자바 기초 강의 정리 - 8. 클래스 패스, JAR (0) | 2024.07.15 |
자바 기초 강의 정리 - 7. 리플렉션, 어노테이션, 클래스 로더 (0) | 2024.07.14 |