侧边栏壁纸
博主头像
孔子说JAVA博主等级

成功只是一只沦落在鸡窝里的鹰,成功永远属于自信且有毅力的人!

  • 累计撰写 297 篇文章
  • 累计创建 134 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

ActiveMQ环境搭建及使用教程

孔子说JAVA
2022-07-04 / 0 评论 / 0 点赞 / 142 阅读 / 8,643 字 / 正在检测是否收录...

MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。用来帮助实现高可用、高性能、可伸缩、易用和安全的企业级面向消息服务的系统。由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可运行。

1、下载ActiveMQ

官网下载地址:https://activemq.apache.org/components/classic/download/

进入下载地址后,可以看到如下的界面,是最近2个版本的下载链接,都包括了Windows和Linux操作系统的安装包,我们根据需要下载即可。

image

  • 注意:每个版本都有JDK版本的要求,需要根据自己本地的JDK版本进行选择。

image-1656635974037

2、安装ActiveMQ

在安装之前要先确保安装机器上已安装了JRE,并确保JRE版本与ActiveMQ的版本兼容。如果您计划重新编译源代码,则需要安装JDK。ActiveMQ与JRE版本对应关系如下:

  • ActiveMQ版本 <=5.10.0, Java 运行时环境 (JRE) 版本:1.6
  • ActiveMQ版本 <5.15.0, Java 运行时环境 (JRE) 版本:1.7
  • ActiveMQ版本 >=5.15.0, Java 运行时环境 (JRE) 版本:>=1.8
  • ActiveMQ版本 >=5.17.1, Java 运行时环境 (JRE) 版本:>=11

2.1 Windows环境下安装ActiveMQ

  1. 将下载的 ZIP 文件解压缩到您选择的目录中,解压即安装。

image-1656636703639

  1. 启动 ActiveMQ

cmd打开dos窗口,切换到安装目录下的bin目录,输入命令:activemq start,运行ActiveMQ。

image-1656636901645

等窗口中输出ActiveMQ的控制台访问地址 http://127.0.0.1:8161/ 时,表示ActiveMQ启动成功。

image-1656636974003

  1. 访问 ActiveMQ

打开管理登录界面
- 网址:http://127.0.0.1:8161/http://127.0.0.1:8161/admin/
- 用户名:admin
- 密码:admin
- Java程序连接端口:61616,如果要修改61616端口,可以修改文件conf>activemq.xml。

image-1656637148674

登录后的界面如下:

image-1656637164577

注意:在配置文件中保存着用户名和密码的信息,可以根据需要改动。

  • 配置文件位置: apache-activemq-5.16.5\conf
  • 配置文件:jetty-realm.properties文件

image-1656637254218

  1. 停止运行ActiveMQ

停止运行ActiveMQ,只需要在dos下按下ctrl+c即可(或直接关闭dos窗口)。

2.2 Linux服务器上安装ActiveMQ

在Linux系统中,我们一般将安装软件在/ursr/local/这个目录中,我们将下载的activemq的tar包 apache-activemq-5.16.5-bin.tar.gz 上传到这个目录下面,可以使用命令或XFTP工具。

  1. 将apache-activemq-5.16.5-bin.tar.gz上传至服务器 /ursr/local/ 目录下。

  2. 解压此文件 tar zxvf apache-activemq-5.16.5-bin.tar.gz

  3. 解压完成后需要修改 apache-activemq-5.16.5 文件夹的权限,如果你一直使用root用户,此步骤可以不执行。

chmod 777 apache-activemq-5.16.5
  1. 进入apache-activemq-5.16.5\bin目录,启动ActiveMQ有2种方式:
    • 方式一:前台方式启动,在bin目录下输入命令 ./activemq console 启动。
    • 方式二:后台方式启动,在bin目录下输入命令 ./activemq start 启动。已经启动服务可以通过命令 ./activemq restart 重新启动。

出现下图说明启动成功。

image-1656637905639

也可以通过命令 ./activemq status 查看activeMQ 是不是启动的状态。

image-1656638112333

指定日志文件输出位置

ActiveMQ日志默认的位置是在:%activemq安装目录 %/data/activemq.log,我们也可以在启动的时候指定输出日志的位置,启动时指定日志输出文件:./activemq start > /usr/local/log/activemq.log

  1. 测试

在本地Windows机器上打开管理界面
- 网址:http://172.19.82.206:8161/http://172.19.82.206:8161/admin/
- 用户名:admin
- 密码:admin
- 注意:这个IP是给linux服务器的IP
- Java程序连接端口:61616,如果要修改61616端口,可以修改文件conf>activemq.xml。

我们发现是无法访问管理页面。这里有2点原因:

  • 原因1:Linux要先把8161这个端口暴露出来,现在是被防火墙阻挡了。
  • 原因2:ActiveMQ在 conf/jetty.xml 文件中限制了ActiveMQ只能本地访问。

原因1的解决方案:

我这里用的是Ubuntu系统,使用 ufw status 查看发现8161端口没有在允许规则之内,所以执行 ufw allow 8161 命令添加规则,再使用 ufw status 查看,可以看到已添加规则。

image-1656638842482

原因2的解决方案:

修改ActiveMQ conf/jetty.xml文件,将jettyPort项设置为虚拟机本地ip即可。修改完成后通过 ./activemq restart 命令重启,这时服务机和本地物理机就都可使用ip访问ActiveMQ管理页面了。

image-1656639096999

在本地机器上访问服务器管理界面:http://172.19.82.206:8161/

image-1656639253843

  1. 停止运行ActiveMQ

(1)前台方式启动使用:Ctrl+C来终止 ActiveMQ

(2)后台方式启动使用:先cd到activeMQ的bin目录,然后执行 ./activemq stop 命令即可停止运行ActiveMQ

image-1656637840942

3、ActiveMQ的使用

  1. 点击Mange ActiveMQ broker进入主页面

在登录后的主页面点击 Mange ActiveMQ broker 进入管理主页面

image-1656639908086

ActiveMQ管理主页面

image-1656640020001

  1. 点对点消息列表

image-1656640108016

列表各列含义如下:

  • Number Of Pending Messages :等待消费的消息 这个是当前未出队列的数量。
  • Number Of Consumers :消费者 这个是消费者端的消费者数量
  • Messages Enqueued :进入队列的消息 进入队列的总数量,包括出队列的。
  • Messages Dequeued :出了队列的消息 可以理解为是消费这消费掉的数量。

4、ActiveMQ工作流程

ActiveMQ消息形式:topic(非安全的),queue(安全的),topic和queue区别如下:

  • topic:不安全 、没有状态的、一对多 、数据容易丢失的、传输速率高。
  • queue:安全、有状态的、一对一、数据不容易丢失的、传输速率低。

工作流程:

a用户把消息发送到 mq 的消息池里,mq负责接受这条消息,如果接到消息了,把连接断开了。mq有一个监控的功能,监控b用户是否上线。如果b用户上线了,把a用户的消息发送给b用户。b用户会把消息的返回值告诉连接池,完了b用户和连接池就断开了。mq接受到了b用户给它的返回值。(a用户只是把消息发过去了,还没有接收到信息 )连接池时刻监视a用户是否在线,如果a用户上线了再把返回值发送给a用户。发送完成之后连接断开了。

  • topic消息形式工作流程:首先用户a把发送消息到消息池里, mq会存放你的消息,监控到b用户上线,将这条消息发过去。发送过去后,mq会把这条消息删掉,(如果b在返回的时候断掉了,那消息就丢了)。如果返回没断监控a用户上线,把消息返回给a。

  • queue消息形式工作流程:首先用户a把发送消息到消息池里, mq会存放你的消息,监控到b用户上线,将这条消息发过去。发送过去后,mq不会把这条消息删掉,会一直保留消息池里。当把消息给b,b接收到消息。mq接收到b的返回值,mq把b的返回值给a之后,才会把这个消息删掉。

(简单地说:queue会保存消息确认a收到了才会删掉,topic不会保存消息)

5、连接异常处理

ActiveMQ消息发送时异常

Could not connect to broker URL: nio://192.168.200.130:61618. Reason: java.net.ConnectExcept

先确保消息中间件可以正常在网页显示,不能显示的先看看,是否windows和虚拟器相互是否ping通,再开启61616,8161端口号

  • 可能是虚拟机端口号未打开,之前用的tcp协议,端口号是打开的,切换到nio时没有及时打开新端口61618,
  • firewall-cmd --add-port=61618/tcp --permanent 开启端口号
  • firewall-cmd --reload 刷新防火墙
  • activemq配置文件中没有加入nio协议的配置

5.1 传输协议

ActiveMQ支持的client-broker通讯协议有: TCP、 NIO、 UDP、 SSL、Http(s)、 VM。

配置文件

配置Transport Connector的文件在activeMQ安装目录的conf/activemq.xml中的 <transportConnectors> 标签之内。

image-1656641078613

在上文给出的配置信息中,URI描述信息的头部都的是采用协议名称,例如:

  • 描述amqp协议的监听端口时,采用的URI描述格式为amqp:/…;
  • 描述Stomp协议的监听端口时,采用的URI描述格式为stomp//…。
  • 唯独在进行openwire协议描述时,URI头采用的tcp://…。这是因为ActiveMQ中默认的消息协议就是openwire。

控制台界面

在MQ控制台界面可查看到:

image-1656641192915

5.2 修改配置

在修改之前,先备份

cp activemq.xml activemq.xml.bk

打开activemq.xml

vim activemq.xml

设置显示行号

:set nu

根据需求进行修改,这里新增NIO协议支持。在 <transportConnectors> 中新增如下内容,然后保存退出。

<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true"/>

image-1656641838359

开放61618端口

ufw allow 61618

5.3 启动ActiveMQ

cd 到mq的bin目录下,执行命令:./activemq start。登录管理控制台查看,看到已支持NIO。

image-1656641905239

6、代码测试

6.1 测试生产者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsProduce {
    public static final String ACTIVEMQ_URL = "nio://192.168.150.101:61618";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
        // 1 创建连接工厂,按照给定的url地址,采用默认的用户名和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2 通过连接工厂,获得连接connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 3 创建会话session
        // 参数1:事务
        // 参数2:签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4 创建目的地(队列还是主题,这里选择队列)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5 创建消息的生产者
        MessageProducer messageProducer = session.createProducer(queue);
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
        // 6 通过使用messageProducer生产3条消息发送到MQ队列中
        for (int i = 0; i < 3; i++) {
            // 7 创建消息 按照要求写好的消息
            TextMessage textMessage = session.createTextMessage("msg----" + i);//一个字符串
            textMessage.setStringProperty("c01", "vip");
            // 8 通过messageProducer发给mq
            messageProducer.send(textMessage);
        }
        // 9 关闭资源
        messageProducer.close();
        session.close();
        connection.close();

        System.out.println("---------消息发布到MQ完成--------");
    }
}

6.2 测试消费者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.IOException;

public class JmsConsumer {
    public static final String ACTIVEMQ_URL = "nio://192.168.150.101:61618";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException, IOException {
        // 1 创建连接工厂,按照给定的url地址,采用默认的用户名和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2 通过连接工厂,获得连接connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 3 创建会话session
        // 参数1:事务
        // 参数2:签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4 创建目的地(队列还是主题,这里选择队列)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5 创建消息的消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        // 6 处理消息

        /*
        方式1:同步阻塞方式(receive())
              订阅者或接收者调用MessageConsumer的receive()方法来接收消息,
              receive方法在能够接收到消息之前(或超时之前)将一直堵塞。
              receive(xxxx),如果添加参数表示等待一定时间,超过时间将关闭。
         */

//        while (true) {
//            TextMessage textMessage = (TextMessage) messageConsumer.receive();
//            if (textMessage!=null) {
//                System.out.println("-------消费者接收到消息: " + textMessage.getText());
//            } else {
//                break;
//            }
//        }

        /*
        方式2: 异步非阻塞方式(监听器onMessage())
        订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,
        当消息到达后,系统自动调用监听器MessageListener的onMessage(Message message)方法。
         */

        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (message != null && message instanceof  TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("-------消费者接收到消息: " + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        System.in.read();  // 保证控制台不灭,消费者启动之后将一直等待。如果关闭的话,消费者启动后还没监听到就立刻关闭了。

        // 7 资源关闭
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

附录:防火墙相关命令

Ubuntu防火墙相关命令:

1)查看防火墙状态     sudo ufw status
2)开启防火墙        sudo ufw enable
3)关闭防火墙        sudo ufw disable
4)重启防火墙        sudo ufw reload
5)设置外来访问默认权限 sudo ufw default deny(//拒接所有外来访问,本机能正常访问外部)

Ubuntu防火墙端口相关命令:

1)开放普通端口        sudo ufw allow 端口
2)关闭普通端口        sudo ufw delete allow 端口
3)开放规定协议的端口    sudo ufw allow 端口/tcp  (//指定开放端口的tcp协议)
4)关闭指定协议端口      sudo ufw delete allow 端口/tcp
5)开放限定ip地址所有端口  sudo ufw allow from 192.168.121.1(指定ip为192.168.121.1的计算机操作所有端口)
6)开放限定ip地址指定端口  sudo ufw allow from 192.168.121.2 to any port 3306(// 开放指定ip为192.168.121.2的计算机访问本机的3306端口)

Centos7防火墙相关命令:

# 1查看防火墙状态
systemctl status firewalld

# 2开启防火墙
systemctl start firewalld

# 3关闭防火墙
systemctl stop firewalld

# 4其他命令
重启防火墙,重启 firewall 服务        systemctl restart firewalld
查看 firewall 服务是否开机启动       systemctl is-enabled firewalld
开机时自动启动 firewall 服务        systemctl enable firewalld.service
开机时自动禁用 firewall 服务        systemctl disable firewalld.service

Centos7防火墙端口相关命令:

# 1查看 firewall-cmd 状态
firewall-cmd --state

# 2 查看已打开的所有端口
firewall-cmd --zone=public --list-ports

# 3查看想开的端口是否已开
firewall-cmd --query-port=端口/tcp

# 4开启指定端口
firewall-cmd --zone=public --add-port=端口/tcp --permanent

# 5开启一个端口,–permanent 永久生效,没有此参数重启后失效
firewall-cmd --reload
必须重新加载才能生效

# 6关闭指定端口
firewall-cmd --zone=public --remove-port=端口/tcp --permanent

–zone #作用域
–add-port=80/tcp #添加端口,格式为:端口/通讯协议
–permanent #永久生效,没有此参数重启后失效
0

评论区