Средства распараллеливания в Java 1.7 (jsr166y...) Михаил Пономаренко, Tech Lead компании Sigma Ukraine
немного истории до java 1.5 были –wait –notify –synchonized в jsr166 –java.util.concurrent –Future, ThreadExecutor –ConcurrentMap - асинхронные итераторы
немного истории в jsr166x – BlockingDeque –ConcurrentNavigableMap, NavigableMap в jsr166y –ForkJoinPool –Phaser –ParallelArray (jsr166y extra)
java.util.concurrent.Phaser
void startTasks(List tasks, final int iterations) { final Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int registeredParties) { return phase >= iterations || registeredParties == 0; } }; phaser.register(); for (final Runnable task : tasks) { phaser.register(); new Thread() { public void run() { do { task.run(); phaser.arriveAndAwaitAdvance(); } while (!phaser.isTerminated()); } }.start(); } phaser.arriveAndDeregister(); // deregister self, don't wait }
Fork Join – рекурсивная декомпозиция если задача маленькая - посчитать если большая разбить и посчитать рекурсивно дробить Возможно дробить считать все вместе
JDK7 дает возможность дробить мелко Минимум взаимодействия Fork Join – рекурсивная декомпозиция
ForkJoin задача ответ 4 4
Детали реализации дополнительной сихронизации не требуется старые разбиения "больше Поэтому: 1.У каждого потока свой дек задачь 2.поток выполнения берет задачу из дека 3.задача добавляет подзадачи в дэк или производит вычисления 4.если задачи кончилисть - "украсть" задачу у другого потока
Детали реализации готово 100 готово Готово 50
ForkJoinTask protected abstract booleanexec() ForkJoinTask fork() –Не ждет public final V join() –То же но без исключений public static void invokeAll(ForkJoinTask... tasks)) Invoke = fork(); join();
RecursiveAction extends ForkJoinTask protected void compute() –Посчитать –Поделить –Вызвать invokeAll –Сделать join Нужно что то сделать, но нет возвращаемого значения RecursiveTask – есть возаращаемое значение – его вернет join
Примеры Doug Lea ПримерОписание FibФибоначчи - подсчет числа фибоначчи для 47. Меньше 13 считается влоб IntegrateИнтегрирование - суммирование численного интеграла от -47 до 48 для (2i-1)x (2i-1) MicroПоиск лучшего хода в настольной игре. Прогноз на 4 хода вперед SortСортировка слиянием 100 миллионов чисел ММУмножение двух матриц 2048х2048. double LUlu-разложеные невыраженной матрицы 4096х4096. double JacobiИтерационная релаксация сетки с ограничениями. Усреднение по 100 соседям. матрица 4096х4096
Результаты Doug Lea Быстрее раз процессоров
Пример 2^28 произвольных чисел double (2 гб) ForkJoinPool pool = new ForkJoinPool(p); SinCosHuge task = new SinCosHuge(randomData); pool.execute(task); Double rz = task.join();
Пример import java.util.concurrent.RecursiveTask; public class SinCosHuge extends RecursiveTask { … protected Double compute() { if (to - from < threshold) { double rz = 0; for (int i = from; i < to; i++) { rz += Math.sin(data[i]) + Math.atan(data[i]); } return rz; } else { int i = (from + to) / 2; SinCosHuge right = new SinCosHuge(data, from, i); SinCosHuge left = new SinCosHuge(data, i, to); invokeAll(left,right); return right.join() + left.join(); } …
Мои измерения Загрузка процессора 100% температура
Мои измерения, время выполнения. потоков время
потоков Во сколько раз медленнее минус один
Мои измерения – относительно самого быстрого потоков Относительная разница в скорости. %
А если по старому? ThreadExecutorPool Result –add(double) –waitDone –fork ThreadPoolExecutor tpe = new ThreadPoolExecutor(p, p, 10,SECONDS, workQueue); Result rz = new Result(); tpe.submit(new SinCosHugePool(randomData, tpe, rz)); rz.waitDone();
Runnable.run if (to - from < treshold) { double rz = 0; for (int i = from; i < to; i++) { rz += Math.sin(data[i]) + Math.atan(data[i]); } result.add(rz); } else { int i = (from + to) / 2; SinCosHugePool right = new SinCosHugePool(data, from, i, executor,result); SinCosHugePool left = new SinCosHugePool(data, i, to, executor,result); result.fork(); executor.execute(left); executor.execute(right); }
А если по старому? потоков Во сколько раз медленнее минус один
А если по старому Относительная разница в скорости. % Ниже = лучше потоков
ParallelArray Судя по всему НЕ попадет в JDK 1.7, но исходники доступны MapReduce в пределах одной машины
Пример IBM ParallelArray students = new ParallelArray (fjPool, data); double bestGpa = students.withFilter(isSenior).withMapping(selectGpa).max(); public class Student { String name; int graduationYear; double gpa; } static final Ops.Predicate isSenior = new Ops.Predicate () { public boolean op(Student s) { return s.graduationYear == Student.THIS_YEAR; } }; static final Ops.ObjectToDouble selectGpa = new Ops. ObjectToDouble () { public double op(Student student) { return student.gpa; } };
Мой пример new long[16384 * 16384/8] - 1 гб рабочей памяти, double[16384 * 16384/2] – 6гб рабочей import jsr166y.ForkJoinPool; import extra166y.ParallelLongArray; ….. long[] randomData = new long[16384 * 16384/8]; ForkJoinPool pool = new ForkJoinPool(p); ParallelLongArray arr = ParallelLongArray.createUsingHandoff(randomData, pool); int uniqueCount = arr.allUniqueElements().size(); ParallelLongArray.SummaryStatistics summary = arr.summary();
Результаты потоков Во сколько раз медленнее
Откуда начать Concurrency JSR-166 Interest Site: