rxjs - Chained Observable after takeWhile completes is not being called? -
i have following methods should called this:
registerdomain
should called , should returnoperationid
- after
10
seconds,getoperationdetail
should called passing inoperationid
getoperationdetail
should called every10
seconds untilsuccessful
returned.- once
getoperationdetail
finishes,createrecordsets
should called. - finally,
getchangestatus
should called until returnsinsync
- if of api calls throw exception, how can handle error on client side?
the following code below calls registerdomain , getoperationdetail, after getoperationdetail completes, not move onto createrecordsets.
registerdomain(domain) { return this._adminservice.registerdomain(domain) .concatmap(operation => this.getoperationdetail(operation.operationid)) .concatmap(() => this._adminservice.createrecordsets(domain)); } getoperationdetail(operationid) { return observable.interval(10000) .mergemap(() => this._adminservice.getoperationdetail(operationid)) .takewhile((info) => info.status.value !== 'successful'); } createrecordsets(casewebsiteurl) { return this._adminservice.createrecordsets(casewebsiteurl.url) .concatmap(registerid => this.getchangestatus(registerid)); } getchangestatus(registerid) { return observable.interval(5000) .mergemap(() => this._adminservice.getchange(registerid)) .takewhile((info) => info.changeinfo.status.value !== 'insync'); }
i updated getoperationdetail
use first
operator:
getoperationdetail(operationid) { return observable.interval(3000) .mergemap(() => this._adminservice.getoperationdetail(operationid)) .first((info) => info.status.value === 'successful') }
now in fact call createrecordsets
, however, after createrecordsets
, continues call getoperationdetail
13 times , calls getchangestatus
. way looking @ it, thought would:
- call
getoperationdetail
until receivessuccess
. - call
createrecordsets
1 time. - call
getchangestatus
until receivesinsync
- done.
why additional calls?
i changed registerdomain this:
registerdomain(domain) { return this._adminservice.registerdomain(domain) .concatmap(operation => this.getoperationdetail(operation.operationid)) .concatmap((op) => this.createrecordsets(op));
before had .concatmap((op) => this.createrecordsets(op))
chained right after this.getoperationdetail
. once moved outside that, started working expected. unsure why though. can explain?
when takewhile
meets value satisfies specified criteria, completes observable without propagating value. means next chained operator not receive value , not invoke callback.
suppose in example first 2 calls this._adminservice.getoperationdetail(...)
result in non-successful status , third call succeeds. means observable returned getoperationdetail()
produce 2 info
values each of having non-successful status. , might important, next chained concatmap
operator invoke callback per each of non-successful values, meaning createrecordsets()
called twice. suppose might want avoid that.
i suggest use first
operator instead:
getoperationdetail(operationid) { return observable.interval(10000) .concatmap(() => this._adminservice.getoperationdetail(operationid)) .first(info => info.status.value !== 'successful'); }
this way getoperationdetail()
produce single "successful" value this._adminservice.getoperationdetail(operationid)
succeeds. first
operator emits first value of source observable matches specified condition , completes.
and when comes error handling, catch
or retry
operators might useful.
update:
the unexpected behavior have faced (getoperationdetail()
keeps being called after first()
completes) seems bug in rxjs
. described in this issue,
every take-ish operator (one completes earlier source observable), keep subscribing source observable when combined operator prolongs subscription (here switchmap).
both first
, takewhile
examples of such take-ish operators , operators "prolong" subscription are, example, switchmap
, concatmap
, mergemap
. in following example numbers kept logging while inner observable of concatmap
emitting values:
var takeish$ = rx.observable.interval(200) // keep logging until inner observable of `concatmap` completed .do(x => console.log(x)) .takewhile(x => x < 2); var source = takeish$ .concatmap(x => rx.observable.interval(200).take(10)) .subscribe();
it looks can worked around turning observable containing such take-ish operator higher-order observable — in similar way you've done:
var takeish$ = rx.observable.interval(200) // log 0, 1, 2 .do(x => console.log(x)) .takewhile(x => x < 2); var source = rx.observable.of(null) .switchmap(() => takeish$) .concatmap(x => rx.observable.interval(200).take(1)) .subscribe();
update 2:
it seems bug described above still exists of rxjs version 5.4.2. affects, example, whether or not source observable of first
operator unsubscribed when first
meets specified condition. when first
operator followed concatmap
, source observable not unsubscribed , keep emitting values until inner observable of concatmap
completes. in case means this._adminservice.getoperationdetail()
keep being called until observable returned createrecordsets()
have completed.
here's example simplified illustrate behavior:
function registerdomain() { return rx.observable.of("operation") .concatmap(() => getoperationdetail() .concatmap(() => rx.observable.interval(200).take(5))); } function getoperationdetail() { return rx.observable.interval(100) // console.log() instead of actual service call .do(x => console.log(x)) .first(x => x === 2); } registerdomain().subscribe();
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/rx.js"></script>
if expand inner observable of first concatmap
operator, following observable:
rx.observable.interval(100) .do(x => console.log(x)) .first(x => x === 2) .concatmap(() => rx.observable.interval(200).take(5));
notice first
followed concatmap
prevents source observable of first
operator (i.e. interval(100).do(x => console.log(x)
) being unsubscribed. values keep being logged (or in case, service calls keep being sent) until inner observable of concatmap
(i.e. interval(200).take(5)
) completes.
if modify example above , move second concatmap
out of inner observable of first concatmap
, first
not chained more , unsubscribe source observable condition satisfied, meaning interval stop emitting values , no more numbers logged (or no more service requests sent):
function registerdomain() { return rx.observable.of("operation") .concatmap(() => getoperationdetail()) .concatmap(() => rx.observable.interval(200).take(5)); } function getoperationdetail() { return rx.observable.interval(100) // console.log() instead of actual service call .do(x => console.log(x)) .first(x => x === 2); } registerdomain().subscribe();
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/rx.js"></script>
inner observable in such case can expanded to:
rx.observable.interval(100) .do(x => console.log(x)) .first(x => x === 2)
notice first
not anymore followed concatmap
.
it worth mentioning in both cases observable returned registerdomain()
produces same values , if move logging do()
operator subscribe()
, same values written console in both cases:
registerdomain.subscribe(x => console.log(x));
Comments
Post a Comment