이전 글 보러가기
개요
기존 프로젝트에서 스포츠 게임별로 사용자들이 실시간 소통을 할 수 있는 채팅 기능을 개발했다. 개발 기간이 짧아 러닝커브를 고려해 채팅 데이터를 전달하는 브로커를 STOMP에서 기본적으로 제공하는 인메모리 브로커를 사용했다. 인메모리 브로커를 사용했기 때문에 서버의 스케일아웃에 대비할 수 없었다. 프로젝트 마무리 후 개인적으로 채팅 데이터 저장과 스케일아웃이 가능한 서버 아키텍처로 개선해보고자 했다. 해당 글에서는 기존 채팅 서비스에서 몽고디비와 카프카를 활용해 실시간 채팅 아키텍처를 개선한 과정에 대해 다룬다.
실시간 채팅 아키텍처
개선 전 서버 스케일아웃 시 발생하는 문제에 대한 아키텍처이다. 스케일아웃된 서버를 운영할 경우 사용자들의 요청은 서로 다른 서버에 도달할 것이고 웹소켓 연결 또한 서로 다른 서버와 맺을 것이다. 여기서 발생하는 문제점은 사용자가 특정 채팅방에 메시지를 보낸다고 할때 메시지 발신자와 동일한 서버를 구독중인 사용자들에게는 메시지가 잘 도착하겠지만, 같은 채팅방이라도 다른 서버를 구독중인 사용자에게는 메시지가 전송되지 않는다는 것이다. 인메모리 브로커는 서버마다 독립적으로 운영되기 때문에 다른 서버의 브로커는 메시지를 전달받지 못했기 때문이다.
아래는 개발에 들어가기 전 설계한 인메모리 브로커의 문제점을 해결한 아키텍처이다. 인메모리 브로커 대신 외부 메시지큐를 사용함으로써 사용자들이 서로 다른 서버와 연결되어 있더라도 컨슈머 그룹을 각각 고유하게 설정해 같은 토픽을 구독하고 있다면 메시지를 consume 할 수 있게 하였다. 또한, 채팅 메시지 기록을 위해 데이터의 삽입/삭제에 유리한 NoSQL을 사용하고자 했다. 가용 기술 중 Redis와 MongoDB가 있었고 해당 프로젝트에서는 JSON 데이터 저장에 적합한 MongoDB를 사용하기로 결정했다.
카프카 클러스터 구성
카프카의 고가용성을 활용하고 싶었기 때문에 추후 트래픽이 많아질 것이라는 가정하에 3개의 브로커를 이용해 카프카 클러스터를 구성해 운영해보기로 결정했다. 토픽은 아래 명령어를 이용해 생성했다. 채팅 서버 4개를 각각 고유한 컨슈머 그룹으로 운영 할 예정이기 때문에 4개의 파티션을 가진 토픽을 생성했다. 각 파티션은 1개의 컨슈머에게만 데이터를 전송할 수 있기 때문에 파티션 갯수보다 컨슈머의 갯수가 많으면 특정 서버는 데이터를 컨슈밍 하지 못하기 때문에 최소 파티션 갯수를 4개로 생각했고, 추후 브로커나 컨슈머에 부하가 생긴다면 상황에 맞게 파티션 또는 컨슈머를 늘려서 운영하는게 좋을 것이라고 판단했다.
$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092,localhost:9094,localhost:9096
--topic CHAT-MESSAGE --partitions 4 --replication-factor 3
$ ./bin/kafka-topics.sh --bootstrap-server localhost:9092,localhost:9094,localhost:9096 --topic C
HAT-MESSAGE --describe
Topic: CHAT-MESSAGE TopicId: pxWFAeigR6C87iFefMqiSw PartitionCount: 4 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: CHAT-MESSAGE Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: CHAT-MESSAGE Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: CHAT-MESSAGE Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: CHAT-MESSAGE Partition: 3 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
아래는 카프카 프로듀서와 컨슈머에 대한 서버측 설정이다. 해당 프로젝트는 채팅 아키텍처에 대한 개선을 주제로 하기 때문에 설정값은 현재 카프카에 대해 이해하고 있는 부분 내에서 필요한 최소한의 설정만 해두었다. 안정성과 성능상의 이점을 같이 챙기고 싶었기 때문에 프로듀서의 ACKS는 1로 설정하였고 다른 프로듀서 설정값들은 기본 옵션을 사용한다. 컨슈머 설정에서는 메시지 유실을 방지하기 위한 오프셋 초기화 설정을 명시했고 리밸런싱을 감지하기 위한 핸들러를 추가해 주었다.
@Configuration
public class KafkaProducerConfig {
@Value("${kafka_bootstrap_servers}")
private String bootstrapServers;
private final String[] acks = {"0", "1", "all"};
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(kafkaProducerFactory());
}
private ProducerFactory<String, String> kafkaProducerFactory() {
Map<String, Object> props = new HashMap<>();
// kafka cluster address setting
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// serializer setting
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// producer acknowledge setting
props.put(ProducerConfig.ACKS_CONFIG, acks[1]);
return new DefaultKafkaProducerFactory<>(props);
}
}
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka_bootstrap_servers}")
private String bootstrapServers;
@Value("${kafka_consumer_group}")
private String consumerGroup;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// fetcher를 이용해 batch로 record를 읽어들이기 위한 설정 (ConsumerRecords 사용 가능)
factory.setBatchListener(true);
// RebalanceListener 등록
factory.getContainerProperties().setConsumerRebalanceListener(new RebalanceListener());
return factory;
}
private ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
// kafka cluster address setting
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// deserializer setting
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// consumer group setting
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
// earliest : 가장 처음부터 읽기, latest : 가장 마지막부터 읽기
// consumer offset 을 사용할 수 없는 상태이거나 offset 정보를 찾을 수 없을떄의 option
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// auto commit(true), manual commit(false)
/* props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); */
// record를 읽어들이는 최소 byte size setting (default: 1byte)
/* props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);*/
// 최소 byte size 만큼의 record가 fetcher에 적재되기까지 기다리는 최대 대기 시간(milli-seconds)
/* props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1024);*/
// 토픽 자동 생성 설정
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props);
}
}
몽고디비 구성
채팅 데이터를 저장할 몽고디비는 채팅방을 중심으로 데이터를 저장하기로 결정했다. 채팅방별로 채팅방의 기본 정보 및 채팅방에 속한 사용자를 조회할 수 있는 컬렉션과 채팅 메시지를 저장할 수 있는 컬렉션을 만들었다. 또한, 조회 성능을 높이기 위해 특정 사용자가 어떤 채팅방에 속해있는지에 대한 정보를 저장하는 컬렉션을 별도로 추가해주었다. 몽고디비의 각 컬렉션에 저장할 수 있는 데이터의 용량은 물리적으로 제한이 없어서 페타바이트 단위로도 데이터를 저장할 수 있다고 한다. 따라서 채팅방에서 발생하는 메시지 데이터를 단일 컬렉션에 저장하기로 결정하였다. 데이터가 많아져 부하가 발생할 경우를 대비해 인덱싱을 걸어두었고, 추후 샤딩을 이용해 데이터를 분산 저장해 해결할 수 있다고 생각했다.
사용자를 중심으로 채팅 데이터를 저장하는 방안도 생각해봤지만, 특정 채팅방의 대화 내용을 조회하기 위해 소속된 사용자들을 조회하고 사용자들의 채팅 데이터 중에서 채팅방에 연관된 데이터를 가져오는것은 매우 비효율적일 것이라고 생각했다. 사용자들이 많아질수록 채팅 데이터는 기하급수적으로 늘어날 것이다. 특정 메시지를 몇 명의 사용자가 읽었는지 또는 어떤 메시지에 어떤 답글이 달렸는지 등 기능이 많아지고 데이터 구조가 복잡해질 것이며 이를 저장하기 위해서는 많은 방면에서의 최적화와 엄청난 규모의 스토리지와 필요할 것이라고 생각한다. 자체 인프라를 운영하는 큰 규모의 기업들이 새삼 대단하다고 느꼈다.
/* chat_rooms collection */
{
"_id": "6797142af785a1210c1ae969",
"name": "개발자 모임",
"userIds": [1, 10],
"createdAt": "2024-03-01 12:34:56"
}
/* chat_messages collection */
{
"_id": "67971934d5722060f6a9268a",
"roomId": "6797142af785a1210c1ae969",
"userId": 1,
"username": "사용자1",
"content": "안녕하세요",
"timestamp": "2024-03-01 13:00:00"
}
/* user_chat_rooms collection */
{
"_id": 1, // 사용자 ID
"chatRoomIds": [
{
"roomId": "6797142af785a1210c1ae969",
"joinedAt": "2024-03-01 12:34:56"
},
{
"roomId": "679715cddcb9c82cff618f2b",
"joinedAt": "2024-03-01 12:34:56"
}
]
}
개선된 실시간 채팅 서비스 구현
먼저 스프링 서버와 클라이언트의 웹소켓 통신을 위한 설정 클래스로 기존에 사용하던 인메모리 브로커를 주석처리 했다. 웹소켓 연결은 서버별로 동일하게 이루어져야 하기 때문에 다른 설정은 변경사항이 없고, 메지시를 중계하는 브로커로 카프카를 사용함으로써 해당 설정이 필요없어졌기 때문이다. 또한, 인증된 사용자만 채팅 서비스를 이용할 수 있도록 하기 위해 웹소켓 연결 시 JWT를 검증할 수 있는 인터셉터를 추가했다.
@Configuration
@EnableWebSocketMessageBroker
public class StompConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
/* wss://{host}:{port}/websocket 경로로 handshake 및 connection 연결 */
registry
.addEndpoint("/ws/chat")
.setAllowedOrigins("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
/* default in-memory broker 사용 및 메시지 발행 경로 prefix 설정 */
/* 외부 브로커 사용 시 아래 설정 제거 */
// config.enableSimpleBroker("/topic");
/* 메시지 발행 전 내부 컨트롤러로 전달 */
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
/* 웹소켓 연결 시 JWT 인증 처리 */
registration.interceptors(stompChannelInterceptor);
}
}
@Component
@RequiredArgsConstructor
@Slf4j(topic = "StompChannelInterceptor")
public class StompChannelInterceptor implements ChannelInterceptor {
private final UserRepository userRepository;
private final JwtProvider jwtProvider;
@Override
public Message<?> preSend(@NonNull Message<?> message, @NonNull MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
/* 웹소켓 연결 요청 시 JWT 검증 */
if(StompCommand.CONNECT.equals(accessor.getCommand())) {
List<String> authorization = accessor.getNativeHeader(SecurityConst.AUTHORIZATION_HEADER);
if(!Objects.isNull(authorization) && !authorization.isEmpty()) {
try {
/* 인증 정보 조회 및 사용자 조회 */
String accessToken = jwtProvider.getAccessToken(authorization.get(0));
Optional<UserEntity> user = userRepository.findByEmail(jwtProvider.getEmail(accessToken));
/* 사용자 정보 조회 실패 시 예외처리 */
if(user.isEmpty()) return buildErrorResponse(Api4xxErrorCode.NOT_FOUND_AUTHENTICATION.getMessage());
} catch (ApiException e) {
return buildErrorResponse(e.getMessage());
}
} else {
/* 요청 헤더를 찾을 수 없을 시 예외처리 */
return buildErrorResponse(Api4xxErrorCode.NOT_FOUND_AUTHENTICATION.getMessage());
}
}
return ChannelInterceptor.super.preSend(message, channel);
}
private Message<?> buildErrorResponse(String errorMessage) {
log.error("{}", errorMessage);
StompHeaderAccessor errorAccessor = StompHeaderAccessor.create(StompCommand.ERROR);
errorAccessor.setMessage(errorMessage);
return MessageBuilder.createMessage(new byte[0], errorAccessor.getMessageHeaders());
}
}
클라이언트로부터 들어온 메시지를 처리하기 위한 데이터 클래스와 발행 요청을 처리할 컨트롤러 코드이다. 카프카를 이용해 분산된 서버에 메시지를 중계해야 하고 채팅 데이터를 별도로 저장해야하기 때문에 메시지가 컨트롤러를 경유할 수 있도록 "/app"을 시작 경로로 매핑해 메시지를 받아와야한다. 클라이언트로부터 메시지 발행 요청이 들어오면 모든 서버에 이를 알리기 위해 채팅 토픽에 메시지를 프로듀싱하고 몽고디비에 데이터를 저장한다. 메시지가 프로듀싱되면 각각 고유한 컨슈머그룹에 속한 모든 서버는 카프카 컨슈머를 통해 메세지를 수신받을 것이고 특정 채팅방을 구독중인 클라이언트에게 메시지를 전송한다.
@Getter
public class ChatMessageRequest {
private String roomId;
private Long userId;
private String username;
private String content;
private String timestamp;
}
@RestController
@RequiredArgsConstructor
@Slf4j(topic = "MessageController")
public class MessageController {
private final ChatMessageRepository chatMessageRepository;
private final KafkaProducer kafkaProducer;
private final SimpMessagingTemplate messagingTemplate;
/* wss://{host}:{port}/app/chat 경로로 메시지 수신 */
@MessageMapping("/chat")
public void publishMessage(@RequestBody ChatMessageRequest request) {
/* 발행 요청 데이터 프로듀싱 */
ChatMessageCollection message = ChatMapper.toMessageCollection(request);
kafkaProducer.produce(KafkaConst.TOPIC_NAME_CHAT, JsonUtil.toJson(message));
/* MongoDB 채팅 데이터 저장 */
chatMessageRepository.save(message);
}
@KafkaListener(topics = KafkaConst.TOPIC_NAME_CHAT)
public void consumeMessage(ConsumerRecords<String, String> records) {
try {
for (ConsumerRecord<String, String> record : records) {
String message = record.value();
log.info("message consumed - topic: {}, message: {}", KafkaConst.TOPIC_NAME_CHAT, message);
/* 채팅방 구독 사용자에게 메시지 전송 */
ChatMessageCollection messageCollection = JsonUtil.fromJson(message, ChatMessageCollection.class);
String stompTopic = "/topic/chat/" + messageCollection.getRoomId();
messagingTemplate.convertAndSend(stompTopic, message);
log.info("message sent success to [{}]", stompTopic);
}
} catch (MessagingException e) {
log.error("message send failed: {}", e.getMessage());
}
}
}
[2025/01/27 17:08:41.758] [ INFO] [clientInboundChannel-45] [KafkaProducer] SendResult [producerRecord=ProducerRecord(topic=CHAT-MESSAGE, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value={"roomId":"679715cddcb9c82cff618f2b","userId":1,"username":"사용자1","content":"안녕하세요","timestamp":"2025-01-27 17:08:41"}, timestamp=null), recordMetadata=CHAT-MESSAGE-1@19]
[2025/01/27 17:08:41.785] [ INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [MessageController] message consumed - topic: CHAT-MESSAGE, message: {"roomId":"679715cddcb9c82cff618f2b","userId":1,"username":"사용자1","content":"안녕하세요","timestamp":"2025-01-27 17:08:41"}
[2025/01/27 17:08:41.786] [ INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [MessageController] message sent success to [/topic/chat/679715cddcb9c82cff618f2b]
마지막으로 클라이언트 코드이다. 이번 글에서는 서버의 스케일아웃을 위해 카프카와 몽고디비를 적용해 서버코드를 변경했을뿐 클라이언트단의 코드는 서버의 스케일아웃과 관계없이 하나의 호스트로만 요청을 보내면 된다. 따라서, 클라이언트 코드는 이전 버전과 변경된 부분이 없으며 아래는 코드가 너무 길어져 HTML, 기타 함수 등을 제외한 중요 코드만 추려서 올렸다.
class ChatContainer extends Component {
constructor(props) {
super(props);
...
this.state = {
userId: 0,
username: '',
messages: [],
sendMessage: '',
};
}
stompClient = new StompJs.Client({
brokerURL: "ws://{host}/ws/chat",
connectHeaders: {Authorization: jwtUtil.getAccessToken()}
});
componentDidMount() {
/* 사용자 정보 조회 */
this.httpGetUser().then(() => {
/* 채팅방 메시지 리스트 조회 */
this.httpGetMessages();
/* 웹소켓 연결 */
this.handleStompConnect();
});
}
componentWillUnmount() {
if (this.stompClient) {
const result = this.stompClient.deactivate();
console.log('deactivate', result);
}
}
httpGetUser() {...}
httpGetMessages() {...}
handleStompConnect() {
const { roomId } = this.props;
/* 웹소켓 연결 시도 */
this.stompClient.activate();
const connectionTimeout = setTimeout(() => {
alert("Connection timeout");
const deactivate= this.stompClient.deactivate();
console.log(deactivate);
}, 5000);
this.stompClient.onConnect = (frame) => {
/* 웹소켓 연결 성공 시 타임아웃 해제 */
clearTimeout(connectionTimeout);
console.log("Connected: " + frame);
/* 현재 채팅방 메시지 구독 */
this.stompClient.subscribe(`/topic/chat/${roomId}`, (message) => {
console.log(message.body);
const receivedMessage = JSON.parse(message.body);
this.setState((prevState) => ({
messages: [...prevState.messages, receivedMessage],
}));
});
};
}
handleSend() {
const { roomId } = this.props;
const { userId, username, sendMessage } = this.state;
this.stompClient.publish({
destination: `/app/chat`,
body: JSON.stringify({
roomId: roomId,
userId: userId,
username: username,
content: sendMessage,
})
});
this.setState({sendMessage: ''});
}
handleKeyDown(e) {...}
handleMessageChange(e) {...}
render() {...}
}
export default ChatContainer;
실시간 채팅 아키텍처를 개선하며 느낀점
스케일아웃에 대비한 실시간 채팅 서비스 개발은 코드상으로는 많은 추가가 이루어지지는 않았지만 새로운 기술을 적용하고 데이터의 구조와 흐름을 구성하는데 많은 시간이 들었던 것 같다. 단순히 기능을 구현하고 성능을 향상을 위한 코드만을 작성할뿐만 아니라 현재 개발하는 서비스의 아키텍처를 어떻게 구축해야 하는지와, 추후 서비스 규모가 커졌을때를 대비할 수 있는 소프트웨어 엔지니어로서의 시야를 얻은 것 같아 매우 뿌듯했다. 또한, 개발 전 설계를 구체적으로 해놓으니 실제 개발은 막히는 부분 없이 손쉽게 할 수 있었다. 프로젝트가 복잡해질수록 초기 설계를 잘 해놓고 들어가야한다는 깨달음을 얻은 좋은 경험이었다.