Хранение данных (1) Для того, чтобы выяснить, откуда берутся гонки данных, нужно разобраться в том, как обеспечивается хранение данных приложения и его потоков. Для любого приложения виртуальная машина поддерживает основное хранилище данных (main storage), в котором сохраняются значения всех переменных и которое используется всеми потоками. Под переменными здесь понимаются поля объектов и классов, а также элементы массивов. Локальные переменные и параметры методов не могут быть доступны другим потокам, поэтому они не представляют интереса. Для каждого потока создается его собственная рабочая память (working memory), в которую перед использованием копируются значения всех переменных. Основные операции, реализуемые виртуальной машиной при работе с памятью: use – чтение значения переменной из рабочей памяти потока; assign – запись значения переменной в рабочую память потока; read – получение значения переменной из основного хранилища; load – сохранение значения переменной, прочитанного из основного хранилища, в рабочей памяти; store – передача значения переменной из рабочей памяти в основное хранилище для дальнейшего хранения; write – сохраняет в основном хранилище значение переменной, переданной командой store. Эти операции являются командами байт-кода, формируемыми компилятором, и в явном виде недоступны программисту.
Хранение данных (2) Поток, работая с любой своей переменной, регулярно применяет команды use и assign для использования ее текущего значения и присвоения нового. Кроме того, должны осуществляться действия по передаче значений в основное хранилище и из него. Они выполняются в два этапа. При получении данных сначала из основного хранилища считывается значение командой read, а затем оно сохраняется в рабочей памяти потока командой load. Эта пара команд всегда выполняется вместе именно в таком порядке, т.е. нельзя выполнить одну, не выполнив другую: { read load } use … use При отправлении данных сначала поток считывает значение из своей рабочей памяти командой store, а затем основное хранилище сохраняет его командой write. Эта пара команд также всегда выполняется вместе именно в таком порядке, т.е. нельзя выполнить одну, не выполнив другую. assign … assign { store write } Набор этих правил составлялся так, чтобы операции с памятью не допускали возможности их неверного использования, а с другой стороны, правила должны оставлять достаточное пространство для различных технологий оптимизации (регистры, очереди, кэши и т.д.).
Хранение данных (3) Исполнение последовательности команд байт-кода виртуальной машиной всегда подчиняется следующим правилам: 1. Все действия, выполняемые одним потоком, строго упорядочены, т.е. выполняются одно за другим в соответствии с порядком извлечения команд байт- кода; 2. Все действия, выполняемые с каждой переменной в основном хранилище памяти, тоже строго упорядочены, т.е. следуют одно за другим. За исключением некоторых дополнительных очевидных правил, больше никаких ограничений нет. Например, если поток изменил значение сначала одной, а затем другой переменной, то измененные данные могут быть переданы в основное хранилище в обратном порядке. В 32-битных системах действия по передаче значений между основным хранилищем и рабочей памятью потока над половинками 64-битных примитивных типов float и double тоже могут выполняться в произвольном порядке. Оба этих фактора в принципе могут создавать проблемы типа гонок данных. Поток создается с чистой рабочей памятью и должен перед использованием загрузить все необходимые переменные из основного хранилища. Любая переменная сначала создается в основном хранилище и лишь затем копируется в рабочую память потоков, которые будут ее применять. Таким образом, потоки никогда не взаимодействуют друг с другом напрямую, только через главное хранилище. Однако нет 100-процентной гарантии, что изменение значения переменной одним потоком обязательно будет "увидено" другим потоком.
Взаимодействие потоков, пример: public class Main implements Runnable { private boolean flag = true; private long count = 0; public void run( ) { // перед этой строкой должна быть while ( flag ) { count += 1; } System.out.println("Finished " + count); } public static void main( String[] args ) throws InterruptedException { Main newInstance = new Main( ); ( new Thread( newInstance ) ).start( ); Thread.sleep( 1000 ); newInstance.flag = false; System.out.println("Main thread finished "); } Согласно официальным спецификациям виртуальной машины Java это приложение может никогда не завершиться. Поток, созданный вызовом метода start(), может "забрать" переменную flag в свою рабочую память и "не видеть" измененного другим потоком значения этой переменной. В HotSpot JVM это приложение благополучно завершается.
Класс java.lang.ThreadLocal (1) Существует множество способов и приемов, позволяющих избежать ситуации "гонки данных". В первую очередь к ним относятся различные средства синхронизации потоков при работе с общими данными, позволяющие организовать правильную последовательность действий. Во вторую очередь – ликвидация общих (разделяемых) переменных. Если каждый поток работает только со своими собственными данными, то ситуация гонки возникнуть не может. Однако реализовать такую модель на 100 процентов невозможно, если многопоточное приложение решает единую задачу. Кроме того, бывают случаи, когда разные потоки должны работать в статическом контексте и со статическими переменными, по определению являющимися общими (разделяемыми). В некоторых случаях может помочь использование класса ThreadLocal. Для каждого потока экземпляр этого класса содержит локальную переменную заданного объектного типа (ссылку), инициализировать которую нужно именно в данном потоке. Раздельными являются именно ссылки, но если в нескольких потоках они указывают на один и тот же объект, то сам объект все равно будет общим. Именно поэтому инициализировать каждый экземпляр нужно после запуска потока.
Класс java.lang.ThreadLocal (2) Пример public static class SomeWorkingThread extends Thread { private ThreadLocal counter = new ThreadLocal ( ); public void run( ) { // перед этой строкой должна быть counter.set( Integer.valueOf( 0 ) ); for ( int i = 0; i < Math.random() * 10; i++ ) { System.out.println( "Поток " + getName( ) + ": исполнение." );// имитация деятельности counter.set( counter.get( ) + 1 ); try{ sleep( ( long )( Math.random( ) ) * 10 ); } catch ( InterruptedException e ) { e.printStackTrace( ); } } System.out.println( "Поток " + getName() + " отработал " + counter.get() + " раз"); } public static void main(String[] args) throws InterruptedException { Thread thread1 = new SomeWorkingThread(); Thread thread2 = new SomeWorkingThread(); thread1.start(); thread2.start(); thread1.join();thread2.join(); } Поток Thread_8: исполнение Поток Thread_9: исполнение … Поток Thread_9: отработал 4 раз Поток Thread_8: исполнение Поток Thread_8: отработал 7 раз
Класс java.lang.ThreadLocal (3) Tget()Возвращает значение текущей локальной копии этой thread-local переменной. protected TinitialValue()Вызывается из метода get в том случае, если thread-local переменная не инициализирована. Базовый метод возвращает значение null. Замещающий метод должен возвратить начальное значение текущей локальной копии этой thread-local переменной. Если это связано с изменением состояния каких-либо других объектов приложения, то замещающий метод нужно синхронизировать. voidremove()Удаляет текущее значение локальной копии этой thread- local переменной. voidset(T value)Устанавливает новое значение. Методы класса ThreadLocal
Базовые средства Java для синхронизации потоков (1) В многопоточной среде необходима синхронизация потоков при обращении к совместно используемым ресурсам. Средства синхронизации должны обеспечивать корректную последовательность операций над такими ресурсами. Пусть есть некий класс, выполняющий в одном потоке подготовку данных, а в другом потоке – их обработку (типичная задача "писатель - читатель"). Вот простое, но неверное решение: public class DataManager { private static boolean ready = false; … // поток обработки public void processingData( ) { while ( !ready ) { // цикл ожидания, занимающий процессор System.out.println("Waiting for data..."); } ready = false; System.out.println("Processing data..."); } … // поток подготовки public void prepareData() { System.out.println("Data prepared"); ready = true; } }
Базовые средства Java для синхронизации потоков (2) Ожидание с использованием блокировки synchronized и методов wait() и notifyAll() класса java.lang.Object (почти правильное решение): public class DataManager { private static final Object monitor = new Object( ); … // поток обработки public void processingData( ) { synchronized ( monitor ) { System.out.println( "Waiting for data..." ); try { monitor.wait(); // не безопасно: поток может "проснуться" без причины } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Processing data..."); } } … // поток подготовки public void prepareData( ) { synchronized ( monitor ) { System.out.println( "Data prepared" ); monitor.notifyAll( ); } } } Вызовы методов wait(), notify() и notifyAll() должны обязательно находиться внутри блока synchronized, либо внутри synchronized-метода, иначе в runtime будет выброшено Exception. Как только поток достигает метода wait(), блокировка по synchronized снимается, а поток уходит в сон.
Базовые средства Java для синхронизации потоков (3) … private static boolean ready = false; private static final Object monitor = new Object( ); … // поток обработки public void processingData( ) { synchronized ( monitor ) { System.out.println( "Waiting for data..." ); while ( !ready ) { try { monitor.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } ready = false; System.out.println("Processing data..."); } } … // поток подготовки public void prepareData( ) { synchronized ( monitor ) { System.out.println( "Data prepared" ); ready = true; monitor.notifyAll( ); } }
Базовые средства Java для синхронизации потоков (4) Категорически рекомендуется всегда вызывать метод wait() внутри цикла, проверяющего флаг готовности данных, доступ к которым синхронизируется. Это гарантирует: 1. Живучесть системы потоков. Если по каким-либо причинам метод notify() (или notifyAll()) будет вызван другим потоком раньше, чем ожидающим был вызван метод wait(), то это поток может никогда не проснуться, несмотря на то, что данные уже готовы к обработке. 2. Безопасность системы потоков. Во многих виртуальных машинах Java реализуется такая стратегия управления потоками, при которой не исключены ложные пробуждения (выход из состояния wait без notify). Наличие цикла проверки флага готовности, обрамляющего вызов метода wait() и предшествующего операторам обработки данных гарантирует, что в случае ложного пробуждения поток не займется обработкой несуществующих или некорректных данных. При выборе между методами notify() и notifyAll() следует помнить, что второй вариант может значительно ухудшить производительность приложения, если в состоянии ожидания одного монитора может находиться большое количество потоков. В то же время выбор метода notify() для извещения всегда опасен тем, что виртуальная машина может разбудить поток, не имеющий никакого отношения к желаемой обработке данных. Тогда нужный поток опять же может никогда не проснуться. При использовании любых средств синхронизации всегда нужно тщательно следить за тем, чтобы не попасть в ситуацию deadlock (смертельное объятие), возникающую так: поток A захватывает ресурс X и ждет освобождения ресурса Y поток B захватывает ресурс Y и ждет освобождения ресурса X
Базовые средства Java для синхронизации потоков (5) Синхронизация, основанная на базовом synchronized имеет очень существенные недостатки: Не существует способа отказаться от попытки захватить какой-либо объект, если он занят. Отсутствует возможность отказаться от попытки захвата объекта через какой-то интервал времени. Если бы эти возможности были, то проблема появления deadlock при синхронизации потоков была бы не так страшна. Не существует способа осуществлять отдельные блокировки для чтения или записи, что иногда было бы весьма полезно. При освобождении некоторого захваченного ресурса (того, который выступает параметром у вызова synchronized блока) нет возможности дать доступ к этому блоку самому первому потоку, который раньше других начал попытался этот ресурс захватить. Если существует несколько вложенных synchronized блоков, то освобождены ресурсы должны быть строго в обратном порядке по сравнению с тем, в котором они были захвачены. Иногда бывает не просто обеспечить именно такой порядок освобождения. Есть типичные ошибки и при запуске потоков. Если вместо метода start( ) вызвать перегруженный метод run( ), то программа без всяких сообщений или исключений отработает, но не в многопоточном, а в однопоточном режиме. Еще одна неприятность заключается в том, что если написать метод run() с другой сигнатурой (например public int run( )), то, опять-таки без сообщений и исключений, при вызове метода start( ) будет запущен на исполнение неперегруженный метод run( ) базового класса Thread, который просто вообще ничего не делает.
Средства пакета java.util.concurrent В Java версии 1.5 был добавлен новый пакет, содержащий много полезных возможностей, касающихся синхронизации и параллелизма: java.util.concurrent. Возможности этого пакета были развиты и дополнены в версиях 1.6 и 1.7. Синхронизация – это обеспечение правильной последовательности операций по обмену данными между некоторым набором потоков. Параллелизм – это искусство эффективно выполнять приложением несколько задач одновременно, избегая всяческих конфликтов данных. Новые возможности в реализации синхронизации и параллелизма можно разделить на следующие группы: Вспомогательные классы и интерфейсы Executor Framework Lock объекты Синхронизаторы Классы, реализующие атомарные операции Параллельные коллекции
java.util.concurrent Интерфейс Callable Интерфейс Callable гораздо лучше подходит для создания задач, предназначенных для параллельного выполнения, чем интерфейс Runnable или тем более класс Thread: import java.util.concurrent.Callable; public class CallableSample implements Callable { public String call( ) throws Exception { … if( какое-то условие ) { throw new Exception("ошибка при выполнении потока"); } System.out.println("задача выполнена"); return "result"; } При этом стоит отметить, что возможность добавить подобный интерфейс появилась только начиная с версии Java 1.5, так как ключевая особенность интерфейса Callable – это использование параметризованных типов (generics) для определения типа возвращаемого результата. В изучаемом дальше фреймворке этот интерфейс реализуется еще более удобным классом FutureTask, обеспечивающим возможность выполнения асинхронных вычислений в пуле потоков. Еще одна важная деталь интерфейса Callable: возможность из потока выбрасывать исключения, не влияющие на другие задачи. Такие исключения должны перехватываться и обрабатываться порождающим потоком.
java.util.concurrent Executor Framework (1) Пакет java.util.concurrent содержит три Executor-интерфейса: Executor ExecutorService ScheduledExecutorService и набор классов, реализующих эти интерфейсы. Интерфейс Executor был введен для отделения процесса создания и отправки задачи на выполнение от механизма исполнения каждой конкретной задачи. Классы, реализующие данный интерфейс, могут оперировать так называемыми пулами потоков, распределяемыми динамически между задачами. Интерфейс Executor имеет единственный метод execute(Runnable task). Он позволяет абстрагироваться от того, как конкретно исполняется задача (этим занимается экземпляр класса, реализующего интерфейс): Executor executor = ; executor.execute( new Runnable myTask() ); В пакете java.util.concurrent интерфейс Executor и его расширения реализуют классы ForkJojnPool, ThreadPoolExecutor и SheduledThreadPoolExecutor. Значительно большие возможности предоставляет интерфейс ExecutorService.
java.util.concurrent Executor Framework (2) Интерфейс ExecutorService является расширением интерфейса Executor и добавляет следующие полезные возможности: Возможность выполнения не только Runnable объектов, но и объектов, реализующих новый интерфейс java.util.concurrent.Callable. Основное отличие от Runnable объектов – возможность возвращать значение из потока. Возможность возвращать вызывавшему потоку объект класса java.util.concurrent.Future, который содержит среди прочего и возвращаемое значение. Возможность остановить выполняемый поток. Для запуска на исполнение одной задачи имеется группа перегруженных методов: Future submit( Runnable task ) T Future submit( Runnable task, T result ) T Future submit( Callable task ) Здесь под T понимается объектный (ссылочный) тип, значение которого будет прямо возвращать задача, если она реализует интерфейс Callable и которое может быть получено от задачи косвенными путями, если она реализует интерфейс Runnable. Интерфейс Future определяет методы доступа к состоянию и результату задачи: boolean cancel( boolean mayInterruptIfRunning ) Пытается остановить выполнение задачи. V get( ) Ожидает завершения задачи и возвращает ее результат. V get( long timeout, TimeUnit unit ) Ожидает завершения задачи в течение заданного интервала времени и возвращает ее результат, если он доступен (иначе – null). boolean isCancelled( ) возвращает истину, если задача была завершена принудительно. boolean isDone( ) возвращает истину, если задача завершилась нормально.
java.util.concurrent Executor Framework (3) Вот простой пример: public class CallableImpl implements Callable { public Integer call( ) { //… выполнение задачи return new Integer( someValue ); } //… Callable callable = new CallableImpl( ); ExecutorService executor = Executors.newCachedThreadPool( ); Future future = executor.submit( callable ); try { System.out.println( "Future value: " + future.get( ) ); } catch ( Exception e ) { e.printStackTrace(); } Здесь создается пул потоков не фиксированного размера.
java.util.concurrent Executor Framework (4) Интерфейс ExecutorService имеет еще две группы методов запуска задач, позволяющих оперировать с целыми группами (коллекциями): List > invokeAll( Collection > tasks ) Выполняет все заданные задачи, содержащиеся в коллекции tasks, возвращая список экземпляров Future, и заканчивается, когда все задачи завершаются. List > invokeAll( Collection > tasks, long timeout, TimeUnit unit ) Выполняет все заданные задачи и заканчивается, когда все завершаются, или истекает тайм-аут заданный в единицах времени unit. T invokeAny( Collection > tasks ) Выполняет все заданные задачи, возвращая результат какой-либо из тех, которые завершились успешно (то есть, не выбросили исключение). T invokeAny( Collection > tasks, long timeout, TimeUnit unit ) Выполняет все заданные задачи, возвращая результат какой-либо из тех, которые завершились успешно в течение заданного интервала времени. Перечисление TimeUnit определяет константы DAYS, HOURS, MINUTES, MICROSECONDS, MILLISECONDS, NANOSECONDS, SECONDS и набор полезных методов для работы с временными интервалами.
java.util.concurrent Executor Framework (5) Интерфейс ExecutorService кроме того содержит методы, позволяющие проверять состояние задач, инициировать их завершение и ожидать этого завершения: boolean awaitTermination( long timeout, TimeUnit unit ) Блокирует текущий поток до завершения всех задач, или истечения заданного тайм-аута, или прерывания текущего потока – что произойдет раньше. boolean isShutdown( ) возвращает true, если этот исполнитель задач остановлен. boolean isTerminated( ) возвращает true, если все задачи завершились. void shutdown( ) Инициирует аккуратное завершение работы. При этом все ранее запущенные задачи выполняются до завершения, но никакие новые (методы submit, invoke*) задачи экземпляром класса, реализующего ExecuteService, не будут приняты. List shutdownNow( ) Пытается остановить все активно выполняющиеся задачи, предотвращает обработку ожидающих задач, возвращает список задач, которые ждали выполнения.