流的构成
当我们使用一个流的时候,通常包括三个基本步骤:
获取一个数据源(source)→ 数据转换→执行操作获取想要的结果,每次转换原有 Stream 对象不改变,返回一个新的 Stream 对象(可以有多次转换),这就允许对其操作可以像链条一样排列,变成一个管道,

有多种方式生成 Stream Source:
从 Collection 和数组
Collection.stream()
Collection.parallelStream()
Arrays.stream(T array) or Stream.of()
从 BufferedReader
java.io.BufferedReader.lines()
静态工厂
java.util.stream.IntStream.range()
java.nio.file.Files.walk()
自己构建
java.util.Spliterator
其它
Random.ints()
BitSet.stream()
Pattern.splitAsStream(java.lang.CharSequence)
JarFile.stream()
流的操作类型分为两种:
Intermediate:一个流可以后面跟随零个或多个 intermediate 操作。其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历。
Terminal:一个流只能有一个 terminal 操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以这必定是流的最后一个操作。Terminal 操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个 side effect。
在对于一个 Stream 进行多次转换操作 (Intermediate 操作),每次都对 Stream 的每个元素进行转换,而且是执行多次,这样时间复杂度就是 N(转换次数)个 for 循环里把所有操作都做掉的总和吗?其实不是这样的,转换操作都是 lazy 的,多个转换操作只会在 Terminal 操作的时候融合起来,一次循环完成。我们可以这样简单的理解,Stream 里有个操作函数的集合,每次转换操作就是把转换函数放入这个集合中,在 Terminal 操作的时候循环 Stream 对应的集合,然后对每个元素执行所有的函数。
还有一种操作被称为 short-circuiting。用以指:
对于一个 intermediate 操作,如果它接受的是一个无限大(infinite/unbounded)的 Stream,但返回一个有限的新 Stream。
对于一个 terminal 操作,如果它接受的是一个无限大的 Stream,但能在有限的时间计算出结果。
当操作一个无限大的 Stream,而又希望在有限时间内完成操作,则在管道内拥有一个 short-circuiting 操作是必要非充分条件。

需要注意的是,对于基本数值型,目前有三种对应的包装类型 Stream:
IntStream、LongStream、DoubleStream。当然我们也可以用 Stream、Stream >、Stream,但是 boxing 和 unboxing 会很耗时,所以特别为这三种基本数值型提供了对应的 Stream。
Java 8 中还没有提供其它数值型 Stream,因为这将导致扩增的内容较多。而常规的数值型聚合运算可以通过上面三种 Stream 进行。

流的操作
接下来,当把一个数据结构包装成 Stream 后,就要开始对里面的元素进行各类操作了。常见的操作可以归类如下。
Intermediate:
map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered
Terminal:
forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator
Short-circuiting:
anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 limit

Stream 的特性可以归纳为:
不是数据结构
它没有内部存储,它只是用操作管道从 source(数据结构、数组、generator function、IO channel)抓取数据。
它也绝不修改自己所封装的底层数据结构的数据。例如 Stream 的 filter 操作会产生一个不包含被过滤元素的新 Stream,而不是从 source 删除那些元素。
所有 Stream 的操作必须以 lambda 表达式为参数
不支持索引访问
你可以请求第一个元素,但无法请求第二个,第三个,或最后一个。不过请参阅下一项。
很容易生成数组或者 List
惰性化
很多 Stream 操作是向后延迟的,一直到它弄清楚了最后需要多少数据才会开始。
Intermediate 操作永远是惰性化的。
并行能力
当一个 Stream 是并行化的,就不需要再写多线程代码,所有对它的操作会自动并行进行的。
可以是无限的
集合有固定大小,Stream 则不必。limit(n) 和 findFirst() 这类的 short-circuiting 操作可以对无限的 Stream 进行运算并很快完成。

Optional 的注意事项

调用 isPresent() 方法时
调用 get() 方法时
Optional 类型作为类/实例属性时
Optional 类型作为方法参数时
isPresent() 与 obj != null 无任何区别, 我们的生活依然在步步惊心. 而没有 isPresent() 作铺垫的 get() 调用在 IntelliJ IDEA 中会收到告警。调用 Optional.get() 前不事先用 isPresent() 检查值是否可用. 假如 Optional 不包含一个值, get() 将会抛出一个异常!
把 Optional 类型用作属性或是方法参数在 IntelliJ IDEA 中更是强力不推荐的!
使用任何像 Optional 的类型作为字段或方法参数都是不可取的. Optional 只设计为类库方法的, 可明确表示可能无值情况下的返回类型. Optional 类型不可被序列化, 用作字段类型会出问题的!!!

所以 Optional 中我们真正可依赖的应该是除了 isPresent() 和 get() 的其他方法:

//按照使用频率排序如下
public Optional map(Function<? super T, ? extends U> mapper)
public T orElse(T other)
public T orElseGet(Supplier<? extends T> other)
public void ifPresent(Consumer<? super T> consumer)
public Optional filter(Predicate<? super T> predicate)
public Optional flatMap(Function<? super T, Optional> mapper)
public T orElseThrow(Supplier<? extends X> exceptionSupplier)

Optional 的三种构造方式:
Optional.of(obj), Optional.ofNullable(obj) 和明确的 Optional.empty()

Optional.of(obj): 它要求传入的 obj 不能是 null 值的, 否则还没开始进入角色就倒在了 NullPointerException 异常上了.
Optional.ofNullable(obj): 它以一种智能的, 宽容的方式来构造一个 Optional 实例. 来者不拒, 传 null 进到就得到 Optional.empty(), 非 null 就调用 Optional.of(obj).
那是不是我们只要用 Optional.ofNullable(obj) 一劳永逸, 以不变应二变的方式来构造 Optional 实例就行了呢? 那也未必, 否则 Optional.of(obj) 何必如此暴露呢, 私有则可?

Optional.ofNullable(obj).isPresent()可以用来对集合进行快速判空。

JAVA并行流的性能“陷阱”

从java8开始,并行编程变得很容易,通过并行流(parallelStream),可以很轻松的实现多线程并行处理。但是,这里面有个性能“陷阱”,如果不注意,使用并行流的效果反而更差,这个陷阱是什么呢?

这个陷阱就是,并行流默认都是用同一个默认的ForkJoinPool,这个ForkJoinPool的线程数和CPU的核心数相同。如果是计算密集型的操作,直接使用是没有问题的,因为这个ForkJoinPool会将所有的CPU打满,系统资源是没有浪费的,但是,如果其中还有IO操作或等待操作,这个默认的ForkJoinPool只能消耗一部分CPU,而另外的并行流因为获取不到该ForkJoinPool的使用权,性能将大大降低。可见,默认的ForkJoinPool必须只能处理计算密集型的任务。

那么,对应非计算密集型的任务,改怎么处理呢? 这就需要我们使用自己创建的ForkJoinPool来执行任务,下面给出实例代码:


ForkJoinPool forkJoinPool = new ForkJoinPool(8);
forkJoinPool.submit(()->{
tasks.parallelStream().forEach(t->{
try {
String gdsstatus=transactionService.GetTransInfo(url, t.getTask_id());
checkStatus(t.getTask_id(),t.getTask_status(),gdsstatus);
} catch (Exception e) {
System.out.println("EXCEPTION OCCOR IN TASK:"+t.getTask_id());
e.printStackTrace();
}

System.out.println("NO:"+count.getAndIncrement()+" is done");

});
});