본문 바로가기
  • A space that records me :)
기술/RabbitMQ

[Spring Boot] RabbitMQ 사용

by yjkim_97 2021. 1. 6.

RabbitMQ란 ? 

2020/11/19 - [IT story] - [RabbitMQ] RabbitMQ란?

 

[RabbitMQ] RabbitMQ란?

2020 KT CPN 시스템 FM 서버 간에 노티 처리를 위해 사용 대표적으로 한 서버에서 캐시가 변동되면, 다른 서버에도 동일한 작업을 하도록 하기 위함이다. RabbitMQ 계념 RabbitMQ란? AMQP 프로토콜을 구현

yjkim97.tistory.com

 

이번 포스트에서는 RabbitMQ를 이용하여 물려있는 서버에게 노티를 보내주는 로직을 구현해 보았다.


RabbitMQ 서버는 이미 구현되어 있다고 가장한다.

 

config.properties.xml

spring.rabbitmq.host= #host
spring.rabbitmq.port= #port
spring.rabbitmq.username= #user name
spring.rabbitmq.password= #password
spring.rabbitmq.virtualhost= #virtualhost name

 

Configuration

RabbitMqConfig.java 생성

RabbitMQ 사용을 위한 환경설정파일이다.

package com.kt.tbb.iptv.coupon.config;

import java.util.Properties;

import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Base64UrlNamingStrategy;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.type.AnnotatedTypeMetadata;

import com.kt.tbb.iptv.coupon.framework.CpnConstants;
import com.kt.tbb.iptv.coupon.framework.event.CpnEventNotifier;
import com.kt.tbb.iptv.coupon.framework.event.CpnEventReceiver;
import com.kt.tbb.iptv.coupon.framework.event.listeners.BatchCanceledListener;
import com.kt.tbb.iptv.coupon.framework.event.listeners.BatchLeaderChangedListener;
import com.kt.tbb.iptv.coupon.framework.event.listeners.CacheChangedListener;
import com.kt.tbb.iptv.coupon.framework.event.listeners.ProgramCacheListener;

import net.sf.ehcache.CacheManager;

@Configuration
@Conditional(RabbitMqConfig.NotificationMqConfigCondition.class)
public class RabbitMqConfig implements InitializingBean
{
    @Value("${cpn.process.name:cpnqueue.prefix}")
    private String processName;

    public static final class NotificationMqConfigCondition implements Condition
    {
        static Boolean turnTheCpnEventOn = null;

        @Override
        public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata)
        {
            if (turnTheCpnEventOn != null)
            {
                return turnTheCpnEventOn.booleanValue();
            }
            Properties properties = CpnConstants.getConfigProperties();
            String property = properties.getProperty("turn.cpn.event.off");
            turnTheCpnEventOn = StringUtils.equalsIgnoreCase(property, "false") == true;
            return turnTheCpnEventOn;
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception //NOSONAR
    {
        NotificationMqConfigCondition.turnTheCpnEventOn = null;
    }

    // ============ Exchange 설정 - 메시지 라우팅 패턴 설정 =============
    @Bean
    public FanoutExchange declareFanoutExchange()
    {
        return new FanoutExchange(CpnConstants.NOTIFICATION_EXCHANGE_NAME, false, false);
    }

    // ============ Consumer 설정 - 메시지를 받는 쪽 ====================
    private class ReceiverConfig
    {
        // 현재 서버가 사용할 큐의 이름을 정의한다.
        @Bean
        public Queue notificationReceiveQueue()
        {
            return new AnonymousQueue(new Base64UrlNamingStrategy(processName + "-"));
        }
        
        // Exchange에서 사용될 패턴을 설정한다.
        @Bean
        public Binding fanoutClientBinding(FanoutExchange fanout, Queue notificationReceiveQueue)
        {
            return BindingBuilder.bind(notificationReceiveQueue).to(fanout);
        }
        
        // 리시버 클래서 Bean등록
        @Bean
        public CpnEventReceiver receiver()
        {
            return new CpnEventReceiver();
        }
    }

    @Bean
    public CpnEventNotifier notificationMessageSender()
    {
        return new CpnEventNotifier();
    }

    private class EventListenerConfig
    {
        @Bean
        public CacheChangedListener cacheChangedListener(CacheManager ehcache)
        {
            return new CacheChangedListener(ehcache);
        }

        @Bean
        public BatchCanceledListener batchCanceledListener()
        {
            return new BatchCanceledListener();
        }

        @Bean
        public BatchLeaderChangedListener batchLeaderChangedListener()
        {
            return new BatchLeaderChangedListener();
        }
        
        
        @Bean
        public ProgramCacheListener programCacheListener(CacheManager ehcache)
        {
            return new ProgramCacheListener(ehcache);
        }

    }
}
FanoutExchange
Exchange
Exchange가 큐에 바인딩하기 위한 fanout 규칙을 정의한다. Exchange에 연결된 모든 큐에 바인딩한다.

ReceiveConfig
Queue

지정된 이름으로 Queue를 등록시킨다. 서로다른 이름으로 여러개의 큐를 등록할 수 있다. 
Binding
Exchange가 Queue에게 메시지를 전달하기 위한 룰이다. 빈으로 등록한 Queue와 Exchange를 바인딩하면서 Exchange에서 사용될 패턴을 설정해주는 것으로 여기서는 fanout (연결된 모든 큐)패턴을 사용한다.

https://docs.spring.io/spring-amqp/docs/current/api/org/springframework/amqp/core/FanoutExchange.html

 

FanoutExchange (Spring AMQP 2.3.2 API)

 

docs.spring.io

 

Message Listener

EventReceiver.java

Queue의 이름을 기반으로 Consumer가 메시지를 받는 리스너이다.

package com.kt.tbb.iptv.coupon.framework.event;

import java.util.List;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;

public class CpnEventReceiver
{
    private static final Logger logger = LoggerFactory.getLogger(CpnEventReceiver.class);

    @Autowired(required = false)
    private List<CpnEventListener> listeners;

    // RabbitListener 어노테이션으로 큐를 바인딩한다.
    @RabbitListener(queues = "#{notificationReceiveQueue.name}")
    public void receive(String message)
    {
        logger.debug("received message:{}", message);
        
        // 큐에 접근하여 메시지를 가져온후 처리 =======================
        if (ObjectUtils.isEmpty(this.listeners))
        {
            return;
        }

        CpnEvent notificationMessage = CpnEventConverter.fromJson(message);

        if (notificationMessage == null)
        {
            logger.warn("Invalid message was arrived. message : {}", message);
            return;
        }

        for (CpnEventListener l : listeners)
        {
            if (l.isAcceptable(notificationMessage.getEventType()))
            {
                l.messageReceived(notificationMessage);
            }
        }
    }
}
@RabbitListener
RabbitListener 어노테이션을 사용하여 해당 이름을 가진 큐에서 메시지를 가져오도록 설정해준다.

본문의 시스템에서의 메시지는 사용자정의된 클래스의 정보를 담고있는 JsonString형태로 이루어져있다.
이 CpnEventReceiver 클래스에서 Queue에서 메시지를 가져와 해당되는 알맞은 작업을 처리한다.

 

'기술 > RabbitMQ' 카테고리의 다른 글

[RabbitMQ] RabbitMQ 기본 개념  (0) 2020.11.19