Tridion Object Cache – Apache MQ base invalidation

The following code below is based on the dd4t-cachechannel jar file available at here. More discussion on this topic is mentioned in a question asked on stackexchange here

Tridion object cache invalidation using JMSCacheChannelConnector provided with Tridion CD send messages in binary which DD4T 2.0 client based on .net is not able to understand. The jar file available in link above acts as a deployer extension and converts the binary message into textmessages so that any .net client can understand it. The problem with this is once it gets converted into text message, Tridion object cache subscriber in cd_cache jar start throwing error “Ignoring unexpected message type”

To solve this issue we need to modify the subscriber in cd_cache as well so that it should be able to understand text message. Below is the full code listing of the changes done in dd4t-cachechannel jar file.




package org.dd4t.cache;

import com.fasterxml.jackson.core.JsonProcessingException;



import com.tridion.cache.CacheChannelEventListener;
import com.tridion.cache.CacheEvent;
import com.tridion.cache.CacheException;
import com.tridion.cache.JMSCacheChannelConnector;
import com.tridion.configuration.Configuration;
import com.tridion.configuration.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;
import javax.jms.IllegalStateException;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;


import java.util.List;
import java.util.Properties;



public class TextJMSCacheChannelConnector extends JMSCacheChannelConnector {



private static Logger LOG = LoggerFactory.getLogger(TextJMSCacheChannelConnector.class);
private volatile boolean isValid = false;
private boolean isClosed = false;
private void verifyOpenState() throws IllegalStateException
{



if (this.isClosed) {
throw new IllegalStateException("Method was called on closed instance");
}



/*throw new ArithmeticException("You can\'t divide by zero!"); */ }
private MessageListener jmsTopicListener = new MessageListener()
{
public void onMessage(Message message)
{
handleJmsMessage(message);
}
};
private ExceptionListener jmsExceptionListener = new ExceptionListener()
{
public void onException(JMSException e)
{
handleJmsException(e);
}
};
private void handleJmsException(JMSException exception)
{
if (!this.isClosed) {
LOG.error("JMS Exception occurred. Attempting setting up JMS connectivity again", exception);
}
if (this.isValid)
{
this.isValid = false;
fireDisconnect();
}
}
private CacheChannelEventListener listener = emptyListener;
private static CacheChannelEventListener emptyListener = new CacheChannelEventListener()
{
public void handleRemoteEvent(CacheEvent event) {}



public void handleDisconnect() {}



public void handleConnect() {}
};
public void setListener(CacheChannelEventListener listener)
{
LOG.debug("Setting listner");
this.listener = (listener != null ? listener : emptyListener);
}
private void fireDisconnect()
{
if (!this.isClosed) {
this.listener.handleDisconnect();
}
}
public void validate()
throws CacheException
{



try {
verifyOpenState();
} catch (IllegalStateException e1) {
throw new CacheException("The conneciton is closed");
}



if (!this.isValid) {
try
{
this.client.cleanupIgnoringErrors();
this.client.connect(this.jmsTopicListener, this.jmsExceptionListener);
this.isValid = true;
this.listener.handleConnect();
}
catch (JMSException e)
{
this.client.cleanupIgnoringErrors();
throw new CacheException("JMS Exception occurred. Attempting setting up JMS connectivity later", e);
}
catch (NamingException e)
{
this.client.cleanupIgnoringErrors();
throw new CacheException("Unable to initialize JMS CacheChannelConnector", e);
}
}
}



protected void handleJmsMessage(Message msg)
{
LOG.debug("-----------------------------------------------------------------");
try {
if (msg instanceof TextMessage)
{
TextMessage textMessage = (TextMessage) msg;
Object payload;
payload =CacheEventSerializer.deSerialize(textMessage.getText());



if ((payload instanceof CacheEvent))
{
if (!this.isClosed) {



this.listener.handleRemoteEvent((CacheEvent)payload);
}
else
{
LOG.debug("Aborting, Listener is already closed");
}
}
else
{
LOG.debug("Payload is not an instance of cacheEvent");
}
}
else
{ LOG.debug("Handle JMS Object Message");
super.handleJmsMessage(msg);
}
} catch (Exception e) {



LOG.error(e.getMessage());
}
LOG.debug("-----------------------------------------------------------------");
}



public void configure(Configuration configuration) throws ConfigurationException {
LOG.info("Loading TextJMSCacheChannelConnector");
Properties jndiContextProperties = null;
if (configuration.hasChild("JndiContext"))
{
Configuration jndiConfig = configuration.getChild("JndiContext");
jndiContextProperties = new Properties();
List configs = jndiConfig.getChildrenByName("Property");
for (Configuration config : configs)
{
String propertyKey = config.getAttribute("Name");
String propertyValue = config.getAttribute("Value");
jndiContextProperties.setProperty(propertyKey, propertyValue);
LOG.debug("JMS Connector JNDI Property '{}' set with value '{}'",propertyKey,propertyValue);
}
}
String topicName = configuration.getAttribute("Topic", "TridionCacheChannel");
String topicConnectionFactoryName = configuration.getAttribute("TopicConnectionFactory", "TopicConnectionFactory");



LOG.debug("JMS Connector TopicConnectionFactory name is {}. Topic is: {}",topicConnectionFactoryName, topicName);



String strategy = configuration.getAttribute("Strategy", "AsyncJMS11");



LOG.debug("JMS strategy is: {} ", strategy);



if (("AsyncJMS11".equals(strategy)) || ("AsyncJMS11MDB".equals(strategy))) {
this.client = new TextJMS11Approach(jndiContextProperties, topicConnectionFactoryName, topicName, "AsyncJMS11MDB".equals(strategy));
} else if ("SyncJMS11".equals(strategy)) {
this.client = new SynchronousJMS11Approach(jndiContextProperties, topicConnectionFactoryName, topicName);
} else if (("AsyncJMS10".equals(strategy)) || ("AsyncJMS10MDB".equals(strategy))) {
this.client = new TextJMS10Approach(jndiContextProperties, topicConnectionFactoryName, topicName, "AsyncJMS10MDB".equals(strategy));
} else {
throw new ConfigurationException("Unknown 'Strategy':" + strategy + " for the JMS Connector");
}



}



public class TextJMS11Approach extends JMSCacheChannelConnector.JMS11Approach {
private TopicConnection topicConnection = null;
private TopicSession topicPublisherSession = null;
private TopicPublisher topicPublisher = null;
private TopicSubscriber topicSubscriber = null;
private TopicSession topicSubscriberSession = null;



protected TextMessage publicationTextMessage;



public TextJMS11Approach(Properties jndiProperties, String factoryName, String topicName, boolean isMDBMode) {
super(jndiProperties, factoryName, topicName, isMDBMode);
}



public void connect(MessageListener messageListener, ExceptionListener exceptionListener) throws JMSException, NamingException {
Context jndiContext = this.jndiProperties != null ? new InitialContext(this.jndiProperties) : new InitialContext();
TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory)jndiContext.lookup(this.topicConnectionFactoryName);
Topic topic = (Topic)jndiContext.lookup(this.topicName);



this.topicConnection = topicConnectionFactory.createTopicConnection();
if (!this.isMDBMode) {
try
{
this.topicConnection.setExceptionListener(exceptionListener);
}
catch (JMSException e)
{
TextJMSCacheChannelConnector.LOG.error("setExceptionListener failed. Most likely due to container restrictions. In these environments the MDB com.tridion.cache.JMSBean must be setup instead", e);
}
}
this.topicConnection.start();
if (!this.isMDBMode) {
try
{
this.topicSubscriberSession = this.topicConnection.createTopicSession(false, 1);
this.topicSubscriber = this.topicSubscriberSession.createSubscriber(topic, null, true);
this.topicSubscriber.setMessageListener(messageListener);
}
catch (JMSException e)
{
TextJMSCacheChannelConnector.LOG.error("setMessageListener failed. Most likely due to container restrictions. In these environments the MDB com.tridion.cache.JMSBean must be setup instead", e);
}
}
this.topicPublisherSession = this.topicConnection.createTopicSession(false, 1);
this.topicPublisher = this.topicPublisherSession.createPublisher(topic);
this.publicationTextMessage = this.topicPublisherSession.createTextMessage();
LOG.debug("Connected to queue, with topic: {} ", topic);
}



public void broadcastEvent(CacheEvent event) throws JMSException {
try {
String serialized = CacheEventSerializer.serialize(event);
this.publicationTextMessage.setText(serialized);
this.topicPublisher.publish(this.publicationTextMessage);
LOG.debug("Published event: {}", serialized);
} catch (JsonProcessingException e) {
LOG.error("Cannot serialize cache event into JSON", e);
}
}
}



public class TextJMS10Approach extends JMSCacheChannelConnector.JMS10Approach {
public TextJMS10Approach(Properties jndiProperties, String factoryName, String topicName, boolean isMDBMode) {
super(jndiProperties, factoryName, topicName, isMDBMode);
}
}
public class TextSynchronousJMS11Approach extends JMSCacheChannelConnector.SynchronousJMS11Approach {
public TextSynchronousJMS11Approach (Properties jndiProperties, String factoryName, String topicName) {
super(jndiProperties, factoryName, topicName);
}
}
}



<hr />



package org.dd4t.cache;


import java.io.IOException;


import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.tridion.cache.CacheEvent;



public class CacheEventSerializer {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();



public static String serialize(final CacheEvent cacheEvent) throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsString(cacheEvent);
}
public static CacheEvent deSerialize(final String textmessage) throws JsonParseException, JsonMappingException, IOException {


Dd4tCacheEvent dd4tCacheEvent;


dd4tCacheEvent = OBJECT_MAPPER.readValue(textmessage,Dd4tCacheEvent.class);
return new CacheEvent(dd4tCacheEvent.regionPath,dd4tCacheEvent.key,(int) dd4tCacheEvent.type);


}


}


<hr />


package org.dd4t.cache;


import com.fasterxml.jackson.annotation.JsonProperty;


public class Dd4tCacheEvent {


@JsonProperty("regionPath")
public String regionPath;
@JsonProperty("key")
public String key;
@JsonProperty("type")
public int type;
public Dd4tCacheEvent()
{}
}


As you can see in the code validate() method is overriden in the class TextJMSCacheChannelConnector so that we can provide our own handleJmsMessage(Message msg) method. This method checks if the message is a text message then convert it into CacheEvent object and pass to handleRemoteEvent() method of CacheChannel class to invalidate the message.

If the message is not a text message it will follow the normal flow.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s