作者:Eddy 历史版本:1 最后编辑:龚清 更新时间:2024-11-20 15:41
适用版本:v3.3.6+;
dependency
producer依赖相关:
<dependency>
<groupId>com.lc.ibps.message</groupId>
<artifactId>ibps-message-producer-rabbit</artifactId>
</dependency>
<!--
<dependency>
<groupId>com.lc.ibps.message</groupId>
<artifactId>ibps-message-producer-kafka</artifactId>
</dependency>
-->
consumer依赖相关:
<dependency>
<groupId>com.lc.ibps.message</groupId>
<artifactId>ibps-message-consumer-rabbit</artifactId>
</dependency>
<!--
<dependency>
<groupId>com.lc.ibps.message</groupId>
<artifactId>ibps-message-consumer-kafka</artifactId>
</dependency>
-->
配置说明
平台目前支持的有rabbitmq和kafka,通过依赖选择使用哪种,文档以rabbitmq为例。消费者通过message中的MessageType来判断由哪个handler处理消息,该handler需要实现IMessageQueueHandler或者ICommandQueueHandler接口。
conf/ibps-message-consumer.xml:
<!-- mq 消息发送实现类注册 -->
<bean id="messageHandlerList" class="java.util.ArrayList">
<constructor-arg>
<list>
<ref bean="innerMessageQueueHandler" />
<ref bean="mailMessageQueueHandler" />
<ref bean="smsMessageQueueHandler" />
<ref bean="wechatMessageQueueHandler" />
</list>
</constructor-arg>
</bean>
<util:map id="messageQueueHandlers" map-class="java.util.HashMap">
<entry key="innerMessageQueueHandler" value-ref="innerMessageQueueHandler"/>
<entry key="mailMessageQueueHandler" value-ref="mailMessageQueueHandler"/>
<entry key="smsMessageQueueHandler" value-ref="smsMessageQueueHandler"/>
<entry key="wechatMessageQueueHandler" value-ref="wechatMessageQueueHandler"/>
</util:map>
<util:map id="commandQueueHandlers" map-class="java.util.HashMap">
<entry key="removeCacheCommandQueueHandler" value-ref="removeCacheCommandQueueHandler"/>
<entry key="syncDatasourceCommandQueueHandler" value-ref="syncDatasourceCommandQueueHandler"/>
</util:map>
API说明
IMessageQueueProducer:
/**
* 发送消息到队列中
* @param message
*/
public void push(T message);
MessageQueueProductorUtil:
/**
* 发送多种消息到消息队列
*
* @param msgTypes 消息类型集合,{@link com.lc.ibps.cloud.mq.core.constants.MessageType}
* @param receivers 接收人集合,用户ID/邮箱/手机号码/微信号
* @param receiverNames 接收人姓名集合,与接收人集合一一对应
* @param subject 消息标题
* @param content 消息内容
* @param contentType 消息类型,{@link com.lc.ibps.cloud.mq.core.constants.ContentType}
*/
public static void send(List<String> msgTypes, List<String> receivers, List<String> receiverNames,
String subject, String content, String contentType)
/**
* 发送指定类型消息到消息队列
*
* @param msgType 消息类型,{@link com.lc.ibps.cloud.mq.core.constants.MessageType}
* @param receivers 接收人集合,用户ID/邮箱/手机号码/微信号
* @param receiverNames 接收人姓名集合,与接收人集合一一对应
* @param subject 消息标题
* @param content 消息内容
* @param contentType 消息类型,{@link com.lc.ibps.cloud.mq.core.constants.ContentType}
*/
public static void send(String msgType, List<String> receivers, List<String> receiverNames,
String subject, String content, String contentType)
/**
* 发送指定类型消息到消息队列
*
* @param msgType 消息类型,{@link com.lc.ibps.cloud.mq.core.constants.MessageType}
* @param receivers 接收人集合,用户ID/邮箱/手机号码/微信号
* @param receiverNames 接收人姓名集合,与接收人集合一一对应
* @param typeKey 消息模版标识
* @param vars 消息变量
*/
public static void sendByTemplate(String msgType, List<String> receivers, List<String> receiverNames,
String typeKey, Map<String, Object> vars)
/**
* 发送多种消息到消息队列
*
* @param sender 发送人
* @param senderName 发送人姓名
* @param msgTypes 消息类型集合,{@link com.lc.ibps.cloud.mq.core.constants.MessageType}
* @param receivers 接收人集合,用户ID/邮箱/手机号码/微信号
* @param receiverNames 接收人姓名集合,与接收人集合一一对应
* @param subject 消息标题
* @param content 消息内容
* @param contentType 消息类型,{@link com.lc.ibps.cloud.mq.core.constants.ContentType}
*/
public static void send(String sender, String senderName,
List<String> msgTypes, List<String> receivers, List<String> receiverNames,
String subject, String content, String contentType) {
if(BeanUtils.isEmpty(msgTypes))
/**
* 发送指定类型消息到消息队列
*
* @param sender 发送人
* @param senderName 发送人姓名
* @param msgType 消息类型,{@link com.lc.ibps.cloud.mq.core.constants.MessageType}
* @param receivers 接收人集合,用户ID/邮箱/手机号码/微信号
* @param receiverNames 接收人姓名集合,与接收人集合一一对应
* @param subject 消息标题
* @param content 消息内容
* @param contentType 消息类型,{@link com.lc.ibps.cloud.mq.core.constants.ContentType}
*/
public static void send(String sender, String senderName,
String msgType, List<String> receivers, List<String> receiverNames,
String subject, String content, String contentType,
String typeKey, Map<String, Object> vars)
/**
* 发送指定类型消息到消息队列
*
* @param sender 发送人
* @param senderName 发送人姓名
* @param msgType 消息类型,{@link com.lc.ibps.cloud.mq.core.constants.MessageType}
* @param receivers 接收人集合,用户ID/邮箱/手机号码/微信号
* @param receiverNames 接收人姓名集合,与接收人集合一一对应
* @param typeKey 消息模版标识
* @param vars 消息变量
*/
public static void sendByTemplate(String sender, String senderName,
String msgType, List<String> receivers, List<String> receiverNames,
String typeKey, Map<String, Object> vars) {
if (StringUtil.isBlank(typeKey))
IQueueConsumer:
/**
* 消费队列中的消息
* @param message
*/
public void popup(M message);
使用示例
MessageQueueProductorUtil: // producer
MessageQueueProductorUtil.send("-1", "System", MessageType.MAIL.value(),
receivers, null, subject, content,
ContentType.HTML.name(), null, null);
InnerMessageQueueHandler:// consumer
...
@Override
public void send(M message) {
logger.debug(">>>>>>>>>>>>>>>>>>>>>starting to send {} message>>>>>>>>>>>>>>>>>>", getMessageType());
if(BeanUtils.isEmpty(message)
|| BeanUtils.isEmpty(message.getSenderId())
|| BeanUtils.isEmpty(message.getReceivers())
|| BeanUtils.isEmpty(message.getTemplate())
)
{
return;
}
MessageQueueConsumerUtil.transfer(message);
if (innerMessageDomain != null) {
InnerMessageVo vo = new InnerMessageVo();
vo.setSubject(message.getTemplate().getSubject());
vo.setContent(message.getTemplate().getContent());
vo.setIsPublic(InnerMessageVo.IS_PUBLIC_NO);
vo.setCanreply(InnerMessagePo.IS_REPLY_NO);
vo.setMessageType(InnerMessagePo.TYPE_SYSTEM);
vo.setReceiverId(StringUtil.join(message.getReceivers(), StringPool.COMMA));
vo.setReceiver(StringUtil.join(message.getReceiverNames(), StringPool.COMMA));
innerMessageDomain.send(vo, message.getSenderId(), message.getSenderName());
logger.debug("<<<<<<<<<<<<<<<<<<<<<ending to send {} message<<<<<<<<<<<<<<<<<<<<<", getMessageType());
}
}
...