о параллелизме в Java: знакомство с пакетом util.concurrent

Как и многие другие сервисы инфраструктуры приложений, вспомогательные классы для поддержания параллелизма, такие как рабочие очереди и пулы потоков часто без необходимости переписываются с нуля для каждого проекта. В этом материале Брайан Гетц познакомит вас с пакетом util.concurrent Дага Ли - высококачественным, широко используемым пакетом утилит с открытым кодом для поддержки параллелизма.

Многие из нас никогда не задумывались о написании своего собственного XML-парсера, механизма поиска и индексации текста, компилятора регулярных выражений, XSL-процессора или генератора PDF как части проекта, который нуждается в одной из этих утилит. Когда нам нужно одно из этих средств, мы используем либо коммерческую, либо бесплатную готовую реализацию для того, чтобы выполнить эти задачи, и по известным причинам существующие реализации хорошо справляются со своей задачей, легко доступны, в то время как написание своей собственной утилиты отняло бы много времени и дало бы очень незначительный (или вообще нулевой) результат.

еще раз изобретаем колесо

Когда дело касается низкоуровневых сервисов среды приложений, таких как журнализация, пулинг соединений с базой данных, кэширование и планирование задач, которых требует практически любое серверное приложение, мы видим, что эти базовые сервисы инфраструктуры переписываются вновь и вновь. Почему это происходит? Совсем не обязательно потому, что существующие опции были неадекватны или что собственные версии лучше или же больше подходят для приложения, которое находится в работе. Фактически, пользовательские версии зачастую не больше подходят для приложения, для которого они разработаны, чем широкодоступные многоцелевые реализации, а могут быть даже хуже по качеству. Например, хотя вам может и не нравиться log4j, но он свою работу выполняет. И хотя доморощенные системы журнализации могут иметь специфические возможности, которых нет у log4j, для большинства приложений вам придется доказывать, что действительно стоит с нуля писать комплексный пользовательский пакет регистрирования, а не использовать существующую общедоступную реализацию. И все же многие команды проектировщиков заканчивают тем, что пишут свои собственные пакеты журнализации, пулинга соединений или планирования потоков снова и снова.

обманчивая простота

Одно из соображений, которое мы не учитываем при написании, например, собственного XSL-процессора - то, какой это будет огромный труд. Но эти низкоуровневые сервисы инфраструктуры обманчиво просты и потому написание собственного сервиса не кажется таким сложным. Однако корректно написать их гораздо сложнее, чем кажется на первый взгляд. Основная причина, по которой снова и снова продолжают изобретать колесо, заключается в том, что потребность в этих средствах в конкретном приложении начинается с малого и постепенно возрастает по мере того, как вы наталкиваетесь на те же самые проблемы, с которыми столкнулось несчетное количество других проектов. Аргумент обычно звучит так: "Нам не нужен полномасштабный пакет журнализации/планирования/кэширования, нам нужно что-то простое, поэтому мы напишем что-нибудь специально для этого проекта, и оно будет приспособлено под наши специфические потребности." Но зачастую вы быстро перерастаете то простое средство, которое написали, и вас тянет добавить еще несколько возможностей, и еще несколько, до тех пор, пока не напишете полномасштабный сервис инфраструктуры. В этот момент вы обычно полностью преданы тому, что сами написали, лучше ли это или хуже, чем то, что уже существует. Вы уже полностью заплатили за создание своего собственного сервиса, поэтому в дополнение к фактическим затратам на переход к универсальной реализации, вам придется также преодолеть барьер "безвозвратных затрат".

клад, спрятанный в строительных блоках для параллелизма

Планирование и классы инфраструктуры параллелизма гораздо труднее в написании, чем это кажется. Язык Java предоставляет набор полезных низкоуровневых примитивов для синхронизации - wait(), notify() и synchronized - но в использовании этих примитивов много тонкостей и проблем, связанных с производительностью, взаимной блокировкой, доступностью, управлением ресурсами и безопасностью потоков, которых необходимо избежать. Параллельный код сложно писать и еще сложнее тестировать, и даже экспертам это не всегда удается с первого раза. Даг Ли (Doug Lea) написал отличный пакет бесплатных утилит для реализации параллелизма, включая блокировки, взаимные исключения, очереди, пулы потоков, облегченные задачи, эффективные параллельные коллекции, атомарные арифметические операции и другие базовые строительные блоки параллельных приложений. Этот пакет, который обычно называют util.concurrent (так как настоящее имя пакета чересчур длинное), образует основу пакета java.util.concurrent в JDK 1.5, стандартизуемый в рамках Java Community Process JSR 166. В настоящее время util.concurrent хорошо оттестирован и используется во многих серверных приложениях, включая сервер приложений JBoss J2EE.

заполнение пустоты

Отсутствие полезного набора высокоуровневых инструментов синхронизации, таких как взаимные исключения, семафоры и блокировки, а также потокобезопасные классы коллекций было явным упущением для библиотек основных классов Java. Примитивы параллелизма, имеющиеся в языке Java, - synchronization, wait() и notify() - слишком низкоуровневые для нужд большинства серверных приложений. Что происходит, если вам необходимо попытаться получить блокировку и прекратить попытки, если не удастся получить ее за определенное время. Завершить попытку получения блокировки, если поток прерывается? Создать блокировку, которую смогут держать минимум N потоков? Поддерживать многорежимные блокировки, такие как одновременное чтение с исключительным правом записи? Или же устанавливать блокировку в одном методе, а снимать ее в другом? Встроенный механизм блокировок не поддерживает напрямую ничего из этого списка, но все их можно построить на базовых примитивах параллелизма, которые предоставляет язык Java. Но это трудно выполнить и легко сделать ошибку.

Разработчики серверных приложений нуждаются в простых средствах, чтобы заставить сделать взаимное исключение, синхронизировать реакцию на события, обмениваться данными во время действий и асинхронно планировать задания. Низкоуровневые примитивы, которые язык Java предоставляет для этого, сложны в использовании и подвержены ошибкам. Пакет util.concurrent имеет целью заполнить этот вакуум, предоставляя набор классов для установки блокировки, блокирования очередей и планирования задач, который дает возможность иметь дело с общими случаями ошибок или привязывать ресурсы, потребляемые очередями заданий и производственным циклом.

планирование выполнения асинхронных задач

Наиболее широко используемые классы в util.concurrent - те, что касаются планирования асинхронных событий. Очень заманчиво запустить фоновый поток, выполняющий задачу, простым созданием для задачи нового потока:

new Thread(new Runnable() { ... } ).start();

Хотя данная запись является красивой и компактной, у нее есть два значительных недостатка. Во-первых, за создание нового потока приходится платить дополнительными ресурсами, и потому порождение множества потоков, каждый из которых будет выполнять маленькую задачу и затем завершаться, означает, что Java Virtual Machine может выполнить гораздо большую работу и потреблять больше ресурсов, создавая и уничтожая потоки, чем при выполнении полезной работы. Даже если бы избыточность от создания и удаления была нулевой, остается второй, более тонкий недостаток у этой модели выполнения - как вы ограничиваете ресурсы, используемые для выполнения определенного типа задач? Что может не позволить вам запустить одновременно тысячу потоков, если волна запросов набежит совершенно внезапно? Реальные серверные приложения нуждаются в более тщательном управлении своими ресурсами, чем это. Вам необходимо ограничить количество асинхронных задач, выполняющихся одновременно. Пулы потоков решают обе эти проблемы - они предлагают преимущества, связанные с улучшенной эффективностью планирования и ограниченным потреблением ресурсов, одновременно. Хотя можно легко написать очередь работ и пул потоков, исполняющий Runnable-работы в потоках пула, написание эффективного планировщика задач дает больше, чем просто синхронизацию доступа к общей очереди. Настоящий планировщик задач должен иметь дело с потоками, которые умирают, уничтожать лишние потоки пула, чтобы они не потребляли ресурсы без необходимости, динамически управлять размером пула на основе уровня нагрузки и ограничивать количество стоящих в очереди задач. Последний момент - ограничение количества задач в очереди - важен для предотвращения разрушения серверных приложений из-за ошибок, вызванных исчерпанием памяти, когда они становятся перегруженными.

Ограничение очереди задач требует решения на основе политик - если очередь работ переполнится, что вам делать с переполнением? Отбросить самый новый элемент? Отбросить самый старый? Заблокировать поток, ставящий в очередь, до освобождения в ней места? Выполнять новый элемент в потоке, ставящем в очередь? Существует множество различных жизнеспособных политик управления переполнением, каждая из которых подходит в одних ситуациях, но не годится в других.

обработчик (Executor)

Util.concurrent определяет интерфейс Executor, для асинхронной обработки Runnable-работ, а также определяет несколько реализаций обработчика Executor, которые предлагают различные характеристики планирования. Постановка задачи в очередь к обработчику делается довольно просто:

Executor executor = new QueuedExecutor();
...
Runnable runnable = ... ;
executor.execute(runnable);


Простейшая реализация, ThreadedExecutor, создает новый поток для каждого Runnable и не предоставляет никакого управления ресурсами - почти также как идиома new Thread(new Runnable() {}).start(). Однако ThreadedExecutor имеет одно существенное преимущество: изменяя только конструкцию вашего обработчика, вы можете переходить к другой модели обработки без необходимости полного пересмотра всех исходных кодов приложения в поисках мест, где вы создаете новые потоки. QueuedExecutor использует один фоновый поток для обработки всех задач, подобно потоку событий в AWT и Swing. QueuedExecutor имеет одно приятное свойство, что задачи исполняются в порядке их постановки в очередь, а в силу того, что все они выполняются в одном потоке, задачи не обязательно требуют синхронизации при каждом обращении к разделяемым данным.

PooledExecutor - это сложная реализация пула потоков, которая не только предоставляет планирование задач в пуле рабочих потоков, но также обеспечивает гибкую настройку размеров пула и управление жизненным циклом потоков, может ограничивать количество элементов в рабочей очереди, чтобы не дать заданиям в очереди потребить всю доступную память, и предлагает множество возможных политик на случай остановки и перегрузки (блокировать, исключить, бросить, исключить самое старое, запустить в вызывающем потоке и так далее). Все реализации Executor управляют созданием и уничтожением потоков для вас, включая остановку всех потоков, когда останавливается обработчик. Также они обеспечивают ловушки к процессу создания потоков, так что ваше приложение при необходимости может управлять созданием экземпляров потоков. Это позволяет вам, например, поместить все рабочие потоки в определенную группу ThreadGroup или дать им значащее имя.

FutureResult (будущий результат)

Иногда вы хотите запустить процесс асинхронно, в надежде, что результаты этого процесса будут доступны позже, когда они вам понадобятся. Вспомогательный класс FutureResult упрощает эту задачу. FutureResult изображает задачу, которой может потребоваться некоторое время на выполнение, и которая может выполняться в другом потоке, а объект FutureResult служит хендлом к этому процессу выполнения. Через него вы можете выяснить, завершилась ли задача, подождать ее завершения и получить ее результат. FutureResult может быть объединен с Executor; вы можете создать FutureResult и поставить его в очередь к обработчику, сохраняя ссылку на FutureResult. Листинг 1 показывает простой пример одновременно с FutureResult и Executor, который начинает формирование образа асинхронно и продолжает другую обработку:

Листинг 1. FutureResult и Executor в действии.

Executor executor = ...
ImageRenderer renderer = ...

FutureResult futureImage = new FutureResult();
Runnable command = futureImage.setter(new Callable() {
public Object call() { return renderer.render(rawImage); }
});

// start the rendering process
executor.execute(command);

// do other things while executing
drawBorders();
drawCaption();

// retrieve the future result, blocking if necessary
drawImage((Image)(futureImage.get())); // use future


FutureResult и кэширование

Вы также можете использовать FutureResult чтобы улучшить параллелизм в кэшах загрузки по запросу. Помещая в кэш FutureResult вместо результата самого вычисления, вы можете сократить время блокировки записи в кэш. И хотя это не ускорит процесс помещения первым потоком элемента в кэш, это сократит время, в течение которого первый поток блокирует другие потоки от обращений к кэшу. Также это позволит раньше сделать результат доступным для других потоков, так как они смогут извлечь FutureTask из кэша. Листинг 2 - это пример использования FutureResult для кэширования:

Листинг 2. Использование FutureResult для улучшения кэширования.

public class FileCache {
private Map cache = new HashMap();
private Executor executor = new PooledExecutor();

public void get(final String name) {
FutureResult result;

synchronized(cache) {
result = cache.get(name);
if (result == null) {
result = new FutureResult();
executor.execute(result.setter(new Callable() {
public Object call() { return loadFile(name); }
}));
cache.put(result);
}
}
return result.get();
}
}


Данный подход позволяет первому потоку быстро входить и выходить из синхронизированного блока, и позволяет другим потокам получить результат вычисления от первого потока, как только тот его создаст, не позволяя двум потокам вычислять один и тот же объект.

резюме

Пакет util.concurrent содержит много полезных классов, некоторые из которых вам покажутся улучшенными версиями тех классов, которые вы уже написали, возможно даже не один раз. Они представляют проверенные в бою высокопроизводительные реализации многих из базовых строительных блоков для многопоточных приложений. util.concurrent явился отправной точкой для JSR 166, который будет производить множество утилит для параллелизма, которые станут пакетом java.util.concurrent в JDK 1.5.



Брайан Гетц, главный консультант, Quiotix


Сетевые решения. Статья была опубликована в номере 02 за 2007 год в рубрике программирование

©1999-2025 Сетевые решения