Error handling in Spring RabbitMQ -


i trying integrate rabbit mq broker in our spring application. able consume messages need add error handling. listener consumes message , apply business logic it, include db writes. business logic can throw exception.

in case of these exceptions need

  1. rollback db writes.
  2. write error table in db, indicating msg failure.
  3. message should not re-queued.

for

requirement #1 - have added txmanager in config.xml , annotated listner.listen() method @transactional

requirement #2 - have added error handler , custom implementation of defaultexceptionstrategey

requirement #3 - have set defaultrequeuerejected=false

but when businessruntimeexception thrown listener, errorhandler not getting invoked. don't know missing. errorhandler invoked exceptions?

config.xml

<tx:annotation-driven transaction-manager="txmanager" /> <bean id="txmanager" class="org.springframework.transaction.jta.jtatransactionmanager"> <property name="allowcustomisolationlevels" value="true" /> 

<rabbit:connection-factory id="rabbitconnectionfactory"/> <rabbit:template id="rabbittemplate" connection- factory="rabbitconnectionfactory" message-converter="jsonmessageconverter"  channel-transacted="true"/> <rabbit:admin id="rabbitadmin" connection-factory="rabbitconnectionfactory"/>  

rabbitmqconfiguration.java

@configuration @enablerabbit public class rabbitmqconfiguration {  @autowired private connectionfactory rabbitconnectionfactory;  @autowired private messageconverter jsonmessageconverter;  @bean public simplerabbitlistenercontainerfactory exportpartylistenercontainer() {     simplerabbitlistenercontainerfactory listenercontainer = new simplerabbitlistenercontainerfactory();     listenercontainer.setconnectionfactory(rabbitconnectionfactory);     listenercontainer.setmessageconverter(jsonmessageconverter);     listenercontainer.setacknowledgemode(acknowledgemode.auto);     listenercontainer.setchanneltransacted(true);     listenercontainer.setdefaultrequeuerejected(false);     listenercontainer.seterrorhandler(errorhandler());     return listenercontainer; }  @bean public errorhandler errorhandler() {     return new conditionalrejectingerrorhandler(new exceptionstrategy()); } }    

exceptionstrategy.java

public class exceptionstrategy extends defaultexceptionstrategy {  @autowired private dao daobean;  @override public boolean isfatal(throwable t) {      if (t instanceof businessruntimeexception) {         businessruntimeexception businessexception = (businessruntimeexception) t;         //db call         daobean.updaterecordstaus();         return true;     }      if (t instanceof listenerexecutionfailedexception) {         listenerexecutionfailedexception lefe = (listenerexecutionfailedexception) t;         logger.error(                 "failed process inbound message queue " + lefe.getfailedmessage().getmessageproperties().getconsumerqueue()                         + "; failed message: " + lefe.getfailedmessage(),                 t);     }     return super.isfatal(t); }}    


Comments

Popular posts from this blog

python - Selenium remoteWebDriver (& SauceLabs) Firefox moseMoveTo action exception -

html - How to custom Bootstrap grid height? -

transpose - Maple isnt executing function but prints function term -