java.util.concurrent Синхронизаторы. Барьеры (1) Барьер – это средство синхронизации, которое используется для того, чтобы некоторое множество потоков ожидало друг друга в некоторой точке программы, называемой обычно точкой синхронизации. В тот момент, когда все потоки достигают точки синхронизации, они разблокируются и могут продолжать выполнение. На практике барьеры обычно используются для сбора результатов выполнения некоторой распараллеленной задачи. В качестве примера можно рассмотреть задачу умножения матриц. При распараллеливании данной задачи каждому потоку будет поручено умножение определенных строк на определенные столбцы. В точке синхронизации все полученные результаты собираются из потоков и в одно из них строится результирующая матрица. В пакете java.util.concurrent для реализации барьерной синхронизации используется класс CyclicBarrier. Конструкторы этого класса получают количество потоков, которые должны достичь точки синхронизации и, опционально, экземпляр интерфейса Runnable, который должен быть исполнен в момент достижения этой точки всеми потоками: CyclicBarrier( int parties ); CyclicBarrier( int parties, Runnable barrierAction );
java.util.concurrent Синхронизаторы. Барьеры (2) intawait( )Ожидание, пока заданное в конструкторе количество потоков выполнит вызов этого метода. intawait( long timeout, TimeUnit unit ) Ожидание, пока заданное в конструкторе количество потоков выполнит вызов этого метода или пока не закончится заданный тайм-аут. intgetNumberWaiting( )Возвращает количество ожидающих потоков. intgetParties( )Возвращает количество потоков, требуемых для преодоления этого барьера (установленное при создании экземпляра). booleanisBroken( )Возвращает true, если хотя бы один поток прерван или ушел из ожидания по тайм-ауту. voidreset( )Сброс барьера в исходное состояние. Методы класса java.util.concurrent.CyclicBarrier: Нужно отметить, что в момент срабатывания барьера экземпляр класса восстанавливает начальное состояние и может отрабатывать следующие parties срабатываний потоков.
java.util.concurrent Синхронизаторы. Барьеры (3) В этой программе параллельно вычисляются суммы элементов строк матрицы (каждой строке сопоставляется свой поток): import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class Main { private static int matrix[ ][ ] = { … };// инициализация или чтение матрицы private static int results[ ]; static Boolean flag = false; private static class Summator extends Thread { int row;// индекс строки для экземпляра сумматора CyclicBarrier barrier;// барьерный синхронизатор Summator( CyclicBarrier barrier, int row ) { // конструктор сумматора this.barrier = barrier; this.row = row; }
java.util.concurrent Синхронизаторы. Барьеры (4) public void run( ) {// метод интерфейса Runnable int columns = matrix[ row ].length; int sum = 0; for ( int i = 0; i < columns; i++ ) {// собственно суммирование sum += matrix[ row ][ i ]; } results[ row ] = sum; System.out.println( "Сумма элементов строки " + row + " равна: " + sum ); try {// ожидание всех остальных barrier.await( ); } catch ( InterruptedException ex ) { ex.printStackTrace( ); } catch ( BrokenBarrierException ex ) { ex.printStackTrace( ); }
java.util.concurrent Синхронизаторы. Барьеры (5) public static void main( String args[ ] ) { // здесь объявления final int rows = matrix.length; results = new int[ rows ]; Runnable merger = new Runnable( ) {// внутренний анонимный класс public void run( ) {// сложение сумм по строкам int sum = 0; for ( int i = 0; i < rows; i++ ) sum += results[ i ]; synchronized( flag ) {// извещение основного потока flag.notifyAll( ); flag = true; } System.out.println( "Сумма элементов матрицы равна: " + sum ); } };
java.util.concurrent Синхронизаторы. Барьеры (6) CyclicBarrier barrier = new CyclicBarrier( rows, merger ); for ( int i = 0; i < rows; i++ ) {// запуск всех сумматоров new Summator( barrier, i ).start( ); } System.out.println( "Ожидание..." ); synchronized( flag ) { while( !flag ){ try { flag.wait( ); } catch( InterruptedException e ) { return; } System.out.println( "Вычисления закончены" ); } В скобках показаны идентификаторы потоков. Сумма элементов строки 1 равна: 4 (9) Сумма элементов строки 2 равна: 9 (10) Сумма элементов строки 0 равна: 102 (8) Сумма элементов строки 4 равна: 25 (12) Ожидание (1) Сумма элементов строки 3 равна: 16 (11) Сумма элементов строки 5 равна: 612 (13) Сумма элементов матрицы равна: 768 (13) Вычисления закончены (1)
java.util.concurrent Синхронизаторы. Обменники Обменник – это параметризованный класс Exchanger пакета java.util.concurrent, с помощью которого можно осуществлять передачу данных между потоками, не заботясь о синхронизации (она реализуется внутренними средствами класса). В двух потоках, которым нужно обменяться данными, вызывается метод exchange() экземпляра класса Exchanger, доступного каждому из них. В качестве аргумента метода указывается значение (его тип должен быть приводим к параметру используемого экземпляра обменника), которое должно быть отдано другому потоку: V exchange( V sendingValue ) Поток, вызвавший этот метод, засыпает до тех пор, пока какой-либо другой поток не вызовет этот же метод этого же экземпляра обменника. Существует возможность указать продолжительность тайм-аута ожидания, после истечения которого проснувшийся поток получит для обработки исключение TimeoutException: V exchange(V sendingValue, long timeout, TimeUnit tUnit) Обменники можно использовать и как средство синхронизации. Часто это оказывается значительно более удобным, чем другие средства. Перепишем только что рассматривавшийся пример.
java.util.concurrent Синхронизаторы. Барьеры и обменники (1) import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class Main { private static int matrix[ ][ ] = { … };// инициализация или чтение матрицы private static int results[ ]; static Boolean flag = false; private static class Summator extends Thread { int row; // индекс строки для экземпляра сумматора CyclicBarrier barrier;// барьерный синхронизатор Summator( CyclicBarrier barrier, int row ) { // конструктор сумматора this.barrier = barrier; this.row = row; }
java.util.concurrent Синхронизаторы. Барьеры и обменники (2) public void run( ) {// метод интерфейса Runnable int columns = matrix[ row ].length; int sum = 0; for ( int i = 0; i < columns; i++ ) {// собственно суммирование sum += matrix[ row ][ i ]; } results[ row ] = sum; System.out.println( "Сумма элементов строки " + row + " равна: " + sum ); try {// ожидание остальных barrier.await( ); } catch ( InterruptedException ex ) { ex.printStackTrace( ); } catch ( BrokenBarrierException ex ) { ex.printStackTrace( ); }
java.util.concurrent Синхронизаторы. Барьеры и обменники (3) public static void main( String args[ ] ) { // здесь объявления final int rows = matrix.length; results = new int[ rows ]; final Exchanger myExchanger = new Exchanger ( ); Runnable merger = new Runnable( ) {// внутренний анонимный класс public void run( ) {// сложение сумм по строкам int sum = 0; for ( int i = 0; i < rows; i++ ) sum += results[ i ]; try {// обмен данными и синхронизация с основным потоком myExchanger.exchange( Integer.valueOf( sum ) ); } catch( InterruptedException ex ) { ex.printStackTrace( ); } System.out.println( "Сумма элементов матрицы равна: " + sum ); } };
java.util.concurrent Синхронизаторы. Барьеры и обменники (4) CyclicBarrier barrier = new CyclicBarrier( rows, merger ); for ( int i = 0; i < rows; i++ ) {// запуск всех сумматоров new Summator( barrier, i ).start( ); } System.out.println( "Ожидание..." ); Integer result = myExchanger.exchange( 0 );// получение результатов System.out.println( "Вычисления закончены. " + result ); } Сумма элементов строки 0 равна: 15 (8) Сумма элементов строки 2 равна: 77 (10) Сумма элементов строки 4 равна: 100 (12) Ожидание... (1) Сумма элементов строки 1 равна: 13 (9) Сумма элементов строки 3 равна: 48 (11) Сумма элементов строки 5 равна: 23 (13) Сумма элементов матрицы равна: 276 (13) Вычисления закончены.276 (1)
java.util.concurrent Синхронизаторы. Защелка (1) Защелка (или щеколда) – средство синхронизации, которое используется для того, чтобы один или несколько потоков могли дождаться выполнения заданного количества операций в других потоках. Класс CountDownLatch пакета java.util.concurrent является соответствующим средством синхронизации. Он работает по принципу таймера, только отсчеты вырабатываются программно, а не кварцевым генератором. При создании экземпляра класса его внутренний счетчик инициализируется начальным значением, передаваемым конструктору. Значение счетчика может быть уменьшено на 1 путем вызова метода countDown(). При вызове метода await() каким-либо потоком, он переходит в состояние ожидания момента достижения счетчиком значения 0. Есть перегруженный метод await(long timeout, TimeUnit unit), позволяющий организовать ожидание не дольше, чем в течение заданного интервала. Есть также метод getCount(), позволяющий получить текущее значение счетчика. В отличие от класса CyclicBarrier экземпляры этого класса одноразовые. На практике данный класс удобно использовать для координации момента начала и окончания определенного числа потоков: можно сделать так, чтобы исполнение заданного числа потоков начиналось в один и тот же момент времени; можно отследить момент окончания заданного числа потоков.
java.util.concurrent Синхронизаторы. Защелка (2) В этом примере защелка используется для синхронизации действий одного мастера и нескольких рабочих: class Master { //... int threadsCount = … ;// определение количества потоков Worker void masterMain( ) throws InterruptedException { CountDownLatch startLatch = new CountDownLatch( 1 ); CountDownLatch readyLatch = new CountDownLatch( threadsCount ); for( int i = 0; i < threadsCount; ++i ) // создание и активация потоков new Thread( new Worker( startLatch, readyLatch ) ).start( ); doSomething( ); // делается что-то startLatch.countDown( ); // фактический запуск doSomethingYet( ); // делается что-то еще readyLatch.await( );// ожидание момента завершения всех потоков } // … }
java.util.concurrent Синхронизаторы. Защелка (3) class Worker implements Runnable { private final CountDownLatch starting; private final CountDownLatch finishing; Worker( CountDownLatch starting, CountDownLatch finishing ) {// конструктор this.starting = starting; this.finishing = finishing; } public void run( ) { try { starting.await();// ожидание разрешения работать doWork();// выполнение работы finishing.countDown();// отметка, что поток завершился } catch ( InterruptedException ex ) { } return; } void doWork() {... } }
java.util.concurrent.atomic Классы с атомарными операциями Подпакет java.util.concurrent.atomic содержит несколько классов, предоставляющих атомарные операции: AtomicBoolean AtomicInteger AtomicIntegerArray AtomicIntegerFieldUpdater AtomicLong AtomicLongArray AtomicLongFieldUpdater AtomicMarkableReference AtomicReference AtomicReferenceArray AtomicReferenceFieldUpdater AtomicStampedReference
java.util.concurrent.atomic Основные методы атомарных классов (1) У всех (кроме *Reference): booleancompareAndSet( * expect, * update ) сравнить текущее значение с expect, если совпадают – атомарно заменить текущее на update и вернуть true, иначе – вернуть false *get() вернуть текущее значение *getAndSet( * newValue) атомарно заменить значение на newValue, вернуть предыдущее значение voidlazySet( * newValue ) На процессорах x86 практически ничем не отличается от просто set, но использовать можно, только тщательно разобравшись в отличии от метода set voidset( * newValue ) атомарно заменить значение на newValue booleanweakCompareAndSet ( * expect, * update ) На процессорах x86 практически ничем не отличается от просто compareAndSet, но использовать можно, только тщательно разобравшись в особенностях
java.util.concurrent.atomic Основные методы атомарных классов (2) У всех численных( Integer, Long, IntegerArray, LongArray; для *Array все эти методы имеют дополнительный первый аргумент int i): *addAndGet(* delta) Атомарно добавляет к текущему значению аргумент delta и возвращает полученное значение *decrementAndGet() Атомарно уменьшает на единицу текущее значение и возвращает результат (аналог префиксной операции – –) *getAndAdd(* delta) Атомарно добавляет к текущему значению аргумент delta и возвращает предыдущее значение *getAndDecrement() Атомарно уменьшает на единицу текущее значение и возвращает предыдущее значение (аналог постфиксной операции – –) *getAndIncrement() Атомарно уменьшает на единицу текущее значение и возвращает предыдущее значение (аналог постфиксной операции + +) *incrementAndGet() Атомарно увеличивает на единицу текущее значение и возвращает результат (аналог префиксной операции + +)
java.util.concurrent.atomic Классы с атомарными операциями. Пример. public final class Counter { private int value = 0; public synchronized int getValue( ) { return value; } public synchronized int increment( ) { return ++value; } public class NonblockingCounter { private AtomicInteger value = new AtomicInteger( 0 ); public int getValue( ) { return value.get( ); } public int increment( ) { int v; do { v = value.get( ); while ( !value.compareAndSet( v, v + 1 ) ); return v + 1; }
Коллекции Java (Java Collections Framework) Параллельные коллекции пакета java.util.concurrent базируются на средствах, реализованных ранее в пакете java.util, но отличаются возможностью безопасного применения в многопоточных приложениях при относительно небольших затратах. Коллекции или контейнеры это классы позволяющие хранить и производить операции над множеством объектов. Коллекции используются для сохранения, получения, манипулирования данными и обеспечивают агрегацию одних объектов другими. Во многих языках программирования (java, C, C++, Pascal, …) единственным встроенным в язык средством хранения наборов объектов являются массивы. Однако, массивы обладают значительными недостатками. Одним из них является фиксированный размер массива, порождающий необходимость постоянно следить за значениями индексов. Другим индексная адресация, что не всегда удобно, т.к. ограничивает возможности добавления и удаления объектов. Чтобы избавиться от этих недостатков, уже несколько десятилетий программисты используют рекурсивные типы данных, такие как списки, деревья и множества. Стандартный набор коллекций Java, реализованный в пакете java.util, служит для избавления программиста от необходимости самостоятельно реализовывать эти типы данных и снабжает его дополнительными возможностями по сравнению с массивами.
Пакет java.util. Коллекции Взаимосвязь основных классов и интерфейсов (1)
Пакет java.util. Коллекции Взаимосвязь основных классов и интерфейсов (2) S Object Dictionary
Пакет java.util. Коллекции Интерфейсы Интерфейс Collection Является корнем всей иерархии классов-коллекций. Он определяет базовую функциональность любой коллекции – набор методов, которые позволяют добавлять, удалять, выбирать элементы коллекции. Классы, которые реализуют интерфейс Collection, могут содержать дубликаты и пустые (null) значения. Класс AbstractCollection, как абстрактный класс, служит основой для создания конкретных классов коллекций и содержит реализацию некоторых методов, определенных в интерфейсе Collection. При наследовании от этого класса все реализованные методы можно заместить.
Пакет java.util. Коллекции Методы интерфейса Collection (1) booleanadd( E e )Если коллекция изменилась в результате вызова этого метода, то возвращается true, иначе (например, если это Set и добавляемый элемент уже в нем содержится) возвращается false. booleanaddAll( Collection c )Если коллекция изменилась в результате вызова этого метода, то возвращается true, иначе возвращается false. voidclear( )Удаляются все элементы из коллекции. booleancontains( Object o )Если коллекция содержит указанный элемент, то возвращается true, иначе – false. booleancontainsAll( Collection c )Если коллекция содержит все элементы из коллекции, заданной аргументом, то возвращается true, иначе – false. booleanisEmpty( )Если коллекция пуста (не содержит ни одного элемента, то возвращается true, иначе – false.
Пакет java.util. Коллекции Методы интерфейса Collection (2) Iterator iterator( )Возвращается объект, реализующий интерфейс java.util.Iterator. booleanremove( Object o )Удаляется одно (или ни одного) вхождение указанного элемента из коллекции. Если коллекция изменилась, то возвращается true, иначе возвращается false. booleanremoveAll( Collection c)Удаляются все вхождения элементов заданной коллекции. Если коллекция изменилась, то возвращается true, иначе возвращается false. booleanretainAll( Collection c )Удаляются все элементы, не входящие в указанную коллекцию. Если коллекция изменилась, то возвращается true, иначе – false. intsize( )Возвращается количество элементов. Object[ ]toArray( )Возвращается массив, содержащий все элементы коллекции. T[ ]toArray( T[ ] a )Возвращается массив, содержащий все элементы коллекции (либо a, либо создается новый, если длина указанного массива меньше количества элементов).
Пакет java.util. Коллекции Интерфейсы *Set Интерфейс Set Классы, которые реализуют этот интерфейс, не допускают наличия дубликатов в составе коллекции. В коллекции этого типа разрешено наличие только одной ссылки типа null. Интерфейс Set расширяет интерфейс Collection (не добавляя методов), таким образом, любой класс, реализующий интерфейс Set, имеет все методы, определенные в Collection. Любой объект, добавляемый в Set, должен реализовать метод equals, чтобы его можно было сравнить с другими элементами. AbstractSet, являясь абстрактным классом, представляет собой основу для реализации различных вариантов интерфейса Set. Интерфейс SortedSet Этот интерфейс расширяет Set, требуя, чтобы содержимое набора было упорядочено. Такие коллекции могут содержать объекты, которые реализуют интерфейс Comparable, либо могут сравниваться с использованием внешнего Comparator. Интерфейс NavigableSet - расширение SortedSet с приблизительным доступом к элементу. Например, метод higher( ) позволяет получить наименьшей элемент в множестве, который больше указанного.