How change default channel on Spring Integration Flow with Java DSL -
i not understand well, can change error channel integration flow. need handle exceptions invalidaccesstokenexception can thrown in subflow inside router.
what i've tried handle exceptions default channel "errorchannel" by:
@bean public integrationflow errorflow() { return integrationflows.from("errorchannel") .handle("errorservice", "handleerror") .get(); }
this error treated method following signature:
void handleerror(message<exception> exception)
but default behavior persists, still shows trace console.
so question is: in java dsl how can configure error channel? possible map group of exceptions particular error channel make management service of group more cohesive?.
the configuration of integration flow explain below:
@configuration @integrationcomponentscan public class infrastructureconfiguration { private static logger logger = loggerfactory.getlogger(infrastructureconfiguration.class); @autowired private ifacebookservice facebookservice; @autowired private iinstagramservice instagramservice; @autowired private iyoutubeservice youtubeservice; /** * pollers builder factory can used configure common bean definitions or * created integrationflowbuilder eip-methods */ @bean(name = pollermetadata.default_poller) public pollermetadata poller() { return pollers.fixeddelay(10, timeunit.seconds).get(); } @bean public taskexecutor taskexecutor() { threadpooltaskexecutor executor = new threadpooltaskexecutor(); executor.setcorepoolsize(5); executor.setmaxpoolsize(10); executor.setqueuecapacity(25); return executor; } /** * mongodbmessagesource instance of messagesource returns message payload * result of execution of query */ @bean @autowired public messagesource<object> mongomessagesource(mongodbfactory mongo) { mongodbmessagesource messagesource = new mongodbmessagesource(mongo, new literalexpression("{}")); messagesource.setexpectsingleresult(false); messagesource.setentityclass(userentity.class); messagesource.setcollectionnameexpression(new literalexpression("users")); return messagesource; } @bean @serviceactivator(inputchannel = "storechannel") public messagehandler mongodbadapter(mongodbfactory mongo) throws exception { mongodbstoringmessagehandler adapter = new mongodbstoringmessagehandler(mongo); adapter.setcollectionnameexpression(new literalexpression("comments")); return adapter; } @bean public integrationflow errorflow() { return integrationflows.from("errorchannel") .handle("errorservice", "handleerror") .get(); } @bean @autowired public integrationflow processusers(mongodbfactory mongo, pollermetadata poller) { return integrationflows.from(mongomessagesource(mongo), c -> c.poller(poller)) .<list<userentity>, map<objectid, list<socialmediaentity>>>transform(userentitieslist -> userentitieslist.stream().collect(collectors.tomap(userentity::getid, userentity::getsocialmedia)) ) .split(new abstractmessagesplitter() { @override protected object splitmessage(message<?> msg) { return ((map<objectid, list<socialmediaentity>>) msg.getpayload()).entryset(); } }) .channel("directchannel_1") .enrichheaders(s -> s.headerexpressions(h -> h.put("user-id", "payload.key"))) .split(new abstractmessagesplitter() { @override protected object splitmessage(message<?> msg) { return ((entry<objectid, list<socialmediaentity>>) msg.getpayload()).getvalue(); } }) .channel(messagechannels.executor("executorchannel", this.taskexecutor())) .<socialmediaentity, socialmediatypeenum>route(p -> p.gettype(), m -> m.subflowmapping(socialmediatypeenum.facebook, sf -> sf.handle(socialmediaentity.class, (p, h) -> facebookservice.getcomments(p.getaccesstoken()))) .subflowmapping(socialmediatypeenum.youtube, sf -> sf.handle(socialmediaentity.class, (p, h) -> youtubeservice.getcomments(p.getaccesstoken()))) .subflowmapping(socialmediatypeenum.instagram, sf -> sf.handle(socialmediaentity.class, (p, h) -> instagramservice.getcomments(p.getaccesstoken()))) ) .channel("directchannel_2") .aggregate() .channel("directchannel_3") .<list<list<commententity>>, list<commententity>>transform(comments -> comments.stream().flatmap(list::stream).collect(collectors.tolist())) .aggregate() .channel("directchannel_4") .<list<list<commententity>>, list<commententity>>transform(comments -> comments.stream().flatmap(list::stream).collect(collectors.tolist())) .channel("storechannel") .get(); } @postconstruct protected void init(){ assert.notnull(facebookservice, "the facebook service can not null"); assert.notnull(instagramservice, "the instagram service can not null"); assert.notnull(youtubeservice, "the youtube service can not null"); } }
an example of exception can launched in of social networking services this:
public class invalidaccesstokenexception extends runtimeexception { private socialmediatypeenum socialmediatype; private string accesstoken; public invalidaccesstokenexception(socialmediatypeenum socialmediatype, string accesstoken) { this.socialmediatype = socialmediatype; this.accesstoken = accesstoken; } public socialmediatypeenum getsocialmediatype() { return socialmediatype; } public string getaccesstoken() { return accesstoken; } }
is possible bind exception particular error channel?.
thanks in advance.
i have tried changing error channel using poolspec follows:
@bean(name = pollermetadata.default_poller) public pollermetadata poller() { return pollers.fixeddelay(10, timeunit.seconds) .errorchannel("customerrorchannel") .get(); }
but messages still continue go default channel 'errorchannel'.
here excerpt of log messages:
2017-07-25 20:20:51.922 debug 3268 --- [ taskexecutor-4] o.s.i.channel.publishsubscribechannel : presend on channel 'errorchannel', message: errormessage [payload=org.springframework.messaging.messagehandlingexception: nested exception sanchez.sanchez.sergio.exception.invalidaccesstokenexception, failedmessage=genericmessage [payload=socialmediaentity{id=59778bf93681ac0cc4e20089, accesstoken=maite_access_token_facebook, type=facebook, invalidtoken=false}, headers={sequencenumber=1, sequencedetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 2, 5]], mongo_collectionname=users, sequencesize=3, user-id=59778bf93681ac0cc4e20094, correlationid=6665f8ee-2bc9-e6c0-17de-35d63a0afaaa, id=6925b3ea-0b30-3304-b7f0-d235959d7db1, timestamp=1501006851466}], headers={id=6eb1789c-21d7-1814-48ee-77553abd99b9, timestamp=1501006851922}] 2017-07-25 20:20:51.923 debug 3268 --- [ taskexecutor-5] o.s.integration.handler.logginghandler : _org.springframework.integration.errorlogger.handler received message: errormessage [payload=org.springframework.messaging.messagehandlingexception: nested exception sanchez.sanchez.sergio.exception.invalidaccesstokenexception, failedmessage=genericmessage [payload=socialmediaentity{id=59778bf93681ac0cc4e2008c, accesstoken=david_access_token_facebook, type=facebook, invalidtoken=false}, headers={sequencenumber=1, sequencedetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 4, 5]], mongo_collectionname=users, sequencesize=3, user-id=59778bf93681ac0cc4e20095, correlationid=254f918f-52d2-1cff-7ba5-7343a39a8941, id=3a5989df-2af7-56d0-2697-5fcaf75a2891, timestamp=1501006851527}], headers={id=c270d1ef-68ac-cb6b-245f-11e567b5b7e8, timestamp=1501006851922}] 2017-07-25 20:20:51.922 debug 3268 --- [ taskexecutor-3] o.s.integration.handler.logginghandler : _org.springframework.integration.errorlogger.handler received message: errormessage [payload=org.springframework.messaging.messagehandlingexception: nested exception sanchez.sanchez.sergio.exception.invalidaccesstokenexception, failedmessage=genericmessage [payload=socialmediaentity{id=59778bf93681ac0cc4e2008f, accesstoken=elena_access_token_facebook, type=facebook, invalidtoken=false}, headers={sequencenumber=1, sequencedetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 5, 5]], mongo_collectionname=users, sequencesize=3, user-id=59778bf93681ac0cc4e20096, correlationid=1aba58bf-733e-f1ed-a82e-67f482ff3531, id=5eb9c517-f451-e2d6-938d-e436bb362aef, timestamp=1501006851549}], headers={id=30a965d3-f829-d6cf-c971-7b4ed2f15f88, timestamp=1501006851922}] 2017-07-25 20:20:51.923 debug 3268 --- [ taskexecutor-4] o.s.integration.handler.logginghandler : _org.springframework.integration.errorlogger.handler received message: errormessage [payload=org.springframework.messaging.messagehandlingexception: nested exception sanchez.sanchez.sergio.exception.invalidaccesstokenexception, failedmessage=genericmessage [payload=socialmediaentity{id=59778bf93681ac0cc4e20089, accesstoken=maite_access_token_facebook, type=facebook, invalidtoken=false}, headers={sequencenumber=1, sequencedetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 2, 5]], mongo_collectionname=users, sequencesize=3, user-id=59778bf93681ac0cc4e20094, correlationid=6665f8ee-2bc9-e6c0-17de-35d63a0afaaa, id=6925b3ea-0b30-3304-b7f0-d235959d7db1, timestamp=1501006851466}], headers={id=6eb1789c-21d7-1814-48ee-77553abd99b9, timestamp=1501006851922}] 2017-07-25 20:20:51.927 error 3268 --- [ taskexecutor-3] o.s.integration.handler.logginghandler : org.springframework.messaging.messagehandlingexception: nested exception sanchez.sanchez.sergio.exception.invalidaccesstokenexception, failedmessage=genericmessage [payload=socialmediaentity{id=59778bf93681ac0cc4e2008f, accesstoken=elena_access_token_facebook, type=facebook, invalidtoken=false}, headers={sequencenumber=1, sequencedetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 5, 5]], mongo_collectionname=users, sequencesize=3, user-id=59778bf93681ac0cc4e20096, correlationid=1aba58bf-733e-f1ed-a82e-67f482ff3531, id=5eb9c517-f451-e2d6-938d-e436bb362aef, timestamp=1501006851549}] @ org.springframework.integration.dsl.lambdamessageprocessor.processmessage(lambdamessageprocessor.java:130) @ org.springframework.integration.handler.serviceactivatinghandler.handlerequestmessage(serviceactivatinghandler.java:89)
i have tried 2 new approaches change default error channel:
create custom errorhandler error channel , declare in poolspec:
@bean public messagechannel customerrorchannel() { return messagechannels.direct("customerrorchannel").get(); }
@bean public errorhandler errorhandler() { messagepublishingerrorhandler messagepublishingerrorhandler = new messagepublishingerrorhandler(); messagepublishingerrorhandler.setdefaulterrorchannel(customerrorchannel()); return messagepublishingerrorhandler; }
@bean(name = pollermetadata.default_poller) public pollermetadata poller() { return pollers.fixeddelay(10, timeunit.seconds) .errorhandler(errorhandler()) .get(); }
the error messages still going default channel 'errorchannel'.
adding messageheaders.error_channel header explicitly indicate error channel:
.enrichheaders(s -> s.headerexpressions(h -> h.put("user-id", "payload.key")) .header(messageheaders.error_channel, "customerrorchannel") )
if approach works, error messages directed "customerrorchannel".
yes, can that. there errormessageexceptiontyperouter
similar task. so, able route invalidaccesstokenexception
specific channel.
also aware pollerspec
can supplied errorchannel()
, don't need worry exceptions go default errorchannel
.
update
ok. after code investigation see this:
.channel(messagechannels.executor("executorchannel", this.taskexecutor()))
that means shift message different thread and, therefore, exception there far away try...catch
of poller's algorithm send custom customerrorchannel
.
the executorchannel
has logic:
if (!(this.executor instanceof errorhandlingtaskexecutor)) { errorhandler errorhandler = new messagepublishingerrorhandler( new beanfactorychannelresolver(this.getbeanfactory())); this.executor = new errorhandlingtaskexecutor(this.executor, errorhandler); }
where messagepublishingerrorhandler
based on errorchannel
default. can here declaration of similar bean taskexecutor()
bean , injection customerrorchannel
messagepublishingerrorhandler
.
another option should work here messagepublishingerrorhandler
errorchannel
header population upstream executorchannel
definition.
Comments
Post a Comment