Skip to content

Commit aca8d03

Browse files
author
Piotr Bobinski
committed
More exercises
1 parent 7ba5e3a commit aca8d03

29 files changed

+167
-0
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package main
2+
3+
import rx.Observable
4+
import spock.lang.Specification
5+
6+
7+
class BasicObservableTest extends Specification {
8+
9+
def "Emit a series of values then terminate"() {
10+
when:
11+
Observable<String> s = Observable.just("one", "two", "three")
12+
s.subscribe(
13+
{ v -> println "onNext: " + v },
14+
{ e -> println "onError: " + e },
15+
{ println "Completed" }
16+
)
17+
then:
18+
s != null
19+
}
20+
21+
def "Observable.empty emits single onCompleted and nothing else"() {
22+
when:
23+
Observable<String> s = Observable.empty()
24+
s.subscribe(
25+
{ v -> println "onNext: " + v },
26+
{ e -> println "onError: " + e },
27+
{ println "Completed" }
28+
)
29+
then:
30+
s != null
31+
}
32+
33+
def "just() caches one value, which might not be preferred solution"() {
34+
when:
35+
Observable<Long> time = Observable.just(System.currentTimeMillis())
36+
time.subscribe { v -> println "Emit: " + v }
37+
Thread.sleep(1000)
38+
time.subscribe { v -> println "Emit: " + v }
39+
then:
40+
time != null
41+
}
42+
43+
def "using defer() might solve this problem"() {
44+
when:
45+
Observable<Long> defer = Observable.defer { Observable.just(System.currentTimeMillis()) }
46+
defer.subscribe { v -> println "Emit: " + v }
47+
Thread.sleep(1000)
48+
defer.subscribe { v -> println "Emit: " + v }
49+
50+
then:
51+
defer != null
52+
}
53+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package main
2+
3+
import rx.exceptions.OnErrorNotImplementedException
4+
import rx.subjects.ReplaySubject
5+
import spock.lang.Specification
6+
7+
8+
class ErrorHandlingTest extends Specification {
9+
10+
def "onError action appeared in the event stream"() {
11+
when:
12+
ReplaySubject<Integer> s = ReplaySubject.create()
13+
s.subscribe(
14+
{ v -> println "onNext: " + v },
15+
{ e -> println "onError: " + e }
16+
)
17+
s.onNext(0)
18+
s.onError(new Exception("Oops"))
19+
20+
then:
21+
s != null
22+
}
23+
24+
def "onError action appeared in the event stream but no onError action was provided to the subscribe"() {
25+
when:
26+
ReplaySubject<Integer> s = ReplaySubject.create()
27+
s.subscribe(
28+
{ v -> println "onNext: " + v },
29+
// { e -> println "onError: " + e }
30+
)
31+
s.onNext(0)
32+
s.onError(new Exception("Oops"))
33+
34+
then:
35+
s != null
36+
thrown OnErrorNotImplementedException
37+
}
38+
39+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package main
2+
3+
import rx.subjects.PublishSubject
4+
import spock.lang.Specification
5+
6+
7+
class PublishSubjectTest extends Specification {
8+
9+
def "When value is pushed to PublishSubject it pushes it down to subscribers at the given moment"() {
10+
when:
11+
PublishSubject<Integer> s = PublishSubject.create()
12+
s.onNext(0)
13+
s.subscribe{ v -> println "Event:" + v }
14+
s.onNext(1)
15+
s.onNext(2)
16+
17+
then:
18+
s != null
19+
}
20+
21+
22+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package main
2+
3+
import rx.Subscription
4+
import rx.functions.Action0
5+
import rx.subjects.ReplaySubject
6+
import rx.subjects.Subject
7+
import rx.subscriptions.Subscriptions
8+
import spock.lang.Specification
9+
10+
11+
class UnsubscribingTest extends Specification {
12+
13+
def "Unsubscribe will cause stop receiving values"() {
14+
when:
15+
Subject<Integer, Integer> values = ReplaySubject.create()
16+
def subscription = values.subscribe(
17+
{ v -> println v },
18+
{ e -> println e },
19+
{ println "Done" }
20+
)
21+
values.onNext(0)
22+
values.onNext(1)
23+
subscription.unsubscribe()
24+
values.onNext(2)
25+
then:
26+
values != null
27+
}
28+
29+
def "Unsubscribing one obs does not interfere with obs on the same observable"() {
30+
when:
31+
Subject<Integer, Integer> s = ReplaySubject.create()
32+
def subscription1 = s.subscribe { v -> println "First:" + v }
33+
def subscription2 = s.subscribe { v -> println "Second:" + v }
34+
35+
s.onNext(0)
36+
subscription2.unsubscribe()
37+
s.onNext(1)
38+
39+
then:
40+
s != null
41+
!subscription1.isUnsubscribed()
42+
subscription2.isUnsubscribed()
43+
}
44+
45+
def "Create takes an Action that will be performed upon unsubscription "() {
46+
when:
47+
Subscription s = Subscriptions.create { println "Clean" }
48+
s.unsubscribe()
49+
then:
50+
s.isUnsubscribed()
51+
}
52+
53+
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 commit comments

Comments
 (0)