본문 바로가기
관리자

Programming-[Backend]/Kafka

[검증 중] Kafka - testcontainer : Transaction 묶기, graceful shutdown, 수동 커밋, await

728x90
반응형

 

아직 대략적으로만 경험했고, 확실하지는 않은 내용이라 다른 참조 문서들을 참고해야하는 내용이다.

 

 

1. Transaction 묶기

 

ref) https://gunju-ko.github.io/kafka/spring-kafka/2018/03/31/Spring-KafkaTransaction.html

kafka는 기본적으로 비동기적인 메시지 발행을 하므로 트랜잭션을 끝까지 보장하지 않는다. 반드시 트랜잭션이 보장되도록 할려면 executeInTransaction 메서드를 사용해야한다. 이 방식은 KafkaTransactionManager를 거치는 방식은 아니고, KafkaTemplate만 이용해서 바로 트랜잭션을 묶어준다.

kafkaTemplate.executeInTransaction(
    t -> t.send(yourTopicName, yourMessageKey, yourMessageValue)
    );

 

DB 트랜잭션과 같이 묶어주는 방식도 있다는데, 이건 실무에서 좀 더 경험해봐야겠다.

 

 

2. Graceful shutdown

 

ref) https://velog.io/@yellowsunn/%EC%8A%A4%ED%94%84%EB%A7%81-%EC%B9%B4%ED%94%84%EC%B9%B4-%EC%BB%A8%EC%8A%88%EB%A8%B8%EB%8A%94-graceful-shutdown%EC%9D%84-%EC%A7%80%EC%9B%90%ED%95%98%EB%8A%94%EA%B0%80

graceful shutdown을 설정해주면 consumer에서 메시지가 처리되고 있는 도중에 SIGTERM, SIGINT 등이 발생하면 이미 처리하고 있던 메시지는 모두 처리하고 offset을 커밋하고 나서 처리하게 한다.

 

spring.kafka.listener.immediate-stop 옵션을 false로 처리하고, server 설정에 shutdown을 graceful로 준다.

server:
  port: 8081
  shutdown: graceful

 

 

3. 수동 커밋

 

ref) https://dkswnkk.tistory.com/744

https://mycup.tistory.com/437

 

 

카프카의 기본 설정은 자동 커밋이라서, 다음 상황들에서 메시지가 유실되거나 중복될 수 있다.

  • 자동 커밋의 주기(auto.commit.interval.ms)보다 consumer 처리 속도가 느린 경우 => 처리는 안됐는데 커밋은 해버려서 오프셋만 늘어나므로 메시지 유실
  • 한 번에 폴링하는 메시지 개수(max.poll.records), 예를 들어 100개라면 그 중 30개만 처리되었는데 애플리케이션이 종료되거나 리밸런싱이 일어나는 경우 => offset 커밋은 안된 상태이므로 컨슈머가 재할당된 이후 오프셋을 0으로 바라보고 있기 때문에 이전에 처리한 30개 메시지에 대해서 다시 처리하므로 메시지 중복

메시지 유실은 데드레터를 수동 재처리, 메시지 중복은 멱등성 보장 로직을 사용함으로써 해결한다. 그러나 사전에 이런 문제들을 방지하기 위해서 수동 커밋의 방법도 있다.

 

 

다음 옵션 값을 false로 바꿔준다.

  • enable.auto.commit = false

 

try-catch-finally를 사용하여 직접 Acknowledgement를 처리해준다.

@KafkaListener(
    topics = {"yourTopicName"},
    groupId = "yourGroupId")
public void youeMethodName(
    DataFromMessage dataFromMessage, Acknowledgment acknowledgment) {
  try {
    yourService.yourServiceMethod(dataFromMessage);
  } finally {
    acknowledgment.acknowledge();
  }
}

 

좀 더 자세히 알아보고 처리해야한다.

 

 

 

4. Await()

 

ref) https://testcontainers.com/guides/testing-spring-boot-kafka-listener-using-testcontainers/#_implement_kafka_listener

이 부분은 testcontainer를 사용해서 kafka 메시지를 테스트 할 때 사용하는 메서드이다. testcontainer docs에 있는 대로 await를 사용하여 테스트하면 kafkaTemplate에 의해 생성된 메시지를 consumer가 소모할 때까지 기다린 다음 결과를 검증할 수 있다.

 

@Test
  void shouldHandleProductPriceChangedEvent() {
    ProductPriceChangedEvent event = new ProductPriceChangedEvent(
      "P100",
      new BigDecimal("14.50")
    );

    kafkaTemplate.send("product-price-changes", event.productCode(), event);

    await()
      .pollInterval(Duration.ofSeconds(3))
      .atMost(10, SECONDS)
      .untilAsserted(() -> {
        Optional<Product> optionalProduct = productRepository.findByCode(
          "P100"
        );
        assertThat(optionalProduct).isPresent();
        assertThat(optionalProduct.get().getCode()).isEqualTo("P100");
        assertThat(optionalProduct.get().getPrice())
          .isEqualTo(new BigDecimal("14.50"));
      });
  }
728x90
반응형