1.五种消息类型介绍
普通消息:普通消息也称为并发消息,和传统的队列相比,并发消息没有顺序, 但是生产消费都是并行进行的,单机性能可达十万级别的TPS。
分区有序消息:与Kafka中的分区类似,把一个Topic消息分为多个分区“保 存”和消费,在一个分区内的消息就是传统的队列,遵循FIFO(先进先出)原则。
全局有序消息:如果把一个 Topic 的分区数设置为 1,那么该 Topic 中的消息 就是单分区,所有消息都遵循FIFO(先进先出)的原则。
延迟消息:消息发送后,消费者要在一定时间后,或者指定某个时间点才可以消 费。在没有延迟消息时,基本的做法是基于定时计划任务调度,定时发送消息。在 RocketMQ中只需要在发送消息时设置延迟级别即可实现。
事务消息:主要涉及分布式事务,即需要保证在多个操作同时成功或者同时失败 时,消费者才能消费消息。RocketMQ通过发送Half消息、处理本地事务、提交 (Commit)消息或者回滚(Rollback)消息优雅地实现分布式事务。
消息类型 | 优点 | 缺点 | 备注 |
---|---|---|---|
普通消息(并发消息) | 性能最好 | 消息的生产和消费都是无序 | 大部分场景适用 |
分区有序消息 | 单分区中消息有序,单机发送TPS万级别 | 单点问题。如果Broker宕机,则会导致发送失败 | 大部分有序消息场景适用 |
全局有序消息 | 类似传统的Queue,全部消息有序,单机发送TPS千级别 | 单点问题。如果Broker宕机,则会导致发送失败 | 极少场景使用 |
延迟消息 | RocketMQ自身支持,不需要额外使用组件,支持延迟特性 | 不能根据任意时间延迟,使用范围受限。Broker随着延迟级别增大支持越多,CPU压力越大延迟时间不准确 | 非精确、延迟级别不多的场景,非常方便使用 |
事务消息 | Rocket MQ自身支持,不需要额外使用组件支持事务特性 | RocketMQ事务是生产者事务,只有生产者参与,如果消费者处理失败则事务失效 | 简单事务处理可以使用 |
2.
//普通消息类型 @Slf4j public class MessageType1 { public static void main(String[] args) { //DefaultMQProducer用于发送非事务消息 DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); //注册NameServer地址 producer.setNamesrvAddr("172.16.247.3:9876"); //异步发送失败后Producer自动重试2次 producer.setRetryTimesWhenSendAsyncFailed(2); try { //启动生产者实例 producer.start(); //消息数据 String data = "{\"title\":\"2022年第四季度汇总数据\"}"; //消息主题 Message message = new Message("tax-data", "2022S4", "3333", data.getBytes()); //发送结果 SendResult result = producer.send(message); log.info("Broker响应:" + result); } catch (Exception e) { e.printStackTrace(); } finally { try { //关闭连接 producer.shutdown(); log.info("连接已关闭"); } catch (Exception e) { e.printStackTrace(); } } } }
代码返回结果:
16:58:40.090 [main] INFO com.fblinux.rocketmq.mtype.MessageType1 - Broker响应: SendResult [ sendStatus=SEND_OK, msgId=7F000001AA3018B4AAC231FDDA4C0000, offsetMsgId=AC10F70300002A9F0000000000234061, messageQueue=MessageQueue [topic=tax-data, brokerName=broker-a, queueId=13], queueOffset=625 ]
返回结果解析:
sendStatus:发送状态,SEND_OK代表成功
msgId:消息由RocketMQ分配的全局唯一Id,由 producer客户端生成,调用方法MessageClientIDSetter.createUniqID()生成全局唯一的Id
offsetMsgId:Broker 服务端将消息追加到内存后会返回其物理偏移量,即在 commitlog 文件中的偏移量,然后会生成一个Id
messageQueue:消息队列内容
- topic:主题名称
- brokerName:broker服务器名字,在RocketMQ xxx.propertites配置文件中brokerName项定义
- queueId:queueId队列Id,默认会初始化4个(0-3)
3.有序消息
假设没有有序消息时有什么问题?
如果某一笔业务分为多条普通消息同时发送,消费者无法保证按生产者预期的顺序进行消费, 进而导致代码逻辑错误。
举例:一个电商下单流程,分为创建订单、扣减库存、加积分。要是消费者无序消费就可能是先扣减库存发货,再给用户加积分,最后在创建订单扣款,导致业务逻辑出现问题。
3.1.分区有序消息
分区有序消息:与Kafka中的分区类似,把一个Topic消息分为多个分区“保存”和消费,在一个分区内的消息就是传统的队列,遵循FIFO(先进先出)原则。
Consumer消费时通过一个分区只能有一个线程消费的方式来保证消息顺序。
RocketMQ 有序消息需要两点调整:
- 生产者端要求按id等唯一标识分配消息队列
- 消费者端采用专用的监听器保证对队列的单线程应用
发送分区顺序消息
@Slf4j //发送分区顺序消息 public class MessageType2 { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("192.168.31.103:9876"); producer.setRetryTimesWhenSendAsyncFailed(2); try { producer.start(); Integer id = 4465; String data = "{\"id\":" + id + " , + \"title\":\"2022年第四季度汇总数据\"}"; Message message = new Message("tax-data", "2022S4", id.toString(), data.getBytes(RemotingHelper.DEFAULT_CHARSET)); //分区有序消息最大的区别便是调用send方法时,需要实现MessageQueueSelector接口,确定使用哪个队列投递消息 SendResult result = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { log.info("当前队列数量:" + mqs.size() + ",明细:" + mqs.toString()); log.info("Message对象:" + msg.toString()); int dataId = Integer.parseInt(msg.getKeys()); int index = dataId % mqs.size(); MessageQueue messageQueue = mqs.get(index); log.info("分区队列:" + messageQueue); return messageQueue; } }, null); log.info("消息已发送:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus()); } catch (Exception e) { e.printStackTrace(); } finally { try { producer.shutdown(); System.out.println("连接已关闭"); } catch (Exception e) { e.printStackTrace(); } } } }
3.2.全局有序消息
在实现MessageQueueSelector接口时,固定选择某个队列就代表全局有序。注意:这里的全局有序 代表broker中全局有序。如果消息被分发到不同的broker中,不保证有序,当然这种使用方法是错误的。
3.3.消费者端调整
面对有序消息场景,消费者端最大的变化是registerMessageListener监听器要实例化MessageListenerOrderly对象,用于为每一个队列分配唯一的连接(线程)进行消费。
4.延迟消息
使用起来不灵活,内部规定了18个延时队列
延迟消息是指消息发送后,消费者要在一定时间后,或者指定某个时间点才可以消费。 在没有延迟消息时,基本的做法是基于定时计划任务调度,定时发送消息。在 RocketMQ中 只需要在发送消息时设置延迟级别即可实现。
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m
Broker在启动时,内部会创建一个内部主题:SCHEDULE_TOPIC_XXXX,根据延迟level的个 数,创建对应数量的队列,也就是说18个level对应了18个队列。注意,这并不是说这个内部主题只会有18个队列,因为Broker通常是集群模式部署的,因此每个节点都有18个队列。
4.1.底层原理
1、修改消息Topic名称和队列信息
RocketMQ Broker端在存储生产者写入的消息时,首先都会将其写入到CommitLog中。之 后将Topic的名称修改为SCHEDULE_TOPIC_XXXX,并根据延迟级别确定要投递到哪个队列下。
2、转发消息到延迟主题的CosumeQueue中
CommitLog中的消息转发到CosumeQueue中是异步进行的。在转发过程中,会对延迟消息 进行特殊处理,主要是计算这条延迟消息需要在什么时候进行投递。
3、延迟服务消费SCHEDULE_TOPIC_XXXX消息
Broker内部有一个ScheduleMessageService类,其充当延迟服务,消费SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中。ScheduleMessageService在启动时,其会创建一个定时器Timer,并根据延迟级别的个数,启动对应数量的TimerTask,每个TimerTask负责一个延迟级别的消费与投递。
4、将信息重新存储到CommitLog中
在将消息到期后,需要投递到目标Topic。由于在第一步已经记录了原来的Topic和队列信息,因此这里重新设置,再存储到CommitLog即可。此外,由于之前Message Tag HashCode字段存储的是消息的投递时间,这里需要重新计算tag的哈希值后再存储
5、将消息投递到目标Topic中
这一步与第二步类似,不过由于消息的Topic名称已经改为了目标Topic。因此消息会直接投递到目标Topic的ConsumeQueue中,之后消费者即消费到这条消息。
6、消费者消费目标topic中的数据
4.2.
//发送延迟消息 public class MessageType3 { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("172.16.247.3:9876"); producer.setRetryTimesWhenSendAsyncFailed(2); try { producer.start(); long id = 4466L; String data = "{\"id\":" + id + " , + \"title\":\"2022年第四季度汇总数据\"}"; Message message = new Message("tax-data", "2022S4", data.getBytes(RemotingHelper.DEFAULT_CHARSET)); //1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h message.setDelayTimeLevel(3); SendResult result = producer.send(message); System.out.println("消息已发送:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus()); } catch (Exception e) { e.printStackTrace(); } finally { try { producer.shutdown(); System.out.println("连接已关闭"); } catch (Exception e) { e.printStackTrace(); } } } }
5.事务消息
5.1.事物消息解决的问题
先看一个工作场景,
订单ID1030被创建后要保存到数据库,同时该1030订单通过MQ投 递给其他系统进行消费。如果要保证订单数据入库与消息投递状态要保证最终一致,要怎么做?
这里有两种常见做法:
第一种,先写库,再发送数据
//伪代码 //插⼊1030号订单 orderDao.insert(1030,order); //向1030号订单新增3条订单明细,10081-10083, orderDetailDao.insert(10081,1030,orderDetail1); orderDetailDao.insert(10082,1030,orderDetail2); orderDetailDao.insert(10083,1030,orderDetail3); //向MQ发送数据,如果数据发送失败 SendResult result = producer.send(orderMessage) if(result.getState().equals("SEND_OK"))){ connection.commit(); }else{ connection.rollback(); }
如果⽣产者发送消息时,因为⽹络原因导致10秒消息才返回SendResult结果,这就意味这10秒内数据 库事务⽆法提交,⼤量并发下,数据库连接资源会在这10秒内迅速耗尽,后续请求进⼊连接池等待状 态,最终导致系统停⽌响应。
第二种,先发消息,在写库
//伪代码 //向MQ发送数据,如果数据发送失败 SendResult result = producer.send(orderMessage) if(result.getState().equals("SEND_OK"))){ //插入1030号订单 orderDao.insert(1030,order); //向1030号订单新增3条订单明细,10081-10083, orderDetailDao.insert(10081,1030,orderDetail1); orderDetailDao.insert(10082,1030,orderDetail2); orderDetailDao.insert(10083,1030,orderDetail3); connection.commit; }
问题更严重,因为消息已经被发送了,消费者可以立即消费,比如下游消费者为1030订单自动设 置了“快递信息”,可是如果后续orderDao向数据库插入数据产生异常导致业务失败。我们还需 要再次发送“取消1030订单”的消息把下游1030订单分配的“快递信息”给撤销,这些都是在业务层面上的额外处理,这无疑提高了对程序员的要求与处理的难度。
那有没有什么方式可以既不阻塞数据库事务,也能保证最终一致性呢?有,RocketMQ提供了事务消息可以保障应用本地事务与MQ最终一致性。
5.2.代码实现
发出事务消息代码:
@Data @Slf4j public class MessageType4 { public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException { //事务消息一定要使用TransactionMQProducer事务生产者创建 TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name1"); //从NameServer获取配置数据 producer.setNamesrvAddr("172.16.247.3:9876"); //CachedThreadPool线程池用于回查事务数据 ExecutorService cachedThreadPool = Executors.newCachedThreadPool(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("check-transaction-thread"); return thread; } }); //将生产者与线程池绑定 producer.setExecutorService(cachedThreadPool); //绑定事务监听器,用于执行代码 TransactionListener transactionListener = new OrderTransactionListenerImpl(); producer.setTransactionListener(transactionListener); //启动生产者 producer.start(); //创建消息对象 Message msg = new Message("order", "order-1030", "1030", "1030订单与明细的完整JSON数据(略)".getBytes()); //一定要调用sendMessageInTransaction发送事务消息 //参数1:消息对象 //参数2:其他参数,目前用不到 producer.sendMessageInTransaction(msg, null); } }
处理本地事务业务代码:
@Data @Slf4j public class OrderTransactionListenerImpl implements TransactionListener { @Override //执行本地事务代码 public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { log.info("正在执行本地事务,订单编号:" + msg.getKeys()); /* 伪代码 try{ //插入1030号订单 orderDao.insert(1030,order); //向1030号订单新增3条订单明细,10081-10083, orderDetailDao.insert(10081,1030,orderDetail1); orderDetailDao.insert(10082,1030,orderDetail2); orderDetailDao.insert(10083,1030,orderDetail3); connection.commit(); //返回Commit,消费者可以消费1030订单消息 return LocalTransactionState.COMMIT_MESSAGE; }catch(Exception e){ //返回Rollback,Broker直接将数据删除,消费者不能收到1030订单消息 connection.rollback(); return LocalTransactionState.ROLLBACK_MESSAGE; } */ log.info("模拟网络中断,Broker并未收到生产者本地事务状态回执,返回UNKNOW"); return LocalTransactionState.UNKNOW; } @Override //回查本地事务处理状态 public LocalTransactionState checkLocalTransaction(MessageExt msg) { String keys = msg.getKeys(); log.info("触发回查,正在检查" + keys + "订单状态"); /* 伪代码 Order order = orderDao.selectById(1030); if(order != null){ //查询到记录,代表数据库已处理成功,回查返回Commit,消费者可以消费1030订单消息 return LocalTransactionState.COMMIT_MESSAGE; }else{ //未查询到记录,代表数据库处理失败,回查返回Rollback,Broker直接将数据删除,消费者不能收到1030订单消息 return LocalTransactionState.ROLLBACK_MESSAGE; } */ log.info("回查结果," + keys + "订单已入库,发送Commit指令"); return LocalTransactionState.COMMIT_MESSAGE; } }
5.3.实验了解RocketMQ事务执行过程
5.3.1.标准流程:
1、producer.sendMessageInTransaction(msg, null); 执行成功
此时1030订单消息已被发送到MQ服务器(Broker),不过该消息在Broker此时状态为“half-message”,相当于存储在MQ中的“临时消息”,此状态下消息无法被投递给消费者。
OrderTransactionListenerImpl.executeLocalTransaction()执行本地事务。
当消息发送成功,紧接着生产者向本地数据库写数据,数据库写入后提交commit,同executeLocalTransaction方法返回COMMIT_MESSAGE,生产者会再次向MQ服务器发送一个commit提交消息,此前在Broker中保存1030订单消息状态就从“half-message”变为”已提交”,broker将消息发给下游的消费者处理。
5.3.2.异常流程1:
producer.sendMessageInTransaction(msg, null); 执行失败,抛出异常
此时没有任何消息被发出,本地事务也不会执行,除了报错外不会产生任何不一致。
5.3.3.异常流程2:
producer.sendMessageInTransaction(msg, null); 执行成功,本地事务执行失败
此时本地事务执行rollback回滚,数据库数据被撤销,同时executeLocalTransaction方法返 回ROLLBACK_MESSAGE代表回滚,生产者会再次向MQ服务器发送一个rollback回滚消息,此前在Broker中保存1030订单消息就会被直接删除,不会发送给消费者,本地事务也可以保证与MQ消息一致。
5.3.4.异常流程3:
producer.sendMessageInTransaction(msg, null); 执行成功,本地事务执行成功,但给Broker返回Commit消息时断网了,导致broker无法收到提交指令。
此时本地数据库订单数据已入库,但MQ因为断网无法收到生产者的发来的“commit”消息,1030订单数据一直处于“half message”的状态,消息无法被投递到消费者,本地事务与MQ消息的一致性被破坏。
RocketMQ为了解决这个问题,设计了回查机制,对于broker中的half message,每过一小段时间 就自动尝试与生产者通信,试图调用通
public LocalTransactionState checkLocalTransaction(MessageExt msg) { String keys = msg.getKeys(); log.info("触发回查,正在检查" + keys + "订单状态"); /* 伪代码 Order order = orderDao.selectById(1030); if(order != null){ //查询到记录,代表数据库已处理成功,回查返回Commit,消费者可以消费1030订单消息 return LocalTransactionState.COMMIT_MESSAGE; }else{ //未查询到记录,代表数据库处理失败,回查返回Rollback,Broker直接将数据删除,消费者不能收到1030订单消息 return LocalTransactionState.ROLLBACK_MESSAGE; } */ log.info("回查结果," + keys + "订单已入库,发送Commit指令"); return LocalTransactionState.COMMIT_MESSAGE; }
5.4.
转载请注明:西门飞冰的博客 » RocketMQ 五种消息类型实践