Tuesday, 8 March 2016

JMS Topic using Spring and Active MQ

Reference: http://half-wit4u.blogspot.in/2014/09/jms-topic-using-spring-and-active-mq.html
A JMS client is an application that uses the services of the message broker. There are two types   of clients, Consumer and Producer. Destinations are the place where message get stored for clients. They can be either queues or topics.

In publish/subscribe model, client which produces message is called Publisher and client which consumes message is known as Subscriber.
Topic is a particular destination where Publisher publishes messages. Subscribers subscribe to topic to consume messages. More than one Subscribers can subscribe to same topic and a message can be consumed by many subscribers.
We are going to divide our publisher and subscriber implementation in two part. First we will learn how to create topic and publish a message [XML as String] into topic.

Apache MQ fully supports Spring for configuration of client and message broker. So we will leverage <amq:> tag in our implementation.
As usual we will create Spring JMS template configuration in our spring context file by following these three steps.

Step 1: Configure Connection Factory
?
1
2
3
4
5
6
7
8
9
10
11
<amq:connectionFactory id="connectionFactory"
  brokerURL="tcp://localhost:61616" closeTimeout="10" />
<bean id="pooledJmsConnectionFactory"
  class="org.apache.activemq.pool.PooledConnectionFactory"
  init-method="start" destroy-method="stop">
  <property name="maxConnections" value="15" />
  <property name="connectionFactory" ref="connectionFactory" />
  <property name="expiryTimeout" value="-1" />
  <property name="maximumActive" value="100" />
</bean>
Step 2: Configure JMS destination

?
1
2
3
4
<bean id="mailDestination"
  class="org.apache.activemq.command.ActiveMQTopic">
  <constructor-arg value="mail.topic" />
</bean>
Configure a JMS Template bean
?
1
2
3
4
5
6
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
 <property name="connectionFactory" ref="pooledJmsConnectionFactory" />
 <property name="defaultDestination" ref="mailDestination" />
 <!-- Topic setting -->
 <property name="pubSubDomain" value="true"/>
</bean>
efine producer bean

?
1
2
3
<bean id="producer" class="com.sarf.jms.MessageProducerBean">
 <property name="jmsTemplate" ref="jmsTemplate" />
</bean>

Note : JmsTemplate is designed for use with Java EE containers which  provide connection pooling capabilities as standardized by the Java EE specifications. So in Non J2EE container, every call to the JmsTemplate.send() method  creates and destroys all the JMS resources (connections, consumers, and producers).
So, In non J2EE container, You should use a pooled connection factory for sending messages with JmsTemplate.

As we already know, our producer context xml file will look like this.
appProdContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:jaxrs="http://cxf.apache.org/jaxrs"
 xmlns:oxm="http://www.springframework.org/schema/oxm"
 xmlns="http://www.springframework.org/schema/beans"
 xmlns:amq="http://activemq.apache.org/schema/core"
 xmlns:jms="http://www.springframework.org/schema/jms"
 default-lazy-init="false"
 xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   http://www.springframework.org/schema/beans/spring-beans.xsd
   http://activemq.apache.org/schema/core 
   http://activemq.apache.org/schema/core/activemq-core.xsd
   http://www.springframework.org/schema/jms 
   http://www.springframework.org/schema/jms/spring-jms.xsd
   http://cxf.apache.org/jaxrs
   http://cxf.apache.org/schemas/jaxrs.xsd">

  <amq:connectionFactory id="connectionFactory"
    brokerURL="tcp://localhost:61616" closeTimeout="10" />

  <bean id="pooledJmsConnectionFactory"
   class="org.apache.activemq.pool.PooledConnectionFactory"
   init-method="start" destroy-method="stop">
  <!--  <property name="maxConnections" value="15" /> -->
   <property name="connectionFactory" ref="connectionFactory" />
<!--    <property name="expiryTimeout" value="-1" />
   <property name="maximumActive" value="100" /> -->
  </bean>

  <bean id="mailDestination"
     class="org.apache.activemq.command.ActiveMQTopic">
   <constructor-arg value="mail.topic" />
  </bean>
     
  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
   <property name="connectionFactory" ref="pooledJmsConnectionFactory" />
   <property name="defaultDestination" ref="mailDestination" />
   <!-- Topic setting -->
   <property name="pubSubDomain" value="true"/>
  </bean>

  <bean id="producer" class="com.springtraining.jmstopic.MessageProducerBean">
   <property name="jmsTemplate" ref="jmsTemplate" />
  </bean>

</beans>

We will define our producer bean and leverage Spring JMS template to send message to destination topic.

MessageProducerBean.java

package com.springtraining.jmstopic;

import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.support.JmsGatewaySupport;

public class MessageProducerBean extends JmsGatewaySupport{
  //Method receives String object and send
  //it to destination topic as BytesMessage
  public void sendMessage(final String myMessage) {
   getJmsTemplate().send(new MessageCreator() {
   public Message createMessage(Session session) throws JMSException {
   //Create byte message
   BytesMessage message = session.createBytesMessage();
   message.writeBytes(myMessage.getBytes());
   return message;
   }
   });
 }

}


To send a xml message to destination topic, we will use a simple main class ProducerTest.
ProducerTest.java
package com.springtraining.jmstopic;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;


public class ProducerTest {   
 public static void main(String[] args) {
    ApplicationContext context =
     new ClassPathXmlApplicationContext("appProdContext.xml");
     MessageProducerBean mp = (MessageProducerBean) 
                  context.getBean("producer");
     /*String msg = "<messageobject><mailId>hsinay@gmail.com</mailId>"+
      "<message>Hello, This is mail from hsinay@gmail.com</message></messageobject>";*/
     String msg = "<MessageObject><mailId>hsinay@gmail.com</mailId>"+
         "<message>Hello, This is mail from Yanish</message></MessageObject>";
     mp.sendMessage(msg);
     System.out.println("Message sent to destination");
     } 

}

Now we will focus on implementing our subscriber which will consume XML and convert it to MessageObject using Spring OXM (Object XML Mappers).

Our MessageObject class will look like this
MessageObject.java
package com.springtraining.jmstopic;

import java.io.Serializable;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;

@XmlAccessorType(XmlAccessType.FIELD)
@XmlRootElement(name = "MessageObject")
public class MessageObject implements Serializable{
  
  @XmlElement(name = "mailId")
  private String mailId;
   
  @XmlElement(name = "message")
  private String message;
     
 public String getMailId() {
  return mailId;
 }
 public void setMailId(String mailId) {
  this.mailId = mailId;
 }
 public String getMessage() {
  return message;
 }
 public void setMessage(String message) {
  this.message = message;
 }

}

As we configured publisher, we will configure subscriber in our appConsumerContext.xml file.

Step 1: Configure Connection Factory

?
1
2
3
4
5
6
7
8
9
10
11
<amq:connectionFactory id="connectionFactory"
  brokerURL="tcp://localhost:61616" closeTimeout="10" />
<bean id="pooledJmsConnectionFactory"
  class="org.apache.activemq.pool.PooledConnectionFactory"
  init-method="start" destroy-method="stop">
  <property name="maxConnections" value="15" />
  <property name="connectionFactory" ref="connectionFactory" />
  <property name="expiryTimeout" value="-1" />
  <property name="maximumActive" value="100" />
</bean>
Step 2: Declare OXM JAXB Marseller bean

?
1
2
3
<oxm:jaxb2-marshaller id="myMarshaller">
 <oxm:class-to-be-bound name="com.sarf.data.MessageObject" />
</oxm:jaxb2-marshaller>
Step 3: Declare Message Converter bean


?
1
2
3
4
5
<bean id="myMessageConverter"
 class="com.sarf.util.MyMarshallingMessageConverter">
 <property name="marshaller" ref="myMarshaller" />
 <property name="unmarshaller" ref="myMarshaller" />
</bean>
We register the MarshallingMessageConverter to use the JAXB2 marshaller for both marshaller abd unmarshaller. This converter bean will be used by spring message listener container to convert incoming message which contains XML into MessageObject using Spring OXM JAXB.

Step 4: Declare Consumer bean 

?
1
<bean id="consumer" class="com.sarf.jms.MessageConsumerBean" />
Step 5: Define JMS Listener 


?
1
2
3
4
5
6
<jms:listener-container connection-factory="pooledJmsConnectionFactory"
  acknowledge="auto" message-converter="myMessageConverter"
  destination-type="topic" >
  <jms:listener destination="mail.topic" ref="consumer"
   method="onMessage" />
</jms:listener-container>
Message listener container is used to receive messages from a JMS message queue/topic. Here we are creating a message listener container which is using consumer bean reference to delegate messages on onMessage() method.

So our final appConsumerContext.xml will look like
appConsumerContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:jaxrs="http://cxf.apache.org/jaxrs"
 xmlns:oxm="http://www.springframework.org/schema/oxm"
 xmlns="http://www.springframework.org/schema/beans"
 xmlns:amq="http://activemq.apache.org/schema/core"
 xmlns:jms="http://www.springframework.org/schema/jms"
 default-lazy-init="false"
 xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   http://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/oxm 
   http://www.springframework.org/schema/oxm/spring-oxm.xsd
   http://activemq.apache.org/schema/core 
   http://activemq.apache.org/schema/core/activemq-core.xsd
   http://www.springframework.org/schema/jms 
   http://www.springframework.org/schema/jms/spring-jms.xsd
   http://cxf.apache.org/jaxrs
   http://cxf.apache.org/schemas/jaxrs.xsd">

   <amq:connectionFactory id="connectionFactory"
    brokerURL="tcp://localhost:61616" closeTimeout="10" />

    <bean id="pooledJmsConnectionFactory"
          class="org.apache.activemq.pool.PooledConnectionFactory"
     init-method="start" destroy-method="stop">
      <!-- <property name="maxConnections" value="15" /> -->
      <property name="connectionFactory" ref="connectionFactory" />
<!--       <property name="expiryTimeout" value="-1" />
      <property name="maximumActive" value="100" /> -->
    </bean>

    <oxm:jaxb2-marshaller id="myMarshaller">
     <oxm:class-to-be-bound name="com.springtraining.jmstopic.MessageObject" />
    </oxm:jaxb2-marshaller>

    <!-- jaxb used for converting xml to object -->
    <bean id="myMessageConverter"
          class="com.springtraining.jmstopic.MyMarshallingMessageConverter">
     <property name="marshaller" ref="myMarshaller" />
     <property name="unmarshaller" ref="myMarshaller" />
    </bean>

    <bean id="consumer" class="com.springtraining.jmstopic.MessageConsumerBean" />

    <!-- destination-type="durableTopic" -->
    <jms:listener-container connection-factory="pooledJmsConnectionFactory"
 acknowledge="auto" message-converter="myMessageConverter"
 destination-type="topic">
 <jms:listener destination="mail.topic" ref="consumer"
  method="onMessage" />
    </jms:listener-container>

</beans>

After having this configuration file, Now its time to look at our Message Converter class and message consumer class.
MyMarshallingMessageConverter.java
package com.springtraining.jmstopic;

import javax.jms.JMSException;
import javax.jms.Message;
import org.springframework.jms.support.converter.MarshallingMessageConverter;
import org.springframework.jms.support.converter.MessageConversionException;

public class MyMarshallingMessageConverter 
 extends MarshallingMessageConverter {
  @Override
  public Object fromMessage(Message message) 
     throws JMSException, MessageConversionException{
     System.out.println(message.getJMSDestination());
 return super.fromMessage(message);
   }

}

MessageConsumerBean.java
package com.springtraining.jmstopic;

public class MessageConsumerBean {
public void onMessage(MessageObject message) {
 try {
     System.out.println("Mail # "+message.getMailId()+" received." + "Message:"+message.getMessage());
     } catch (Exception e) {
e.printStackTrace();
}
 }

}
We will use this simple ConsumerTest class to initialize the application context in main method to listen to topic.
ConsumerTest.java
package com.springtraining.jmstopic;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ConsumerTest {
 public static void main(String[] args) {
    ApplicationContext context =
     new ClassPathXmlApplicationContext("appConsumerContext.xml");
     System.out.println("Consumer listening !!!!!!");
   }

}

To run program in single eclipse. Running both Consumer/Publisher.
To.java
package com.springtraining.jmstopic;

public class To {

static String[] str = {};

public static void main(String... args){
ConsumerTest con = new ConsumerTest();
con.main(str);
ProducerTest test = new ProducerTest();
test.main(str);

}

}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sarf</groupId>
<artifactId>JMSMailSystem</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>JMSMailSystem</name>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId>
<configuration>
<warName>JMSMailSystem</warName>
<outputDirectory>D:\Server\jboss-as-7.1.0.Final\standalone\deployments</outputDirectory>
</configuration>
</plugin>
</plugins>
<finalName>JMSMailSystem</finalName>
</build>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>3.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>3.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>3.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-asm</artifactId>
<version>3.0.0.RELEASE</version>
</dependency>

<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-frontend-jaxrs</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>3.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>3.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>3.2.2.RELEASE</version>
</dependency>

<!-- <dependency> <groupId>javax.transaction</groupId> <artifactId>jta</artifactId>
<version>1.1</version> </dependency> -->
<dependency>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.6</version>
</dependency>

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.7.0</version>
</dependency>

</dependencies>
<repositories>
<repository>
<id>repository.jboss.org-public</id>
<name>JBoss repository</name>
<url>https://repository.jboss.org/nexus/content/groups/public</url>
</repository>

</repositories>
</project>


No comments:

Post a Comment