() {
+ @Override
+ public void onCompleted() {
+ System.out.println("Completed!");
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void onNext(String strings) {
+ System.out.println("same hello " + strings);
+ }
+
+ });
+ }
+}
diff --git a/src/main/java/me/ehlxr/reactive/SchedulerTest.java b/src/main/java/me/ehlxr/reactive/SchedulerTest.java
new file mode 100644
index 0000000..01472f6
--- /dev/null
+++ b/src/main/java/me/ehlxr/reactive/SchedulerTest.java
@@ -0,0 +1,22 @@
+package me.ehlxr.reactive;
+
+
+import io.reactivex.Flowable;
+import io.reactivex.schedulers.Schedulers;
+
+/**
+ * Created by lixiangrong on 2018/1/16.
+ */
+public class SchedulerTest {
+ public static void main(String[] args) throws InterruptedException {
+ Flowable.fromCallable(() -> {
+ Thread.sleep(1000); // imitate expensive computation
+ return "Done";
+ })
+ .subscribeOn(Schedulers.io())
+ .observeOn(Schedulers.single())
+ .subscribe(System.out::println, Throwable::printStackTrace);
+
+ Thread.sleep(2000); // <--- wait for the flow to finish
+ }
+}
diff --git a/src/main/java/me/ehlxr/reactive/TestFuture.java b/src/main/java/me/ehlxr/reactive/TestFuture.java
new file mode 100644
index 0000000..4529ef8
--- /dev/null
+++ b/src/main/java/me/ehlxr/reactive/TestFuture.java
@@ -0,0 +1,81 @@
+package me.ehlxr.reactive;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * @author lixiangrong
+ * @date 2018/1/16
+ *
+ * Future 是一个泛型接口,如果一个可运行的函数(实现 Callable 或 Runable 的类)在一个线程中运行,
+ * 利用 Future 可以用它的 get() 方法返回 V 类型的结果。 注意, get() 会阻塞当前线程
+ *
+ * 问题
+ * get() 顺序会影响出结果时间,关键 get 的阻塞;
+ * 如果能按这些线程出结果的时间序列,把数据结果交给后面的线程并行加工处理,CPU就不用阻塞在 get() 了。但编程无疑会很复杂。
+ */
+public class TestFuture {
+ // https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html
+ private static ExecutorService executor = Executors.newCachedThreadPool();
+
+ public static void main(String[] args) {
+ TestFuture test = new TestFuture();
+ test.testTaskRunning("fa", 300);
+ test.testAsyncTaskRunning();
+ executor.shutdown();
+ }
+
+ private void testTaskRunning(String name, Integer waiting) {
+ System.out.println("Prepare for execution:" + name);
+ long startTime = System.currentTimeMillis();
+
+ Future fa = executor.submit(
+ () -> {
+ try {
+ Thread.sleep(waiting);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return String.format("service exec time: %d", waiting);
+ });
+
+ System.out.println("Start execute: " + (System.currentTimeMillis() - startTime) + "ms");
+
+ try {
+ String s = fa.get(); // Future to Blocked
+ System.out.println(s);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ System.out.println("End execute: " + (System.currentTimeMillis() - startTime) + "ms");
+
+ }
+
+ private void testAsyncTaskRunning() {
+ System.out.println("Prepare for execution: composite task");
+ long startTime = System.currentTimeMillis();
+
+ Future fa = executor.submit(new TimeConsumingService("fa", 200, new String[]{}));
+ Future fb = executor.submit(new TimeConsumingService("fb", 400, new String[]{}));
+
+ System.out.println("Start execute: " + (System.currentTimeMillis() - startTime) + "ms");
+
+ try {
+ // What will happen when change line fc and fd ?
+ Future fc = executor.submit(new TimeConsumingService("fc", 400, new String[]{fa.get()}));
+ Future fd = executor.submit(new TimeConsumingService("fd", 200, new Object[]{fb.get()}));
+ Future fe = executor.submit(new TimeConsumingService("fe", 200, new Object[]{fb.get()}));
+ System.out.println(fc.get());
+ System.out.println(fd.get());
+ System.out.println(fe.get());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ System.out.println("End execute: " + (System.currentTimeMillis() - startTime) + "ms");
+ }
+
+
+}
\ No newline at end of file
diff --git a/src/main/java/me/ehlxr/reactive/TestLambda.java b/src/main/java/me/ehlxr/reactive/TestLambda.java
new file mode 100644
index 0000000..89ad69a
--- /dev/null
+++ b/src/main/java/me/ehlxr/reactive/TestLambda.java
@@ -0,0 +1,26 @@
+package me.ehlxr.reactive;
+
+/**
+ * Created by lixiangrong on 2018/1/16.
+ */
+public class TestLambda {
+ public static void main(String[] args) {
+
+ System.out.println("=== RunnableTest ===");
+
+ // Anonymous Runnable
+ Runnable r1 = new Runnable() {
+ @Override
+ public void run() {
+ System.out.println("Hello world one!");
+ }
+ };
+
+ // Lambda Runnable
+ Runnable r2 = () -> System.out.println("Hello world two!");
+
+ // Run em!
+ r1.run();
+ r2.run();
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/me/ehlxr/reactive/TestRx.java b/src/main/java/me/ehlxr/reactive/TestRx.java
new file mode 100644
index 0000000..2e366a7
--- /dev/null
+++ b/src/main/java/me/ehlxr/reactive/TestRx.java
@@ -0,0 +1,60 @@
+package me.ehlxr.reactive;
+
+import rx.Observable;
+import rx.schedulers.Schedulers;
+
+/**
+ * Created by lixiangrong on 2018/1/16.
+ */
+public class TestRx {
+ public static void main(String[] args) {
+ TestRx testRx = new TestRx();
+ testRx.testAsyncCompositeJoin();
+ // testRx.testAsyncCompositeJoin();
+ }
+
+ private void testAsyncCompositeJoin() {
+ System.out.println("Prepare for execution:Async Composite Join");
+ long startTime = System.currentTimeMillis();
+
+ // Tasks oa -> oc, both in the same thread 1.
+ Observable oa = Observable.just("oa").observeOn(Schedulers.io()).flatMap(
+ soa -> Observable.fromCallable(new TimeConsumingService("fa", 1000, new String[]{}))
+ );
+ Observable oc = oa.flatMap(
+ res -> {
+ System.out.println(res);
+ System.out.println("Executed At: " + (System.currentTimeMillis() - startTime) + "ms");
+ return Observable.fromCallable(
+ new TimeConsumingService("fc", 2000, new String[]{res}));
+ });
+
+ // tasks ob -> (od,oe), ob, od, oe have special thread 2,3,4.
+ Observable ob = Observable.just("ob").observeOn(Schedulers.io()).flatMap(
+ sob -> Observable.fromCallable(new TimeConsumingService("fb", 2000, new String[]{}))
+ );
+ Observable od_oe = ob.flatMap(
+ res -> {
+ System.out.println(res);
+ System.out.println("Executed At: " + (System.currentTimeMillis() - startTime) + "ms");
+ Observable od = Observable.just("od").observeOn(Schedulers.io()).flatMap(
+ sod -> Observable.fromCallable(new TimeConsumingService("fd", 1000, new String[]{res}))
+ );
+ Observable oe = Observable.just("oe").observeOn(Schedulers.io()).flatMap(
+ sod -> Observable.fromCallable(new TimeConsumingService("fe", 1000, new String[]{res}))
+ );
+ return Observable.merge(od, oe);
+ });
+
+ System.out.println("Observable build: " + (System.currentTimeMillis() - startTime) + "ms");
+
+ // tasks join oc,(od_oe) and subscribe
+ Observable.merge(oc, od_oe).toBlocking().subscribe(
+ res -> {
+ System.out.println(res);
+ System.out.println("Executed At: " + (System.currentTimeMillis() - startTime) + "ms");
+ });
+
+ System.out.println("End executed: " + (System.currentTimeMillis() - startTime) + "ms");
+ }
+}
diff --git a/src/main/java/me/ehlxr/reactive/TestRx01.java b/src/main/java/me/ehlxr/reactive/TestRx01.java
new file mode 100644
index 0000000..eb9defd
--- /dev/null
+++ b/src/main/java/me/ehlxr/reactive/TestRx01.java
@@ -0,0 +1,23 @@
+package me.ehlxr.reactive;
+
+
+import rx.Observable;
+
+/**
+ * Created by lixiangrong on 2018/1/16.
+ * 链式函数式处理
+ */
+public class TestRx01 {
+ public static void main(String[] args) {
+ hello(1, 2, 3, 4, 5);
+ }
+
+ private static void hello(Integer... integers) {
+ Observable workflow = Observable.from(integers)
+ .filter(i -> (i < 10) && (i > 0))
+ .map(i -> i * 2);
+ workflow.subscribe(i -> System.out.print(i + "! "));
+ System.out.println();
+ workflow.subscribe(i -> System.out.print(i + "! "));
+ }
+}
diff --git a/src/main/java/me/ehlxr/reactive/TimeConsumingService.java b/src/main/java/me/ehlxr/reactive/TimeConsumingService.java
new file mode 100644
index 0000000..acac4d9
--- /dev/null
+++ b/src/main/java/me/ehlxr/reactive/TimeConsumingService.java
@@ -0,0 +1,26 @@
+package me.ehlxr.reactive;
+
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+
+/**
+ * Created by lixiangrong on 2018/1/16.
+ */
+public class TimeConsumingService implements Callable {
+
+ private String service_name;
+ private int wait_ms;
+ private Object[] depandencies;
+
+ TimeConsumingService(String name, Integer waiting, Object[] depandencies) {
+ this.service_name = name;
+ this.wait_ms = waiting;
+ this.depandencies = depandencies;
+ }
+
+ @Override
+ public String call() throws Exception {
+ Thread.sleep(wait_ms);
+ return String.format("service %s exec time is: %d, depandencies: %s", service_name, wait_ms, Arrays.toString(depandencies));
+ }
+}
\ No newline at end of file