본문 바로가기
관리자

Programming-[Backend]/Spring

[TIL] spring WebFlux, WebClient - 비동기 polling

728x90
반응형

https://www.vinsguru.com/category/spring/spring-webflux/page/6/

배경

서버에 요청이 들어오면 다른 서버에 비동기적으로 polling 요청을 보내야하는 상황. 처음에는 생각나는대로 while문과 try-catch 문을 사용해 구현했으나, 리팩토링 하다보니 Spring WebFlux를 사용하면 간결한 문법과 함께 polling 시에 벌어지는 여러 상황들을 대처할 수 있는 메서드들이 주어져서 편하게 코딩할 수 있다는 것을 알게되었다.

 

코드

 

@Async
protected CompletableFuture<ResponseEntity<String>> pollingAuthResult(String authReqId,
    String realmId, String clientId, String expiresIn, String interval) {

  HttpHeaders headers = new HttpHeaders();
  headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
  LinkedMultiValueMap<String, String> body = createBodyForPolling(authReqId, clientId);
  String tokenEndpointUri = keycloakUrl + realmId + oidcAuthTokenSuffix;

  int intervalInt = Integer.parseInt(interval) + 1;
  int maxDuration = Integer.parseInt(expiresIn);
  long maxIterations = (long) Math.ceil((double) maxDuration / intervalInt);

  return Flux.interval(Duration.ofSeconds(intervalInt))
      .take(maxIterations)
      .flatMap(attempt -> WebClient.create()
          .post()
          .uri(tokenEndpointUri)
          .headers(httpHeaders -> httpHeaders.addAll(headers))
          .bodyValue(body)
          .retrieve()
          .toEntity(String.class)
          .onErrorResume(WebClientResponseException.class, ex -> {
            HttpStatusCode statusCode = ex.getStatusCode();
            String responseBody = ex.getResponseBodyAsString();
            return Mono.just(ResponseEntity.status(statusCode).body(responseBody));
          })
      )
      .doOnNext(response -> {
        if (response.getStatusCode() == HttpStatus.OK) {
          log.info("Successful response: {}", response.getBody());
        } else if (response.getStatusCode() == HttpStatus.BAD_REQUEST) {
          log.info("User Authentication pending... response: {}", response.getBody());
        } else {
          log.warn("Unexpected response status: {}", response.getStatusCode());
        }
      })
      .filter(response -> response.getStatusCode() == HttpStatus.OK)
      .next()
      .switchIfEmpty(Mono.defer(() -> {
        log.warn("Polling timed out after {} seconds.", maxDuration);
        return Mono.just(ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT).body("Polling timed out."));
      })).toFuture();
}

 

 

 

각 부분 설명

 

Flux

Flux.interval(Duration.ofSeconds(intervalInt))

 

이런식으로 작성하면 일정간 간격으로 반복 이벤트를 생성하는 Flux 스트림을 만든다.

 

 

.take(maxIterations)

 

take 구문은 반복 횟수를 제한한다.

 

.flatMap(attempt -> WebClient.create()
    .post()
    .uri(tokenEndpointUri)
    .headers(httpHeaders -> httpHeaders.addAll(headers))
    .bodyValue(body)
    .retrieve()
    .toEntity(String.class)
    .onErrorResume(WebClientResponseException.class, ex -> {
      HttpStatusCode statusCode = ex.getStatusCode();
      String responseBody = ex.getResponseBodyAsString();
      return Mono.just(ResponseEntity.status(statusCode).body(responseBody));
    })
)

 

  • .flatMap을 통해 Stream마다 WebClient를 생성하고 요청을 보낸다.
  • toEntity()를 통해 응답을 String으로 변환한다.
  • .onErrorResume으로 예외가 발생했을 때 상태코드, body 정보를 받는다.
  • Mono.just()를 통해 오류 정보를 포함하여 Mono 형태로 반환하면, 이후 체이닝된 메서드로 넘어간다. 오류 정보를 포함한 채로 넘기기 때문에 다음 메서드들에서 이 정보들을 활용할 수 있게 된다. Mono인 이유는 단일값을 반환하기 때문이다.

 

 

.doOnNext(response -> {
  if (response.getStatusCode() == HttpStatus.OK) {
    log.info("Successful response: {}", response.getBody());
  } else if (response.getStatusCode() == HttpStatus.BAD_REQUEST) {
    log.info("User Authentication pending... response: {}", response.getBody());
  } else {
    log.warn("Unexpected response status: {}", response.getStatusCode());
  }
})

 

doOnNext를 통해 스트림의 요소를 소비하기 전에 로깅 등의 사이드 효과를 수행한다.

 

 

.filter(response -> response.getStatusCode() == HttpStatus.OK)
.next()

 

filter를 통해 200 OK인 응답만 걸러낸다. next()를 통해 스트림을 종료한다.

 

.switchIfEmpty(Mono.defer(() -> {
  log.warn("Polling timed out after {} seconds.", maxDuration);
  return Mono.just(ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT).body("Polling timed out."));
})).toFuture()

 

switchIfEmpty는 스트림에서 처리된 응답이 없을 경우에 실행된다. 비어있을 경우 스트림을 대체하기 위해 Mono.just() 타입으로 응답을 생성한다.

 

메서드의 최종적인 타입이 비동기 메서드에서 사용하는 CompletableFuture 형태이므로 타입을 맞추기 위해서 .toFuture() 메서드를 적용한다.

728x90
반응형