Скачать презентацию
Идет загрузка презентации. Пожалуйста, подождите
Презентация была опубликована 11 лет назад пользователемfiles.nsumedia.ru
1 java.util.concurrent Executor Framework (6) Интерфейс SheduledExecutorService расширяет интерфейс ExecutorService путем добавления методов, управляющих планированием запуска задач через заданный интервал времени и/или через заданные промежутки времени: ScheduledFuture schedule(Callable task, long delay, TimeUnit unit) Создает задачу, представленную task, которая становится активной после заданной задержки и возвращает экземпляр ScheduledFuture (расширенный Future). ScheduledFuture schedule(Runnable task, long delay, TimeUnit unit) Создает задачу, представленную task, которая становится активной после заданной задержки. ScheduledFuture scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) Создает и выполняет периодическую задачу, которая становится активной сначала после данной начальной задержки с установленным сроком (выполнение начнется после initialDelay в момент initialDelay+period, затем initialDelay + 2 * period, и так далее) ScheduledFuture scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) Создает и выполняет периодическую задачу, которая становится активной сначала после данной начальной задержки и впоследствии с заданной задержкой между завершением одного выполнения и началом следующего.
2 java.util.concurrent Executor Framework (7) Вот простейший пример использования интерфейса ScheduledExecutorService: import java.util.concurrent.*; public class Beep { public static void main( String[ ] args ) { // создать планируемый пул потоков размером 1 поток ScheduledExecutorService ses = Executors.newScheduledThreadPool( 1 ); Runnable beeper = new Runnable( ) {// анонимный внутренний класс public void run( ) { System.out.println("Beep!"); } }; // выполнять beeper каждые 3 секунды после начальной 10-секундной задержки ses.scheduleAtFixedRate( beeper, 10, 3, TimeUnit.SECONDS ); } Есть класс ScheduledThreadPoolExecutor, реализующий данный интерфейс.
3 java.util.concurrent Executor Framework (8) В предыдущих примерах неоднократно упоминался класс Executors, являющийся фабрикой для интерфейсов фреймворка и содержащий группы статических методов: Которые создают и возвращают экземпляры ExecutorService с обычными параметрами конфигурации. Которые создают и возвращают экземпляры ScheduledExecutorService с обычными параметрами конфигурации. Которые создают и возвращают "обернутые" экземпляры ExecutorService, для которых отключено реконфигурирование и становятся недоступными некоторые специфичные методы. Которые создают и возвращают фабрику потоков ThreadFactory, используемую виртуальной машиной по умолчанию для создания новых потоков. Которые создают и возвращают объекты Callable из экземпляров других классов (таких как Runnable). Для эффективного управления множеством потоков во фреймворке реализован механизм пула потоков. С помощью пулов решают и проблемы издержек жизненного цикла потока, и проблемы пробуксовки ресурсов, типичные для серверных приложений: При многократном использовании одних и тех же потоков для решения множественных задач издержки создания потока распространяются на каждую из них. Поскольку поток уже существует в тот момент, когда поступает запрос, задержка на создание потока, устраняется. Таким образом, запрос может быть обработан немедленно, что делает приложение более быстрореагирующим. При этом нужно заботиться о правильной настройке количества потоков в пуле, чтобы предотвратить так называемую пробуксовку ресурсов (простой созданных потоков из-за отсутствия работы).
4 java.util.concurrent Executor Framework (9) Вот некоторые методы класса Executors: static Callable callable( Runnable task )Возвращает Callable-объект, который после выполнения данной задачи возвращает null. static Callable callable( Runnable task, T result ) Возвращает Callable-объект, который после выполнения данной задачи возвращает result static ThreadFactorydefaultThreadFactory( )Возвращает фабрику потоков, по умолчанию используемую виртуальной машиной для создания новых потоков. static ExecutorServicenewCachedThreadPool( )Создает пул потоков, в котором новые потоки создаются при необходимости, а ранее созданные потоки используются, когда они свободны. static ExecutorServicenewCachedThreadPool( ThreadFactory threadFactory ) Создает пул потоков, в котором новые потоки создаются при необходимости с использованием указанной фабрики, а ранее созданные потоки используются, когда они свободны. static ExecutorServicenewFixedThreadPool( int nThreads ) Создает пул потоков фиксированного размера.
5 java.util.concurrent Executor Framework (10) static ExecutorServicenewFixedThreadPool( int nThreads, ThreadFactory threadFactory ) Создает пул потоков фиксированного размера с использованием заданной фабрики потоков. static ScheduledExecutorService newScheduledThreadPool( int corePoolSize ) Создает пул потоков, реализующих интерфейс управляемого запуска задач. static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory ) Создает пул потоков, реализующих интерфейс управляемого запуска задач с использованием заданной фабрики потоков. static ExecutorServicenewSingleThreadExecutor( )Создает исполнителя, который использует единственный рабочий поток. static ExecutorServicenewSingleThreadExecutor( ThreadFactory threadFactory ) Создает исполнителя, который использует единственный рабочий поток, создаваемый с использованием заданной фабрики потоков.
6 java.util.concurrent Executor Framework (11) static ScheduledExecutorService newSingleThreadScheduledExec utor( ) Создает исполнителя, который использует единственный рабочий поток для управляемого запуска задач. static ScheduledExecutorService newSingleThreadScheduledExec utor( ThreadFactory threadFactory ) Создает однопоточного исполнителя для управляемого запуска задач с использованием заданной фабрики потоков. static ThreadFactoryprivilegedThreadFactory( )Возвращает фабрику потоков, у которых есть те же самые полномочия, как у текущего потока. static ExecutorServiceunconfigurableExecutorService( ExecutorService executor ) Возвращает объект, который делегирует данному исполнителю только методы данного ExecutorService. Не могут быть вызваны никакие другие методы. static ScheduledExecutorService unconfigurableScheduledExecut orService( ScheduledExecutorService executor ) Возвращает объект, который делегирует данному исполнителю только методы данного ScheduledExecutorService. Не могут быть вызваны никакие другие методы.
7 java.util.concurrent Расширение фреймворка Executor: ForkJoinPool Это расширение разработано специально для упрощения распараллеливания рекурсивных задач. Краткое описание используемых типов: ForkJoinTask – это абстрактный класс, который является легковесным аналогом потока (Thread). Благодаря методам, реализованным в классе ForkJoinPool, можно в небольшом количестве потоков выполнить существенно большее число задач. Это достигается путём так называемого work-stealing'а, когда поток спящей задачи на самом деле не спит, а выполняет другие задачи. У класса ForkJoinTask есть много интересных методов (аналогичных методам invoke* и submit), здесь рассмотрим только два: fork(), который производит асинхронный запуск задачи join(), который дожидается выполнения задачи и возвращает результат её выполнения. Сам по себе класс ForkJoinTask практически не используется, потому что для подавляющего большинства задач уже есть готовые более конкретные реализации: RecursiveAction, являющийся аналогом Runnable-объектов (на случай, если никакого значения возвращать не нужно, а нужно лишь выполнить некоторое действие), и RecursiveTask, являющийся аналогом Callable-объектов. ForkJoinPool – в этом классе как раз и реализована достаточно хитрая логика по распределению нагрузки между реальными потоками. Снаружи он выглядит практически как самый обычный пул потоков, и особенностей в его использовании по сравнению с базовым ExecutorService нет (он наследует от класса AbstractExecutorService, реализующего этот интерфейс). Приводить детальный обзор методов класса не будем, вместо этого рассмотрим пример.
8 java.util.concurrent Расширение фреймворка Executor: ForkJoinPool. Пример (1) // существо примера – рекурсивный обход дерева. Здесь пока определяется само дерево interface Node { Collection getChildren( ); long getValue( ); } class MyTree implements Node { private Collection childrens = new ArrayList ( );// коллекция дочерних узлов private long value = 0;// этим моделируется содержание узла дерева public MyTree( long value ) {// конструктор this.value = value; } public Collection getChildren( ) {// возврат корней поддеревьев return childrens; } public long getValue( ) { return value; } public void add( MyTree newNode ) {// добавление корня нового поддерева childrens.add( newNode ); }
9 java.util.concurrent Расширение фреймворка Executor: ForkJoinPool. Пример (2) // это класс, обеспечивающий многопоточный рекурсивный обход дерева class ValueSumCounter extends RecursiveTask { private final Node node; public ValueSumCounter( Node node ) {// конструктор this.node = node; protected Long compute( ) {// основной метод класса RecursiveTask long sum = node.getValue( );// для накопления результата List subTasks = new LinkedList( );// коллекция (список) подзадач for( Node child : node.getChildren( ) ) {// обход всех поддеревьев ValueSumCounter task = new ValueSumCounter( child );// для каждого – своя подзадача task.fork( ); // которую запустим асинхронно subTasks.add( task );// и добавим в список для ожидания } for( ValueSumCounter task : subTasks ) {// переберем все подзадачи sum += task.join( ); // дождёмся выполнения подзадачи и прибавим ее результат } return sum;// вернем собственный результат }
10 java.util.concurrent Расширение фреймворка Executor: ForkJoinPool. Пример (3) public class Main { public static void main( String[ ] args ) { MyTree root = new MyTree( 1 );// создадим корень MyTree tmp = new MyTree( 2 );// создадим поддерево root.add( tmp );// добавим его root.add( new MyTree( 3 ) );// добавим второе поддерево tmp.add( new MyTree( 4 ) );// добавим узлы tmp.add( new MyTree( 5 ) );// в первое поддерево // выполним обход всего дерева с использованием рекурсивной задачи: long result = new ForkJoinPool( ).invoke( new ValueSumCounter( root ) ); System.out.println( "Результат обхода: " + result ); } Реализовывать обход дерева таким образом существенно удобнее и интуитивно- понятнее, чем было бы с использованием интерфейса Future и прямым программированием создания потоков. Более того, для выполнения fork-нутой задачи вовсе не обязательно будет использоваться выделенный настоящий поток. Напротив, будут активно использоваться уже существующие потоки, которые в текущий момент находятся в join-е. Это, очевидно, может дать существенный прирост производительности, поскольку создание потоков – это времяемкий процесс Результат обхода: 15
11 java.util.concurrent Еще одно р асширение фреймворка Executor: Класс ThreadPoolExecutor Представляет собой гибко настраиваемый пул потоков, обеспечивающий динамическое управление числом потоков, запуск и управление набором задач, сбор статистики Настраивается максимальное и базовое число потоков в пуле, а также время через которое удаляются простаивающие потоки Настраивается очередь задач, ожидающих выполнения Обработчики, которые вызываются перед запуском или после завершения отдельных задач Для выполнения всевозможных настроек используются внутренние классы: ThreadPoolExecutor.AbortPolicy ThreadPoolExecutor.CallerRunsPolicy ThreadPoolExecutor.DiscardOldestPolicy ThreadPoolExecutor.DiscardPolicy
12 java.util.concurrent.locks Блокировки До версии Java 1.5 мониторы и методы wait, notify, notifyAll были единственным доступным механизмом блокировок. Начиная с этой версии появился новый механизм - блокировки, реализованные в виде набора классов и интерфейсов пакета java.util.concurrent.locks. В отличие от мониторов они не обладают специфичным синтаксисом и не встроены в язык. Их использование осуществляется аналогично любым другим объектам Java. Но они обеспечивают несравненно большую гибкость в отличие от старых методов синхронизации. При этом средства пакета java.util.concurrent.locks обеспечивают ту же семантику операций с памятью, что и мониторы. Базовые операции с блокировками описывает интерфейс java.util.concurrent.locks.Lock. В отличие от мониторов функциональность блокировок из стандартной библиотеки не ограничивается методами безусловного захвата блокировки, которые блокируются на бесконечное время. Существуют методы, обеспечивающие условные попытки захвата и попытки захвата с указанием таймаута.
13 java.util.concurrent.locks Блокировки (1) Типичная схема использования нового механизма блокировок выглядит так: lockObject.lock(); try { // cинхронизируемые действия } finally { lockObject.unlock(); } где lockObject – это объект класса, реализующего интерфейс java.util.concurrent.locks.Lock.
14 java.util.concurrent.locks Блокировки (2) Основной реализацией интерфейса Lock является класс ReentrantLock. Этот класс представляет повторно-входимую блокировку с возможностью выбора следующего владельца. Реентрантность (повторная входимость) означает, что текущий поток- владелец блокировки не будет блокироваться при многократной попытке захвата ресурса. Каждая такая попытка всегда будет удачной, но на каждую операцию захвата ресурса впоследствии нужно будет выполнить операцию его освобождения. Вот методы интерфейса Lock: voidlock( )Получает блокировку. voidlockInterruptibly( )Получает блокировку с возможностью прерывания ожидания (см. следующий слайд). ConditionnewCondition( )Возвращает новый экземпляр Condition, который связан с этим экземпляром объекта Lock. booleantryLock( )Получает блокировку, только если объект свободен в момент вызова. В этом случае возвращается значение true. booleantryLock( long time, TimeUnit unit ) Получает блокировку, если объект оказался свободен в рамках указанного времени ожидания и текущий поток не был прерван. В этом случае возвращается значение true. voidunlock( )Освобождает объект (точнее – один его захват).
15 java.util.concurrent.locks Блокировки (3) Обычная схема использования метода lockInterruptibly: Lock myLockMonitor = new ReentrantLock( );// класс, реализующий интерфейс Lock try { myLockMonitor.lockInterruptibly( ); try { // доступ к защищенному ресурсу … } finally { myLockMonitor.unlock( ); } } catch (InterruptedException e) {// необходимо, потому что блокировка // System.err.println("Interrupted wait");// может быть прервана … } Обычная схема использования метода tryLock: if ( myLockMonitor.tryLock( ) ) { try { // доступ к защищенному ресурсу } finally { myLockMonitor.unlock( ); } } else { // альтернативное действие }
16 java.util.concurrent.locks Блокировки (4) intgetHoldCount( )Возвращает количество захваченных данным потоком блокировок protected ThreadgetOwner( )Возвращает поток, которому в настоящий момент принадлежит этот экземпляр, или null если нет такого потока. protected Collection getQueuedThreads( )Возвращает набор, содержащий потоки, ожидающие получения блокировки этого экземпляра. intgetQueueLength( )Возвращает количество потоков, ожидающих получения блокировки этого экземпляра. protected Collection getWaitingThreads( Condition condition ) Возвращает набор, содержащий потоки, ожидающие освобождения заданного экземпляра интерфейса Condition, связанного с этим экземпляром класса ReentrantLock. intgetWaitQueueLength( Condition condition ) Возвращает количество потоков, ожидающих освобождения заданного экземпляра интерфейса Condition, связанного с этим экземпляром класса ReentrantLock. Методы класса ReentrantLock:
17 java.util.concurrent.locks Блокировки (5) booleanhasQueuedThread( Thread thread ) Возвращает true, если указанный поток ожидает получения этой блокировки. booleanhasQueuedThreads( )Возвращает true, если есть потоки, ожидающие получения этой блокировки. booleanhasWaiters( Condition condition ) Возвращает true, если есть потоки, ожидающие получения блокировки указанного экземпляра Condition. booleanisFair( )Возвращает true если для этого экземпляра блокировки установлена политика справедливости (первый запросивший поток получит блокировку первым). Политика справедливости может быть задана булевым аргументом конструктора экземпляра класса ReentrantLock. booleanisHeldByCurrentThread( )Возвращает true, если эта блокировка захвачена текущим потоком. booleanisLocked( )Возвращает true, если эта блокировка захвачена каким-либо потоком. voidlock( )Поток получает блокировку. Остальные методы класса просто реализуют все методы интерфейса Lock.
18 java.util.concurrent.locks Блокировки (7) И мониторы и класс ReentrantLock обеспечивают так называемый одноранговый доступ к разделяемым данным. Все потоки, пытающиеся войти в защищаемую область, равны и внутри неё может находиться только один поток. Такое поведение далеко не всегда эффективно. Допустим у нас есть несколько потоков изменения данных и несколько потоков чтения с соотношением количества операций изменения/чтения равным 1/10. В случае использования общей блокировки все конкурирующие потоки будут выполняться строго последовательно на участке доступа к разделяемой памяти. Но ведь все потоки чтения вполне могут иметь одновременный доступ к данным, что позволит им выполняться параллельно. При этом поток, изменяющий данные, при получении ресурса должен оказаться единственным владельцем защищаемого участка памяти. Нужное поведение поддерживается классом ReentrantReadWriteLock реализующим интерфейс ReadWriteLock. Для пользователя объект данного класса выглядит как контейнер с двумя методами, возвращающими ссылки на две разные блокировки чтения и записи. Блокировка записи ведёт себя полностью аналогично ReentrantLock, а блокировка чтения отличается и позволяет нескольким потокам становиться её владельцем. Таким образом для потоков допускается одновременное владение несколькими блокировками чтения, но только одной блокировкой записи. Методы класса ReentrantReadWriteLock очень похожи на методы класса ReentrantLock, только захват блокировки lock( ) разделен на readLock( ) и writeLock( ). Классы блокировок, реализующих интерфейс Lock, имеют ещё один метод newCondition, возвращающий объект класса Condition. Применение его полностью аналогично базовому механизму мониторов, методу wait( ) соответствует метод await( ), методу noyify( ) – signal( ), методу notifyAll( ) – signalAll( ).
19 java.util.concurrent Синхронизаторы. Семафоры (1) К синхронизаторам можно отнести различного рода структуры, которые отвечают за координацию работы потоков. Некоторые такие структуры реализованы в пакете java.util.concurrency: Семафоры Барьеры Обменники Защелки Считающим семафором называют целочисленную переменную, выполняющую те же функции, что и флаг блокировки. Однако в отличие от последнего она может принимать кроме 0 и 1 ( true/false) и другие целые положительные значения. Семафоры используются для ограничения числа потоков, которые используют некий ресурс. Максимально возможное значение семафора, понимаемое как количество потоков, которые одновременно могут получить доступ к ресурсу, задается аргументом конструктора. Вторым (необязательным) аргументом может быть булево значение, определяющее "справедливость" захвата ресурса ожидающими потоками. При реализации справедливой политики ресурс будет отдан тому потоку, который первым попытался его захватить. За справедливость приходится платить довольно большими временными затратами, поэтому обычно реализуется несправедливый вариант.
20 java.util.concurrent Синхронизаторы. Семафоры (2) Основные методы класса java.util.concurrent.Semaphore: voidacquire( )Получить доступ к ресурсу. Поток блокируется, пока не доступ не будет получен или пока его не прервут. voidacquire( int permits )Получить permits доступов к ресурсу. Поток блокируется, пока не доступ не будет получен или пока его не прервут. voidacquireUninterruptibly()Получить доступ к ресурсу с запретом прерывания. voidacquireUninterruptibly( int permits ) Получить permits доступов к ресурсу с запретом прерывания. intavailablePermits( )Возвращает количество свободных доступов intdrainPermits( )Получить все свободные доступы protected Collection getQueuedThreads( )Возвращает коллекцию потоков, ожидающих получения доступа intgetQueueLength( )Возвращает количество потоков, ожидающих получения доступа booleanhasQueuedThreads( )Возвращает true, если есть хотя бы один ожидающий поток
21 java.util.concurrent Синхронизаторы. Семафоры (3) booleanisFair( )Возвращает true, если для семафора установлена справедливая политика protected void reducePermits( int reduction )Уменьшить количество доступов на указанную величину voidrelease( )Освободить один доступ voidrelease( int permits )Освободить указанное количество доступов booleantryAcquire( )Попытаться получить один доступ. В случае успеха возвращается true. booleantryAcquire( int permits )Попытаться получить указанное количество доступов. В случае успеха возвращается true. booleantryAcquire( int permits, long timeout, TimeUnit unit ) Попытаться получить указанное количество доступов в течение заданного интервала времени. В случае успеха (и если поток не был прерван) возвращается true. booleantryAcquire( long timeout, TimeUnit unit ) Попытаться получить один доступ в течение заданного интервала времени. В случае успеха (и если поток не был прерван) возвращается true.
22 java.util.concurrent Синхронизаторы. Семафоры (4) Задача "писатель-читатель", в которой для передачи данных используется пул буферов: interface Buffer { void markAsUsed( ); boolean markedAsUsed( ); } … private final Semaphore buffers = new Semaphore( MAX_BUFFERS, true ); … public Buffer getBuffer() throws InterruptedException { buffers.acquire(); Buffer buffer = getNextAvailableBuffer(); buffer.markAsUsed( ); return buffer; } public void retBuffer(Buffer buf) { if (buf.markedAsUsed( ) == false) buffers.release( ); putBufferToQueue( buf ); } …
23 java.util.concurrent Синхронизаторы. Барьеры (1) Барьер – это средство синхронизации, которое используется для того, чтобы некоторое множество потоков ожидало друг друга в некоторой точке программы, называемой обычно точкой синхронизации. В тот момент, когда все потоки достигают точки синхронизации, они разблокируются и могут продолжать выполнение. На практике барьеры обычно используются для сбора результатов выполнения некоторой распараллеленной задачи. В качестве примера можно рассмотреть задачу умножения матриц. При распараллеливании данной задачи каждому потоку будет поручено умножение определенных строк на определенные столбцы. В точке синхронизации все полученные результаты собираются из потоков и в одно из них строится результирующая матрица. В пакете java.util.concurrent для реализации барьерной синхронизации используется класс CyclicBarrier. Конструкторы этого класса получают количество потоков, которые должны достичь точки синхронизации и, опционально, экземпляр интерфейса Runnable, который должен быть исполнен в момент достижения этой точки всеми потоками: CyclicBarrier( int parties ); CyclicBarrier( int parties, Runnable barrierAction );
24 java.util.concurrent Синхронизаторы. Барьеры (2) intawait( )Ожидание, пока заданное в конструкторе количество потоков выполнит вызов этого метода. intawait( long timeout, TimeUnit unit ) Ожидание, пока заданное в конструкторе количество потоков выполнит вызов этого метода или пока не закончится заданный тайм-аут. intgetNumberWaiting( )Возвращает количество ожидающих потоков. intgetParties( )Возвращает количество потоков, требуемых для преодоления этого барьера (установленное при создании экземпляра). booleanisBroken( )Возвращает true, если хотя бы один поток прерван или ушел из ожидания по тайм-ауту. voidreset( )Сброс барьера в исходное состояние. Методы класса java.util.concurrent.CyclicBarrier: Нужно отметить, что в момент срабатывания барьера экземпляр класса восстанавливает начальное состояние и может отрабатывать следующие parties срабатываний потоков.
Еще похожие презентации в нашем архиве:
© 2024 MyShared Inc.
All rights reserved.