作者:Eddy  历史版本:1  最后编辑:龚清  更新时间:2024-11-20 15:41

适用版本:v3.3.6+;

dependency

producer依赖相关:

        <dependency>
            <groupId>com.lc.ibps.message</groupId>
            <artifactId>ibps-message-producer-rabbit</artifactId>
        </dependency>

        <!--
        <dependency>
            <groupId>com.lc.ibps.message</groupId>
            <artifactId>ibps-message-producer-kafka</artifactId>
        </dependency>
        -->

consumer依赖相关:

<dependency>
   <groupId>com.lc.ibps.message</groupId>
   <artifactId>ibps-message-consumer-rabbit</artifactId>
</dependency>

<!--
<dependency>
   <groupId>com.lc.ibps.message</groupId>
   <artifactId>ibps-message-consumer-kafka</artifactId>
</dependency>
-->

配置说明

平台目前支持的有rabbitmq和kafka,通过依赖选择使用哪种,文档以rabbitmq为例。消费者通过message中的MessageType来判断由哪个handler处理消息,该handler需要实现IMessageQueueHandler或者ICommandQueueHandler接口。

conf/ibps-message-consumer.xml:

<!-- mq 消息发送实现类注册 -->
<bean id="messageHandlerList" class="java.util.ArrayList">
   <constructor-arg>
      <list>
         <ref bean="innerMessageQueueHandler" />
         <ref bean="mailMessageQueueHandler" />
         <ref bean="smsMessageQueueHandler" />
         <ref bean="wechatMessageQueueHandler" />
      </list>
   </constructor-arg>
</bean>

<util:map id="messageQueueHandlers" map-class="java.util.HashMap">
   <entry key="innerMessageQueueHandler" value-ref="innerMessageQueueHandler"/>
   <entry key="mailMessageQueueHandler" value-ref="mailMessageQueueHandler"/>
   <entry key="smsMessageQueueHandler" value-ref="smsMessageQueueHandler"/>
   <entry key="wechatMessageQueueHandler" value-ref="wechatMessageQueueHandler"/>
</util:map>

<util:map id="commandQueueHandlers" map-class="java.util.HashMap">
   <entry key="removeCacheCommandQueueHandler" value-ref="removeCacheCommandQueueHandler"/>
   <entry key="syncDatasourceCommandQueueHandler" value-ref="syncDatasourceCommandQueueHandler"/>
</util:map>

API说明

IMessageQueueProducer:

    /**
     * 发送消息到队列中
     * @param message 
     */
    public void push(T message);

MessageQueueProductorUtil:

/**
 * 发送多种消息到消息队列
 *
 * @param msgTypes 消息类型集合,{@link com.lc.ibps.cloud.mq.core.constants.MessageType}
 * @param receivers    接收人集合,用户ID/邮箱/手机号码/微信号
 * @param receiverNames    接收人姓名集合,与接收人集合一一对应
 * @param subject  消息标题
 * @param content  消息内容
 * @param contentType  消息类型,{@link com.lc.ibps.cloud.mq.core.constants.ContentType}
 */
public static void send(List<String> msgTypes, List<String> receivers, List<String> receiverNames, 
      String subject, String content, String contentType)

    /**
     * 发送指定类型消息到消息队列
     *
     * @param msgType    消息类型,{@link com.lc.ibps.cloud.mq.core.constants.MessageType}
     * @param receivers    接收人集合,用户ID/邮箱/手机号码/微信号
     * @param receiverNames    接收人姓名集合,与接收人集合一一对应
     * @param subject     消息标题
     * @param content     消息内容
     * @param contentType     消息类型,{@link com.lc.ibps.cloud.mq.core.constants.ContentType}
     */
    public static void send(String msgType, List<String> receivers, List<String> receiverNames, 
            String subject, String content, String contentType)

    /**
     * 发送指定类型消息到消息队列
     *
     * @param msgType    消息类型,{@link com.lc.ibps.cloud.mq.core.constants.MessageType}
     * @param receivers    接收人集合,用户ID/邮箱/手机号码/微信号
     * @param receiverNames    接收人姓名集合,与接收人集合一一对应
     * @param typeKey     消息模版标识
     * @param vars     消息变量
     */
    public static void sendByTemplate(String msgType, List<String> receivers, List<String> receiverNames, 
            String typeKey, Map<String, Object> vars) 

    /**
     * 发送多种消息到消息队列
     *
     * @param sender     发送人
     * @param senderName     发送人姓名
     * @param msgTypes    消息类型集合,{@link com.lc.ibps.cloud.mq.core.constants.MessageType}
     * @param receivers    接收人集合,用户ID/邮箱/手机号码/微信号
     * @param receiverNames    接收人姓名集合,与接收人集合一一对应
     * @param subject     消息标题
     * @param content     消息内容
     * @param contentType     消息类型,{@link com.lc.ibps.cloud.mq.core.constants.ContentType}
     */
    public static void send(String sender, String senderName, 
            List<String> msgTypes, List<String> receivers, List<String> receiverNames, 
            String subject, String content, String contentType) {
        if(BeanUtils.isEmpty(msgTypes))

    /**
     * 发送指定类型消息到消息队列
     *
     * @param sender     发送人
     * @param senderName     发送人姓名
     * @param msgType    消息类型,{@link com.lc.ibps.cloud.mq.core.constants.MessageType}
     * @param receivers    接收人集合,用户ID/邮箱/手机号码/微信号
     * @param receiverNames    接收人姓名集合,与接收人集合一一对应
     * @param subject     消息标题
     * @param content     消息内容
     * @param contentType     消息类型,{@link com.lc.ibps.cloud.mq.core.constants.ContentType}
     */
    public static void send(String sender, String senderName, 
            String msgType, List<String> receivers, List<String> receiverNames, 
            String subject, String content, String contentType,
            String typeKey, Map<String, Object> vars)

    /**
     * 发送指定类型消息到消息队列
     *
     * @param sender     发送人
     * @param senderName     发送人姓名
     * @param msgType    消息类型,{@link com.lc.ibps.cloud.mq.core.constants.MessageType}
     * @param receivers    接收人集合,用户ID/邮箱/手机号码/微信号
     * @param receiverNames    接收人姓名集合,与接收人集合一一对应
     * @param typeKey     消息模版标识
     * @param vars     消息变量
     */
    public static void sendByTemplate(String sender, String senderName, 
            String msgType, List<String> receivers, List<String> receiverNames, 
            String typeKey, Map<String, Object> vars) {
        if (StringUtil.isBlank(typeKey)) 

IQueueConsumer:

/**
 * 消费队列中的消息
 * @param message 
 */
public void popup(M message);

使用示例

MessageQueueProductorUtil: // producer

MessageQueueProductorUtil.send("-1", "System", MessageType.MAIL.value(), 
                receivers, null, subject, content, 
                ContentType.HTML.name(), null, null);

InnerMessageQueueHandler:// consumer

...
    @Override
    public void send(M message) {
        logger.debug(">>>>>>>>>>>>>>>>>>>>>starting to send {} message>>>>>>>>>>>>>>>>>>", getMessageType());

        if(BeanUtils.isEmpty(message) 
                || BeanUtils.isEmpty(message.getSenderId()) 
                || BeanUtils.isEmpty(message.getReceivers())
                || BeanUtils.isEmpty(message.getTemplate())
                )
        {
            return;
        }

        MessageQueueConsumerUtil.transfer(message);

        if (innerMessageDomain != null) {
            InnerMessageVo vo = new InnerMessageVo();
            vo.setSubject(message.getTemplate().getSubject());
            vo.setContent(message.getTemplate().getContent());
            vo.setIsPublic(InnerMessageVo.IS_PUBLIC_NO);
            vo.setCanreply(InnerMessagePo.IS_REPLY_NO);
            vo.setMessageType(InnerMessagePo.TYPE_SYSTEM);
            vo.setReceiverId(StringUtil.join(message.getReceivers(), StringPool.COMMA));
            vo.setReceiver(StringUtil.join(message.getReceiverNames(), StringPool.COMMA));

            innerMessageDomain.send(vo, message.getSenderId(), message.getSenderName());

            logger.debug("<<<<<<<<<<<<<<<<<<<<<ending to send {} message<<<<<<<<<<<<<<<<<<<<<", getMessageType());
        }
    }

...