This commit is contained in:
ehlxr 2018-01-16 18:22:27 +08:00
parent 3fb067f709
commit 8572148c15
11 changed files with 314 additions and 0 deletions

View File

@ -97,5 +97,13 @@
<orderEntry type="library" scope="RUNTIME" name="Maven: net.bytebuddy:byte-buddy:1.6.14" level="project" /> <orderEntry type="library" scope="RUNTIME" name="Maven: net.bytebuddy:byte-buddy:1.6.14" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: net.bytebuddy:byte-buddy-agent:1.6.14" level="project" /> <orderEntry type="library" scope="RUNTIME" name="Maven: net.bytebuddy:byte-buddy-agent:1.6.14" level="project" />
<orderEntry type="library" name="Maven: org.objenesis:objenesis:2.5" level="project" /> <orderEntry type="library" name="Maven: org.objenesis:objenesis:2.5" level="project" />
<orderEntry type="library" name="Maven: io.reactivex:rxjava:1.3.0" level="project" />
<orderEntry type="library" name="Maven: io.reactivex.rxjava2:rxjava:2.1.8" level="project" />
<orderEntry type="library" name="Maven: org.reactivestreams:reactive-streams:1.0.2" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:guava:22.0" level="project" />
<orderEntry type="library" name="Maven: com.google.code.findbugs:jsr305:1.3.9" level="project" />
<orderEntry type="library" name="Maven: com.google.errorprone:error_prone_annotations:2.0.18" level="project" />
<orderEntry type="library" name="Maven: com.google.j2objc:j2objc-annotations:1.1" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.mojo:animal-sniffer-annotations:1.14" level="project" />
</component> </component>
</module> </module>

15
pom.xml
View File

@ -243,6 +243,21 @@
<artifactId>powermock-api-mockito2</artifactId> <artifactId>powermock-api-mockito2</artifactId>
<version>${powermock.version}</version> <version>${powermock.version}</version>
</dependency> </dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.1.8</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>22.0</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
<finalName>java-utils</finalName> <finalName>java-utils</finalName>

Binary file not shown.

View File

@ -0,0 +1,19 @@
package me.ehlxr;
/**
* Created by lixiangrong on 2018/1/16.
* 可变参数
*/
public class VarArgsTest {
private static void m1(String s, String... ss) {
for (String s1 : ss) {
System.out.println(s1);
}
}
public static void main(String[] args) {
m1("");
m1("aaa");
m1("aaa", "bbb");
}
}

View File

@ -0,0 +1,34 @@
package me.ehlxr.reactive;
import rx.Observable;
import rx.Subscriber;
/**
* Created by lixiangrong on 2018/1/16.
*/
public class HelloRxJava {
public static void main(String[] args) {
hello("wer", "wd");
}
private static void hello(String... names) {
Observable.from(names).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("Completed!");
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onNext(String strings) {
System.out.println("same hello " + strings);
}
});
}
}

View File

@ -0,0 +1,22 @@
package me.ehlxr.reactive;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
/**
* Created by lixiangrong on 2018/1/16.
*/
public class SchedulerTest {
public static void main(String[] args) throws InterruptedException {
Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- wait for the flow to finish
}
}

View File

@ -0,0 +1,81 @@
package me.ehlxr.reactive;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author lixiangrong
* @date 2018/1/16
* <p>
* Future<V> 是一个泛型接口如果一个可运行的函数实现 Callable Runable 的类在一个线程中运行
* 利用 Future<V> 可以用它的 get() 方法返回 V 类型的结果 注意 get() 会阻塞当前线程
* <p>
* 问题
* get() 顺序会影响出结果时间关键 get 的阻塞
* 如果能按这些线程出结果的时间序列把数据结果交给后面的线程并行加工处理CPU就不用阻塞在 get() 但编程无疑会很复杂
*/
public class TestFuture {
// https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html
private static ExecutorService executor = Executors.newCachedThreadPool();
public static void main(String[] args) {
TestFuture test = new TestFuture();
test.testTaskRunning("fa", 300);
test.testAsyncTaskRunning();
executor.shutdown();
}
private void testTaskRunning(String name, Integer waiting) {
System.out.println("Prepare for execution" + name);
long startTime = System.currentTimeMillis();
Future<String> fa = executor.submit(
() -> {
try {
Thread.sleep(waiting);
} catch (Exception e) {
e.printStackTrace();
}
return String.format("service exec time: %d", waiting);
});
System.out.println("Start execute " + (System.currentTimeMillis() - startTime) + "ms");
try {
String s = fa.get(); // Future to Blocked
System.out.println(s);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("End execute " + (System.currentTimeMillis() - startTime) + "ms");
}
private void testAsyncTaskRunning() {
System.out.println("Prepare for execution composite task");
long startTime = System.currentTimeMillis();
Future<String> fa = executor.submit(new TimeConsumingService("fa", 200, new String[]{}));
Future<String> fb = executor.submit(new TimeConsumingService("fb", 400, new String[]{}));
System.out.println("Start execute " + (System.currentTimeMillis() - startTime) + "ms");
try {
// What will happen when change line fc and fd ?
Future<String> fc = executor.submit(new TimeConsumingService("fc", 400, new String[]{fa.get()}));
Future<String> fd = executor.submit(new TimeConsumingService("fd", 200, new Object[]{fb.get()}));
Future<String> fe = executor.submit(new TimeConsumingService("fe", 200, new Object[]{fb.get()}));
System.out.println(fc.get());
System.out.println(fd.get());
System.out.println(fe.get());
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("End execute " + (System.currentTimeMillis() - startTime) + "ms");
}
}

View File

@ -0,0 +1,26 @@
package me.ehlxr.reactive;
/**
* Created by lixiangrong on 2018/1/16.
*/
public class TestLambda {
public static void main(String[] args) {
System.out.println("=== RunnableTest ===");
// Anonymous Runnable
Runnable r1 = new Runnable() {
@Override
public void run() {
System.out.println("Hello world one!");
}
};
// Lambda Runnable
Runnable r2 = () -> System.out.println("Hello world two!");
// Run em!
r1.run();
r2.run();
}
}

View File

@ -0,0 +1,60 @@
package me.ehlxr.reactive;
import rx.Observable;
import rx.schedulers.Schedulers;
/**
* Created by lixiangrong on 2018/1/16.
*/
public class TestRx {
public static void main(String[] args) {
TestRx testRx = new TestRx();
testRx.testAsyncCompositeJoin();
// testRx.testAsyncCompositeJoin();
}
private void testAsyncCompositeJoin() {
System.out.println("Prepare for executionAsync Composite Join");
long startTime = System.currentTimeMillis();
// Tasks oa -> oc, both in the same thread 1.
Observable<String> oa = Observable.just("oa").observeOn(Schedulers.io()).flatMap(
soa -> Observable.fromCallable(new TimeConsumingService("fa", 1000, new String[]{}))
);
Observable<String> oc = oa.flatMap(
res -> {
System.out.println(res);
System.out.println("Executed At " + (System.currentTimeMillis() - startTime) + "ms");
return Observable.fromCallable(
new TimeConsumingService("fc", 2000, new String[]{res}));
});
// tasks ob -> (od,oe), ob, od, oe have special thread 2,3,4.
Observable<String> ob = Observable.just("ob").observeOn(Schedulers.io()).flatMap(
sob -> Observable.fromCallable(new TimeConsumingService("fb", 2000, new String[]{}))
);
Observable<String> od_oe = ob.flatMap(
res -> {
System.out.println(res);
System.out.println("Executed At " + (System.currentTimeMillis() - startTime) + "ms");
Observable<String> od = Observable.just("od").observeOn(Schedulers.io()).flatMap(
sod -> Observable.fromCallable(new TimeConsumingService("fd", 1000, new String[]{res}))
);
Observable<String> oe = Observable.just("oe").observeOn(Schedulers.io()).flatMap(
sod -> Observable.fromCallable(new TimeConsumingService("fe", 1000, new String[]{res}))
);
return Observable.merge(od, oe);
});
System.out.println("Observable build " + (System.currentTimeMillis() - startTime) + "ms");
// tasks join oc,(od_oe) and subscribe
Observable.merge(oc, od_oe).toBlocking().subscribe(
res -> {
System.out.println(res);
System.out.println("Executed At " + (System.currentTimeMillis() - startTime) + "ms");
});
System.out.println("End executed: " + (System.currentTimeMillis() - startTime) + "ms");
}
}

View File

@ -0,0 +1,23 @@
package me.ehlxr.reactive;
import rx.Observable;
/**
* Created by lixiangrong on 2018/1/16.
* 链式函数式处理
*/
public class TestRx01 {
public static void main(String[] args) {
hello(1, 2, 3, 4, 5);
}
private static void hello(Integer... integers) {
Observable<Integer> workflow = Observable.from(integers)
.filter(i -> (i < 10) && (i > 0))
.map(i -> i * 2);
workflow.subscribe(i -> System.out.print(i + "! "));
System.out.println();
workflow.subscribe(i -> System.out.print(i + "! "));
}
}

View File

@ -0,0 +1,26 @@
package me.ehlxr.reactive;
import java.util.Arrays;
import java.util.concurrent.Callable;
/**
* Created by lixiangrong on 2018/1/16.
*/
public class TimeConsumingService implements Callable<String> {
private String service_name;
private int wait_ms;
private Object[] depandencies;
TimeConsumingService(String name, Integer waiting, Object[] depandencies) {
this.service_name = name;
this.wait_ms = waiting;
this.depandencies = depandencies;
}
@Override
public String call() throws Exception {
Thread.sleep(wait_ms);
return String.format("service %s exec time is: %d, depandencies: %s", service_name, wait_ms, Arrays.toString(depandencies));
}
}