Error handling in Spring RabbitMQ -
i trying integrate rabbit mq broker in our spring application. able consume messages need add error handling. listener consumes message , apply business logic it, include db writes. business logic can throw exception.
in case of these exceptions need
- rollback db writes.
- write error table in db, indicating msg failure.
- message should not re-queued.
for
requirement #1 - have added txmanager in config.xml , annotated listner.listen() method @transactional
requirement #2 - have added error handler , custom implementation of defaultexceptionstrategey
requirement #3 - have set defaultrequeuerejected=false
but when businessruntimeexception thrown listener, errorhandler not getting invoked. don't know missing. errorhandler invoked exceptions?
config.xml
<tx:annotation-driven transaction-manager="txmanager" /> <bean id="txmanager" class="org.springframework.transaction.jta.jtatransactionmanager"> <property name="allowcustomisolationlevels" value="true" /> <rabbit:connection-factory id="rabbitconnectionfactory"/> <rabbit:template id="rabbittemplate" connection- factory="rabbitconnectionfactory" message-converter="jsonmessageconverter" channel-transacted="true"/> <rabbit:admin id="rabbitadmin" connection-factory="rabbitconnectionfactory"/> rabbitmqconfiguration.java
@configuration @enablerabbit public class rabbitmqconfiguration { @autowired private connectionfactory rabbitconnectionfactory; @autowired private messageconverter jsonmessageconverter; @bean public simplerabbitlistenercontainerfactory exportpartylistenercontainer() { simplerabbitlistenercontainerfactory listenercontainer = new simplerabbitlistenercontainerfactory(); listenercontainer.setconnectionfactory(rabbitconnectionfactory); listenercontainer.setmessageconverter(jsonmessageconverter); listenercontainer.setacknowledgemode(acknowledgemode.auto); listenercontainer.setchanneltransacted(true); listenercontainer.setdefaultrequeuerejected(false); listenercontainer.seterrorhandler(errorhandler()); return listenercontainer; } @bean public errorhandler errorhandler() { return new conditionalrejectingerrorhandler(new exceptionstrategy()); } } exceptionstrategy.java
public class exceptionstrategy extends defaultexceptionstrategy { @autowired private dao daobean; @override public boolean isfatal(throwable t) { if (t instanceof businessruntimeexception) { businessruntimeexception businessexception = (businessruntimeexception) t; //db call daobean.updaterecordstaus(); return true; } if (t instanceof listenerexecutionfailedexception) { listenerexecutionfailedexception lefe = (listenerexecutionfailedexception) t; logger.error( "failed process inbound message queue " + lefe.getfailedmessage().getmessageproperties().getconsumerqueue() + "; failed message: " + lefe.getfailedmessage(), t); } return super.isfatal(t); }}
Comments
Post a Comment