服务器 频道

JMS服务器openJms入门

编译后运行接收信息服务,可以看到接收到并打印之前发送的消息,再看看控制台,发现queue1的消息队列变为0,消息已被读取,消息发送和接收到此结束。

  上篇openJms介绍 (一) 提到了openJms的构建及消息的发送和接收,这篇主要了解消息的发布和订阅。JMS 的发布/订阅模型定义了如何向一个内容节点发布和订阅消息,内容节点也叫主题(topic),主题是为发布者(publisher)和订阅者 (subscribe) 提供传输的中介。发布/订阅模型使发布者和订阅者之间不需要直接通讯(如RMI)就可保证消息的传送,有效解决系统间耦合问题(当然有这个需要才行),还有就是提供了一对一、一对多的通讯方式,比较灵活。

  先介绍JMS里2个概念,持久订阅模式和非持久订阅模式,其实也是发布/订阅模型在可靠性上提供的2种方式:

  非持久订阅模式:只有当客户端处于激活状态,也就是和JMS 服务器保持连接的状态下,才能接收到发送到某个Topic的消息,而当客户端处于离线状态时,则这个时间段发到Topic的消息将会永远接收不到。

  持久订阅模式:客户端向JMS 注册一个识别自己身份的ID,当这个客户端处于离线时,JMS 服务器会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS 服务器时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息,即消息永远能接收到。

  下面我们就接着来看openJms在发布/订阅模式上的表现,由于篇幅关系,在这里只讲述非持久订阅模式,持久订阅模式可以根据JMS的标准来试。

  消息发布的代码如下:

  package javayou.demo.openjms;

  import java.util.*;

  import javax.jms.*;

  import javax.naming.*;

  public class TopicPublish {

  public static void main(String[] args) {

  try {

  //取得JNDI上下文和连接

  Hashtable properties = new Hashtable();

  properties.put(

  Context.INITIAL_CONTEXT_FACTORY,

  "org.exolab.jms.jndi.InitialContextFactory");

  //openJms默认的端口是1099

  properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/");

  Context context = new InitialContext(properties);

  //获得JMS Topic连接队列工厂

  TopicConnectionFactory factory =

  (TopicConnectionFactory) context.lookup(

  "JmsTopicConnectionFactory");

  //创建一个Topic连接,并启动

  TopicConnection topicConnection = factory.createTopicConnection();

  topicConnection.start();

  //创建一个Topic会话,并设置自动应答

  TopicSession topicSession =

  topicConnection.createTopicSession(false,

  Session.AUTO_ACKNOWLEDGE);

  //lookup 得到 topic1

  Topic topic = (Topic) context.lookup("topic1");

  //用Topic会话生成Topic发布器

  TopicPublisher topicPublisher = topicSession.createPublisher(topic);

  //发布消息到Topic

  System.out.println("消息发布到Topic");

  TextMessage message = topicSession.createTextMessage

  ("你好,欢迎定购Topic类消息");

  topicPublisher.publish(message);

  //资源清除,代码略 ... ...

  } catch (NamingException e) {

  e.printStackTrace();

  } catch (JMSException e) {

  e.printStackTrace();

  }

  }

  }

  而订阅消息的接收有同步的和异步2种,他们分别使用receive()和onMessage(Message message)方法来接收消息,具体代码:

  同步接收:

  package javayou.demo.openjms;

  import java.util.*;

  import javax.jms.*;

  import javax.naming.*;

  public class TopicSubscribeSynchronous {

  public static void main(String[] args) {

  try {

  System.out.println("定购消息接收启动:");

  //取得JNDI上下文和连接

  Hashtable properties = new Hashtable();

  properties.put(Context.INITIAL_CONTEXT_FACTORY,

  "org.exolab.jms.jndi.InitialContextFactory");

  properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/");

  Context context = new InitialContext(properties);

  //获得Topic工厂和Connection

  TopicConnectionFactory factory =

  (TopicConnectionFactory) context.lookup(

  "JmsTopicConnectionFactory");

  TopicConnection topicConnection = factory.createTopicConnection();

  topicConnection.start();

0
相关文章