activeMQ进阶与优化(一) weir 2015-11-08 10:03:47.0 java,分布式 4947 说起activeMQ他的知识体系还真不少,涉及到的技术规范也很多。我对他的总结就是传递消息,异步处理和持久化内容。 传递消息这个范围就广了而且还没有语言的限制,不管你是用什么语言写的系统我都可以处理,单单这一项特性就够牛的了。 异步处理就是像ajax一样提交了我就不管了,让后台服务器去处理完了之后给我结果就行了,我总不能傻等着页面反应吧,这种伪装方式可以很好的骗过用户,让他感觉系统很流畅。 持久化内容就是我发出的消息要记录下来作为证据或后续操作用或回滚操作用等等,这当然是一件好事情。 我么还有一些词汇来形容这样的软件作用叫消息中间件。其实这个东西的出现离不开分布式和跨语言服务这些概念,在这里不展开说明,只是想说明一个问题就是数据消息的传递也是需要规范的,在java里面我们叫JMS,它是j2ee的规范只有借口没有实现,所以才有了activeMQ这样实现的中间件出现,当然还有付费的我们就不说了。 由于activeMQ的内容是在很多,所以也不可能面面俱到,我们还是从最简单的入手,慢慢一步步来: 下载并安装ActiveMQ服务器端 1:从http://activemq.apache.org下载最新的ActiveMQ 2:直接解压,然后拷贝到你要安装的位置就好了 启动运行 1:普通启动:到ActiveMQ/bin下面,./activemq start 2:启动并指定日志文件./activemq start > /tmp/activemqlog 检查是否已经启动 ActiveMQ默认采用61616端口提供JMS服务,使用8161端口提供管理控制台服务,执行以下命令以便检验是否已经成功启动ActiveMQ服务: 1:比如查看61616端口是否打开: netstat -an | grep 61616 2:也可以直接查看控制台输出或者日志文件 3:还可以直接访问ActiveMQ的管理页面:http://localhost:8161/admin/ 默认的用户名和密码是admin/admin 关闭ActiveMQ,可以用./activemq stop 暴力点的可以用ps -ef | grep activemq 来得到进程号,然后kill掉 发送消息: public class JmsSend { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageProducer producer = session.createProducer(destination); for(int i=0; i<3; i++) { TextMessage message = session.createTextMessage("message--"+i); Thread.sleep(1000); //通过消息生产者发出消息 producer.send(message); } session.commit(); session.close(); connection.close(); } } 接收消息: public class JmsReceiver { public static void main(String[] args) throws Exception { ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = cf.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageConsumer consumer = session.createConsumer(destination); int i=0; while(i<3) { i++; TextMessage message = (TextMessage) consumer.receive(); session.commit(); System.out.println("收到消息:" + message.getText()); } session.close(); connection.close(); } } 从这个例子可以看出来,activeMQ必须有发送者和接受者组成,换句话说就是客户端和服务端 本来我想把JMS的一些概念也拿出来,但是我发现太多了,这些东西也不会太大变化,网上也有很多资料,其实这东西架子基本不会太大变化,就那么多东西,中间件要做的就是简化我们的开发流程,尽量简单化,我这里只写一下最基本的流程: 1:创建一个JMS connection factory 2:通过connection factory来创建JMS connection 3:启动JMS connection 4:通过connection创建JMS session 5:创建JMS destination 6:创建JMS producer,或者创建JMS message,并设置destination 7:创建JMS consumer,或者是注册一个JMS message listener 8:发送或者接受JMS message(s) 9:关闭所有的JMS资源(connection, session, producer, consumer等) 你说这东西有什么可说的,很固定的格式。 对于持久化和非持久化,更容易理解了,就是消息保存与不保存的问题。 activeMQ支持的传输协议有: TCP:(1) TCP协议传输可靠性高,稳定性强 (2)高效性:字节流方式传递,效率很高 (3)有效性、可用性:应用广泛,支持任何平台 NIO: (1)可能有大量的Client去链接到Broker上 一般情况下,大量的Client去链接Broker是被操作系统的线程数所限制的。因此, NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议 (2)可能对于Broker有一个很迟钝的网络传输 NIO比TCP提供更好的性能 UDP: (1)TCP是一个原始流的传递协议,意味着数据包是有保证的,换句话说,数据包是不会被复 制和丢失的。UDP,另一方面,它是不会保证数据包的传递的 (2)TCP也是一个稳定可靠的数据包传递协议,意味着数据在传递的过程中不会被丢失。这样 确保了在发送和接收之间能够可靠的传递。相反,UDP仅仅是一个链接协议,所以它没有可 靠性之说 从上面可以得出:TCP是被用在稳定可靠的场景中使用的;UDP通常用在快速数据传递和不 怕数据丢失的场景中,还有ActiveMQ通过防火墙时,只能用UDP SSL: 1:连接的URI形式:ssl://hostname:port?key=value 2:Transport Connector配置示例: <transportConnectors> <transportConnector name="ssl" uri="ssl://localhost:61617?trace=true" /> </transportConnectors> HTTP/HTTPS:像web和email等服务需要通过防火墙来访问的,Http可以使用这种场合 VM: 1:VM transport允许在VM内部通信,从而避免了网络传输的开销。这时候采用的连 接不是socket连接,而是直接的方法调用。 2:第一个创建VM连接的客户会启动一个embed VM broker,接下来所有使用相同的 broker name的VM连接都会使用这个broker。当这个broker上所有的连接都关闭 的时候,这个broker也会自动关闭。 3:连接的URI形式:vm://brokerName?key=value 4:Java中嵌入的方式: vm:broker:(tcp://localhost:6000)?brokerName=embeddedbroker&persistent=fal se , 定义了一个嵌入的broker名称为embededbroker以及配置了一个 tcptransprotconnector在监听端口6000上 5:使用一个加载一个配置文件来启动broker vm://localhost?brokerConfig=xbean:activemq.xml 下面还是写一下JMS的一些概念: JMS Java Message Service,Java消息服务,是Java EE中的一个技术。 JMS定义了Java 中访问消息中间件的接口,并没有给予实现,实现JMS 接口的消息 中间件称为JMS Provider,例如ActiveMQ JMS provider:实现JMS接口和规范的消息中间件 JMS message:JMS的消息,JMS消息由以下三部分组成: 1:消息头:每个消息头字段都有相应的getter和setter方法 2:消息属性:如果需要除消息头字段以外的值,那么可以使用消息属性 3:消息体:封装具体的消息数据 JMS producer:消息生产者,创建和发送JMS消息的客户端应用 JMS consumer:消息消费者,接收和处理JMS消息的客户端应用 消息的消费可以采用以下两种方法之一: 1:同步消费:通过调用消费者的receive方法从目的地中显式提取消息,receive 方法可 以一直阻塞到消息到达。 2:异步消费:客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作 JMS domains:消息传递域,JMS规范中定义了两种消息传递域:点对点(point-topoint, 简写成PTP)消息传递域和发布/订阅消息传递域(publish/subscribe,简写 成pub/sub) 1:点对点消息传递域的特点如下: (1)每个消息只能有一个消费者 (2)消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消 息的时候是否处于运行状态,它都可以提取消息。 2:发布/订阅消息传递域的特点如下: (1)每个消息可以有多个消费者 (2)生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它 订阅之后发布的消息。JMS 规范允许客户创建持久订阅,这在一定程度上放松了时间 上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。 3:在点对点消息传递域中,目的地被称为队列(queue);在发布/订阅消息传递域中, 目的地被称为主题(topic) Connection factory:连接工厂,用来创建连接对象,以连接到JMS的provider JMS Connection:封装了客户与JMS 提供者之间的一个虚拟的连接 JMS Session:是生产和消费消息的一个单线程上下文 会话用于创建消息生产者(producer)、消息消费者(consumer)和消息 (message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送 和接收被组合到了一个原子操作中。 Destination:消息发送到的目的地 Acknowledge:签收 Transaction:事务 JMS client:用来收发消息的Java应用 Non-JMS client:使用JMS provider本地API写的应用,用来替换JMS API实现收 发消息的功能,通常会提供其他的一些特性,比如:CORBA、RMI等。 Administered objects:预定义的JMS对象,通常在provider规范中有定义,提 供给JMS客户端来访问,比如: ConnectionFactory和Destination JMS 消息由以下几部分组成:消息头,属性和消息体 消息头包含消息的识别信息和路由信息,消息头包含一些标准的属性如下: 1:JMSDestination:由send方法设置 2:JMSDeliveryMode:由send方法设置 3:JMSExpiration:由send方法设置 4:JMSPriority:由send方法设置 5:JMSMessageID:由send方法设置 6:JMSTimestamp:由客户端设置 7:JMSCorrelationID :由客户端设置 8:JMSReplyTo :由客户端设置 9:JMSType :由客户端设置 10:JMSRedelivered:由JMS Provider设置 标准的JMS 消息头包含以下属性: 1:JMSDestination:消息发送的目的地:主要是指Queue和Topic,自动分配 2:JMSDeliveryMode:传送模式。有两种:持久模式和非持久模式。一条持久性的 消息应该被传送“一次仅仅一次”,这就意味者如果JMS提供者出现故障,该消 息并不会丢失,它会在服务器恢复之后再次传递。一条非持久的消息最多会传送 一次,这意味这服务器出现故障,该消息将永远丢失。自动分配 3:JMSExpiration:消息过期时间,等于Destination 的send 方法中的 timeToLive值加上发送时刻的GMT 时间值。如果timeToLive 值等于零,则 JMSExpiration 被设为零,表示该消息永不过期。如果发送后,在消息过期时间 之后消息还没有被发送到目的地,则该消息被清除。自动分配 4:JMSPriority:消息优先级,从0-9 十个级别,0-4 是普通消息,5-9 是加急消 息。JMS 不要求JMS Provider 严格按照这十个优先级发送消息,但必须保证加 急消息要先于普通消息到达。默认是4级。自动分配 5:JMSMessageID:唯一识别每个消息的标识,由JMS Provider 产生。自动分配 6:JMSTimestamp:一个JMS Provider在调用send()方法时自动设置的。它是消息被 发送和消费者实际接收的时间差。自动分配 7:JMSCorrelationID:用来连接到另外一个消息,典型的应用是在回复消息中连接 到原消息。在大多数情况下,JMSCorrelationID用于将一条消息标记为对 JMSMessageID标示的上一条消息的应答,不过,JMSCorrelationID可以是任何 值,不仅仅是JMSMessageID。由开发者设置 8:JMSReplyTo:提供本消息回复消息的目的地址。由开发者设置 9:JMSType:消息类型的识别符。由开发者设置 10:JMSRedelivered:如果一个客户端收到一个设置了JMSRedelivered属性的消 息,则表示可能客户端曾经在早些时候收到过该消息,但并没有签收 (acknowledged)。如果该消息被重新传送,JMSRedelivered=true反之, JMSRedelivered =false。自动设置 消息体,JMS API定义了5种消息体格式,也叫消息类型,可以使用不同形式发送 接收数据,并可以兼容现有的消息格式。包括:TextMessage、MapMessage、 BytesMessage、StreamMessage和ObjectMessage 消息属性,包含以下三种类型的属性: 1:应用程序设置和添加的属性,比如: Message.setStringProperty(“username”,username); 2:JMS定义的属性 使用“JMSX”作为属性名的前缀, connection.getMetaData().getJMSXPropertyNames(), 方法返回所有连接支持 的JMSX 属性的名字。 3:JMS供应商特定的属性 JMS定义的属性如下: 1:JMSXUserID:发送消息的用户标识,发送时提供商设置 2:JMSXAppID:发送消息的应用标识,发送时提供商设置 3:JMSXDeliveryCount:转发消息重试次数,第一次是1,第二次是2,… ,发送时 提供商设置 4:JMSXGroupID:消息所在消息组的标识,由客户端设置 5:JMSXGroupSeq:组内消息的序号第一个消息是1,第二个是2,…,由客户端设置 6:JMSXProducerTXID :产生消息的事务的事务标识,发送时提供商设置 7:JMSXConsumerTXID :消费消息的事务的事务标识,接收时提供商设置 8:JMSXRcvTimestamp :JMS 转发消息到消费者的时间,接收时提供商设置 9:JMSXState:假定存在一个消息仓库,它存储了每个消息的单独拷贝,且这些消 息从原始消息被发送时开始。每个拷贝的状态有:1(等待),2(准备),3 (到期)或4(保留)。由于状态与生产者和消费者无关,所以它不是由它们来 提供。它只和在仓库中查找消息相关,因此JMS没有提供这种API。由提供商设置 JMS的可靠性机制: 消息接收确认 JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常 包含三个阶段:客户接收消息、客户处理消息和消息被确认。 在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话 中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参 数有以下三个可选值: Session.AUTO_ACKNOWLEDGE:当客户成功的从receive方法返回的时候,或者从 MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。 Session.CLIENT_ACKNOWLEDGE:客户通过调用消息的acknowledge方法确认消 息。需要注意的是,在这种模式中,确认是在会话层上进行,确认一个被消费的消息 将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10 个消 息,然后确认第5 个消息,那么所有10 个消息都被确认。 Session.DUPS_ACKNOWLEDGE:该选择只是会话迟钝的确认消息的提交。如果JMS provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider 必须把消息头的JMSRedelivered字段设置为true 消息持久性,JMS 支持以下两种消息提交模式: PERSISTENT:指示JMS provider持久保存消息,以保证消息不会因为JMS provider的失败而丢失 NON_PERSISTENT:不要求JMS provider持久保存消息 消息优先级 可以使用消息优先级来指示JMS provider首先提交紧急的消息。优先级分 10个级别,从0(最低)到9(最高)。如果不指定优先级,默认级别是4。需要 注意的是,JMS provider并不一定保证按照优先级的顺序提交消息 消息过期 可以设置消息在一定时间后过期,默认是永不过期 消息的临时目的地 可以通过会话上的createTemporaryQueue 方法和createTemporaryTopic 方法来创建临时目的地。它们的存在时间只限于创建它们的连接所保持的时间。 只有创建该临时目的地的连接上的消息消费者才能够从临时目的地中提取消息 持久订阅 首先消息生产者必须使用PERSISTENT提交消息。客户可以通过会话上的 createDurableSubscriber方法来创建一个持久订阅,该方法的第一个参数必须 是一个topic。第二个参数是订阅的名称。 JMS provider会存储发布到持久订阅对应的topic上的消息。如果最初创建 持久订阅的客户或者任何其它客户,使用相同的连接工厂和连接的客户ID,相同 的主题和相同的订阅名,再次调用会话上的createDurableSubscriber方法,那 么该持久订阅就会被激活。JMS provider会向客户发送客户处于非激活状态时所 发布的消息。 持久订阅在某个时刻只能有一个激活的订阅者。持久订阅在创建之后会一 直保留,直到应用程序调用会话上的unsubscribe方法。 本地事务 在一个JMS客户端,可以使用本地事务来组合消息的发送和接收。JMS Session接口提供了commit和rollback方法。事务提交意味着生产的所有消息被 发送,消费的所有消息被确认;事务回滚意味着生产的所有消息被销毁,消费的 所有消息被恢复并重新提交,除非它们已经过期。 事务性的会话总是牵涉到事务处理中,commit或rollback方法一旦被调 用,一个事务就结束了,而另一个事务被开始。关闭事务性会话将回滚其中的事 务。 需要注意的是,如果使用请求/回复机制,即发送一个消息,同时希望在同 一个事务中等待接收该消息的回复,那么程序将被挂起,因为知道事务提交,发 送操作才会真正执行。 需要注意的还有一个,消息的生产和消费不能包含在同一个事务中。 JMS的PTP模型 JMS PTP(Point-to-Point)模型定义了客户端如何向队列发送消息,从队列接收 消息,以及浏览队列中的消息。 PTP模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队 列的存在使得消息的异步传输成为可能。和邮件系统中的邮箱一样,队列可以包 含各种消息,JMS Provider 提供工具管理队列的创建、删除。 PTP的一些特点: 1:如果在Session 关闭时,有一些消息已经被收到,但还没有被签收 (acknowledged),那么,当消费者下次连接到相同的队列时,这些消息还会被再 次接收 2:如果用户在receive 方法中设定了消息选择条件,那么不符合条件的消息会留在 队列中,不会被接收到 3:队列可以长久地保存消息直到消费者收到消息。消费者不需要因为担心消息会丢 失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势 JMS的Pub/Sub模型 JMS Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作topic 主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者 (subscribe) 从主题订阅消息。主题使得消息订阅者和消息发布者保持互相独立,不需要 接触即可保证消息的传送。 Pub/Sub的一些特点: 1:消息订阅分为非持久订阅和持久订阅 非持久订阅只有当客户端处于激活状态,也就是和JMS Provider保持连接状态才能 收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会 丢失,永远不会收到。 持久订阅时,客户端向JMS 注册一个识别自己身份的ID,当这个客户端处于离线 时,JMS Provider会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS Provider时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息。 2:如果用户在receive 方法中设定了消息选择条件,那么不符合条件的消息不会被接收 3:非持久订阅状态下,不能恢复或重新派送一个未签收的消息。只有持久订阅才能恢复或重 新派送一个未签收的消息。 4:当所有的消息必须被接收,则用持久订阅。当丢失消息能够被容忍,则用非持久订阅