본문 바로가기
관리자

Programming-[Backend]/Kafka

[카프카 핵심가이드] 5. 카프카 어드민

728x90
반응형

카프카 핵심 가이드 2E: 대규모 실시간 데이터와 스트림 처리
그웬 샤피라 , 토드 팔리노 , 라지니 시바람 , 크리트 페티 저자(글) · 이동진 번역
제이펍 · 2023년 04월 14일


 

5. 프로그램 내에서 코드로 카프카 관리하기

 

5.1 AdminClient 개요


5.1.1 비동기적이고 최종적 일관성을 가지는 API


AdminClient는 비동기 결과인 Future 객체를 `Result 객체`로 감싸서 작업이 끝날 때까지 대기하거나 작업 결과에 대해 일반적으로 뒤이어 쓰이는 작업을 수행하는 헬퍼 메서드를 가지고 있음

최종적 일관성(eventual consistency)
AdminClient API가 리턴하는 Future 객체의 완료 여부는 컨트롤러의 상태가 완전히 업데이트된 상태를 기준으로 한다. 즉 여러 브로커 중에서 리더인 컨트롤러 1개만 인지하면 Future 객체의 결과를 알 수 있다. 따라서 토픽을 생성하는 요청을 보냈을 때, 컨트롤러에서는 인지했는데 다른 브로커들로 메타데이터의 전파가 일어나는 동안 토픽 리스트를 확인하는 listTopics 요청을 보내면 다른 브로커들은 그 정보를 모를 수 있다. 정확히 그 타이밍이 언제가 될 것인지에 대해서는 보장이 불가하다.


5.1.2 옵션

AdminClient의 각 메서드가 특정한 `Options 객체`를 인자로 받는다.
ex) listTopcis 메서드는 ListTopicsOptions 객체를 인자로 받음

- timeoutMS: 모든 AdminClient 메서드가 갖고있는 매개변수. 클라이언트가 TimeoutException을 발생시키기전에 클러스터로부터 응답을 기다리는 시간을 조정함

 

5.1.3 수평 구조

모든 어드민 기능이 KafkaAdminClient에 구현되어 있음. 너무 크지만 하나의 인터페이스에만 의존하면 되므로 IDE 자동완성 등 편의성이 있음

 

5.1.4 추가 참고 사항

- 클러스터의 상태를 변경하는 모든 작업(create, delete, alter)은 컨트롤러에 의해 수행
- 클러스터의 상태를 읽기만 하는 작업(list, describe)는 아무 브로커에서나 수행될 수 있고 클라이언트 입장에서 보이는 가장 부하가 적은 브로커로 전달

 

 

5.2 AdminClient 사용법: 생성, 설정, 닫기



책에 있는 코드 정상작동 확인함

import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;

public class KafkaAdminClient {

  public static void main(String[] args) {
    Properties props = new Properties();
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    AdminClient admin = AdminClient.create(props);

    admin.close(Duration.ofSeconds(30)); //close의 인자로 전달된 Duration이 timeout
  }
}



close()메서드의 인자: timeout
timeout을 설정하지 않으면 클라이언트가 모든 진행 중인 작업이 완료될 때까지 기다림
timeout을 설정하면 진행 중인 작업이 있더라도 모든 진행 중인 작동을 멈추고 모든 자원을 해제함

 


5.2.1 client.dns.lookup



- 먼저 이해하기
로컬에서는 이런 식으로 설정함




bootStrap server 설정(브로커):

props.put(AdminClientConfig.*BOOTSTRAP_SERVERS_CONFIG*, "localhost:9092");

listener 설정:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-local:9092,PLAINTEXT_HOST://localhost:29092

 


AWS MSK




bootstrap-servers: AAA.amazonaws.com:9092, BBB.amazonaws.com:9092, CCC.amazonaws.com:9092

 

DNS 별칭으로 일일이 부트스트랩 서버 명들을 나열하지 않아도 될 수 있음.
그러나,
💡 → MSK의 경우 bootstrap-servers를 DNS 별칭이 아니라 쭉 늘어쓰고 로드밸런서도 앞단에 안두는게 일반적이라고 함. 자세히 알아봐야함


리스너 관련 프로토콜

 

  • PLAINTEXT: 보안적인 요구가 없는 텍스트기반의 통신
  • SSL(Secure Socket Layer): 클라이언트와 브로커간 암호화하여 보안 설정
  • SASL_SSL(Simple Authentication and Security Layer): SASL을 통해 SSL을 적용한 리스너. 클라이언트의 인증 및 권한 부여를 해줄 때 사용함


호스트(bootstrap-server)는 설정해야하므로 브로커로부터 호스트 정보를 받고, advertised.listeners 설정에 있는 호스트명을 기준으로 클라이언트 - 브로커 클러스터간 연결이 된다. 그러나 아래 두 가지의 경우 이 방식대로 작동안하는 경우가 생긴다.

1. DNS 별칭

위에서 살펴본대로 많은 브로커 호스트 주소가 있을 때, [all-brokers.hostname.com](http://all-brokers.hostname.com) 으로 DNS를 설정하여 부트스트랩 설정으로 줄 수 있다. 이런 경우인데, SASL을 사용해서 인증 처리를 하는 경우에 문제가 생길 수 있다. 클라이언트는 all-brokers.hostname.com 으로 인증을 할려고 하는데 서버의 보안 주체는 브로커 호스트( ex) broker2.hostname.com)일 수 있으므로 SASL은 인증을 거부하고 연결도 실패한다. 이럴 경우에 client.dns.lookup=resolve_canonical_bootstrap_servers_only로 설정해주면 DNS 별칭에 포함된 모든 브로커 이름을 부트스트랩 서버 목록에 넣어준 것과 동일하게 작동한다.



2. 다수의 IP 주소로 DNS 이름을 사용하는 경우

로드밸런서(Load Balancer, LB)를 여러 대 두어 장애를 피한다. 즉 ex.com이라는 도메인에 위 그림처럼 각 IP 주소를 갖는 LB가 연결된 경우인 것이다. 그런데 만약 1개의 LB의 IP 주소가 시간이 지남에 따라 변경됐을 경우, 클라이언트에 의해 해석된 IP 주소가 사용 불능으로 처리되어 버릴 수 있다. 그럼 카프카 클라이언트는 뒷 단의 Broker들은 멀쩡한데도 불구하고 연결에 실패할 수 있다. 따라서 클라이언트가 로드 밸런싱 계층의 고가용성을 활용할 수 있도록 client.dns.lookup=use_all_dns_ips를 사용하는 것이 강력히 권장된다.

 

 


5.2.2 request.timeout.ms

AdminClient의 응답을 기다릴 수 있는 시간의 최대값을 정의. 기본값은 120초.
책 내용: 브로커가 응답하는데 30초 이상 걸릴 경우, 확인 작업을건너뛰거나 일단 서버 기동을 계속한 뒤 나중에 토픽의 존재를 확인한다.

 

 

 

 

 

5.3 필수적인 토픽 관리 기능

 

토픽확인: listTopics()

ListTopicResult topics = admin.listTopics();
topics.names().get().forEach(System.out::println);



책에 나온 불완전한 코드를 내 프로젝트에 맞춰서 출력해봤음

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;

public class KafkaAdminClient {

  public static void main(String[] args) {
    Properties props = new Properties();
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
    AdminClient admin = AdminClient.create(props);

    DescribeTopicsResult demoTopic = admin.describeTopics(
        List.of("pos-sync-legacy-master"));

    try {
      TopicDescription topicDescription = demoTopic.topicNameValues().get("pos-sync-legacy-master")
          .get();

      System.out.println("###" + topicDescription);

    } catch (ExecutionException e) {
      throw new RuntimeException(e);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }

    admin.close(Duration.ofSeconds(30));
  }
}





콘솔에 찍힌 로그: 파티션 3개가 출력되는 것 확인: TopicDescription 객체는 다음과 같음

###(name=pos-sync-legacy-master, internal=false, partitions=(**`partition=0,`** leader=localhost:29092 (id: 1 rack: null), replicas=localhost:29092 (id: 1 rack: null), isr=localhost:29092 (id: 1 rack: null)),(`partition=1,` leader=localhost:29092 (id: 1 rack: null), replicas=localhost:29092 (id: 1 rack: null), isr=localhost:29092 (id: 1 rack: null)),(`partition=2,` leader=localhost:29092 (id: 1 rack: null), replicas=localhost:29092 (id: 1 rack: null), isr=localhost:29092 (id: 1 rack: null)), authorizedOperations=null)




deleteTopics() 메서드
목록 인자로 넘겨준 토픽들을 삭제함. 돌이킬 수 없이 완전히 삭제되므로 데이터 유실 등에 특별히 주의해야한다.

 

 

 


5.4 설정 관리

ConfigResource 객체를 통해서 할 수 있음. 교재의 코드는 로그 압착 설정이 되지 않은 토픽을 확인하고 로그 압착 설정을 넣어주는 내용

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.config.TopicConfig;

public class KafkaAdminClient {

  public static void main(String[] args) throws ExecutionException, InterruptedException {
    Properties props = new Properties();
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
    AdminClient admin = AdminClient.create(props);

    ConfigResource configResource = new ConfigResource(Type.TOPIC, "pos-sync-legacy-master");
    DescribeConfigsResult configsResult = admin.describeConfigs(
        Collections.singleton(configResource));
    Config configs = configsResult.all().get().get(configResource);

    //기본값이 아닌 설정을 출력
    configs.entries().stream().filter(
        entry -> !entry.isDefault())
        .forEach(System.out::println);

    //토픽 압착 설정 확인
    ConfigEntry compaction = new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG,
        TopicConfig.CLEANUP_POLICY_COMPACT);
    if (!configs.entries().contains(compaction)) {
      // 압착 설정 되어있지 않은 경우 해줌
      Collection<AlterConfigOp> configOp = new ArrayList<>();
      configOp.add(new AlterConfigOp(compaction, AlterConfigOp.OpType.SET));
      HashMap<ConfigResource, Collection<AlterConfigOp>> alterConf = new HashMap<>();
      alterConf.put(configResource, configOp);
      admin.incrementalAlterConfigs(alterConf).all().get();
      System.out.println("### 압착 했음!");
    } else {
      System.out.println("Topic " + "pos-sync-legacy-master" + "is compacted topic");
    }

    admin.close(Duration.ofSeconds(30));
  }
}




코드 실행 전 KafUI 화면



실행 후: compact 옵션 적용

 




5.5 컨슈머 그룹 관리

AdminClient를 사용해서 프로그램적으로 컨슈머 그룹과 이 그룹들이 커밋한 오프셋을 조회하고 수정하는 방법을 살펴본다.

 

5.5.1 컨슈머 그룹 살펴보기

admin.listconsumerGroups().valid().get().forEach(System.out::println);

 

 




특정 그룹에 대해 상세 정보 살펴보기

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;

public class KafkaAdminClient {

  public static void main(String[] args) throws ExecutionException, InterruptedException {
    Properties props = new Properties();
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
    AdminClient admin = AdminClient.create(props);

    ConsumerGroupDescription groupDescription = admin.describeConsumerGroups(
            Collections.singleton("pos-sync-legacy-master-consumer"))
        .describedGroups().get("pos-sync-legacy-master-consumer").get();

    System.out.println("$$$$$" + groupDescription);

    admin.close(Duration.ofSeconds(30));
  }
}



출력 결과

$$$$$(groupId=pos-sync-legacy-master-consumer, isSimpleConsumerGroup=false, members=(memberId=consumer-pos-sync-legacy-master-consumer-5-82d14242-fc74-470c-8f3c-c56ac51b5c96, groupInstanceId=null, clientId=consumer-pos-sync-legacy-master-consumer-5, host=/192.168.65.1, assignment=(topicPartitions=pos-sync-legacy-master-1)),(memberId=consumer-pos-sync-legacy-master-consumer-6-7f7a085f-ec3a-44fc-a47f-860377833853, groupInstanceId=null, clientId=consumer-pos-sync-legacy-master-consumer-6, host=/192.168.65.1, assignment=(topicPartitions=pos-sync-legacy-master-2)),(memberId=consumer-pos-sync-legacy-master-consumer-4-6186099a-ce5c-4c70-9ccc-c6e48b0bddf3, groupInstanceId=null, clientId=consumer-pos-sync-legacy-master-consumer-4, host=/192.168.65.1, assignment=(topicPartitions=pos-sync-legacy-master-0)), partitionAssignor=range, state=Stable, coordinator=localhost:29092 (id: 1 rack: null), authorizedOperations=null)

 


커밋 정보 얻어오기

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

public class KafkaAdminClient {

  public static void main(String[] args) throws ExecutionException, InterruptedException {
    Properties props = new Properties();
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
    AdminClient admin = AdminClient.create(props);

    Map<TopicPartition, OffsetAndMetadata> offsets = admin.listConsumerGroupOffsets(
        "pos-sync-legacy-master-consumer").partitionsToOffsetAndMetadata().get();
    HashMap<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>();

    for (TopicPartition tp : offsets.keySet()) {
      requestLatestOffsets.put(tp, OffsetSpec.latest());
    }

    Map<TopicPartition, ListOffsetsResultInfo> latestOffsets = admin.listOffsets(
        requestLatestOffsets).all().get();

    for (Map.Entry<TopicPartition, OffsetAndMetadata> e : offsets.entrySet()) {
      String topic = e.getKey().topic();
      int partition = e.getKey().partition();
      long offset = e.getValue().offset();
      long latestOffset = latestOffsets.get(e.getKey()).offset();
      System.out.println("$$$" + topic + partition + offset + latestOffset);
    }

    admin.close(Duration.ofSeconds(30));
  }
}

 

 



5.5.2 컨슈머 그룹 수정하기

책에서는 특정 컨슈머 그룹의 오프셋을 수정하는 것에 대해서 설명함. 특히 맨 처음 오프셋 값으로 리셋하는 과정을 설명한다.

- 카프카에서는 현재 작업이 돌아가고 잇는 컨슈머 그룹에 대한 오프셋을 수정하는 것을 허용하지 않는다. 따라서 이를 위해서는 컨슈머 애플리케이션을 꺼야한다.
오프셋을 리셋하고 맨 처음부터 다시 컨슈머그룹이 돌아가게 하면 저장된 상태가 깨질 수 있다. 메시지가 중복처리될 수도 있다.

책에서는 최초 offset 값을 찾는데, 내 생각엔 특정 시점의 오프셋 값을 찾아서 그때로 오프셋을 설정해주는게 유용할 것 같아서 GPT에 물어봄

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;

import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ExecutionException;

public class KafkaOffsetSetter {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String bootstrapServers = "localhost:9092";
        String groupId = "your-consumer-group";
        String topic = "your-topic";
        long timestampToSeek = 1622505600000L; // 특정 시점의 타임스탬프 (예: 2023-01-01 00:00:00)

        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", bootstrapServers);
        consumerProps.put("group.id", groupId);
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

        // 특정 시점의 오프셋 찾기
        List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
                .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
                .collect(Collectors.toList());

        Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
        for (TopicPartition partition : partitions) {
            timestampsToSearch.put(partition, timestampToSeek);
        }

        Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(timestampsToSearch);
        Map<TopicPartition, Long> partitionOffsets = new HashMap<>();
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetsForTimes.entrySet()) {
            partitionOffsets.put(entry.getKey(), entry.getValue().offset());
        }

        // AdminClient를 사용하여 컨슈머 그룹의 오프셋 설정
        Properties adminProps = new Properties();
        adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        AdminClient adminClient = AdminClient.create(adminProps);
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
            offsetsToCommit.put(entry.getKey(), new OffsetAndMetadata(entry.getValue()));
        }

        AlterConsumerGroupOffsetsResult result = adminClient.alterConsumerGroupOffsets(groupId, offsetsToCommit);
        result.all().get();  // 설정이 완료될 때까지 대기

        consumer.close();
        adminClient.close();
    }
}



궁금했던 점

 




5.6 클러스터 메타데이터

admin.describeCluster(); 메서드 실행으로 가능함!

 


5.7 고급 어드민 작업

5.7.1 토픽에 파티션 추가하기

토픽의 파티션 수는 토픽이 생성될 때 처리하는게 보통이고, 파티션에 메시지가 순서대로 처리될 것을 기대하기 때문에 중간에 파티션 수를 늘리는 것은 위험할 수 있다. 또한 여러 토픽을 한 번에 확장할 경우 일부 토픽은 성공하고 나머지는 실패할 수도 있으므로 유의해야한다.

Map<String, NewPartitions> newPartitions = new HashMap<>);
newPartitions.put(TOPIC_NAME, NewPartitions. increaseTo(NUM_PARTITIONS+2));
admin.**createPartitions**(newPartitions).all).get();

 



5.7.2 토픽에서 레코드 삭제하기

deleteRecords 메서드: 호출 시점을 기준으로 지정된 오프셋보다 더 오래된 모든 레코드에 삭제 표시를 해서 컨슈머가 접근 못하게 함

 

5.7.3 리더 선출

선호 리더 선출(preferred leader election)
선호 리더란 토픽이 처음 생성되었을 때 리더 레플리카였던 레플리카를 뜻함.
카프카는 기본적으로 여러 레플리카로 이루어진 파티션 중 리더 파티션을 5분마다 선출하여 순회시킴. 만약 auto.leader.rebalance.enable = false이거나 직접 선호 리더 선출 과정을 작동시키고 싶다면 AdminClient의 electLeader() 메서드를 호출하면 됨

언클린 리더 선출(unlcean leader election)
리더 파티션이 사용 불능 상태가 되었는데, 다른 레플리카 파티션들이 데이터가 없는 등의 이유로 리더를 못 맡는다면 리더 지정이 불가하여 파티션이 사용 불능 상태가 된다. 이 때 electLeader() 메서드를 호출하면 레플리카 파티션 중 하나를 그냥 리더로 선출한다. 이 경우 새 리더로 복제되지 않은 모든 이벤트들은 유실된다.

 


5.7.4 레플리카 재할당


파티션 레플리카의 위치를 변경하고 싶을 때 alterPartitionReassignments를 사용하면 된다. 다만, 파티션 이동 시 브로커간 대량의 데이터 복제를 초래할 수 있으므로 사용 가능한 네트워크 대역폭에 주의하고, 필요할 경우 쿼터를 설정해서 복제 작업을 스로틀링 해줘야한다.

 


5.8 테스트하기

MockAdminClient 클래스를 사용해서 테스트 한다는데, 책 내용으로는 정상적으로 실행안됨. GPT도 제대로 안 알려줘서 좀 더 깊게 봐야할듯

import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;

import java.util.Collections;
import java.util.concurrent.ExecutionException;

@RequiredArgsConstructor
public class KafkaAdminService {
  private final AdminClient adminClient;

  public void createTopic(String topicName, int numPartitions, short replicationFactor) throws ExecutionException, InterruptedException {
    NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
    adminClient.createTopics(Collections.singleton(newTopic)).all().get();
  }
}



import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.*;

@SpringBootTest(classes = KafkaAdminService.class)
public class KafkaAdminServiceTest {

  @Autowired
  private KafkaAdminService kafkaAdminService;

  @BeforeEach
  public void setUp() {
    kafkaAdminService = new KafkaAdminService(new MockAdminClient());
  }

  @Test
  public void testCreateTopic() throws ExecutionException, InterruptedException {
    // Arrange
    String topicName = "test-topic";
    int numPartitions = 3;
    short replicationFactor = 1;

    // Mocking the createTopics method
    when(adminClient.createTopics(any())).thenReturn();

    // Act
    kafkaAdminService.createTopic(topicName, numPartitions, replicationFactor);

    // Assert
    ArgumentCaptor<NewTopic> topicCaptor = ArgumentCaptor.forClass(NewTopic.class);
    verify(adminClient, times(1)).createTopics(Collections.singleton(topicCaptor.capture()));

    NewTopic capturedTopic = topicCaptor.getValue();
    assertEquals(topicName, capturedTopic.name());
    assertEquals(numPartitions, capturedTopic.numPartitions());
    assertEquals(replicationFactor, capturedTopic.replicationFactor());
  }
}

 

 

 

 

728x90
반응형