Study/Spring boot

[Spring boot] WebClient 사용해보기 - 비동기 통신 subscribe()

Anna-Jin 2023. 11. 11. 14:01
반응형

들어가며

WebClient는 RestTemplate의 대안으로서 급부상된 WebFlux의 라이브러리이다. 

Spring MVC에서도 지원하는 WebClient는 RestTemplate과 비교했을 때 가장 큰 장점이 `비동기 통신을 지원한다`는 점이 아닐까 싶다. 물론 그 외에도 함수형 프로그래밍 스타일을 지원한다는 점, 다양한 설정과 에러처리를 지원한다는 점 등도 장점으로 꼽을 수 있겠다.

 

WebClient의 최대 장점인 비동기 통신을 지금까지 사용하기 어렵고 쓸 데가 없다는 이유만으로 동기 방식을 고수해왔었는데 이번 회사 프로젝트를 하면서 비동기 통신이 필요해진 시점이 생겼다. 이번 기회에 WebClient의 비동기 통신을 구현해보았고 이번 포스팅의 주제로 삼아보고자 한다.

 

 

처음 비동기 통신을 구현해보았기 때문에 다소 부족할 수 있다는 점 참고 바란다.


비동기 vs 동기

비동기와 동기의 차이점은 예전에 글로 정리한 적이 있었다. 

 

2022.05.30 - [Study/CS] - [CS] 동기 & 비동기 vs 블로킹 & 논블로킹

 

[CS] 동기 & 비동기 vs 블로킹 & 논블로킹

들어가며 예전에 Petpular 프로젝트 때 사용해본 Spring WebClient에서 동기 & 비동기 / 블로킹 & 논블로킹의 개념이 등장했었다. WebClient는 논블로킹방식으로 작동하고 block() 메소드를 이용해서 블로킹

annajin.tistory.com

 

동기는 한 작업이 완료될 때까지 다음 작업을 기다리는 방식이고

비동기는 한 작업을 실행한 후에도 다음 작업을 기다리지 않고 계속 진행하는 방식이다.

 

RestTemplate은 동기 방식으로 동작하는 하는 HTTP 클라이언트 라이브러리이고,

WebClient는 비동기 방식으로 동작하는 라이브러리이다.

 

 

Subscribe()

아래 포스팅의 ConnectStep 부분을 보면 block() 메소드를 사용하여 응답을 동기로 받는다고 작성해두었다.

 

2023.08.26 - [Study/Spring boot] - [Spring boot] WebClient 사용해보기 - 모듈화 1-2

 

[Spring boot] WebClient 사용해보기 - 모듈화 1-2

들어가며 2023.08.25 - [Study/Spring boot] - [Spring boot] WebClient 사용해보기 2-1 이전글에서는 내가 어떻게 웹클라이언트를 사용하고 있는 지에 대해 간단하게 설명했다. 이번 글에서는 코드를 하나씩 보

annajin.tistory.com

 

 

이번에는 위의 게시글 내용을 베이스로 subscribe()를 사용해서 응답을 비동기로 받을 수 있도록 구현해보자.

 

ConnectStepImpl

@Log4j2
@RequiredArgsConstructor
public class ConnectStepImpl implements ConnectStep {

    private final WebClient.RequestHeadersSpec<?> methodType;
    private Object response;

   
    /**
     * WebClient의 header와 response class 설정 후 subscribe로 호출<br>
     * 헤더와 응답값이 존재할 때 사용. (ex. 일반적인 api 호출)<br>
     * 예외처리 포함
     * @return {@link ResponseStep}
     */
    @Override
    public Mono<?> connectSubscribe(Map<String, String> headers, Class<?> responseType) {
        try {
            return this.methodType
                    .headers(httpHeaders -> httpHeaders.setAll(headers == null || headers.isEmpty() ? new HashMap<>() : headers))
                    .retrieve()
                    .onStatus(HttpStatusCode::isError, clientResponse ->
                            clientResponse.bodyToMono(String.class)
                                    .flatMap(msg -> Mono.error(new ApiException(ResponseCode.INTERNAL_SERVER_ERROR, msg))))
                    .bodyToMono(responseType)
                    .retryWhen(Retry.fixedDelay(3, java.time.Duration.ofSeconds(1))
                            .doBeforeRetry(before -> log.info("Retry: {} | {}", before.totalRetries(), before.failure())));
        } catch (Exception e) {
            throw new ApiException(ResponseCode.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    /**
     * WebClient의 header와 response class 설정 후 subscribe로 호출<br>
     * 헤더와 응답값이 존재하지 않을 때 사용. (ex. Google chat webhook api 호출)<br>
     * 예외처리 포함
     * @return {@link ResponseStep}
     */
    @Override
    public Mono<String> connectSubscribe() {
        try {
            return this.methodType
                    .retrieve()
                    .onStatus(HttpStatusCode::isError, clientResponse ->
                            clientResponse.bodyToMono(String.class)
                                    .flatMap(msg -> Mono.error(new ApiException(ResponseCode.INTERNAL_SERVER_ERROR, msg))))
                    .bodyToMono(String.class)
                    .retryWhen(Retry.fixedDelay(3, java.time.Duration.ofSeconds(1))
                            .doBeforeRetry(before -> log.info("Retry: {} | {}", before.totalRetries(), before.failure())));
        } catch (Exception e) {
            throw new ApiException(ResponseCode.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }
}

 

 

subscribe()는 WebFlux에서 제공하는 객체인 Mono 혹은 Flux를 사용한다. 이번 포스팅에서는 Mono 객체만을 사용하기로 한다.

block()을 사용할 때와 다른 점은 ConnectStepImpl 클래스에서 subscribe()를 호출하지 않는다는 점과 return값이 ResponseStep이 아니라는 점이다.

 

단순히 데이터의 후가공이나 logging이 필요하지 않다면 이 단계에서 subscribe()를 호출해도 상관 없다.

하지만 예제에서는 메소드의 모듈화를 위해 확장성을 고려해야했으므로 응답 호출을 후 단계로 미뤘다.

 

이 점에 주의하자.

 

Mono가 뭐고 Flux가 무엇인지에 대해서 공부하려면 WebFlux의 세계까지 들어가야한다. 이번에는 단순한 api 비동기 호출에 대해 작성하고 있으므로 이 부분은 건너뛰도록 한다.

여담이지만 처음 개발 공부를 시작했을 때 WebFlux를 완벽하게 공부한 다음 WebClient를 사용하려고 2주의 시간을 끙끙댄 적이 있었다. 지금 생각해보면 너무 비효율적인 공부방식이었던 것 같다. 개발은 하면서 공부하는게 최고라고 생각한다!

 

 

 

WebClientConnector

@Component
@RequiredArgsConstructor
public class WebClientConnector {
    private final ApiWebClientBuilder webClientBuilder;
    private final ApiStatics statics;


    /**
     * Google Chat WebHook 메세지 전송
     * @param requestBody {@link WebHook}
     */
    public Mono<String> callGoogleChat(WebHook requestBody) {
        ApiStatics.GoogleChat googleChat = statics.getGoogleChat();

        return webClientBuilder.request()
                .post(googleChat.getUrl(), requestBody)
                .connectSubscribe();
    }
}

 

 

WebClientConnector 객체는 API를 호출해주는 역할을 한다. 간단하게 Google Chat에 메시지를 보내는 Webhook API를 호출하는 예제를 작성해보았다.

 

구현체를 보면 ResponseStep 메소드를 사용하지 않은 것을 확인할 수 있다. 

위에서 return값을 ResponseStep 객체로 하지 않은 이유가, .connectSubscribe()메소드 이후 체이닝에서는 ResponseStep의 메소드를 사용할 수 없도록 강제하기 위함이었다.

 

 

GoogleChatService

@Log4j2
@Service
@RequiredArgsConstructor
public class GoogleChatService {

    private final WebClientConnector webClientConnector;

    /**
     * 비동기로 Google Chat WebHook 메세지 전송<br>
     * retry 3회 시도
     * @param title         card header 타이틀
     * @param messageHeader card body 상단 메세지
     * @param message       card body 하단 메세지, 복수 메세지 - divider 추가
     */
    public void sendGoogleChatMultipleMessage(String title, String messageHeader, List<String> message) {
        ResponseCode responseCode = ResponseCode.GOOGLE_CHAT_ERROR;

        webClientConnector.callGoogleChat(WebHookMessageBuilder.createGoogleChatMultipleMessage(title, messageHeader, message))
                .flatMap(response -> Mono.just(ResponseEntity.ok().body(ApiResponse.success())))
                .onErrorResume(error -> Mono.just(ResponseEntity.status(responseCode.getStatus()).body(ApiResponse.error(responseCode, "메세지 전송 실패 : " + error.getCause().getMessage()))))
                .subscribe(
                        response -> log.info("메세지 전송 완료 : {}", response),
                        error -> log.error("메세지 전송 실패 : {}", error.getMessage())
                );
    }

    /**
     * 비동기로 Google Chat WebHook 메세지 전송<br>
     * retry 3회 시도
     * @param title         card header 타이틀
     * @param messageHeader card body 상단 메세지
     * @param message       card body 하단 메세지, 단일 메세지
     */
    public void sendGoogleChatMultipleMessage(String title, String messageHeader, String message) {
        ResponseCode responseCode = ResponseCode.GOOGLE_CHAT_ERROR;

        webClientConnector.callGoogleChat(WebHookMessageBuilder.createGoogleChatSingleMessage(title, messageHeader, message))
                .flatMap(response -> Mono.just(ResponseEntity.ok().body(ApiResponse.success())))
                .onErrorResume(error -> Mono.just(ResponseEntity.status(responseCode.getStatus()).body(ApiResponse.error(responseCode, "메세지 전송 실패 : " + error.getCause().getMessage()))))
                .subscribe(
                        response -> log.info("메세지 전송 완료 : {}", response),
                        error -> log.error("메세지 전송 실패 : {}", error.getMessage())
                );
    }
}

 

 

앞서 구현한 로직들을 호출하고, 응답을 받아 로깅하는 Service이다. ConnectStepImpl에서 subscribe()를 사용하지 않은 이유는 이 곳에서 확인할 수 있다.

최종 응답 데이터를 비동기 스트림 내에서 가공하고 처리하기 위함이다. 만약 모듈화된 Step 내부에 subscribe()가 존재한다면 각 서비스마다 connectStep을 새롭게 구현해야할 것이다.

 

  • flatMap() : 각 입력 요소를 새로운 스트림으로 변환한다음 각각의 스트림을 하나로 합쳐주는 역할을 한다. 쉽게 말하면 각각의 요청을 하나로 모아주는 역할을 한다고 생각하면 된다. 예제에서는 여러개의 스트림이 존재하지 않지만 비동기 응답에 대한 데이터를 처리하기 위하여 사용했다. 
  • onErrorResume() : 스트림 내에서 발생한 에러를 처리한다.
  • subscribe() : 비동기 응답을 받아 처리한다. 첫번째 인자로는 정상 응답, 두번째 인자로는 에러 응답을 처리할 수 있다.

 

 


마치며

이렇게 간단하게 비동기 처리를 구현해보았다. 

사실 WebFlux의 세계는 Spring MVC만큼이나 방대하고 알아야할 것, 사용할 수 있는 수단이 많지만 초보 개발자가 단기간에 습득할 수 있는 분야는 아니라고 생각하기에 차근차근 공부하고 포스팅해나가면서 알아가는게 좋지 않을까 싶다.

 

 

반응형