How change default channel on Spring Integration Flow with Java DSL -
i not understand well, can change error channel integration flow. need handle exceptions invalidaccesstokenexception can thrown in subflow inside router.
what i've tried handle exceptions default channel "errorchannel" by:
@bean public integrationflow errorflow() {    return integrationflows.from("errorchannel")        .handle("errorservice", "handleerror")        .get(); } this error treated method following signature:
void handleerror(message<exception> exception) but default behavior persists, still shows trace console.
so question is: in java dsl how can configure error channel? possible map group of exceptions particular error channel make management service of group more cohesive?.
the configuration of integration flow explain below:
@configuration @integrationcomponentscan public class infrastructureconfiguration {      private static logger logger = loggerfactory.getlogger(infrastructureconfiguration.class);      @autowired     private ifacebookservice facebookservice;      @autowired     private iinstagramservice instagramservice;      @autowired     private iyoutubeservice youtubeservice;      /**      * pollers builder factory can used configure common bean definitions or       * created integrationflowbuilder eip-methods      */     @bean(name = pollermetadata.default_poller)     public pollermetadata poller() {         return pollers.fixeddelay(10, timeunit.seconds).get();     }      @bean     public taskexecutor taskexecutor() {         threadpooltaskexecutor executor = new threadpooltaskexecutor();         executor.setcorepoolsize(5);         executor.setmaxpoolsize(10);         executor.setqueuecapacity(25);         return executor;     }      /**      * mongodbmessagesource instance of messagesource returns message payload       * result of execution of query      */     @bean     @autowired     public messagesource<object> mongomessagesource(mongodbfactory mongo) {         mongodbmessagesource messagesource = new mongodbmessagesource(mongo, new literalexpression("{}"));         messagesource.setexpectsingleresult(false);         messagesource.setentityclass(userentity.class);         messagesource.setcollectionnameexpression(new literalexpression("users"));         return messagesource;     }      @bean     @serviceactivator(inputchannel = "storechannel")     public messagehandler mongodbadapter(mongodbfactory mongo) throws exception {         mongodbstoringmessagehandler adapter = new mongodbstoringmessagehandler(mongo);         adapter.setcollectionnameexpression(new literalexpression("comments"));         return adapter;     }      @bean     public integrationflow errorflow() {         return integrationflows.from("errorchannel")                 .handle("errorservice", "handleerror")                 .get();     }       @bean     @autowired     public integrationflow processusers(mongodbfactory mongo, pollermetadata poller) {         return integrationflows.from(mongomessagesource(mongo), c -> c.poller(poller))                 .<list<userentity>, map<objectid, list<socialmediaentity>>>transform(userentitieslist                         -> userentitieslist.stream().collect(collectors.tomap(userentity::getid, userentity::getsocialmedia))                 )                 .split(new abstractmessagesplitter() {                     @override                     protected object splitmessage(message<?> msg) {                         return ((map<objectid, list<socialmediaentity>>) msg.getpayload()).entryset();                     }                 })                 .channel("directchannel_1")                 .enrichheaders(s -> s.headerexpressions(h -> h.put("user-id", "payload.key")))                 .split(new abstractmessagesplitter() {                     @override                     protected object splitmessage(message<?> msg) {                         return ((entry<objectid, list<socialmediaentity>>) msg.getpayload()).getvalue();                     }                 })                 .channel(messagechannels.executor("executorchannel", this.taskexecutor()))                 .<socialmediaentity, socialmediatypeenum>route(p -> p.gettype(),                         m                         -> m.subflowmapping(socialmediatypeenum.facebook,                                  sf -> sf.handle(socialmediaentity.class, (p, h) -> facebookservice.getcomments(p.getaccesstoken())))                             .subflowmapping(socialmediatypeenum.youtube,                                  sf -> sf.handle(socialmediaentity.class, (p, h) -> youtubeservice.getcomments(p.getaccesstoken())))                             .subflowmapping(socialmediatypeenum.instagram,                                  sf -> sf.handle(socialmediaentity.class, (p, h) -> instagramservice.getcomments(p.getaccesstoken())))                 )                 .channel("directchannel_2")                 .aggregate()                 .channel("directchannel_3")                 .<list<list<commententity>>, list<commententity>>transform(comments ->                          comments.stream().flatmap(list::stream).collect(collectors.tolist()))                 .aggregate()                 .channel("directchannel_4")                 .<list<list<commententity>>, list<commententity>>transform(comments ->                          comments.stream().flatmap(list::stream).collect(collectors.tolist()))                 .channel("storechannel")                 .get();     }       @postconstruct     protected void init(){         assert.notnull(facebookservice, "the facebook service can not null");         assert.notnull(instagramservice, "the instagram service can not null");         assert.notnull(youtubeservice, "the youtube service can not null");     }  } an example of exception can launched in of social networking services this:
public class invalidaccesstokenexception extends runtimeexception {      private socialmediatypeenum socialmediatype;     private string accesstoken;      public invalidaccesstokenexception(socialmediatypeenum socialmediatype, string accesstoken) {         this.socialmediatype = socialmediatype;         this.accesstoken = accesstoken;     }      public socialmediatypeenum getsocialmediatype() {         return socialmediatype;     }      public string getaccesstoken() {         return accesstoken;     } } is possible bind exception particular error channel?.
thanks in advance.
i have tried changing error channel using poolspec follows:
@bean(name = pollermetadata.default_poller) public pollermetadata poller() {     return pollers.fixeddelay(10, timeunit.seconds)           .errorchannel("customerrorchannel")          .get(); } but messages still continue go default channel 'errorchannel'.
here excerpt of log messages:
2017-07-25 20:20:51.922 debug 3268 --- [ taskexecutor-4] o.s.i.channel.publishsubscribechannel    : presend on channel 'errorchannel', message: errormessage [payload=org.springframework.messaging.messagehandlingexception: nested exception sanchez.sanchez.sergio.exception.invalidaccesstokenexception, failedmessage=genericmessage [payload=socialmediaentity{id=59778bf93681ac0cc4e20089, accesstoken=maite_access_token_facebook, type=facebook, invalidtoken=false}, headers={sequencenumber=1, sequencedetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 2, 5]], mongo_collectionname=users, sequencesize=3, user-id=59778bf93681ac0cc4e20094, correlationid=6665f8ee-2bc9-e6c0-17de-35d63a0afaaa, id=6925b3ea-0b30-3304-b7f0-d235959d7db1, timestamp=1501006851466}], headers={id=6eb1789c-21d7-1814-48ee-77553abd99b9, timestamp=1501006851922}] 2017-07-25 20:20:51.923 debug 3268 --- [ taskexecutor-5] o.s.integration.handler.logginghandler   : _org.springframework.integration.errorlogger.handler received message: errormessage [payload=org.springframework.messaging.messagehandlingexception: nested exception sanchez.sanchez.sergio.exception.invalidaccesstokenexception, failedmessage=genericmessage [payload=socialmediaentity{id=59778bf93681ac0cc4e2008c, accesstoken=david_access_token_facebook, type=facebook, invalidtoken=false}, headers={sequencenumber=1, sequencedetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 4, 5]], mongo_collectionname=users, sequencesize=3, user-id=59778bf93681ac0cc4e20095, correlationid=254f918f-52d2-1cff-7ba5-7343a39a8941, id=3a5989df-2af7-56d0-2697-5fcaf75a2891, timestamp=1501006851527}], headers={id=c270d1ef-68ac-cb6b-245f-11e567b5b7e8, timestamp=1501006851922}] 2017-07-25 20:20:51.922 debug 3268 --- [ taskexecutor-3] o.s.integration.handler.logginghandler   : _org.springframework.integration.errorlogger.handler received message: errormessage [payload=org.springframework.messaging.messagehandlingexception: nested exception sanchez.sanchez.sergio.exception.invalidaccesstokenexception, failedmessage=genericmessage [payload=socialmediaentity{id=59778bf93681ac0cc4e2008f, accesstoken=elena_access_token_facebook, type=facebook, invalidtoken=false}, headers={sequencenumber=1, sequencedetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 5, 5]], mongo_collectionname=users, sequencesize=3, user-id=59778bf93681ac0cc4e20096, correlationid=1aba58bf-733e-f1ed-a82e-67f482ff3531, id=5eb9c517-f451-e2d6-938d-e436bb362aef, timestamp=1501006851549}], headers={id=30a965d3-f829-d6cf-c971-7b4ed2f15f88, timestamp=1501006851922}] 2017-07-25 20:20:51.923 debug 3268 --- [ taskexecutor-4] o.s.integration.handler.logginghandler   : _org.springframework.integration.errorlogger.handler received message: errormessage [payload=org.springframework.messaging.messagehandlingexception: nested exception sanchez.sanchez.sergio.exception.invalidaccesstokenexception, failedmessage=genericmessage [payload=socialmediaentity{id=59778bf93681ac0cc4e20089, accesstoken=maite_access_token_facebook, type=facebook, invalidtoken=false}, headers={sequencenumber=1, sequencedetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 2, 5]], mongo_collectionname=users, sequencesize=3, user-id=59778bf93681ac0cc4e20094, correlationid=6665f8ee-2bc9-e6c0-17de-35d63a0afaaa, id=6925b3ea-0b30-3304-b7f0-d235959d7db1, timestamp=1501006851466}], headers={id=6eb1789c-21d7-1814-48ee-77553abd99b9, timestamp=1501006851922}] 2017-07-25 20:20:51.927 error 3268 --- [ taskexecutor-3] o.s.integration.handler.logginghandler   : org.springframework.messaging.messagehandlingexception: nested exception sanchez.sanchez.sergio.exception.invalidaccesstokenexception, failedmessage=genericmessage [payload=socialmediaentity{id=59778bf93681ac0cc4e2008f, accesstoken=elena_access_token_facebook, type=facebook, invalidtoken=false}, headers={sequencenumber=1, sequencedetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 5, 5]], mongo_collectionname=users, sequencesize=3, user-id=59778bf93681ac0cc4e20096, correlationid=1aba58bf-733e-f1ed-a82e-67f482ff3531, id=5eb9c517-f451-e2d6-938d-e436bb362aef, timestamp=1501006851549}]     @ org.springframework.integration.dsl.lambdamessageprocessor.processmessage(lambdamessageprocessor.java:130)     @ org.springframework.integration.handler.serviceactivatinghandler.handlerequestmessage(serviceactivatinghandler.java:89) i have tried 2 new approaches change default error channel:
- create custom errorhandler error channel , declare in poolspec: - @bean public messagechannel customerrorchannel() { return messagechannels.direct("customerrorchannel").get(); } - @bean public errorhandler errorhandler() { messagepublishingerrorhandler messagepublishingerrorhandler = new messagepublishingerrorhandler(); messagepublishingerrorhandler.setdefaulterrorchannel(customerrorchannel()); return messagepublishingerrorhandler; } - @bean(name = pollermetadata.default_poller) public pollermetadata poller() { return pollers.fixeddelay(10, timeunit.seconds) .errorhandler(errorhandler()) .get(); } 
the error messages still going default channel 'errorchannel'.
- adding messageheaders.error_channel header explicitly indicate error channel: - .enrichheaders(s -> s.headerexpressions(h -> h.put("user-id", "payload.key")) .header(messageheaders.error_channel, "customerrorchannel") ) 
if approach works, error messages directed "customerrorchannel".
yes, can that. there errormessageexceptiontyperouter similar task. so, able route invalidaccesstokenexception specific channel.
also aware pollerspec can supplied errorchannel(), don't need worry exceptions go default errorchannel.
update
ok. after code investigation see this:
.channel(messagechannels.executor("executorchannel", this.taskexecutor())) that means shift message different thread and, therefore, exception there far away try...catch of poller's algorithm send custom customerrorchannel.
the executorchannel has logic:
    if (!(this.executor instanceof errorhandlingtaskexecutor)) {         errorhandler errorhandler = new messagepublishingerrorhandler(                 new beanfactorychannelresolver(this.getbeanfactory()));         this.executor = new errorhandlingtaskexecutor(this.executor, errorhandler);     } where messagepublishingerrorhandler based on errorchannel default. can here declaration of similar bean taskexecutor() bean , injection customerrorchannel messagepublishingerrorhandler.
another option should work here messagepublishingerrorhandler errorchannel header population upstream executorchannel definition.
Comments
Post a Comment