Override
public void begin(long size) {
state = supplier.get();
}
@Override
public void accept(T t) {
accumulator.accept(state, t);
}
@Override
public void combine(ReducingSink other) {
state = combiner.apply(state, other.state);
}
}
return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
@Override
public int getOpFlags() {
return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
? StreamOpFlag.NOT_ORDERED
: 0;
}
};
}
private static abstract class Box<U> {
U state;
Box() {} // Avoid creation of special accessor
public U get() {
return state;
}
}
Box 是一个结果值的持有者; ReducingSink 用begin, accept, combine 三个方法定义了要进行的计算;ReducingSink是有状态的流数据消费的计算抽象,阅读Sink接口文档可知。ReduceOps.makeRef(collector) 返回了一个封装了Reduce操作的ReduceOps对象。注意到,这里都是声明要执行的计算,而不涉及计算的实际过程。展示了表达与执行分离的思想。真正的计算过程启动在 ReferencePipeline.eva luate 方法里:
final <R> R eva luate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.eva luateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.eva luateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
使用 IDE 的 go to implementations 功能, 跟进去,可以发现,最终在 AbstractPipeLine 中定义了:
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
Spliterator 用来对流中的元素进行分区和遍历以及施加Sink指定操作,可以用于并发计算。Spliterator的具体实现类定义在 Spliterators 的静态类和静态方法中。其中有:
数组Spliterator:
static final class ArraySpliterator<T> implements Spliterator<T>
static final class IntArraySpliterator implements Spliterator.OfInt
static final class LongArraySpliterator implements Spliterator.OfLong
static final class DoubleArraySpliterator implements Spliterator.OfDouble
迭代Spliterator:
static class IteratorSpliterator<T> implements Spliterator<T>
static final class IntIteratorSpliterator implements Spliterator.OfInt
static final class LongIteratorSpliterator implements Spliterator.OfLong
static final class DoubleIteratorSpliterator implements Spliterator.OfDouble
抽象Spliterator:
public static abstract class AbstractSpliterator<T> implements Spliterator<T>
private static abstract class EmptySpliterator<T, S extends Spliterator<T>, C>
public static abstract class AbstractIntSpliterator implements Spliterator.OfInt
public static abstract class AbstractLongSpliterator implements Spliterator.OfLong
public static abstract class AbstractDoubleSpliterator implements Spliterator.OfDouble
每个具体类都实现了trySplit,forEachRemaining,tryAdvance,estimateSize,characteristics, getComparator。 trySplit 用于拆分流,提供并发能力;forEachRemaining,tryAdvance 用于遍历