티스토리 뷰

Spring

WebSocket 적용 및 테스트

토마's 2019. 12. 24. 14:17

최근 적용한 WebSocket에 대해서 포스팅 해보려고 합니다.

시작하며

최근 프로젝트 개발 및 유지보수 작업을 진행하면서, 알림 팝업창을 띄우는 방식에서 이슈가 있다는 것을 느꼈습니다. 그건 바로 폴링 방식으로 매번 프론트엔드 소스에서 신규 데이터가 있는지를 체크 하고 조회하는 형태로 구현이 되어 있었고, 바로 아래와 같이 신규 데이터가 있던 없던 간에 1초 마다 API를 call 하는 형태였습니다. 

 

 

이런 형태의 폴링 방식은 부하를 가져올 것이라는 것을 예상했고, 이번 고도화 작업을 통해 웹 소켓(WebSocket) + STOMP를 적용해 신규 데이터가 있을 때만, 수신하고 있는 유저에게 팝업을 띄워주는 방식으로 적용하는 과정에 대해서 이야기 해보려고 합니다.

구성 단계

기본적으로 웹 소켓을 사용할 수 있도록 디펜던시를 pom.xml에 추가하고,

<!-- https://mvnrepository.com/artifact/org.springframework/spring-websocket -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-websocket</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/org.springframework/spring-messaging -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-messaging</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/org.eclipse.jetty.websocket/javax-websocket-client-impl -->
<dependency>
    <groupId>org.eclipse.jetty.websocket</groupId>
    <artifactId>javax-websocket-client-impl</artifactId>
</dependency>

WebSocket 구성을 위한 config 파일을 만들어 WebSocket을 사용할 수 있도록 구성했습니다.

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
 
    // https://www.baeldung.com/spring-security-websockets
    // https://supawer0728.github.io/2018/03/30/spring-websocket/
 
    /**
     * Configure message broker.
     *
     * @param config the config
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        // eg. To invoke @MessageMapping("/test"), send message's destination is /app/test
        config.setApplicationDestinationPrefixes("/app");
 
        // eg. Subscribe message's destination is /topic/{value} and use @SendTo annotation in controller
        // eg. Subscribe message's destination is /user/queue/{value} and use @SendToUser annotation in controller
        config.enableSimpleBroker("/topic", "/queue");
    }
 
    /**
     * Register stomp endpoints.
     *
     * @param registry the registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").setAllowedOrigins("*").addInterceptors(new HttpHandshakeInterceptor());
        registry.addEndpoint("/websocket").setAllowedOrigins("*").addInterceptors(new HttpHandshakeInterceptor()).withSockJS();
    }
 
    /**
     * Configure web socket transport.
     *
     * @param registration the registration
     */
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setMessageSizeLimit(160 * 64 * 1024);    // Max incoming message size, default : 64 * 1024
        registration.setSendTimeLimit(20 * 10000);            // default : 10 * 10000
        registration.setSendBufferSizeLimit(10 * 512 * 1024); // Max outgoing buffer size, default : 512 * 1024
    }
}

여기서 클라이언트에서 End point를 /websocket으로 해서 구독을 할 수 있도록 설정했고, 다들 아시겠지만 구독 중인 전체 사용자 대상으로 통신을 전달할 /topic과 단일로 연결된 대상에게 통신을 전달할 /queue를 구독할 수 있도록 설정했습니다. 또한, WebSecurityConfig에서 End Point 가 접근할 수 있도록 허용하도록 코드를 수정했습니다.

... (생략)
.antMatchers("/websocket/**").permitAll()
... (생략)

그리고 기본적으로 Interaction 프로세스가 어떠한 방법으로 진행될지에 대해 먼저 정리를 한번 해주는게 좋을 것 같아 UML(Unified Modeling language)을 이용해 그려보았습니다.

 

위와 같은 형태로 유저가 로그인을 통해 websocket 커넥션이 맺어지게 되면, 로그인 인증을 했던 토큰에 대한 검증을 진행 후, 해당 유저와의 통신을 위한 통로(destination)를 전달하고 유저는 해당 통로를 이용해 구독(subscribe)을 하게 됩니다. 그리고 이벤트가 발생하면 해당 통로를 통해 이벤트 데이터를 전달하게 됩니다.

 

기본 구성은 끝났으니, 웹 소켓이 연결이 되었을 때, 연결 요청에 대한 전처리 및 후처리를 담당하는 핸드셰이크 인터셉터를 만들어 요청의 세션 정보를 같이 넘겨주도록 했습니다.

public class HttpHandshakeInterceptor implements HandshakeInterceptor {
 
    /**
     * Before handshake boolean.
     *
     * @param request    the request
     * @param response   the response
     * @param wsHandler  the ws handler
     * @param attributes the attributes
     * @return the boolean
     * @throws Exception the exception
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
                                   Map<String, Object> attributes) throws Exception {
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
            HttpSession session = servletRequest.getServletRequest().getSession();
 
            attributes.put("sessionId", session.getId());
        }
 
        return true;
    }
 
    /**
     * After handshake.
     *
     * @param request   the request
     * @param response  the response
     * @param wsHandler the ws handler
     * @param exception the exception
     */
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
                               WebSocketHandler wsHandler, Exception exception) {
        // nothing to do.
    }
}

그리고나서, 웹 소켓 연결이 이루어 지거나, 연결이 끊어지는 등의 이벤트가 발생했을 때의 처리 과정, 유저 세션 정보 등록, 세션 정보가 유효할 때의 정보를 조회 및 특정 사용자에게 메시지를 전달하기 위한 헤더를 만들어주는 메소드 등을 다음과 같이 리스너(Listener)를 통해서 구현을 진행했습니다.

@Component
public class EventListener {
 
    private static final Logger logger = LoggerFactory.getLogger(EventListener.class);
 
    private static Map<String, BrowserSession> browserSessionMap = new ConcurrentHashMap<String, BrowserSession>();
 
    /**
     * Handle session connected events.
     *
     * @param event the event
     */
    @EventListener
    public void handleWebSocketConnectListener(SessionConnectedEvent event) {
        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
 
        //GenericMessage msg = (GenericMessage) headerAccessor.getMessageHeaders().get("simpConnectMessage");
 
        logger.info("Received a new web socket connection. Session ID : [{}]", headerAccessor.getSessionId());
    }
 
    /**
     * Handle session disconnected events.
     *
     * @param event the event
     */
    @EventListener
    public void handleWebSocketDisConnectListener(SessionDisconnectEvent event) {
        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
 
        String sessionId = findBrowserSessionId(headerAccessor.getSessionId());
        if(sessionId != null) {
            browserSessionMap.remove(headerAccessor.getSessionId());
        }
 
        logger.info("Web socket session closed. Message : [{}]", event.getMessage());
    }
 
    /**
     * Find session id by session id.
     *
     * @param sessionId
     * @return
     */
    public String findBrowserSessionId(String sessionId) {
        String session = null;
 
        for (Map.Entry<String, BrowserSession> entry : browserSessionMap.entrySet()) {
            if (entry.getKey().equals(sessionId)) {
                session = entry.getKey();
            }
        }
 
        return session;
    }
 
    /**
     * Register browser session.
     *
     * @param browserSession the browser session
     * @param sessionId      the session id
     */
    public synchronized void registerBrowserSession(BrowserSession browserSession, String sessionId) {
        browserSessionMap.put(sessionId, browserSession);
    }
 
    /**
     * Find session ids by user name list.
     *
     * @param username the member id
     * @return the list
     */
    public List<String> findSessionIdsByMemberId(String username) {
        List<String> sessionIdList = new ArrayList<String>();
 
        for (Map.Entry<String, BrowserSession> entry : browserSessionMap.entrySet()) {
            if (entry.getValue().getUserId().equals(username)) {
                sessionIdList.add(entry.getKey());
            }
        }
 
        return sessionIdList;
    }
 
    /**
     * Create headers message headers.
     *
     * @param sessionId the session id
     * @return the message headers
     */
    public MessageHeaders createHeaders(String sessionId) {
        SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
        headerAccessor.setSessionId(sessionId);
        headerAccessor.setLeaveMutable(true);
 
        return headerAccessor.getMessageHeaders();
    }
}

클라이언트에서 연결이 이루어졌을 때, 세션에 대한 핸들링 처리를 하기 위해  Session Handler를 만들었으며 afterConnected를 구현해 연결이 이루어졌을 때 구독을 하도록 설정했습니다. 기본적으로 StompSessionHandlerAdaptor를 상속 받아서 afterConnected, handlerException, getPayloadType, handleFrame 등을 override 해야 됩니다.

@Component
public class SessionHandler extends StompSessionHandlerAdapter {
 
    /**
     * The constant logger.
     */
    private static final Logger logger = LoggerFactory.getLogger(SessionHandler.class);
 
    /**
     * The Session.
     */
    private StompSession session;
 
    /**
     * After connected.
     *
     * @param session          the session
     * @param connectedHeaders the connected headers
     */
    @Override
    public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
        logger.info("New session established : " + session.getSessionId());
        this.session = session;
 
        subscribe(WS_QUEUE_USER + WS_QUEUE_REPLY);
        subscribe(WS_QUEUE_USER + WS_QUEUE_ERROR);
    }
 
    /**
     * Handle exception.
     *
     * @param session   the session
     * @param command   the command
     * @param headers   the headers
     * @param payload   the payload
     * @param exception the exception
     */
    @Override
    public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
        logger.error("Got an exception. Reason : {}", exception.getMessage());
        logger.error("StompHeaders : [{}], Payload : [{}]", headers, new String(payload));
    }
 
    /**
     * Gets payload type.
     *
     * @param headers the headers
     * @return the payload type
     */
    @Override
    public Type getPayloadType(StompHeaders headers) {
        return SimpleJsonResponse.class;
    }
 
    /**
     * Handle frame.
     *
     * @param headers the headers
     * @param payload the payload
     */
    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        SimpleJsonResponse msg = (SimpleJsonResponse) payload;
        logger.info("Received Message : [{}]", msg);
    }
 
    /**
     * Subscribe.
     *
     * @param destination the destination
     */
    public synchronized void subscribe(String destination) {
        session.subscribe(destination, this);
        logger.debug("[{}] Subscribed.", destination);
    }
}

마지막으로 웹 소켓 통신 처리를 위한 컨트롤러(Controller)를 만들어, 요청에 대한 처리를 할 수 있도록 구성 해줍니다. 로그인을 하게 되면 새로운 웹 소켓 통신을 할 수 있는 통로를 클라이언트로 반환해주면서 해당 통로로 연결된 유저는 응답 값을 받을 수 있도록 설정했습니다.

@Controller
public class WebSocketController {
 
    /**
     * The constant logger.
     */
    private static final Logger logger = LoggerFactory.getLogger(WebSocketController.class);
 
    /**
     * The Session listener.
     */
    @Autowired
    private EcotrolEventListener sessionListener;
 
    @Autowired
    private MemberService memberService;
 
    @Autowired
    private JwtTokenUtil jwtTokenUtil;
 
    @MessageMapping("/test")
    public void test(SimpMessageHeaderAccessor headerAccessor, final SimpleJsonResponse message) {
        logger.info("Init - SessionID : [{}], Message : [{}]", headerAccessor.getSessionId(), message);
    }
 
    /**
     * <pre>
     * 로그인 한 사용자가 웹 소켓 통신을 할 수 있도록 url 정보를 리턴한다.
     * e.g) /user/queue/alarm/{uuid}
     * </pre>
     *
     * @param headerAccessor
     * @param message
     * @return
     */
    @MessageMapping("/login")
    @SendToUser(WS_QUEUE_ALARM)
    public SimpleJsonResponse login(SimpMessageHeaderAccessor headerAccessor, final SimpleJsonResponse message) {
        logger.info("Login - SessionID : [{}], Message : [{}]", headerAccessor.getSessionId(), message.toString());
 
        SimpleJsonResponse response = new SimpleJsonResponse(WS_CODE_LOGIN_RESPONSE);
        if (message.getResultCode() == WS_CODE_LOGIN) {
            try {
                String authToken = (String) message.getData();
 
                if ("REFRESH_TOKEN".equals(jwtTokenUtil.getAuthFromToken(authToken))) {
                    throw new Exception("token is not authorized.");
                }
 
                String username = jwtTokenUtil.getUsernameFromToken(authToken);
 
                MemberManagement member = memberService.getMember(username);
 
                if (member == null) {
                    throw new Exception("member does not exists.");
                }
 
                String uuid = headerAccessor.getSessionId();
 
                BrowserSession browserSession = new BrowserSession();
                browserSession.setSessionId(headerAccessor.getSessionId());
                browserSession.setUserId(member.getId());
                browserSession.setUuid(uuid);
 
                // register session
                sessionListener.registerBrowserSession(browserSession, headerAccessor.getSessionId());
 
                response.setStatus(true);
                response.setData(WS_QUEUE_USER + WS_QUEUE_ALARM + "/" + uuid);
            } catch (Exception e) {
                logger.error("Unhandled exception occurred while handle /app/login.", e);
 
                response.setStatus(false);
                response.setResultMessage(e.getMessage());
            }
        } else {
            response.setStatus(false);
            response.setResultMessage("Message code must be [" + WS_CODE_LOGIN + "] for /app/login.");
        }
 
        return response;
    }
 
    /**
     * Heartbeat.
     *
     * @param message the message
     * @throws Exception the exception
     */
    @MessageMapping("/heartbeat")
    public void heartbeat(SimpMessageHeaderAccessor headerAccessor, final SimpleJsonResponse message) throws Exception {
        logger.info("Heartbeat - SessionID : [{}], Message : [{}]", headerAccessor.getSessionId(), message);
    }
 
    /**
     * Handle exception string.
     *
     * @param exception
     * @return
     */
    @MessageExceptionHandler
    @SendToUser(WS_QUEUE_ERROR)
    public String handleException(Throwable exception) {
        return exception.getMessage();
    }
}

테스트

우선 백엔드 자바 코드 상으로 연결이 제대로 되는지 테스트를 진행하기 위해 간단한 메인 Method가 포함된 클래스를 다음과 같이 작성하고 서버 실행 후, 메인 Method를 실행해봤습니다. 기본적으로 로컬 상의 테스트이기 때문에 웹 소켓 연결 주소로는 "ws://localhost:8080/websocket" 으로 End Point 까지 구성한 대로 설정해주었습니다.

public class WebSocketClient {
 
    private static final String URL = "ws://localhost:8080/websocket";
 
    public static void main(String[] args) {
        org.springframework.web.socket.client.WebSocketClient client = new StandardWebSocketClient();
        WebSocketStompClient stompClient = new WebSocketStompClient(client);
        stompClient.setMessageConverter(new MappingJackson2MessageConverter());
 
        stompClient.connect(URL, new EcotrolSessionHandler());
 
        new Scanner(System.in).nextLine(); // Don't close immediately.
    }
}

그 결과, 다음과 같이 SessionHandler에서 로그로 찍어준 코드가 찍히면서, 새로운 세션 연결이 이루졌다는 것을 확인해보실 수 있습니다.

메시지가 정상적으로 전송이 되는지 정확히 테스트를 진행하기 위해 다음과 같이 SessionHandler의 afterConnected 메소드에 메시지 Object를 하나 만들고, 해당 메시지를 전송하도록 테스트 코드를 추가하고 테스트를 진행하면 아래와 같이 클라이언트에서 응답 값을 받아서 확인할 수 있습니다.

public void sendAlarmToMember(Member member) {
        List<String> sessionList = sessionListener.findSessionIdsByMemberId(member.getId());
 
        for (String sessionId : sessionList) {
            Message message = new Message("Today ME", "Hello world! :)");
 
            logger.info("Send Alarm To member : [{}], message : [{}]", member.getId(), message);
 
            // set websocket response message.
            SimpleJsonResponse response = new SimpleJsonResponse();
            response.setResultCode(WS_CODE_COMMAND_RESPONSE);
            response.setStatus(true);
            response.setData(message);
 
            // send websocket message
            simpMessagingTemplate.convertAndSendToUser(sessionId, WS_QUEUE_ALARM + "/" + sessionId,
                    response, sessionListener.createHeaders(sessionId));
        }
}

마치며

이것으로 웹소켓 적용 및 테스트에 대한 포스팅을 마치도록 하겠습니다.