Record
record는 kafka의 데이터 통신을 위한 객체이며 이를 위해 필요한 값들을 저장하고 있다. record는 프로듀서가 메세지를 보낼 때 사용하는 producer-record와 컨슈머가 메세지를 받을 때 사용하는 consumer-record로 나뉜다.
Producer Record
topic
record가 어느 topic으로 전송되어야하는지를 저장하는 값이며 레코드 생성시 초기화한다.
partition
record가 어느 partition으로 전송되어야하는지를 저장하는 값이며 레코드 생성시 초기화해 파티션을 명시하거나, partitioner가 round-robin 방식 또는 hash-key를 이용해 목적지 partition데이터를 세팅한다.
headers
record에 부가적인 정보를 담기 위한 부분이다. key-value 형식의 데이터를 추가할 수 있으며 http-header를 생각하면 편하다.
key
partitioner에서 key값을 이용해 메세지를 분리하기 위해 메세지 생성시 입력하는 값이다. key값을 입력하면 partitioner는 key값에 맞는 partition으로 record를 전송하고 같은 key를 가지는 record는 모두 같은 partition으로 전송된다. 별도의 key값을 설정하지 않으면 null값을 가지게 되고 partitioner는 key값이 null이라면 record를 round-robin으로 partition에 분배한다.
value
실질적인 메세지 데이터를 저장하는 공간이다. 제네릭을 사용하기 때문에 float, byte[], string 등 다양한 형태로 데이터 저장이 가능하다. 데이터 형식에 맞춰 적절한 직렬화 포맷을 설정해줘야하고 별도의 직렬화 클래스를 만들어 사용할 수도 있다. 다만, producer의 key/value 직렬화 포맷과 consumer의 key/value 역직렬화 포맷이 동일해야하기 때문에 유의해야한다. 보통 StringSerializer를 사용한다고 한다.
timestamp
타임스탬프의 경우 스트림 프로세싱에 활용하기 위한 시간을 저장하는 용도로 사용된다. 추가적인 설정을 하지 않으면 producer-record가 생성된 시간으로 저장되며 message.timestamp.type 옵션을 이용해 브로커 적재 시간을 세팅할 수도 있다.
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
...
}
Consumer Record
consumer record는 아래 코드를 보면 producer record에 비해 많은 값이 저장되는 것을 볼 수 있는데 대부분 producer record와 동일한 값이 저장되어 있다. producer record와 동일한 부분은 제외하고 알아보자.
offset
partition에서 메세지의 위치를 나타낸다. offset은 파티션 내부의 고유한 값으로 설정되기 때문에 producer record에서는 볼 수 없었지만, consumer record는 partition에서 가져온 객체이기 때문에 offset값이 설정되어 있다.
timestampType
producer record에서 언급한 것처럼 timestamp는 옵션값을 이용해 record 생성시 또는 broker 적재시의 timestamp를 세팅할 수 있다. timestampType은 옵션의 type을 명시하는 값이다.
serializedKeySize & serializedValueSize
producer record 단에서 직렬화한 key/value의 크기를 저장해놓은 값이다. 해당 값은 consumer record로의 역직렬화시 사용된다.
leader epoch
leader epoch은 partition의 leader가 변경횟수를 기록하는 값으로 leader가 변경될때마다 증가하는 값이다. consumer는 leader epoch 정보를 통해 현재 리더가 언제 변경되었는지 파악할 수 있다. 이를 통해 consumer는 offset을 재조정하거나 데이터를 재처리해야 하는 시점을 판단할 수 있다.
public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
public static final int NULL_SIZE = -1;
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private final Optional<Integer> leaderEpoch;
...
}
Spring에서 Producer/Consumer Record 사용
Producer Record
@Component
@RequiredArgsConstructor
@Slf4j(topic = "KafkaProducer")
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
/**
* basic record produce
* @param topic topic name
* @param message produce message
* */
public void produce(String topic, String message) throws Exception {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
SendResult<String, String> result = kafkaTemplate.send(record).get();
log.info("{}", result);
}
...
}
Consumer Record
@Component
@Slf4j(topic = "KafkaConsumer")
public class KafkaConsumer {
/**
* demo topic consume (auto commit)
* @param records consumed message
* */
@KafkaListener(topics = KafkaConst.KAFKA_TOPIC_DEMO)
public void consumeDemo(ConsumerRecords<String, String> records) {
for (ConsumerRecord<String, String> record : records) {
log.info("{}", record);
}
}
...
}