您当前的位置: 首页 >  Java

wespten

暂无认证

  • 0浏览

    0关注

    899博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

JMS-JAVA消息服务

wespten 发布时间:2018-08-26 12:03:25 ,浏览量:0

JMS

JAVA消息服务,是应用程序接口中关于面向消息中间件的API,用于两个应用程序之间,或分布式系统中发送消息,进行异步通信。

JMS消息两种类型

点对点 消息发送给一个单独的使用者。点对点消息往往与队列相关联

发布/订阅 支持事件驱动模型,消息生产者与消费者都参与消息的传递。生产者发布事件而使用者订阅感兴趣的事件,并使用事件。

安装ActiveMQ开源消息总线

http://activemq.apache.org/activemq-5111-release.html

ActiveMQ 服务启动地址:http://127.0.0.1:8161/admin/ 用户名/密码 admin/admin

ActiveMQ  点对点消息实现

消息生产者

Session.AUTO_ACKNOWLEDGE 

Receive消息确认的三种方式

Session.AUTO_ACKNOWLEDGE。当客户成功的从receive 方法返回的时候,或者从MessageListener.onMessage

方法成功返回的时候,会话自动确认客户收到的消息。

Session.CLIENT_ACKNOWLEDGE。 客户通过消息的 acknowledge 方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消 费的消息。例如,如果一个消息消费者消费了 10 个消息,然后确认第 5 个消息,那么所有 10 个消息都被确认。

Session.DUPS_ACKNOWLEDGE。 该选择只是会话迟钝第确认消息的提交。如果 JMS provider 失败,那么可能会导致一些重复的消息。如果是重复的消息,那么 JMS provider 必须把消息头的 JMSRedelivered 字段设置为 true。

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者
 * @author Administrator
 *
 */
public class JMSProducer {

	private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
	private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
	private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
	private static final int SENDNUM=10; // 发送的消息数量
	
	public static void main(String[] args) {
		
		ConnectionFactory connectionFactory; // 连接工厂
		Connection connection = null; // 连接
		Session session; // 会话 接受或者发送消息的线程
		Destination destination; // 消息的目的地
		MessageProducer messageProducer; // 消息生产者
		
		// 实例化连接工厂
		connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
		
		try {
			connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
			connection.start(); // 启动连接
			session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session 第一个参数是否有事物,第二个参数消息确认的方式自确认
			destination=session.createQueue("FirstQueue1"); // 创建消息队列
			messageProducer=session.createProducer(destination); // 创建消息生产者
			sendMessage(session, messageProducer); // 发送消息
			session.commit();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally{
			if(connection!=null){
				try {
					connection.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}
	
	/**
	 * 发送消息
	 * @param session
	 * @param messageProducer
	 * @throws Exception
	 */
	public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
		for(int i=0;i            
关注
打赏
1665965058
查看更多评论
0.0421s