Skip to content

java中stream的了解

bluecryolite edited this page Sep 19, 2018 · 3 revisions

引言

我之所以比较坚决地想升级到JDK1.8,并不是着眼于什么函数式编程、lanmda、匿名函数、惰性求值(咦,是不是说得太多了),而是以前的编码方式,让我觉得编程思路常常被打断。当然,客观上,升级后也带了更精悍的代码,但是主要还是觉得能把注意力集中在业务而不是语法上。

举个例子。
先做下数据准备:

    private List<ImmutableTriple<Integer, String, String>> personDic;

    public void InitData() {
        personDic = new ArrayList<ImmutableTriple<Integer, String, String>>();
        personDic.add(new ImmutableTriple<Integer, String, String>(1, "Jesse", "Tester"));
        personDic.add(new ImmutableTriple<Integer, String, String>(5, "Tony", "Dever"));
        personDic.add(new ImmutableTriple<Integer, String, String>(2, "Maggie", "Tester"));
        personDic.add(new ImmutableTriple<Integer, String, String>(3, "Bary", "Dever"));
        personDic.add(new ImmutableTriple<Integer, String, String>(4, "Bob", "Dever"));
    }

如果我们要把一个列表转成另一个类型的列表,在JDK1.8之下的版本做开发的时候,我会陡然发现,我的思路从业务中出来了,思路切换到了去创建一个新对象、写一个循环、提醒自己不要忘记写add(著名bug)。如:

    public List<Person> convertOld() {
        List<Person> persons = new ArrayList<Person>();
        for(ImmutableTriple<Integer, String, String> t: personDic) {
            Person person = new Person();
            person.setId(t.getLeft());
            person.setName(t.getMiddle());
            person.setJob(t.getRight());
            persons.add(person);
        }

        return persons;
    }

而升级到jdk1.8之后,只需要埋头写转换,而不用再去考虑其他的语法,这能让我的思路保持连贯。如:

    public List<Person> convert() {
        return convertInter().collect(Collectors.toList());
    }

    private Stream<Person> convertInter() {
        return personDic.stream().map(t -> {
            Person person = new Person();
            person.setId(t.getLeft());
            person.setName(t.getMiddle());
            person.setJob(t.getRight());
            return person;
        });
    }

Stream方法的分类

不知道该怎么写Stream,这大概是因为其实我并不怎么懂的缘故。那么,就只写我知道的咯。

Stream的方法,其中一种分类方法,是可以分成转换类(Transform)方法和行为类(Action)方法。简单识别的方法是:

  • 如果Stream的某个方法执行后,返回值还是Stream,那么这个方法就是转换类方法;
  • 如果Stream的某个方法执行后,返回值不再是Stream,那么这个方法就是行为类方法。

其中,转换类方法是惰性求值的,调用的时候并不执行,直到碰上了一个行为类方法才真正执行。
如前述例子:
ConvertOld方法执行后,一个新的List就已经产生了;
但是ConvertInter执行后,并不会产生新的List,也没有Person的实例产生(也就是其实并没有执行),直到遇到了Convert方法中调用collect,这时才会产生新的List。

转换类方法

这在JAVA 8中又称为中间操作(intermediate operation)
转换类方法大致包括:

  • 类型转换 map
  • 筛选 filter
  • 排序 sorted
  • 去重 distinct
  • 扁平化 flatMap
  • 子集:前半截子集 limit、后半截子集 skip
  • 并行 parallel
  • 执行peek

着重讲下以下几点:

  • flatmap 一般用于把一个集合的集合中的所有元素,转成为一个一维的集合。
    public List<Person> flatMap() {
        //准备数据
        List<Person> sourcePersons = convertOld();
        List<Person> splitPersons1 = sourcePersons.stream().filter(t -> t.getId() < 3).collect(Collectors.toList());
        List<Person> splitPersons2 = sourcePersons.stream().filter(t -> t.getId() >= 3).collect(Collectors.toList());

        //扁平化集合的集合
        Stream<List<Person>> sourceLists = Stream.of(splitPersons1, splitPersons2);
        return sourceLists.flatMap(t -> t.stream()).collect(Collectors.toList());
    }
  • sorted
    /**
     * 排序。
     * 多字段的写法:Comparator.comparing().thenComparing()
     *
     * @return
     */
    public List<Person> sort() {
        return convertInter().sorted(Comparator.comparing(Person::getId))
                .collect(Collectors.toList());
    }

    /**
     * 排序,倒序。
     *
     * @return
     */
    public List<Person> sortDesc() {
        return convertInter().sorted(Comparator.comparing(Person::getId).reversed())
                .collect(Collectors.toList());
    }
  • parallel 顾名思义,可以让后续的操作并行执行了,充分发挥多核的作用。

  • peek,在后面再做介绍

  • 惰性求值。这个主要是要确认真正执行的时间,可能会踩到以为执行了但是其实还没有执行的坑。

行为类方法

这在JAVA 8中被称为终止操作(terminal operation)
行为类的方法,包括:

  • 计算 reduce,由其衍生的最小值 min、最大值 max、计数 count
  • 遍历 forEach
  • 匹配 allMatch anyMatch noneMatch findAny findFirst
  • 最复杂的 collect,用于类型转换、分组、分区、统计、字符串连接。这是目前还没有完全搞懂的。

着重讲以下几点:

  • group + max和min的时候,返回的是第一个找到的最大或者最小的那个实例。
    /**
     * 分组
     * 多字段的写法:.collect(groupingBy(Class:field, groupingBy(Class:field)))
     *
     * @return
     */
    public Map<String, List<Person>> group() {
        return convertInter()
                .collect(groupingBy(Person::getJob));
    }

    /**
     * 分组并统计
     *
     * @return
     */
    public Map<String, Optional<Person>> groupAndStat() {
        return convertInter()
                .collect(groupingBy(Person::getJob, Collectors.maxBy(Comparator.comparing(Person::getId))));
    }
  • 针对一个Stream,其行为类方法只能调用一次
    Stream<Person> persons = convertInter();
    persons.collect(Collectors.toList());
    persons.min();    //抛出异常

如果要对一个Stream做多项统计,可以采用的方法是:
先map为一个统计类,然后在reduce中计算。

    /**
     * 统计多项值
     * 
     * @return
     */
    public Optional<HashMap<String, Integer>> stat() {
        return convertInter()
                .map(t -> {
                    HashMap<String, Integer> result = new HashMap<String, Integer>();
                    result.put("min", t.getId());
                    result.put("max", t.getId());
                    result.put("count", 1);
                    return result;
                })
                .reduce((t, r) -> {
                    if (t.get("min") > r.get("min")) {
                        t.put("min", r.get("min"));
                    }
                    if (t.get("max") < r.get("max")) {
                        t.put("max", r.get("max"));
                    }
                    t.put("count", t.get("count") + 1);
                    return t;
                });
    }
  • peek和forEach的区别
    peek是一个转换类方法,在其中可以针对每一个元素执行某个操作,但是依然返回一个Stream,并且很显然,这 不会立即执行
    forEach是一个行为类方法,针对某个Stream只能执行一次
    看看伪代码:
Stream<T> stream = Strem.of(...)
    .peek(t -> {doSomething(t)})  //比如:把每个元素插入到某张表
    .filter(t -> {doSomeFilter(t)})  //筛选元素
    .peek(t -> {doAnotherThing(t)})  //比如:把筛选后的元素插入到另一张表
    .forEach(t -> {doOtherThing(t)});  //比如:返回每个元素的状态

多线程

先发一个老式写法,这个写法也是打断思路得不行不行的:

    public void parallelByExecutor() {
       ExecutorService multiTaskExecutor = Executors.newFixedThreadPool(4);
       List<Future<ImmutablePair<Integer, String>>> newJobFutures = new ArrayList<Future<ImmutablePair<Integer, String>>>();
       for (Person person: convertOld()) {
           newJobFutures.add(multiTaskExecutor.submit(new NewJobCallable(person.getId(), person.getJob())));
       }
       
       for (Future<ImmutablePair<Integer, String>> item: newJobFutures) {
           try {
               ImmutablePair<Integer, String> response = item.get();
               //doSomething(response);
           } catch(Exception err) {
               err.printStackTrace();
           }
       }
    }

    private class NewJobCallable implements Callable<ImmutablePair<Integer, String>> {
        private final Integer id;
        private final String job;

        private NewJobCallable(Integer id, String job) {
            this.id = id;
            this.job = job;
        }

        @Override
        public ImmutablePair<Integer, String> call() {
            return new ImmutablePair<Integer, String>(id, "new " + job);
        }
    }

用Stream的parallel和peek改造一下

    /**
     * 并行调用数据操作或者接口操作等其它方法
     * 
     */
    public void parallel() {
        convertInter().parallel().peek(t -> t.setJob("new " + t.getJob()))
                .forEach(t -> System.out.println(t.getName() + " " + t.getJob()));
    }

MapReduce

在前述介绍中,出现了Map,出现了Reduce。是的,没错,如果你对Hadoop有所了解的话,这就是Hadoop中介绍的MapReduce的概念。
当然,Hadoop中的MapReduce的编程模型抽象得很底层(这和JDK版本无关),编程也是一件痛苦的事情(参看示例

就用一个MapReduce的HelloWorld作为结束吧,包含了分组、统计、并行、合并诸多操作。当然,更简单的写法是在collect中做gruop和count,不过,既然是讲MapReduce,总不能既没有map,也没有reduce吧。

    /**
     * 使用MapReduce做词频统计
     *
     * @return
     */
    public Optional<HashMap<String, Integer>> helloMapReduce() {
        //数据准备
        String sources = "This is a test file. This file is typed some persons: Tony Wu, Tony Wang, Tony Tang, Leon Liu, Leon Zhu, Tom Tang";
        Pattern pattern = Pattern.compile("[a-zA-Z]+");
        Matcher matcher = pattern.matcher(sources);
        Stream.Builder<String> builder = Stream.builder();
        while (matcher.find()) {
            builder.accept(matcher.group().toLowerCase());
        }

        //词频统计
        return builder.build().parallel().map(t -> {
            HashMap<String, Integer> result = new HashMap<String, Integer>();
            result.put(t, 1);
            return result;
        })
        .reduce((t, v) -> {
            //前面使用了并行,所以这里需要考虑做合并,即:t和v中都可能存在多条数据。
            for (String v_key : v.keySet()) {
                Integer n = t.get(v_key);
                if (n == null) {
                    n = 0;
                }
                t.put(v_key, v.get(v_key) + n);
            }

            return t;
        });
    }

文中的源码地址

Clone this wiki locally