Kafka Producer
kafka 에서 topic에 메세지를 보내는 주체를 producer라고 한다. producer는 kafka-console-producer를 사용할 수도 있지만 보통 프로듀서 애플리케이션으로 구현해 사용한다. kafka에서 지원하는 공식 언어는 java다. python, javascript, golang 등의 언어를 사용해 애플리케이션을 구축할 수도 있지만 공식 지원 언어가 아니기 때문에 구조적/성능적 이슈가 존재할 수 있으니 주의해야한다.
kafka library를 이용해 구현한 프로듀서 애플리케이션은 아래와 같은 구조를 가진다. producer record를 사용해 메세지를 전송하고 직렬화 후 partitioner가 producer record를 어떤 partition으로 보낼지 결정해 record에 set한다. 이 후, 설정한 배치사이즈만큼 producer-record가 accumulator에 배치 형태로 저장되고 sender가 accumulator의 데이터를 받아와 leader-partition에 전송한다.
Record
Partitioner & Accumulator
partitioner는 record가 들어오면 어떤 partition으로 record를 전송할지 결정해 record의 partition값을 세팅하는 역할을 한다. kafka library는 기본적으로 UniformStickyPartitioner(deafult)와 RoundRobinPartitioner를 제공하며 사용자 지정 partitioner class를 생성해 사용할 수 있다. 별도의 partition을 지정하지 않을 경우 default로 UniformStickyPartitioner를 사용한다. UniformStickyPartitioner은 기존 RoundRobinPartitioner를 개선한 partitioner인데 record를 accumulator에 보내는 과정에서 더 좋은 성능을 보여준다.
UniformStickyPartitioner (default)
UniformStickyPartitioner의 경우 message key가 null이 아니라면 key값과 partition을 매칭해 레코드를 전송한다. hash key를 이용하기 때문에 이후에 같은 key값이 들어온다면 같은 partition을 매칭한다. message key가 null인 경우에는 RoundRobin개념을 이용해 우선순위(#)가 낮은 buffer부터 record를 균일하게 매칭한다. 아래 그림을 보면 accumulator의 buffer에 record를 batch size만큼 몰아서 채우고 다 채웠다면 다음 buffer로 이동해 데이터를 batch size만큼 적재해 전송한다.
RoundRobinPartitioner
RoundRobinPartitioner또한 message key가 null이 아니라면 key값과 partition을 매칭해 레코드를 전송한다. 마찬가지로 hash key를 이용하기 때문에 이후에 같은 key값이 들어온다면 같은 partition을 매칭한다. RoundRobinPartitioner 또한 message key가 null인 경우에는 RoundRobin개념을 이용해 record를 partition과 균일하게 매칭한다. 다만, UniformStickyPartitioner와 다른점은 buffer에 batch size 만큼의 데이터가 적재될 때까지 기다리는 것이 아니라 데이터를 하나씩 넣고 partition을 switch하는 방식으로 데이터를 적재해 buffer가 채워지면 sender로 데이터를 전달한다.
Custom Partitioner
Kafka Library에서는 CustomPartitioner 구성을 위해 Partitioner 인터페이스를 제공한다. 아래와 같이 오버라이드 메서드를 통해서 메세지 키 또는 메세지 값에 따른 Partition 지정 로직을 구현할 수 있다.
public class CustomPartitioner implements Partitioner {
@Override
public int partition(
String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster
) {}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
Producer Options
Essential
bootstrap.servers
카프카 브로커의 호스트:포트를 지정한다. `,`를 이용해 복수의 브로커를 클러스터로 연결한다.
key.serializer & value.serializer
레코드의 키/값을 직렬화하는 클래스를 지정한다. 보통 StringSerializer를 사용하고 상황에 맞게 다른 Serializer를 사용한다.
Optional
acks
producer가 전송한 데이터가 브로커들에게 정상적으로 저장되었는지 전송 성공 여부를 확인하는 옵션 값이다. 0, 1, 2(all) 중 하나로 설정할 수 있고 기본값은 1이다. acks를 0으로 설정하면 전달 여부를 확인하지 않는다는 의미이고 1로 설정하면 Record가 리더 파티션에만 성공적으로 전달됐는지 확인한다는 의미이다. 마지막으로 acks=all은 리더 파티션을 포함한 ISR 만큼의 파티션에 Record가 잘 전달됐는지 확인한다. default acks=1이고 일반적으로 acks=1을 사용한다.
linger.ms
Accumulator Buffer에 Batch Size 만큼의 Record가 적재되기까지 기다리는 최소 시간을 의미한다. Accumulator에 Record를 적재하는 time limit이라고 보면 될 것 같다. 기본값은 0ms이고 0이면 바로 Sender에 Record를 전송한다.
retries
브로커로부터 에러를 받고 난 뒤 재전송을 시도하는 횟수를 지정하는 옵션이다. 기본값은 Integer.MAX_VALUE(2^31-1)이다.
max.in.flight.requests.per.connection
한 번에 요청하는 최대 커넥션 갯수이다. Sender와 Broker사이에 연결할 최대 TCP Connection 수를 말하고. 기본값은 5로 설정되어 있다.
partitioner.class
파티셔너를 지정하는 옵션이다. 기본값은 UniformStickyPartitioner(2.5.0 이상)이고 RoundRobinPartitioner나 CustomPartitioner를 구현해 대체할 수 있다.
enable.idempotence
멱등성 프로듀서로 동작할지 결정하는 옵션값이다. 기본값은 false이다.
transactional.id
Record를 트랜젝션 단위로 묶을지 결정한다.
Producer Callback Future
Kafka에서는 send() 요청에 대한 응답으로 동기/비동기 처리를 지원한다. 그 중 결과의 비동기 처리를 지원하는 객체가 Future객체이다. Kafka 2.x.x 버전에서는 ListenableFuture 인터페이스를 구현한 Callback 클래스를 만들어 비동기처리를 구현할 수 있고 3.x.x 버전에서는 ListenableFuture가 deprecated되고 CompletableFuture를 사용해 send() 결과값의 비동기처리를 구현할 수 있다. 결과값에 대한 동기 처리는 send().get()을 이용해 SendResult 객체를 받아 처리할 수 있다.
ListenableFuture Example
// KafkaCallback.java
public class KafkaCallback implements ListenableFutureCallback {
@Override
public void onSuccess(Object result) {...}
@Override
public void onFailure(Throwable error) {...}
}
// KafkaProducer.java
public class KafkaProducer {
/**
* basic record produce *
* @param topic topic name
* @param message produce message
* */
public void produce(String topic, String message) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
// attach callback
kafkaTemplate.send(record).addCallback(new KafkaCallback());
}
}
CompletableFuture Example
/**
* basic record produce
* @param topic topic name
* @param message produce message
* */
public void produce(String topic, String message) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
// synchronous send - timeout 10s
try {
SendResult<String, String> result = kafkaTemplate.send(record).get(10, TimeUnit.SECONDS);
handleResult(result);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
handleResult(e);
}
// asynchronous send
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(record);
future.whenComplete(this::handleResult);
}
멱등성 프로듀서 (Idempotence Producer)
멱등성이란 여러 번 연산을 수행하더라도 동일한 결과를 나타내는 것을 말한다. 멱등성 프로듀서는 특정 이슈에 의해 동일한 레코드를 파티션에 여러 번 전송하더라도 카프카 클러스터에는 단 한 번만 전달(exactly once delivery)함을 보장한다.
# kafka 3.0.0 부터는 default가 true이고 acks=all 과 retreis가 Integer.MAX_VALUE로 옵션이 필수적으로 설정, 고정된다.
enable.idempotence = true
멱등성 프로듀서가 단 한 번만 전달이 가능한 이유는 브로커에 데이터를 전달할때 프로듀서의 고유한 ID값인 프로듀서 아이디(PID)와 레코드의 전달 번호인 시퀀스 아이디(SID)를 함께 전달하기 때문이다. 브로커는 요청이 들어오면 PID와 SID를 확인해 동일한 메세지의 적재 요청이 오더라도 중복 데이터를 선별해 정확히 한번만 브로커에 적재되도록 동작한다.
멱등성 프로듀서는 한계가 존재한다. 프로듀서 애플리케이션이 재시작된다면 PID값이 달라지는데 이러한 경우 브로커 입장에서 다른 프로듀서가 보낸 데이터로 판단하기 때문에 exactly once delivery를 보장할 수 없다. 멱등성 프로듀서의 경우 메모리에 있는 데이터를 읽고 쓰고 확인하는 과정에서 비용이 발생하기 때문에 멱등성 프로듀서를 사용하기전에 잘 생각해서 사용해야한다. 카프카 3점대 부터는 그래도 최적화가 많이 되어서 부하는 분명히 있지만 쓸만하다고 한다.
트랜잭션 프로듀서와 컨슈머 (Transaction Producer & Consumer)
복수의 레코드를 하나의 트랜잭션으로 묶어서 전송하는 동작을 지원하는 producer이다. 트랜잭션으로 묶인 레코드들은 트랜잭션의 특징에 따라 모두 처리되거나 모두 처리되지 않는 all or nothing의 성격을 가진다.
# transaction producer
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID());
# transaction consumer
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");