为什么需要Steam
Java 8 中的Steam是对集合 (Collection) 对象功能的增强, 他专注于对集合对象进行各种非常便利,高效的聚合操作(aggregate operation), 或者大批量数据操作 (bulk data operation). Steam API借助于同样新出现的Lambda 表达式, 极大的提高编程效率和程序可读性. 同时他提供穿行和并行两种模式进行汇聚操作, 并发模式能够成分利用多核处理器的优势, 使用fork/join 并行法师来拆分任务和加速处理过程. 通常编写并行代码很难而且容易出错, 但使用Steam API无需编写一行多线程的代码, 就可以很方便地写出高性能的并发代码. Java 8中首次出现的java.util.stream是一个函数式语言+多核时代综合影响的产物.
什么是聚合操作
TODO
(需求: 如果发现type为grocery的所有交易, 然后返回以交易值降序排序的交易ID集合)
public class Transaction {
private final int id;
private final Integer value;
private final Type type;
public Transaction(int id, Integer value, Type type) {
this.id = id;
this.value = value;
this.type = type;
}
public enum Type {
A, B, C, D, GEOCERY
}
public int getId() {return id;}
public Integer getValue() {return value;}
public Type getType() {return type;}
}
清单 1. Java 7的排序,取值实现
public static void main(String[] args) {
List<Transaction> transactions = new ArrayList<>();
transactions.add(new Transaction(1, 10, Transaction.Type.GEOCERY));
transactions.add(new Transaction(3, 30, Transaction.Type.GEOCERY));
transactions.add(new Transaction(6, 60, Transaction.Type.GEOCERY));
transactions.add(new Transaction(5, 50, Transaction.Type.GEOCERY));
transactions.add(new Transaction(2, 20, Transaction.Type.A));
transactions.add(new Transaction(4, 40, Transaction.Type.C));
// JDK 7 发现type为grocery的所有交易
List<Transaction> groceryTransactions = new ArrayList<>();
for (Transaction t : transactions) {
if (t.getType() == Transaction.Type.GEOCERY) {
groceryTransactions.add(t);
}
}
// 集合排序 交易值降序排序
Collections.sort(groceryTransactions, new Comparator<Transaction>() {
@Override
public int compare(Transaction o1, Transaction o2) {
return o2.getValue().compareTo(o1.getValue());
}
});
// 交易ID 获取
List<Integer> transactionIds = new ArrayList<>();
for (Transaction t : groceryTransactions) {
transactionIds.add(t.getId());
}
System.out.println(transactionIds);//[6, 5, 3, 1]
}
清单 2. Java 8的排序,取值实现
// JDK 8 如果发现type为grocery的所有交易, 然后返回以交易值降序排序的交易ID集合
List<Integer> transactionsIds =
transactions.parallelStream().filter(t -> t.getType() == Transaction.Type.GEOCERY)
.sorted(Comparator.comparing(Transaction::getValue).reversed())
.map(Transaction::getId)
.collect(Collectors.toList());
System.out.println(transactionsIds);//[6, 5, 3, 1]
Steam 总览
流的操作类型分为两种:
Intermediate: 一个流可以后面跟随零个或者多个intermediate操作, 其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是Lazy的,也就是说仅仅调用这类方法,并没有真正开始流的遍历.
Terminal: 一个流只能有一个terminal操作, 当这个操作执行后,流就被使用“光”了, 无法在被操作。所以这必定是流的最后一个操作。Terminal操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个side effect.
清单 3. 一个流的操作示例
// JDK 8
public class Widget {
private final Color color;
private final int weight;
enum Color {RED, BLACK, BLUE}
public Widget(Color color, int weight) {
this.color = color;
this.weight = weight;
}
public Color getColor() {return color;}
public int getWeight() {return weight;}
public static void main(String[] args) {
List<Widget> widgets = new ArrayList<>();
widgets.add(new Widget(Color.RED, 1));
widgets.add(new Widget(Color.RED, 2));
widgets.add(new Widget(Color.BLACK, 3));
widgets.add(new Widget(Color.BLUE, 4));
// stream() 获取当前的source, filter 和 mapToInt为intermediate操作, 进行数据筛选和转换,
// 最后一个sum为terminal操作,对符合条件的全部widget做重量求和
int sum = widgets.stream()
.filter(w -> w.getColor() == Color.RED)
.mapToInt(w -> w.getWeight())
.sum();
System.out.println(sum);// 3
}
}
清单 4. 构造流的几种常见方法
// JDK 8
public class SteamConstruct {
public static void main(String[] args) {
// 1. Individual values 单独值
Stream stream = Stream.of("a1", "b1", "c1");
stream.forEach(System.out::print);//打印 a1b1c1
// 2. Arrays 数组
String[] strArray = new String[] {"a2", "b2", "c2"};
stream = Stream.of(strArray);
stream = Arrays.stream(strArray);
System.out.println(stream.collect(Collectors.joining(",")).toString());//打印 a2,b2,c2
// 3. Collections 集合
List<String> list = Arrays.asList(strArray);
stream = list.stream();
}
}
清单 5. 数值流的构造(对于基本数值型,目前有三种对应的包装类型Stream: 1. IntStream 2. LongStream 3. DoubleStream )
// JDK 8
public class BasicStream {
// IntStream, LongStream, DoubleStream. 当然我们也可以用Stream<Integer>, Stream<Long>, Stream<Double>,
// 但是boxing 和 unboxing会很耗时, 所以特别为这三个基本数值型提供了对应的Stream
public static void main(String[] args) {
IntStream.of(new int[] {1, 2, 3}).forEach(System.out::print);// 123
IntStream.range(1, 3).forEach(System.out::print);// [1,3) 12
IntStream.rangeClosed(1, 3).forEach(System.out::print);// [1,3] 123
}
}
清单 6. 流转换为其他数据结构 (一个Stream只可以使用一次,否则会报错)
public class StreamExchange {
public static void main(String[] args) {
Stream stream = Stream.of("a1", "b1", "c1");
// 1. Array
String[] strArray1 = (String[]) stream.toArray(String[]::new);
for (String s : strArray1) { System.out.print(s); } //a1b1c1
// 2.Collection list
stream = Stream.of("a1", "b1", "c1");// stream has already been operated upon or closed
List<String> list1 = (List<String>) stream.collect(Collectors.toList());
for (String s : list1) { System.out.print(s); }//a1b1c1
// 2.Collection list
stream = Stream.of("a1", "b1", "c1");// stream has already been operated upon or closed
List<String> list2 = (List<String>) stream.collect(Collectors.toCollection(ArrayList::new));
for (String s : list2) { System.out.print(s); } //a1b1c1
// 2.Collection set
stream = Stream.of("a1", "b1", "c1");// stream has already been operated upon or closed
Set<String> set = (Set<String>) stream.collect(Collectors.toSet());
for (String s : set) { System.out.print(s); } //a1c1b1
// 2.Collection stack
stream = Stream.of("a1", "b1", "c1");// stream has already been operated upon or closed
Stack<String> stack = (Stack<String>) stream.collect(Collectors.toCollection(Stack::new));
for (String s : stack) { System.out.print(s); } //a1b1c1
// 3. String
stream = Stream.of("a1", "b1", "c1");// stream has already been operated upon or closed
String str = stream.collect(Collectors.joining()).toString();
System.out.print(str); // a1b1c1
}
}
流的操作
- Intermediate
map(mapToInt,flatMap等), filter, distinct, sorted, peek, limit, skip, parallel, sequential, unordered - Terminal
forEach, forEachOrdered, toArray, reduce, collect, min, max, count, anyMatch, allMatch, noneMatch, findFirst, findAny, iterator - Short-cricuiting
anyMatch, allMatch, noneMatch, findFirst, findAny, limit
Map/flatMap
清单 7. 转换大写 【.map(String::toUpperCase)】和【map(s -> { return s.toUpperCase(); })】和 【.map(s -> s.toUpperCase())】
public class ToUpperCase {
public static void main(String[] args) {
Stream<String> stream = Stream.of("hello", "world", "java8", "stream");
List<String> wordList = stream.map(String::toUpperCase).collect(Collectors.toList());
System.out.println(wordList.toString());// [HELLO, WORLD, JAVA8, STREAM]
stream = Stream.of("hello", "world", "java8", "stream");
wordList = stream.map(s -> { return s.toUpperCase(); }).collect(Collectors.toList());
System.out.println(wordList.toString());// [HELLO, WORLD, JAVA8, STREAM]
stream = Stream.of("hello", "world", "java8", "stream");
wordList = stream.map(s -> s.toUpperCase()).collect(Collectors.toList());
System.out.println(wordList.toString());// [HELLO, WORLD, JAVA8, STREAM]
}
}
清单 8. 平方数 (map 生产的是个1:1的映射,每个输入元素,都按照规则转换成另一个元素)
public class ToSquare {
public static void main(String[] args) {
Stream<Integer> stream = Arrays.asList(1, 2, 3, 4).stream();
List<Integer> squareList = stream.map(n -> n * n).collect(Collectors.toList());
System.out.println(squareList.toString());// [1, 4, 9, 16]
}
}
清单 9. 一对多 (flatMap把input stream中的层级结构扁平化,就是将底层元素抽出来放到一起,最终output的Stream里面已经没有List了,都是直接的数字)
public class ManyToOne {
public static void main(String[] args) {
Stream<List<Integer>> inputStream =
Stream.of(Arrays.asList(1), Arrays.asList(2, 3), Arrays.asList(4, 5, 6));
Stream<Integer> outputStream = inputStream.flatMap(childList -> childList.stream());
System.out.print(outputStream.collect(Collectors.toList()).toString());// [1, 2, 3, 4, 5, 6]
}
}
Filter
清单 10. 留下偶数
public class KeepEvenNumber {
public static void main(String[] args) {
Integer[] sixNums = {1, 2, 3, 4, 5, 6};
Integer[] evens = Stream.of(sixNums).filter(n -> n % 2 == 0).toArray(Integer[]::new);
System.out.println(Arrays.toString(evens));// [2, 4, 6]
}
}
清单 11. 把单词挑出来 (首先把每行的单词用flatMap整理到新的Stream, 然后保留长度不为0的,就是正品文章中的全部单词了)
public class PickAllWords {
public static void main(String[] args) {
Path path = Paths.get(System.getProperty("user.dir")
+ "/src/main/java/com/wdxxl/jdk8/ibm/stream/PickAllWords.java");
// 1. Java 8 Read File + Stream
try (Stream<String> stream = Files.lines(path)) {
List<String> output = stream.flatMap(line -> Stream.of(line.split(" ")))
.filter(word -> word.length() > 0).collect(Collectors.toList());
System.out.println(output);
} catch (IOException e) {
e.printStackTrace();
}
// 2. BufferedReader + Stream
try (BufferedReader br = Files.newBufferedReader(path)) {
List<String> output = br.lines().flatMap(line -> Stream.of(line.split(" ")))
.filter(word -> word.length() > 0).collect(Collectors.toList());
System.out.println(output);
} catch (IOException e) {
e.printStackTrace();
}
}
}
ForEach
清单 12. 打印姓名 (forEach 和pre-java8的对比) 【forEach 不能修改自己包含的本地变量值,也不能用break/return 之类的关键字提前结束循环】
public class TestForEach {
public static void main(String[] args) {
List<Person> roster = new ArrayList<>();
roster.add(new Person(Person.Sex.FEMALE, "Lisa"));
roster.add(new Person(Person.Sex.MALE, "King"));
roster.add(new Person(Person.Sex.MALE, "Jake"));
// JDK 8
roster.stream().filter(p -> p.gender == Person.Sex.MALE)
.forEach(p -> System.out.println(p.name));
// JDK 7
for (Person p : roster) {
if(p.gender == Person.Sex.MALE){
System.out.println(p.name);
}
}
}
}
class Person {
Sex gender;
String name;
public enum Sex { MALE, FEMALE }
public Person(Sex gender, String name) {
this.gender = gender;
this.name = name;
}
}
清单 13. peek 对每个元素执行操作并且返回一个新的Stream 【peek : 偷窥】注意执行顺序
public class Peek {
public static void main(String[] args) {
Stream.of("one", "two", "three", "four")
.filter(p -> p.length() > 3)
.peek(v -> System.out.println("Filtered Value:" + v))
.map(String::toUpperCase)
.peek(v -> System.out.println("Mapped Value:" + v))
.collect(Collectors.toList());
// 1. Filtered Value:three
// 2. Mapped Value:THREE
// 3. Filtered Value:four
// 4. Mapped Value:FOUR
}
}
清单 14. Optional的两个用例 【使用Optional代码的可读性好,而且它提供的是编译时检查,能极大的降低NPE对程序的影响】
public class OptionalTest {
public static void main(String[] args) {
String strA = " abcd", strB = null;
print(strA);
print(" ");
print(strB);
System.out.println(getLength(strA));
System.out.println(getLength(" "));
System.out.println(getLength(strB));
}
public static void print(String text) {
// JDK 8
Optional.ofNullable(text).ifPresent(System.out::println);
// Pre-JDK 8
if (text != null) { System.out.println(text); }
}
public static int getLength(String text) {
// JDK 8
return Optional.ofNullable(text).map(String::length).orElse(-1);
// Pre-JDK 8
// return (text != null) ? text.length() : -1;
}
}
reduce
清单 15. reduce的用例
public class ReduceTest {
public static void main(String[] args) {
// 1. 求和 SUM 10
Integer sum = Stream.of(1, 2, 3, 4).reduce(0, (a, b) -> a + b);
sum = Stream.of(1, 2, 3, 4).reduce(0, Integer::sum); //有起始值
sum = Stream.of(1, 2, 3, 4).reduce(Integer::sum).get(); //无起始值
// 2. 最小值 minValue = -3.0
double minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MAX_VALUE, Double::min);
minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double::min).get();
// 2. 最大数值 maxValue = 1.0
double maxValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MIN_VALUE, Double::max);
maxValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double::max).get();
// 3. 字符串连接 Concat "ABCD"
String concat = Stream.of("A", "B", "C", "D").reduce("", String::concat);
// 4. 过滤和字符串连接 Filter & Concat = "ace"
concat = Stream.of("a", "B", "c", "D", "e", "F")
.filter(x -> x.compareTo("Z") > 0)
.reduce("", String::concat);
}
}
limit/skip (limit 返回Stream的前面n个元素; skip 则是扔掉前n个元素; 它是由一个叫subStream的方法改名而来.)
清单 16. limit和skip对运行次数的影响
public class LimitSkipTest {
public static void main(String[] args) {
List<LimitSkipTest.User> users = new ArrayList<>();
LimitSkipTest limitSkipTest = new LimitSkipTest();
for (int i = 0; i < 100; i++) {
users.add(limitSkipTest.new User(i, "name_" + i)); // 内部类构造
}
List<String> userList = users.stream()
.map(User::getName) // name_0name_1name_2name_3name_4name_5name_6name_7name_8name_9
.limit(10)
.skip(3)
.collect(Collectors.toList());
System.out.println(userList);// [name_3, name_4, name_5, name_6, name_7, name_8, name_9]
}
// 内部类
class User {
public int no;
private final String name;
public User(int no, String name) { this.no = no; this.name = name; }
public String getName() { System.out.print(name); return name; }
}
}
清单 17. limit和skip对sorted后的运行次数无影响
public class LimitSkipTest2 {
public static void main(String[] args) {
List<LimitSkipTest2.User> users = new ArrayList<>();
LimitSkipTest2 limitSkipTest2 = new LimitSkipTest2();
for (int i = 0; i < 5; i++) {
users.add(limitSkipTest2.new User(i, "name_" + i));
}
// 对users做了13次微调,首先对5个元素的Stream排序,然后进行limit操作
List<String> userList = users.stream()
.sorted((p1, p2) -> p1.getName().compareTo(p2.getName()))
.map(User::getName) // name_1,name_0,name_2,name_1,name_3,name_2,name_4,name_3,name_0,name_1,
.limit(2)
.collect(Collectors.toList());
System.out.println(userList);// [name_0, name_1]
}
// 内部类
class User {
public int no;
private final String name;
public User(int no, String name) { this.no = no; this.name = name; }
public String getName() { System.out.print(name); return name; }
}
}
sorted
清单 18. 排序前进行limit和skip (这种优化是有business logic上的局限性的: 既不需要排序后再取值)
List<String> userList = users.stream()
.limit(2)
.sorted((p1, p2) -> p1.getName().compareTo(p2.getName()))
.map(User::getName) // name_1,name_0,name_0,name_1,
.collect(Collectors.toList());
System.out.println(userList);// [name_0, name_1]
min/max/distinct 【min和max的功能也可以通过对Stream元素先排序,再findFirst来实现,但前者的性能会更好,为O(n),而sorted的成本是O(n log n)】
清单 19. 找出最长一行的长度
public class FindLongestLine {
public static void main(String[] args) {
Path path = Paths.get(System.getProperty("user.dir")
+ "/src/main/java/com/wdxxl/jdk8/ibm/stream/FindLongestLine.java");
// 2. BufferedReader + Stream
try (BufferedReader br = Files.newBufferedReader(path)) {
int output = br.lines()
.mapToInt(String::length)
.max()
.getAsInt();
System.out.println(output);// 83
} catch (IOException e) {
e.printStackTrace();
}
}
}
清单 20. 找出全文的单词,转小写,并且排序
public class OperateWords {
public static void main(String[] args) {
Path path = Paths.get(System.getProperty("user.dir")
+ "/src/main/java/com/wdxxl/jdk8/ibm/stream/OperateWords.java");
// 2. BufferedReader + Stream
try (BufferedReader br = Files.newBufferedReader(path)) {
List<String> output = br.lines()
.flatMap(line -> Stream.of(line.split(" ")))
.map(String::toLowerCase)
.distinct()
.sorted()
.collect(Collectors.toList());
System.out.println(output);
} catch (IOException e) {
e.printStackTrace();
}
}
}
Match
- allMatch: Stream 中全部元素符合传入的predicate,返回true
- anyMatch: Stream 中只要有一个元素符合传入的predicate,返回true
- noneMatch: Stream 中没有一个元素符合传入的predicate,返回true
清单 21. 使用Match
public class MatchTest {
public static void main(String[] args) {
List<MatchTest.User> users = new ArrayList<>();
MatchTest matchTest = new MatchTest();
for (int i = 0; i < 5; i++) {
users.add(matchTest.new User(i, "name_" + i, i * 5));
}
boolean isAllAdult = users.stream().allMatch(p -> {
System.out.println(p.age); // 0 和 private final int age; ??
return p.age > 18;
});
System.out.println("All are adult? " + isAllAdult); // All are adult? false
boolean isAnyChild = users.stream().anyMatch(p -> p.age < 12);
System.out.println("Any Child? " + isAnyChild); // Any Child? true
boolean noneOldThan19 = users.stream().noneMatch(p -> p.age > 19);
System.out.println("none Old Than 19? " + noneOldThan19);// none Old Than 19? false
boolean noneOldThan50 = users.stream().noneMatch(p -> p.age > 50);
System.out.println("none Old Than 50? " + noneOldThan50);// none Old Than 50? true
}
class User {
public int no;
public String name;
private final int age;
public User(int no, String name, int age) { this.no = no; this.name = name; this.age = age; }
}
}
进阶: 自己生成流
Stream.generate
通过实现Supplier借口,你可以自己来控制流的生成。这种情形通常用于随机数,常量的Stream,或者需要前后元素建维持着某种状态信息的Stream。把Supplier示例传递给Stream.generate() 生成的Stream,默认是串行 (相对parallel而言)但无序的(相对于ordered而言)。由于它是无限的,在管道中,必须利用limit之类的操作限制Stream大小。
清单 22. 生产10个随机整数
public class RandomTest {
public static void main(String[] args) {
Random seed = new Random();
Supplier<Integer> random = seed::nextInt;
Stream.generate(random)
.limit(10)
.forEach(System.out::println);
// Another way
IntStream.generate(() -> (int) (System.nanoTime() % 100))
.limit(10)
.forEach(System.out::println);
}
}
清单 23. 自实现Supplier 【Stream.generate 还接受自己实现的Supplier。 例如在构造海量测试数据的时候,用某种自动的规则给每一个变量赋值,或者依据公式计算Stream的每个元素之。这些都是维持状态信息的情形】
public class SupplierTest {
public static void main(String[] args) {
SupplierTest supplierTest = new SupplierTest();
Stream.generate(supplierTest.new UserSupplier()).limit(10)
.forEach(p -> System.out.println(p.name + ":" + p.age));
}
class UserSupplier implements Supplier<User> {
private int index = 0;
private final Random random = new Random();
@Override
public User get() {
return new User(index++, "name_" + index, random.nextInt(100));
}
}
class User {
public int no;
private final String name;
private final int age;
public User(int no, String name, int age) { this.no = no; this.name = name; this.age = age; }
}
}
清单 24. 生产一个等差数列
public class Sequence {
public static void main(String[] args) {
Stream.iterate(0, n -> n + 3)
.limit(10).forEach(x -> System.out.print(x + " "));// 0 3 6 9 12 15 18 21 24 27
Stream.iterate(4, n -> n + 3)
.limit(10).forEach(x -> System.out.print(x + " "));// 4 7 10 13 16 19 22 25 28 31
}
}
进阶: 用Collectors来进行reduction操作
grouping/partitioningBy
清单 25. groupingBy 按照年龄归组
public class AdultGroup {
public static void main(String[] args) {
AdultGroup adultGroup = new AdultGroup();
Map<Integer, List<User>> children = Stream.generate(adultGroup.new UserSupplier())
.limit(100)
.collect(Collectors.groupingByConcurrent(User::getAge));
Iterator it = children.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Integer, List<User>> users = (Map.Entry) it.next();
System.out.println("Age: " + users.getKey() + "=" + users.getValue().size());
}
}
class UserSupplier implements Supplier<User> {
private int index = 0;
private final Random random = new Random();
@Override
public User get() {
return new User(index++, "name_" + index, random.nextInt(100));
}
}
class User {
public int no;
public String name;
public int age;
public User(int no, String name, int age) { this.no = no; this.name = name; this.age = age; }
public int getAge() { return age; }
}
}
清单 26. partitioningBy 按照未成年人和成年人归组
在使用条件“年龄小于18”进行分组后可以看到,不到18岁的未成年人是一组,成年人是另外一组。
public class AdultPartition {
public static void main(String[] args) {
AdultPartition adultPartition = new AdultPartition();
Map<Boolean, List<User>> children = Stream.generate(adultPartition.new UserSupplier())
.limit(100)
.collect(Collectors.partitioningBy(p -> p.age > 18));
System.out.println("Children number:" + children.get(false).size());
System.out.println("Adult number:" + children.get(true).size());
}
class UserSupplier implements Supplier<User> {
private int index = 0;
private final Random random = new Random();
@Override
public User get() {
return new User(index++, "name_" + index, random.nextInt(100));
}
}
class User {
public int no;
public String name;
public int age;
public User(int no, String name, int age) { this.no = no; this.name = name; this.age = age; }
}
}
结束语
总之,Stream 的特性可以归纳为:
- 不是数据结构
- 它没有内部存储,它只是用操作管道从 source(数据结构、数组、generator function、IO channel)抓取数据。
- 它也绝不修改自己所封装的底层数据结构的数据。例如 Stream 的 filter 操作会产生一个不包含被过滤元素的新 Stream,而不是从 source 删除那些元素。
- 所有 Stream 的操作必须以 lambda 表达式为参数
- 不支持索引访问
- 你可以请求第一个元素,但无法请求第二个,第三个,或最后一个。不过请参阅下一项。
- 很容易生成数组或者 List
- 惰性化
- 很多 Stream 操作是向后延迟的,一直到它弄清楚了最后需要多少数据才会开始。
- Intermediate 操作永远是惰性化的。
- 并行能力
- 当一个 Stream 是并行化的,就不需要再写多线程代码,所有对它的操作会自动并行进行的。
- 可以是无限的
- 集合有固定大小,Stream 则不必。limit(n) 和 findFirst() 这类的 short-circuiting 操作可以对无限的 Stream 进行运算并很快完成。
原文:IBM: Java 8 中的 Streams API 详解