freeBuf
主站

分类

漏洞 工具 极客 Web安全 系统安全 网络安全 无线安全 设备/客户端安全 数据安全 安全管理 企业安全 工控安全

特色

头条 人物志 活动 视频 观点 招聘 报告 资讯 区块链安全 标准与合规 容器安全 公开课

官方公众号企业安全新浪微博

FreeBuf.COM网络安全行业门户,每日发布专业的安全资讯、技术剖析。

FreeBuf+小程序

FreeBuf+小程序

Java并行流操作parallelStream简谈
2020-07-14 17:04:36

        近日团队成员在使用Java的并行流parallelStream时遇到一些典型的case,在此整理一些干货,供大家探讨。

 

1.并行流的实现原理

多线程
fork/join
ForkJoinPool
WorkQueue ForkJoinTask<?>[] array   最大值1 << 26; // 64M

2.并行流可能引发的问题

2.1 allMatch问题

问题描述:

allMatch进行匹配,当List中所有对象全部符合某个条件返回true,当List为空时,也返回true

示例:


   public static void main(String[] args) {
       List<UserEntity> userList = Lists.newArrayList();
       for (int i = 1; i <= 10; i++) {
           userList.add(UserEntity.builder()
                  .userName(String.valueOf(i))
                  .build());
      }
       // 结果1 true
       boolean result1 = userList.parallelStream().allMatch(Objects::nonNull);
       System.out.println(result1);
       // 结果2 false
       result1 = userList.parallelStream().allMatch(Objects::isNull);
       System.out.println(result1);
       userList.clear();
       // 结果3 true
       result1 = userList.parallelStream().allMatch(Objects::nonNull);
       System.out.println(result1);
       // 结果4 true
       result1 = userList.parallelStream().allMatch(Objects::isNull);
       System.out.println(result1);
  }

 

2.2 排序问题

问题描述:

当使用parallelStream.foreach,无序

示例:

    public static void main(String[] args) {
       List<UserEntity> userList = Lists.newArrayList();
       for (int i = 1; i <= 10; i++) {
           userList.add(UserEntity.builder()
                  .userName(String.valueOf(i))
                  .build());
      }
       // foreach
       userList.forEach(userEntity -> {
           System.out.print(userEntity.getUserName() + " ");
      });
    // 1 2 3 4 5 6 7 8 9 10
       System.out.println();
       
       // 并行流foreach
       userList.parallelStream().forEachOrdered(userEntity -> {
           // 处理逻辑
           System.out.print(userEntity.getUserName() + " ");
      });
       // 7 6 8 5 4 2 1 3 9 10
       System.out.println();
       
       // 并行流forEachOrdered
       userList.parallelStream().forEachOrdered(userEntity -> {
           // 处理逻辑
           System.out.print(userEntity.getUserName() + " ");
      });
    // 1 2 3 4 5 6 7 8 9 10
       System.out.println();
       
  }

**使用forEachOrdered (ForEachTask、ForEachOrderedTask)

2.3 使用默认线程池问题

问题描述:

由于并行流默认使用ForkJoinCommonPool,可能会由于某个任务阻塞另一个任务

示例:

    public static void main(String[] args) throws ExecutionException, InterruptedException {
       List<UserEntity> userList = Lists.newArrayList();
       for (int i = 1; i <= 1000; i++) {
           userList.add(UserEntity.builder()
                  .userName(String.valueOf(i))
                  .build());
      }
       List<UserEntity> userList2 = Lists.newArrayList(userList);

       // 任务1
       userList.parallelStream().forEach(userEntity -> {
           System.out.println("任务1 Thread " + Thread.currentThread().getName() + " " + userEntity.getUserName());
           try {
               Thread.sleep(10000);
          } catch (InterruptedException e) {
               e.printStackTrace();
          }
      });

       // 任务2
       userList2.parallelStream().forEach(userEntity -> {
           System.out.println("任务2 Thread " + Thread.currentThread().getName() + " " + userEntity.getUserName());
      });
       System.out.println("---------------------------");
  }

**对于一些耗时特别长的任务使用自定义的线程池

    public static void main(String[] args) throws ExecutionException, InterruptedException {
       List<UserEntity> userList = Lists.newArrayList();
       for (int i = 1; i <= 1000; i++) {
           userList.add(UserEntity.builder()
                  .userName(String.valueOf(i))
                  .build());
      }
       List<UserEntity> userList2 = Lists.newArrayList(userList);

       // 任务1
       ForkJoinPool customThreadPool = new ForkJoinPool(4);
       ForkJoinTask<?> submit1 = customThreadPool.submit(() -> {
           userList.parallelStream().forEach(userEntity -> {
               System.out.println("任务1 Thread " + Thread.currentThread().getName() + " " + userEntity.getUserName());
               try {
                   Thread.sleep(10000);
              } catch (InterruptedException e) {
                   e.printStackTrace();
              }
          });
      });

       // 任务2
       ForkJoinPool customThreadPool2 = new ForkJoinPool(4);
       ForkJoinTask<?> submit2 = customThreadPool2.submit(() -> {
           userList2.parallelStream().forEach(userEntity -> {
               System.out.println("任务2 Thread " + Thread.currentThread().getName() + " " + userEntity.getUserName());

               try {
                   Thread.sleep(10000);
              } catch (InterruptedException e) {
                   e.printStackTrace();
              }
          });
      });

       submit1.get();
       submit2.get();
  }

***注意:使用Java8的并行流时默认会根据机器的CPU核心数开启线程,也可以自定义开启线程的数量(不推荐):System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");

2.4 事务问题

问题描述:

事务不生效

示例:

    @GetMapping("/parallel/add")
   public String parallelAddUser() {
       List<UserEntity> userEntityList = Lists.newArrayList();
       for (int i = 1; i <= 10; i++) {
           userEntityList.add(UserEntity.builder()
                  .userName(String.valueOf(i))
                  .build());
      }
       userService.parallelAddUser(userEntityList);
       return "success";
  }

   @Transactional(rollbackFor = Exception.class)
   public void parallelAddUser(List<UserEntity> userEntityList) {
       userEntityList.parallelStream().forEach(userEntity -> {
           userMapper.insert(userEntity);
           if("10".equalsIgnoreCase(userEntity.getUserName())){
               try {
                   Thread.sleep(1000);
              } catch (InterruptedException e) {
                   e.printStackTrace();
              }
               System.out.println(1/0);
          }
      });
  }

Spring事务上下文通过ThreadLocal维护。parallelStream并发执行,除了主线程,其他线程不在事务上下文中,导致spring事务失效。

2.5 线程安全问题

问题描述:

示例:

    public static void main(String[] args) {
       List<UserEntity> userList = Lists.newArrayList();
       for (int i = 1; i <= 10000; i++) {
           userList.add(UserEntity.builder()
                  .userName(String.valueOf(i))
                  .build());
      }
       List<UserEntity> resultList = Lists.newArrayList();
       for (int i = 1; i <= 10; i++) {
           userList.parallelStream().forEach(resultList::add);
           System.out.println("第" + i + "次List数量为" + resultList.size());
           resultList.clear();
      }
  }

3.使用场景总结

1.当Collection中的元素数量很多时,使用parallelstream会体现出多线程的优势,一般多个线程同时处理元素会比单循环处理快很多(不绝对,取决于循环中的具体操作,如果有运算,一般会体现出优势)

2.Collection中元素不是很多,但是每个循环中的操作都比较耗时,如果不使用多线程,相当于串行处理,处理速度必然要慢很多,这时候应当考虑使用parallelstream来提高处理速度。

3.并行的数据必须是无序并且无状态的,前者会影响速度,后者会影响结果正确性(状态可以理解为 Java 中共享变量)

摘自Java 8实战:

  1. 如果有疑问,测量。把顺序流转成并行流轻而易举,但却不一定是好事。并行流并不总是比顺序流快。此外,并行流有时候会和你的直觉不一致,所以在考虑选择顺序流还是并行流时,第一个也是最重要的建议就是用适当的基准来检查其性能。

  2. 留意装箱。自动装箱和拆箱操作会大大降低性能。Java 8中有原始类型流(IntStream、LongStream、DoubleStream)来避免这种操作,但凡有可能都应该用这些流。

  3. 有些操作本身在并行流上的性能就比顺序流差。特别是limit和findFirst等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。例如,findAny会比findFirst性能好,因为它不一定要按顺序来执行。你总是可以调用unordered方法来把有序流变成无序流。那么,如果你需要流中的n个元素而不是专门要前n个的话,对无序并行流调用limit可能会比单个有序流(比如数据源是一个List)更高效。

  4. 还要考虑流的操作流水线的总计算成本。设N是要处理的元素的总数,Q是一个元素通过流水线的大致处理成本,则N*Q就是这个对成本的一个粗略的定性估计。Q值较高就意味着使用并行流时性能好的可能性比较大。

  5. 对于较小的数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成的额外开销。

  6. 要考虑流背后的数据结构是否易于分解。例如,ArrayList的拆分效率比LinkedList高得多,因为前者用不着遍历就可以平均拆分,而后者则必须遍历。另外,用range工厂方法创建的原始类型流也可以快速分解。

  7. 流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。例如,一个SIZED流可以分成大小相等的两部分,这样每个部分都可以比较高效地并行处理,但筛选操作可能丢弃的元素个数却无法预测,导致流本身的大小未知。

  8. 还要考虑终端操作中合并步骤的代价是大是小(例如Collector中的combiner方法)。如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通过并行流得到的性能提升。

附:流的数据源和可分解性

# java # 多线程 # 并行流 # parallelStream # 并发安全
本文为 独立观点,未经允许不得转载,授权请联系FreeBuf客服小蜜蜂,微信:freebee2022
被以下专辑收录,发现更多精彩内容
+ 收入我的专辑
+ 加入我的收藏
相关推荐
  • 0 文章数
  • 0 关注者