본문 바로가기
🌱 Spring

안 읽은 채팅 구현기 : STOMP의 ChannelInterceptor 를 사용하여

by iirin 2023. 10. 16.
이 글은 당근마켓을 모티브로 한 프로젝트 Secondhand 구현시 이슈 사항을 정리한 글입니다.

 

들어가며

Secondhand 프로젝트를 진행할 때 가장 마지막에 구현했던 기능인 ‘아직 안 읽은 채팅’에 대해 백엔드 개발자 2명이 한 고민과 구현 과정을 정리해보았습니다.

채팅 시스템은 처음 이 프로젝트를 시작할 당시에 BE는 물론 FE, iOS에게도 굉장히 큰 과제였습니다. 유저간 실시간 채팅을 어떻게 구현할 것이며, 어떻게 DB에 저장할 것인가도 어려운 미션이었는데요. (이것에 대해서는 추후에 따로 글을 적어보기로…) 산넘어 산… “안 읽은 채팅을 어떻게 구현할 것인가”는 다른 의미로 어려웠던 미션이었습니다.

전자는 “새로운 기술을 어떻게 도입할 것인가”라는 고민에 가까웠다면 후자는 채팅 도메인에 대한 해석과 구현 방법에 시간을 많이 할애한 것 같습니다. 복잡한 문제를 어떻게 쪼갤지에 대해서는 다양한 방법이 있습니다. 그리고 그 중에서 하나를 매번 선택해야 하는 것이죠.

 

발단 : 요구사항 파악

이미지 출처 : https://zdnet.co.kr/view/?no=20210528110514 정확하지는 않지만, 기획안은 코드스쿼드 저작권을 확인하지 못해 첨부는 못하고… 비슷한 웹뷰로 첨부합니다.

채팅 리스트에 보면 채팅방으로 접근할 수 있는 리스트 외에도 해당 회원이 마지막으로 보낸 메시지와 아직 내가 확인하지 않은 ‘안 읽은 메시지 개수’가 카운트됨을 확인할 수 있습니다.

그리고 정확하지는 않지만 대다수의 채팅 메신저가 그렇듯이 마지막 메시지의 발송 시간에 따라 정렬되고 있습니다.

이 리스트에서 원하는 채팅방을 클릭하면 채팅 내역을 확인할 수 있는데요. 이 채팅 내역을 확인하면 안 읽은 메시지는 0이 되어야 합니다.

 

안 읽은 채팅에 대한 고민

어떻게 동작해야 하는가?

요구사항 분석시에 언급한 동작방식을 정리하면 다음과 같습니다.

  • 사용자가 상대방에게 메시지를 보냈는데 상대방이 바로 읽지 않으면 ‘안 읽은 메시지’가 됩니다. 카운트가 1 늘어나야 합니다.
  • 사용자가 채팅방 리스트를 볼 때 안읽은 메시지가 각 채팅방별로 숫자로 표현되어야 합니다.
  • 사용자가 채팅방에 들어가면 안 읽은 메시지는 0으로 초기화 되어야 합니다.

 

어떻게 구현해야 하는가?

이쯤 되어서 안 읽은 채팅을 어떻게 구현할 것인가를 선택해야 되었는데요.

안 읽은 채팅을 팀원과 함께 상의하다가 다양한 방식으로 구현할 수 있고 그것에 대한 장단점을 분석하였습니다.

💡 방법 1. 사용자의 마지막 접속 시간과 채팅방 메시지 발송 시각을 비교합니다.

채팅방으로 채팅 메시지를 조회하여 마지막 접속시간 이후에 보내진 메시지를 카운트하여 보내는 방식입니다.

- 장점 : 데이터 정규화로 불필요한 데이터를 저장하지 않아도 되고, 데이터 정합성이 높습니다.
- 단점 : 매 채팅방 리스트를 조회할 때 사용자 채팅방의 모든 로그를 조회해야 합니다. 채팅 메시지가 많다면 시간이 꽤 오래 걸릴 수 있습니다.
💡 방법 2. 안 읽은 채팅을 저장하는 별도의 컬럼 혹은 테이블로 저장합니다.

별도의 컬럼 혹은 테이블을 DB에 저장하두고 매번 메시지를 수신할 때마다 갱신시켜줍니다. 유저가 접속하면 0으로 초기화합니다.

- 장점 : 매번 채팅방 로그를 모두 조회하여 비교해야한다는 부담감을 줄일 수 있습니다. 조회에 이점을 갖습니다.
- 단점 : 메시지 수신시 매번 UPDATE 가 일어납니다. 같은 트랜잭션에 있을 경우 UPDATE 로직의 실패가 동일 트랜잭션에 영향을 줄 수 있습니다. 데이터 정합성이 지켜지지 않을 수 있습니다.
 
💡 방법 3. 사용자의 마지막 읽은 채팅 메시지 PK를 저장합니다.

방법1과 방법2의 절충안이었습니다. 그러나 두 방법의 단점을 동시에 가지게 된다는 것이 제일 큰 단점이라는 분석으로 바로 열외시켰습니다.
 
 

이 방법들 중 최종적으로 선택한 방법은 두 번째, 안 읽은 채팅 개수를 저장하는 별도의 테이블을 만드는 것이었습니다.

그 이유는 채팅을 보내는 횟수보다 조회 횟수가 월등히 많다는 것입니다.

이 데이터는 채팅방 리스트를 조회할 때마다 일어나야 하고 비효율적으로 구현하는 경우 연쇄 쿼리가 N개에 채팅방에 대해 1개씩 일어날 수 있습니다. 그래서 데이터베이스를 일부 비정규화하여 중복된 데이터가 저장되더라도 조회에 유리한 로직으로 구현해보자고 팀원과 합의하였습니다.

그리고 이 안 읽은 채팅 매번 채팅방 입장할 때마다 초기화되어서 약간 오류가 있어도 치명적이지 않다는 판단도 있었습니다.

 

고민했던 것과 구현 방식

어떻게 저장할까?

Secondhand는 현재 RDB로 MySQL을 사용하고 있습니다. 그러나 이 채팅 Metainfo는 MongoDB에 채팅 메시지와 함께 저장했습니다.

그 이유로는 구현해보지 못한 도메인이라 개발 단계에서는 저장 데이터가 매번 바뀔텐데 MySQL에 저장하면해 중간에 컬럼이 바뀔 때마다 스키마도 매번 수정해주어야 하기 때문에 개발 시간이 오래 소요될 것 같다는 것이 가장 컸습니다.

MongoDB는 Document 로 저장되기 때문에 중간에 데이터 저장 형식이 바뀌더라도 부담없이 바로 적용시킬 수 있습니다.

  • 저장한 결과입니다.
{
  "_id": "6c5879d3-455a-472a-9be6-eef45dcbf51b",
  "participants": {
    "info": {
      "member1": {
        "memberId": "member1",
        "isConnected": true, # 현재 접속 여부
        "messageStock": 0 # 안읽은 메시지 개수
      },
      "newpow": {
        "memberId": "newpow",
        "isConnected": false,
        "messageStock": 1
      }
    }
  },
  "lastMessage": "test message15", # 마지막 메시지
  "updateAt": {
    "$date": "2023-09-02T06:57:40.595Z" # 마지막 메시지 갱신 날짜. 이걸로 어플리케이션에서 정렬하여 API를 반환합니다.
  },
  "_class": "com.team5.secondhand.chat.chatroom.domain.Chatroom"
}

 

채팅 구현 환경 점검

현재 프로젝트 내에서 실시간 채팅은 STOMP와 Redis pub/sub 을 혼합하여 구현하였습니다. STOMP가 클라이언트의 구독과 메시지 발송을 받고 Redis를 통해 pub하면 이를 핸들링하여 클라이언트에게 메시지가 발송되는 형태인데요.

STOMP 이 메시지를 받고 처리하기 전에 핸들링을 할 수 있는 ChannelInterceptor 를 제공합니다. 이를 이용하여 채팅방 구독 여부를 확인할 수가 있어 각 메시지 상태에 따라 채팅 metainfo를 갱신해주기로 했습니다.

그리고 매번 사용자가 메시지를 보낼 때마다 안읽은 메시지도 더해지도록 구현하기 위해 SpringEvent 를 이용하였습니다.

    @Async
    @EventListener
    public void chatBubbleArrivedEventHandler(ChatBubbleArrivedEvent event) throws NotChatroomMemberException {
        ChatBubble chatBubble = event.getChatBubble();
        Chatroom chatroom = getChatroom(chatBubble.getRoomId());
        chatroom.updateLastMessage(chatBubble);
        Chatroom saveChatroom = metaInfoRepository.save(chatroom);
    }

 

 

구현 코드

채팅 metainfo 도메인 구현

이제 채팅 metainfo를 저장하기 위한 도메인을 구현해보았습니다. 이번에는 테이블 구조를 먼저 짜기보다는 로직에 필요한 도메인을 구성하고 이를 MongoDB에 저장하다가 후에 MySQL 에 저장하도록 수정하기로 했습니다.

public class ChatroomMetaInfo implements Serializable {
    private String chatroomId; // chatroom 테이블 pk 참조, 식별관계
    private Participants participants = new Participants(new ConcurrentHashMap<>()); // 참가자 정보 저장
    private String lastMessage; // 채팅 마지막 메시지
    private Instant updateAt;   // 마지막 업데이트 날짜

    public boolean updateLastMessage (ChatBubble chatBubble) throws NotChatroomMemberException {
        this.lastMessage = chatBubble.getMessage();
        this.updateAt = Instant.now();
        return participants.getMessage(chatBubble.getReceiver());
    }

		// 사용자가 들어오고 나갈때마다 사용자 정보를 업데이트 합니다.
    public boolean enter(String memberId) {
        return participants.enter(memberId);
    }

    public boolean exit(String memberId) {
        return participants.exit(memberId);
    }

		// 생성자 getter 등 생략
}

 

사용자 정보는 Map 형태로 만들었고, 이를 관리하기 위한 일급컬렉션으로 구현하였습니다. 사용자 닉네임이나 PK로 바로 조회할 수 있기 때문에 Map 형태로 구현한 것입니다. 안읽은 메세지나 접속 여부를 참가자별로 저장해두어 채팅방에 사람이 더 들어가는 비즈니스 요구가 있을 때 확장 가능하도록 구성하였습니다.

public class Participants implements Serializable {

    private Map<String, ParticipantInfo> info = new ConcurrentHashMap<>();

    public boolean getMessage(Long receiver) {
        ParticipantInfo member;
        if((member=info.get(receiver))==null) {
            return false;
        }
        member.plusBubble();
        return true;
    }

    public boolean enter(String memberId) {
        ParticipantInfo participantInfo;
        if ((participantInfo = info.get(memberId))==null) {
            return false;
        }

        participantInfo.connect();
        info.put(memberId, participantInfo);
        return true;
    }

    public boolean exit(String memberId) {
        ParticipantInfo participantInfo;
        if ((participantInfo = info.get(memberId))==null) {
            return false;
        }

        participantInfo.disconnect();
        info.put(memberId, participantInfo);
        return true;
    }

		// 생성자 getter 생략
}
public class ParticipantInfo implements Serializable {
    private String memberId;
    private Instant lastDisconnectedAt;
    private Boolean isConnected;
    private Integer messageStock; // 안읽은 메시지 개수

    public void plusBubble() {
        this.messageStock ++;
    }

    public void connect() {
        this.isConnected = true;
        this.messageStock = 0;
    }

    public void disconnect() {
        this.isConnected = false;
        this.messageStock = 0;
        this.lastDisconnectedAt = Instant.now();
    }

		// 생성자 getter 생략
}

 

Message pre handler 구현

사용자의 메시지 header command를 읽고 서비스를 호출합니다.

command 종류에 대해선 다음 링크를 참조 👉 https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/messaging/simp/stomp/StompHeaderAccessor.html

public class StompMessageProcessor implements ChannelInterceptor {

    private final JwtService jwtService;
    private final SessionStorege sessionStorege;

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor headerAccessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        handleMessage(headerAccessor);
        return message;
    }

    private void handleMessage(StompHeaderAccessor headerAccessor) {
        if (headerAccessor == null || headerAccessor.getCommand() == null) {
            throw new MessageDeliveryException(ErrorType.BAD_REQUEST.getMessage());
        }

        switch (headerAccessor.getCommand()) {
            case CONNECT:
                String memberId = getMemberIdByToken(headerAccessor.getFirstNativeHeader("Authorization"));
                sessionStorege.saveSession(headerAccessor.getSessionId(), memberId); // 세션에 사용자 접속 여부를 저장합니다.
                break;
            case SUBSCRIBE: // 채팅방 구독시 로직 발생
                enterToChatRoom(headerAccessor); // 이벤트 발생
                break;
            case UNSUBSCRIBE: // 채팅방 구독 해지시 로직 발생
                exitToChatRoom(headerAccessor); // 이벤트 발생
                break;
            case DISCONNECT:
                sessionStorege.deleteSession(headerAccessor.getSessionId());
                break;
						// 그 외는 커맨드는 무시
        }
    }

    // 생략
}

 

이벤트 리스너 구현

채팅방 생성시 metainfo 가 함께 구현되고, 채팅 메시지가 도착하면 안읽은 메시지가 1씩 오르는 로직은 다른 도메인에서 직접 서비스를 참조하지 않고 이벤트를 사용하여 비동기로 처리하였습니다.

@Service
@RequiredArgsConstructor
public class ChatroomMetaEventListener {

    private final ApplicationEventPublisher eventPublisher;
    private final ChatroomMetaRepository metaInfoRepository;

		// 채팅방 생성시 metainfo 도 함께 생성되도록 합니다.
    @Async
    @EventListener
    public void chatroomCreatedEventHandler(ChatroomCreatedEvent event) { 
        Chatroom chatroom = Chatroom.init(event.getInfo());
        metaInfoRepository.save(chatroom);
    }

		// 채팅메시지 도착시 안읽은 메시지와 마지막 메시지를 업데이트 합니다.
    @Async
    @EventListener
    public void chatBubbleArrivedEventHandler(ChatBubbleArrivedEvent event) throws NotChatroomMemberException {
        ChatBubble chatBubble = event.getChatBubble();
        Chatroom chatroom = getChatroom(chatBubble.getRoomId());
        chatroom.updateLastMessage(chatBubble);
    }

    public Chatroom getChatroom(String chatroomId) {
        return metaInfoRepository.findById(chatroomId).orElseThrow();
    }

}

 

한계 및 개선할 것

동시성 문제

채팅이라는 도메인의 특성상 빠르게 입력과 조회가 이루어져야 합니다. DB에 바로 접근하는 현재의 로직상 동시성 이슈로 lost update가 우려됩니다.

이를 위해 채팅 metadata를 캐싱하고 주기적으로 DB에 업데이트 하는 로직으로 변경하는 것이 서비스 안정성에 좋을 것 같습니다.

 

데이터 정합성 문제

별도 트랜잭션이기 때문에 비동기 이벤트 로직 진행중 예외가 발생하였을 때 기존 데이터와 일치하지 않는 데이터 정합성 오류가 있을 수 있습니다. 현재 구현단계에서는 매번 초기화되는 데이터이기 때문에 치명적이지 않다고 판단하였지만, 장기적으로 서비스한다고 하였을 때 이에 대한 대응이 필요합니다. 이런 오류 하나하나가 사용자의 어플리케이션에 대한 신뢰감을 떨어뜨릴 수 있기 때문입니다.

 

ChannelInterceptor 의 역할에 대한 문제

구현하다보니 ChannelInterceptor 외에 다른 방법이 없나 고민이 들었습니다. 일부 로직의 구현인데 모든 웹소켓 통신을 pre handle하고 있는 점이 마음에 걸립니다. 특히 현재는 채팅방 구독과 구독 해지로 채팅방 입퇴장을 감지하고 있지만, 만약 토픽이 다양해진다면 구독과 해지 로직이 반드시 채팅방에만 쓰이지 않수도 있기 때문입니다. 일단 기능 구현은 했지만 고민이 많이 남습니다.

 


Refs.