본문 바로가기
관리자

Programming-[Backend]/Kafka

[카프카 핵심가이드] 7. 멱등적 프로듀서와 트랜잭션

728x90
반응형

카프카 핵심 가이드 2E: 대규모 실시간 데이터와 스트림 처리
그웬 샤피라 , 토드 팔리노 , 라지니 시바람 , 크리트 페티 저자(글) · 이동진 번역
제이펍 · 2023년 04월 14일


 

 

1. 멱등적 프로듀서

 

 

  • m1, m2, m1, m2, m3 -> 중복 발생
  • enable.idempotence=true
  • producer ID 요청 후, 해당 pid와 레코드 별 seq 넘버를 추가해서 중복 메시지를 없애고 순서 보장

 

트랜잭션 필요 상황 가정

 

 

  • '이체' 애플리케이션
  • Alice -> Bob 에게 이체하는 상황
  • 전송 토픽 이벤트를 가져와서 출력 토픽 2개에 레코드를 쌓는다. (Alice -10$ 인출, Bob +10$ 입금)
  • 위의 트랜잭션이 완료되어야 오프셋 커밋이 찍힌다.

 

트랜잭션 없이 시스템 장애 발생

 

  • Alice -> -10$ 인출 시점에 애플리케이션이 실패한다고 가정
  • 애플리케이션 실패로 오프셋 커밋이 되지 않고, consumer가 전송 토픽을 다시 읽게되는 중복과정이 발생
  • 결과적으로 Alice -> -10$ 이벤트가 2번 발생

 

 

트랜잭션 적용 후 시스템 장애

 

  • 고유한 transactional.id 를 부여
  • 이 애플리케이션 시작 시점에, 트랜잭션을 조정할 **트랜잭션 코디네이터**를 찾음
  • __transaction_state_topic 이라는 내부 토픽이 존재함
  • 코디네이터가 결정되면, 코디네이터는 트랜잭션 애플리케이션에 대한 고유 produce ID, epoch를 생성 하여 애플리케이션에 전달
  • 출력 토픽으로 이벤트를 쓰기전에 트랜잭션 애플리케이션은 코디네이터한테 대상 파티션을 알려줘야함.

 

  • 코디네이터는 이 트랜잭션의 일부가 될 파티션을 내부 토픽에 지속적으로 저장
  • 이 정보가 저장되면, 애플리케이션은 출력 토픽에 이벤트를 작성할 수 있음.

 

 

 

  • 이 시점에 아까와 같이 이벤트가 실패했다고 가정
  • 처음으로 트랜잭션 코디네이터를 통해 producer ID를 요청함.
  • 코디네이터는 이전 인스턴스에서 보류중인 트랜잭션이 있음을 알고있음.
  • 그리고 동일한 transactional.id를 가진 producer ID를 등록하려 했다는 것을 보고 새로운 인스턴스네! 라고 파악
  • 트랜잭션 코디네이터가 먼저 할 일은 이전 인스턴스의 보류 중인 모든 트랜션을 중단해야함.
  • 따라서, 내부 트랜잭션 로그에 중단 마커 (A)를 추가
  • 그리고, 트랜잭션이 데이터를 작성했던 **각 파티션에도 중단 마커 (A)** 를 추가
  • 위 작업이 완료된 후, 코디네이터는 producer ID의 epoch를 증가 시킨다 pid e0 -> pid e1
    • epoch를 증가시켜서 이전 인스턴스 epoch는 차단함 
  • 이제 consumer가 출력 토픽을 읽으려고 하면 중단 이벤트임을 인지하고 무시하게됨


트랜잭션 적용 후 시스템 성공

  • 입력 토픽까지 읽고, 그 다음 출력 토픽에 쓰기전에 추가해야하는 파티션 세트를 코디네이터에게 알려줘야함.
  • 위 경우에는 인출 파티션(P0), 입금 파티션(P1), 내부 오프셋 토픽 파티션(P7) 을 알려주게됨
  • 코디네이터는 이 3개의 파티션을 트랜잭션 로그에 기록함.
  • 이제 애플리케이션은 위의 3개의 파티션에 레코드를 쓸 수 있게됨.
  • 레코드 쓰기 작업이 완료되면, 애플리케이션은 트랜잭션 코디네이터에게 커밋 요청을 보냄 (커밋해줘!)
  • 트랜잭션 코디네이터가 내부 토픽에 커밋 마커 (C) 를 찍음
  • 내부 커밋 마커가 찍힌 이후, 나머지 3개의 출력 토픽에 동일한 커밋 마커를 추가 함
  • 이 시점에 커밋 정보는 read_committed 모드로 읽는 **consumer 에게 노출** 된다.


### read_commited 모드 알아보기

  • High watermark = 66
  • 오프셋 64인 레코드는 아직 커밋되지 않은 상태라 보류중 (커밋 또는 중단 마커 없음)
  • 브로커는 마지막 안정 오프셋 (last stable offset, LSO)를 알고있음
  • LSO : 첫 번째 열려있는 보류 중인 트랜잭션의 오프셋 (64)
  • LSO 이전의 오프셋은 모두 상태가 결정되었다고 할 수 있음 -> consumer에게 노출 가능함.
  • 따라서, 응답을 줄 때 LSO 이전의 오프셋과 중단 마커를 무시하라는 메타데이터 정보를 추가적으로 전달

  


chapter 08. '정확히 한 번' 의미 구조


정확히 한 번

  • 멱등적 프로듀서 : 프로듀서 재시도로 인해 발생하는 중복 방지
  • * 트랜젝션 : 스트림 처리 애플리케이션에서 '정확히 한 번' 처리를 보장


멱등적 프로듀서


멱등적 서비스: 동일 작업을 여러 번 실행해도 한 번 실행한 것과 결과가 같은 서비스

-- 멱등 X
1. UPDATE t SET x=x+1 where y=5
-- 멱등 O
2. UPDATE t SET x=18 where y=5

 

 

 


멱등적 프로듀서의 작동 원리

  • 고유한 프로듀서 ID와 시퀀스 넘버를 가짐
  • 브로커가 이전에 받은 적이 있는 메시지를 받게되면, 적절한 에러를 발생
    •  프로듀서에 로깅되고 지표에도 반영되지만, 예외가 발생한 것은 아니라 사용자에게 경고를 보내지는 않음
    •  RequestMetrics 유형의 ErrorsPerSec 지표값에 기록됨


작동 실패 시, 멱등적 프로듀서 처리


1. 프로듀서 재시작

  • 멱등적 프로듀서 기능이 켜져있다면, 프로듀서 초기화 과정에서 브로커로부터 프로듀서 ID를 생성받음
  • 트랜젝션 기능이 꺼져있다면, 프로듀서 초기화할 때마다 완전히 새로운 ID가 생성됨
  • 따라서 새 프로듀서가 기존 프로듀서가 이미 전송한 메시지를 다시 전송할 경우, 브로커는 중복이 발생한지 모름.


2. 브로커 장애

  • 리더는 새 메시지가 쓰여질 때마다 인-메모리 프로듀서 상태에 최근 5개의 시퀀스 넘버를 업데이트함.
  • 리더 브로커 장애로 팔로워가 리더가 된 시점에 메모리에 최근 5개의 시퀀스 넘버를 가지고 있음.
    •  따라서 아무 이슈나 지연 없이 새로운 메시지 유효성 검증 재개
  • 예전 리더가 다시 돌아오면?
    •  스냅샷 파일에서 최신 상태를 읽어오고, 현재 리더로부터 복제한 레코드를 사용해서 프로듀서 상태를 업데이트함.


멱등적 프로듀서의 한계

  • 프로듀서 내부 로직으로 인한 재시도(프로듀서, 네트워크, 브로커 에러)가 발생할 경우 생기는 중복만 방지함.
  • producer.send() 두 번 호출하면 중복이 발생함.

 


멱등적 프로듀서 사용법

enable.idempotence=true


위의 기능을 활성화 하면 다음처럼 동작함.

  • 프로듀서 ID를 받아오기 위해 프로듀서 시동 시 API 호출
  • 레코드 배치에 프로듀서 ID와, 첫 메시지의 시퀀스 포함
  • 레코드 배치의 시퀀스 넘버를 검증해서 메시지 중복을 방지
  • 장애가 발생하더라도 파티션에 쓰여지는 메시지들의 순서는 보장됨.


트랜잭션

트랜잭션이 해결하는 문제

  • 상황 가정
    •  원본 토픽으로부터 이벤트를 읽어서 처리 후, 결과를 다른 토픽에 쓴다.


1. 애플리케이션 크래시로 인한 재처리

  • 결과를 다른 토픽에 썼는데, 원본 토픽 입력 오프셋이 커밋되기 전에 애플리케이션이 크래시 발생!
  • 컨슈머 리밸런스가 발생하고, 컨슈머가 읽고있던 파티션들은 다른 컨슈머로 재할당 됨.
  • 할당받은 컨슈머가 마지막 커밋 오프셋부터 레코드를 읽기 시작함.
  • 중복 처리 발생.


2. 좀비 애플리케이션에 의해 발생하는 재처리

  • 애플리케이션이 레코드 배치를 읽어온 직후 바로 연결이 끊어진 상황.
  • 1번과 동일하게 새로운 컨슈머가 할당받아 처리 후
  • 멈췄던 애플리케이션이 다시 살아남.
  • 마지막으로 읽어왔던 레코드 배치를 처리하느라 중복 발생.
    •  새로 카프카를 폴링하거나, 하트비트로 자기가 죽었다는걸 판정받기 전까지 실행 가능.


트랜잭션은 어떻게 '정확히 한 번'을 보장하는가?

  • 트랜잭션적 프로듀서를 사용해야함.
  • 일반 프로듀서와 트랜잭션적 프로듀서 차이점
    •  transactional.id 설정
    •  initTransactions() 호출해서 초기화
  • transactional.id는 재시작 하더라도 값이 유지됨.
  • 이미 존재하는 transactional.id 프로듀서가 initTransactions()을 다시 호출하면 이전에 쓰던 producer.id 값을 할당해줌.


좀비펜싱

  •   프로듀서가 초기화를 위해 initTransaction()을 호출하면 transactional.id에 에포크 값을 증가시킴.
  •   동일한 transactional.id를 가지더라도, 에포크 값이 낮은 프로듀서가 요청을 보낼 경우 FencedProducer 에러 발생

 

  • 컨슈머에 격리수준이 올바르게 설정되어 있지 않은 경우, 기대하는 '정확히 한 번' 보장이 이루어지지 않음.
  • 컨슈머에 isolation.level 설정하기
    •  read_committed
    •  read_uncommitted (default)


트랜잭션으로 해결할 수 없는 문제들


1. 스트림 처리에 있어서의 부수 효과

  • 이메일 발송, REST API 호출, 파일 쓰기 등의 외부 작업의 중복은 카프카 트랜잭션이 처리할 수 없음.

 

2. 카프카 토픽에서 읽어서 DB에 쓰는 경우

  • 하나의 트랜잭션에서 외부 디비에 결과를 쓰고 카프카에는 오프셋을 커밋할 수 있도록 하는 매커니즘은 없음.

 

3. DB에서 읽고, 카프카에 쓰고, 다시 다른 DB에 쓰기

 

4. 한 클러스터에서 다른 클러스터로 복제

 

5. 발행/구독 패턴

  • 오프셋 커밋 로직에 따라 컨슈머들이 메시지를 한 번 이상 처리하게 되는 경우
  • 따라서, read_commited 설정이 반드시 되어야 함.


트랜잭션 사용법

  • 카프카 스트림즈
    •   processing.guarantee
    •   exactly_once 이나 exactly_once_beta


카프카

    producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalld);
    consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");'
    consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");



트랜잭션 ID와 펜싱

  • 트랜잭션 ID는 동일 애플리케이션 인스턴스가 재시작 했을 때는 일관적으로 유지되어야 하고,
  • 서로 다른 애플리케이션 인스턴스에 대해서는 서로 달라야 함.
  • 카프카 2.5에서 트랙잭션 ID와 **컨슈머 그룹 메타데이터**를 함께 사용하는 펜싱을 도입

  • 프로듀서 B가 다음 세대의 컨슈머 그룹에서 온 것을 확인 후 문제없이 작업을 처리함


트랜잭션의 작동 원리


트랜잭션 2단계 커밋 (two-phase commit)


1. 현재 진행중인 트랜잭션이 존재함을 로그에 기록한댜. 연관된 파티션들 역시 함께 기록한다.

2. 로그에 커밋 혹은 중단 시도를 기록한다. (일단 로그에 기록이 남으면 최종적으로는 커밋되거나 중단되어야 한다.)

3. 모든 파티션에 트랜잭션 마커를 쓴다.

4. 트랜책션이 종료되었음을 로그에 쓴다.


__transaction_state 라는 이름의 내부 토픽을 사용하여 로그를 작성


위의 알고리즘을 통해 수행되는 과정

1. 프로듀서 -> initTransaction()을 호출해서 자신이 트랜잭션 프로듀서임을 등록

2. initTransaction() API는 코디네이터에 새 트랜잭션 ID를 등록하거나, 기존 트랜잭션 ID의 에포크 값을 증가시킴
3. beginTransaction() 프로듀서에 현재 진행중인 트랜잭션이 있음을 알려줌.

4. 프로듀서가 새로운 파티션으로 레코드를 전송할 때 브로커에 AddPartitionsToTxn 요청을 보냄으로써

   현재 이 프로듀서에 진행중인 트랜잭션이 있고, 해당 레코드가 트랜잭션의 일부임을 알림

   해당 정보는 트랜잭션 로그에 기록됨.

5. 쓰기 작업이 완료되고 커밋할 준비가 되면, 트랜잭션에서 처리한 레코드들의 오프셋부터 커밋

6. sendOffsetsTOTransaction() 을 호출하면 트랜잭션 코디네이터로 오프셋과 컨슈머 그룹 ID가 포함된 요청이 전송

7. commitTransaction() 이나 abortTransaction() 을 호출 하면 트랜잭션 코디네이터에 EndTxn 요청이 전송

8. 트랜잭션 코디네이터는 트랜잭션 로그에 커밋 혹은 중단 시도를 기록함

  •   transaction.timeout.ms에 설정된 시간 내에 커밋, 중단 둘 다 안되면 코디네이터가 자동으로 트랜잭션을 중단

   
트랜잭션 성능

  • 트랜잭션 ID 등록 요청은 한 번만 있음.
  • 파티션 등록은 각 트랜잭션마다 파티션별로 한 번씩만 이루어짐.
  • 트랜잭션 커밋 요청이 전송되면 각 파티션에 커밋 마커가 추가됨.
  • 이 모든 과정은 동기적으로 진행되어 트랜잭션이 완료되거나 실패할 때까지 데이터는 전송되지 않음.
  • 따라서 많은 메시지를 트랜잭션에 포함시킬수록 오버헤드는 줄어들며 전체 처리량이 증가함.
  • 컨슈머는 커밋 마커를 읽어오는 작업에 약간의 오버헤드가 있음.
  • read_committed 모드에서는 아직 커밋되지 않은 트랜잭션의 메시지가 반환되지 않아 종단 지연이 길어질 수 있음.
  • 하지만, 컨슈머는 완료되지 않은 트랜잭션의 메시지를 버퍼링할 필요가 없어 추가적인 작업은 없음.

 

728x90
반응형