1.安装
下载地址:
注意:4.8.0版本有bug,启动会失败,原因不明,换4.6.0成功
http://rocketmq.apache.org/docs/quick-start/
控制面板和链接示例地址:
https://github.com/apache/rocketmq-externals/releases
面板是springboot项目,打包之前需要修改端口和rocketmq的ip地址和端口。在properties文件里面
2.启动
先启动 ./mqnamesrv
再启动 ./mqbroker
./mqbroker -n localhost:9876 //需要指定nameserver的地址
启动之前需要修改几个配置文件
runserver.sh和runbroker.sh文件里面的内存大小
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
//修改为,因为rocketmq需要的内存要求太高了
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
3.关闭
bin/mqshutdown broker
bin/mqshutdown namesrv
4.服务生产者
4.1依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
4.2配置文件
rocketmq:
name-server: 192.168.109.131:9876 #rocketMQ服务的地址
producer:
group: shop-order # 生产者组
4.3发送代码
//注入mq模板
@Autowired
private RocketMQTemplate rocketMQTemplate;
//发送消息
rocketMQTemplate.convertAndSend("order-topic", order); //第一个参数是主题,第二个是内容
5.服务消费者
5.1依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency
5.2配置文件
rocketmq:
name-server: 192.168.109.131:9876
5.3接收消息
@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")
public class SmsService implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("收到一个订单信息{},接下来发送短信", JSON.toJSONString(order));
}
}
6.普通消息和顺序消息
rocketMQTemplate.syncSend("test-topic-1", "这是一条同步消息");
rocketMQTemplate.asyncSend("test-topic-1", "这是一条异步消息", new
SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
rocketMQTemplate.sendOneWay("test-topic-1", "这是一条单向消息");
rocketMQTemplate.syncSendOrderly("test-topic-1", "这是一条异步顺序消息",
"xxxx");
2021年12月2日更新
收发普通消息(三种方式)
阿里云消息队列RocketMQ版提供三种方式来发送普通消息:同步发送、异步发送和单向(Oneway)发送。本文介绍了每种发送方式的原理、使用场景、示例代码,以及三种发送方式的对比;此外还提供了订阅普通消息的示例代码。
同步发送
- 原理同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。
- 应用场景此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
- 示例代码
import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQProducer {
/**
* 替换为您阿里云账号的AccessKey ID和AccessKey Secret。
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
}
public static void main(String[] args) throws MQClientException {
/**
*创建Producer,并开启消息轨迹。设置为您在阿里云消息队列RocketMQ版控制台创建的Group ID。
*如果不想开启消息轨迹,可以按照如下方式创建:
*DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
*/
DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
/**
*设置使用接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项。
*/
producer.setAccessChannel(AccessChannel.CLOUD);
/**
*设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“http://MQ_INST_XXXX.aliyuncs.com:80”。
*/
producer.setNamesrvAddr("YOUR ACCESS POINT");
producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message("YOUR TOPIC",
"YOUR MESSAGE TAG",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
//消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
//在应用退出前,销毁Producer对象。
//注意:如果不销毁也没有问题。
producer.shutdown();
}
}
异步发送
- 原理异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。阿里云消息队列RocketMQ版的异步发送,需要实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
- 应用场景异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
- 示例代码
import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQAsyncProducer {
/**
* 替换为您阿里云账号的AccessKey ID和AccessKey Secret。
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
}
public static void main(String[] args) throws MQClientException {
/**
* 创建Producer,并开启消息轨迹。设置为您在阿里云消息队列RocketMQ版控制台创建的Group ID。
* 如果不想开启消息轨迹,可以按照如下方式创建:
* DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
*/
DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
/**
* 设置使用接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项。
*/
producer.setAccessChannel(AccessChannel.CLOUD);
/**
* 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“http://MQ_INST_XXXX.aliyuncs.com:80”。
*/
producer.setNamesrvAddr("YOUR ACCESS POINT");
producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message("YOUR TOPIC",
"YOUR MESSAGE TAG",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override public void onSuccess(SendResult result) {
// 消费发送成功。
System.out.println("send message success. msgId= " + result.getMsgId());
}
@Override public void onException(Throwable throwable) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println("send message failed.");
throwable.printStackTrace();
}
});
} catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
// 在应用退出前,销毁Producer对象。
// 注意:如果不销毁也没有问题。
producer.shutdown();
}
}
单向(Oneway)发送
- 原理发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
- 应用场景适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
- 示例代码
import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQOnewayProducer {
/**
* 替换为您阿里云账号的AccessKey ID和AccessKey Secret。
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
}
public static void main(String[] args) throws MQClientException {
/**
* 创建Producer,并开启消息轨迹。设置为您在阿里云消息队列RocketMQ版控制台创建的Group ID。
* 如果不想开启消息轨迹,可以按照如下方式创建:
* DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
*/
DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
/**
* 设置使用接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项。
*/
producer.setAccessChannel(AccessChannel.CLOUD);
/**
* 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“http://MQ_INST_XXXX.aliyuncs.com:80”。
*/
producer.setNamesrvAddr("YOUR ACCESS POINT");
producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message("YOUR TOPIC",
"YOUR MESSAGE TAG",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);
} catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
// 在应用退出前,销毁Producer对象。
// 注意:如果不销毁也没有问题。
producer.shutdown();
}
}
三种发送方式的对比
下表概括了三者的特点和主要区别。
发送方式 | 发送TPS | 发送结果反馈 | 可靠性 |
---|---|---|---|
同步发送 | 快 | 有 | 不丢失 |
异步发送 | 快 | 有 | 不丢失 |
单向发送 | 最快 | 无 | 可能丢失 |
订阅普通消息
订阅普通消息的方式只有以下一种。
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
public class RocketMQPushConsumer {
/**
* 替换为您阿里云账号的AccessKey ID和AccessKey Secret。
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
}
public static void main(String[] args) throws MQClientException {
/**
* 创建Consumer,并开启消息轨迹。设置为您在阿里云消息队列RocketMQ版控制台创建的Group ID。
* 如果不想开启消息轨迹,可以按照如下方式创建:
* DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
//设置为阿里云消息队列RocketMQ版实例的接入点。
consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
//阿里云上消息轨迹需要设置为CLOUD方式,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项。
consumer.setAccessChannel(AccessChannel.CLOUD);
// 设置为您在阿里云消息队列RocketMQ版控制台上创建的Topic。
consumer.subscribe("YOUR TOPIC", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
Comments | NOTHING