深入浅析Java7中的新特性forkjoin?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
创新互联,为您提供重庆网站建设、重庆网站制作、网站营销推广、网站开发设计,对服务资质代办等多个行业拥有丰富的网站建设及推广经验。创新互联网站建设公司成立于2013年,提供专业网站制作报价服务,我们深知市场的竞争激烈,认真对待每位客户,为客户提供赏心悦目的作品。 与客户共同发展进步,是我们永远的责任!
Java7引入了Fork Join的概念,来更好的支持并行运算。顾名思义,Fork Join类似与流程语言的分支,合并的概念。也就是说Java7 SE原生支持了在一个主线程中开辟多个分支线程,并且根据分支线程的逻辑来等待(或者不等待)汇集,当然你也可以fork的某一个分支线程中再开辟Fork Join,这也就可以实现Fork Join的嵌套。
有两个核心类ForkJoinPool和ForkJoinTask。
ForkJoinPool实现了ExecutorService接口,起到线程池的作用。所以他的用法和Executor框架的使用时一样的,当然Fork Join本身就是Executor框架的扩展。ForkJoinPool有3个关键的方法,来启动线程,execute(…),invoke(…),submit(…)。具体描述如下:
|   | 
首先,用户需要创建一个自己的ForkJoinTask。代码如下:
public class MyForkJoinTask extends ForkJoinTask {
 
 /**
  *
  */
 private static final long serialVersionUID = 1L;
 private V value;
 private boolean success = false;
 @Override
 public V getRawResult() {
  return value;
 }
 @Override
 protected void setRawResult(V value) {
  this.value = value;
 }
 @Override
 protected boolean exec() {
  System.out.println("exec");
  return this.success;
 }
 public boolean isSuccess() {
  return success;
 }
 public void setSuccess(boolean isSuccess) {
  this.success = isSuccess;
 }
}测试ForkJoinPool.invoke(…):
 @Test
 public void testForkJoinInvoke() throws InterruptedException, ExecutionException {
  ForkJoinPool forkJoinPool = new ForkJoinPool();
  MyForkJoinTask task = new MyForkJoinTask();
  task.setSuccess(true);
  task.setRawResult("test");
  String invokeResult = forkJoinPool.invoke(task);
  assertEquals(invokeResult, "test");
 }
 @Test
 public void testForkJoinInvoke2() throws InterruptedException, ExecutionException {
  final ForkJoinPool forkJoinPool = new ForkJoinPool();
  final MyForkJoinTask task = new MyForkJoinTask();
  new Thread(new Runnable() {
   public void run() {
    try {
     Thread.sleep(1000);
    } catch (InterruptedException e) {
    }
    task.complete("test");
   }
  }).start();
  // exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...)
  String result = forkJoinPool.invoke(task);
  System.out.println(result);
 }
 @Test
 public void testForkJoinSubmit() throws InterruptedException, ExecutionException {
  final ForkJoinPool forkJoinPool = new ForkJoinPool();
  final MyForkJoinTask task = new MyForkJoinTask();
  task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞
  ForkJoinTask result = forkJoinPool.submit(task);
  result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete
 }测试ForkJoinPool.submit(…):
@Test
 public void testForkJoinSubmit() throws InterruptedException, ExecutionException {
  final ForkJoinPool forkJoinPool = new ForkJoinPool();
  final MyForkJoinTask task = new MyForkJoinTask();
  task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞
  ForkJoinTask result = forkJoinPool.submit(task);
  result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete
 }
 @Test
 public void testForkJoinSubmit2() throws InterruptedException, ExecutionException {
  final ForkJoinPool forkJoinPool = new ForkJoinPool();
  final MyForkJoinTask task = new MyForkJoinTask();
  forkJoinPool.submit(task);
  Thread.sleep(1000);
 }
 @Test
 public void testForkJoinSubmit3() throws InterruptedException, ExecutionException {
  final ForkJoinPool forkJoinPool = new ForkJoinPool();
  final MyForkJoinTask task = new MyForkJoinTask();
  new Thread(new Runnable() {
   public void run() {
    try {
     Thread.sleep(1000);
    } catch (InterruptedException e) {
    }
    task.complete("test");
   }
  }).start();
  ForkJoinTask result = forkJoinPool.submit(task);
  // exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...)
  result.get();
  Thread.sleep(1000);
 }测试ForkJoinPool.execute(…):
 @Test
 public void testForkJoinExecute() throws InterruptedException, ExecutionException {
  ForkJoinPool forkJoinPool = new ForkJoinPool();
  MyForkJoinTask task = new MyForkJoinTask();
  forkJoinPool.execute(task); // 异步执行,无视task.exec()返回值。
 }在实际情况中,很多时候我们都需要面对经典的“分治”问题。要解决这类问题,主要任务通常被分解为多个任务块(分解阶段),其后每一小块任务被独立并行计算。一旦计算任务完成,每一快的结果会被合并或者解决(解决阶段)。ForkJoinTask天然就是为了支持“分治”问题的。
分支/合并的完整过程如下:

下面列举一个分治算法的实例。
import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class MaximumFinder extends RecursiveTask{ private static final int SEQUENTIAL_THRESHOLD = 5; private final int[] data; private final int start; private final int end; public MaximumFinder(int[] data, int start, int end) { this.data = data; this.start = start; this.end = end; } public MaximumFinder(int[] data) { this(data, 0, data.length); } @Override protected Integer compute() { final int length = end - start; if (length < SEQUENTIAL_THRESHOLD) { return computeDirectly(); } final int split = length / 2; final MaximumFinder left = new MaximumFinder(data, start, start + split); left.fork(); final MaximumFinder right = new MaximumFinder(data, start + split, end); return Math.max(right.compute(), left.join()); } private Integer computeDirectly() { System.out.println(Thread.currentThread() + ' computing: ' + start + ' to ' + end); int max = Integer.MIN_VALUE; for (int i = start; i < end; i++) { if (data[i] > max) { max = data[i]; } } return max; } public static void main(String[] args) { // create a random data set final int[] data = new int[1000]; final Random random = new Random(); for (int i = 0; i < data.length; i++) { data[i] = random.nextInt(100); } // submit the task to the pool final ForkJoinPool pool = new ForkJoinPool(4); final MaximumFinder finder = new MaximumFinder(data); System.out.println(pool.invoke(finder)); } } 
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注创新互联行业资讯频道,感谢您对创新互联的支持。
网站标题:深入浅析Java7中的新特性forkjoin
本文路径:http://www.cqwzjz.cn/article/jogdpc.html

 建站
建站
 咨询
咨询 售后
售后
 建站咨询
建站咨询 
 