当前位置导航:炫浪网>>网络学院>>编程开发>>JAVA教程>>Java入门

Java消息服务基础


  在不同系统之间交换信息的一大障碍是如何在精确交换和格式化数据方面取得一致。Java Message Service( Java消息服务,简称JMS)通过提供一种与J2EE应用程序或传统系统交互的方法部分的解决了这个问题。
  JMS的通用接口集合以异步方式发送或接收消息。异步方式接收消息显然是使用间断网络连接的客户机,诸如移动电话和PDA的最好的选择。另外, JMS采用一种宽松结合方式整合企业系统的方法,其主要的目的就是创建能够使用跨平台数据信息的、可移植的企业级应用程序,而把开发人力解放出来。
  Java消息服务支持两种消息模型:Point-to-Point消息(P2P)和发布订阅消息(Publish Subscribe messaging,简称Pub/Sub)。JMS规范并不要求供应商同时支持这两种消息模型,但开发者应该熟悉这两种消息模型的优势与缺点。
  P2P消息模型是在点对点之间传递消息时使用。如果应用程序开发者希望每一条消息都能够被处理,那么应该使用P2P消息模型。与Pub/Sub消息模型不同,P2P消息总是能够被传送到指定的位置。
  Pub/Sub模型在一到多的消息广播时使用。如果一定程度的消息传递的不可靠性可以被接受的话,那么应用程序开发者也可以使用Pub/Sub消息模型。换句话说,它适用于所有的消息消费程序并不要求能够收到所有的信息或者消息消费程序并不想接收到任何消息的情况。
  JMS通过允许创建持久订阅来简化时间相关性,即使消息预订者未激活也可以接收到消息。此外,使用持久订阅还可通过队列提供灵活性和可靠性,而仍然允许消息被发给许多的接收者。
  Topic Subscriber topic Subscriber =
  topicSession.createDurableSubscriber(topic, subscriptionName);
  Connection对象表示了到两种消息模型中的任一种的消息系统的连接。服务器端和客户机端对象要求管理创建的JMS连接的状态。连接是由Connection Factory创建的并且通过JNDI查寻定位。
  //取得用于 P2P的 QueueConnectionFactory
  QueueConnectionFactory = queueConnectionFactory( );
  Context messaging = new InitialContext( );
  QueueConnectionFactory = (QueueConnectionFactory)
  Messaging.lookup(“QueueConnectionFactory”);
  //取得用于 pub/sub的 TopicConnectionFactory
  TopicConnectonFactory topicConnectionFactory;
  Context messaging = new InitialContext();
  topicConnectionFactory = (TopicConnectionFactory)
  messaging.lookup(“TopicConnectionFactory”);
  注意:用于P2P的代码和用于PublishSubscribe的代码非常相似。
  如果session被标记为transactional的话,确认消息就通过确认和校正来自动地处理。如果session没有标记为 transactional,你有三个用于消息确认的选项。
  · AUTO_ACKNOWLEDGE session将自动地确认收到一则消息。
  · CLIENT_ACKNOWLEDGE 客户端程序将确认收到一则消息,调用这则消息的确认方法。
  · DUPS_OK_ACKNOWLEDGE 这个选项命令session“懒散的”确认消息传递,可以想到,这将导致消息提供者传递的一些复制消息可能会出错。这种确认的方式只应当用于消息消费程序可以容忍潜在的副本消息存在的情况。
  queueSession = queueConnection.createQueueSession(false, session.AUTO_ACKNOWLEDGE);//P2P
  topicSession = topicConnection.createTopicSession(false, session.AUTO_ACKNOWLEDGE); //Pub-Sub
  注意:在本例中,一个session目的从连结中创建,非值指出session是non-transactional的,并且 session将自动地确认收到一则消息。
  JMS现在有两种传递消息的方式。标记为NON_PERSISTENT的消息最多投递一次,而标记为PERSISTENT的消息将使用暂存后再转送的机理投递。如果一个JMS服务离线,那么持久性消息不会丢失但是得等到这个服务恢复联机时才会被传递。所以默认的消息传递方式是非持久性的。即使使用非持久性消息可能降低内务和需要的存储器,并且这种传递方式只有当你不需要接收所有的消息时才使用。
  虽然 JMS规范并不需要JMS供应商实现消息的优先级路线,但是它需要递送加快的消息优先于普通级别的消息。JMS定义了从0到9的优先级路线级别,0是最低的优先级而9则是最高的。更特殊的是0到4是正常优先级的变化幅度,而5到9是加快的优先级的变化幅度。举例来说:
  topicPublisher.publish (message, DeliveryMode.PERSISTENT, 8, 10000); //Pub-Sub
  或
  queueSender.send(message, DeliveryMode.PERSISTENT, 8, 10000);//P2P
  这个代码片断,有两种消息模型,映射递送方式是持久的,优先级为加快型,生存周期是10000 (以毫秒度量 )。如果生存周期设置为零,这则消息将永远不会过期。当消息需要时间限制否则将使其无效时,设置生存周期是有用的。
  JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
  · StreamMessage -- Java原始值的数据流
  · MapMessage--一套名称-值对
  · TextMessage--一个字符串对象
  · ObjectMessage--一个序列化的 Java对象
  · BytesMessage--一个未解释字节的数据流
  JMS应用程序接口提供用于创建每种类型消息和设置荷载的方法例如,为了在一个队列创建并发送一个TextMessage实例,你可以使用下列语句:
  TextMessage message = queueSession.createTextMessage(); message.setText(textMsg);
  以异步方式接收消息,需要创建一个消息监听器然后注册一个或多个使用MessageConsumer的JMS MessageListener接口。会话(主题或队列)负责产生某些消息,这些消息被传送到使用onMessage方法的监听者那里。
  import javax.jms.*;
  public class ExampleListener implements MessageListener {
  //把消息强制转化为TextMessage格式
  public void onMessage(Message message) {
  TextMessage textMsg = null;
  // 打开并处理这段消息
  }
  }
  当我们创建QueueReceiver和TopicSubscriber时,我们传递消息选择器字符串:
  //P2P QueueReceiver
  QueueReceiver receiver;
  receiver = session.createReceiver(queue, selector);
  //Pub-Sub TopicSubscriber
  TopicSubscriber subscriber;
  subscriber = session.createSubscriber(topic, selector);
  为了启动消息的交付,不论是Pub/Sub还是P2P,都需要调用start方法。
  TopicConnection.start( ); //pub-sub
  QueueConnection.start( ); //P2P
  TopicConnection.start ( );// pub-sub
  QueueConnection.start ( );// P2P
  当一条消息被捕捉时,这条消息做为一条必须被强制转化为适当消息类型的普通Message对象到达。这是一个被用来提取或打开消息内容的getter方法。下列代码片段使用StreamMessage类型。
  private void unPackMessage (Message message) {
  String eName;
  String position;
  double rate;
  StreamMessage message;
  Message = session.createStreamMessage( );
  //注意下面的代码必须按照我给出的顺序书写
  message.writeString(eName);
  message.writeString(position);
  message.writeDouble(rate);
  //实现处理消息的必要的程序逻辑
  }
  停止消息的传递,无论是Pub/Sub还是P2P,都调用stop方法。
  TopicConnection.start( ); //pub-sub
  QueueConnection.start( ); //P2P
  TopicConnection.start ( );// pub-sub
  QueueConnection.start ( );// P2P
  其他的J2EE组件--servlet或EJB--可以当作消息生产者;然而,它们可能只能同步操作,这可能是因为它们的请求-应答的性质决定的。虽然XML目前还不是被支持的消息类型,发送一个XML文件和创建一条文本类型消息以及把XML文件添加到消息的有效负载都一样简单,都是以非专有的方式传送数据。值得注意的是,一些JMS供应厂商已经提供了可用的XML消息类型。但是使用非标准的消息类型可能会出现可移植性问题。
  String reportData; //reportData内容为XML 文档
  TextMessage message;
  message = session.createTextMessage();
  message.setText (reportData);
  消息驱动组件(MDB)是一个当消息到达时被容器调用的异步消息消费程序。和entity和session EJB不同,MDB没有本地和远程接口并且是匿名的;它们对于客户是不可见的。MDB是JMS系统的一部分,作为消费者实现服务器上的商业逻辑程序。 一个客户程序可能通过使用JNDI定位一个与MDB相关联的JMS。 例如:
  Context initialContext = new InitialContext();
  Queue reportInfoQueue = (javax.jms.Queue)initialContext.lookup
  (“java:comp/env/jms/reportInfoQueue”);
  MDB是由Bean类和相应的XML部署描述符组成。 Bean 类实现MessageDriveBean 接口:
  import javax.ejb.*;
  import jms.Message.*;
  public interface MessageDriveBean {
  public void ejbCreate();
  public void ejbRemove();
  public void setMessageDrivenContext(MessageDrivenContext ctx);
  }
  消息监听器接口:
  import javax.jms.*;
  public interface MessageListener {
  public void onMessage( );
  }
  部署描述符
  <!DOCTYPE ejb-jar PUBLIC "-//Sun Microsystems, Inc.//DTD Enterprise JavaBeans 2.0//EN" "http://java.sun.com/j2ee/dtds/ejb-jar_2_0.d
相关内容
赞助商链接