Продолжаем цикл статей, посвященный обработке больших объемов данных в параллель в Java 8.

В предыдущей статье мы познакомились и интересным инструментарием Fork/Join Framework, позволяющим разбить обработку на несколько частей и запустить параллельно выполнение отдельных задач. Что нового в этой статье? Отвечу – более содержательные примеры и новые механизмы для качественной обработки информации. Параллельно я расскажу о ресурсных и прочих особенностях работы в этом режиме.

Всех заинтересованных приглашаю под кат.

Начало

Все, что хорошо делится — делим. Примерно так я писал в предыдущей статье, предлагая разделить обработку на части и максимально загрузить процессоры.

Уже давно даже домашние компьютеры имеют многоядерные процессоры. Здесь и кроется первая особенность работы в этом режиме. Нужно соблюдать паритет между количеством подзадач и количеством ядер. По многочисленным тестам формула запуска приблизительно следующая, количество подзадач должно быть: (Количество ядер +0) или (Количество ядер +1). Эти варианты тестировался на нескольких серьезных серверах и нескольких обычных машинах.

Механизмы ограничения

Под механизмами ограничения я понимаю, всевозможные механизмы («отсечки») максимально быстрой и удобной обработкой и отладкой ошибок. В своих проектах стараюсь создать максимальное количество способов отладки кода, например:

  1. Постарайтесь реализовать механизмы однозадачных и многозадачных режимов для ваших вычислений. Зачем? Сейчас объясню. Допустим, что у вас есть успешно переданный проект, и даже, возможно протестированный. В случае нестандартно ситуации, первое, что можно сделать – это для быстрого понимания и исправления ошибки переключиться в однозадачный режим (принудительно) и сразу получает ошибку на экране сервера приложений (если такое возможно и есть доступ).Удобнее выявлять недочеты в однозадачном режиме — если у вас включен параллельный режим, то при остановке первой подзадачи, вы увидите часть выполнения второй (это не всегда удобно для просмотра).
  2. Продумывайте механизмы доступа и загрузки постоянно требующихся данных.Расскажу подробнее. Например, вы хотите перед началом обработки перевести около 100 огромных таблиц в Map<K,V>. Да, это быстро удобно, но есть несколько неприятных моментов.Допустим, что вы начинаете тестировать большие данные. Присутствует проблема по трем договорам, блокам, клиентам, без разницы, по трем позициям (давайте будем называть «позиции»). Вы разобрались, в чем ошибка, исправили, перезаписали jar, перезапустили и… Ничего! Снова сидим, ждет несколько минут. Ждем расчет.В этой ситуации нам бы помогли механизмы выборочной загрузки (загрузок) данных.Например, далее не самый лучший вариант. Фактически, построение Map по всем данным. Хотя иногда и он применяется.

    Следующий вариант предпочтительнее (имхо). В нем мы ограничиваемся входящими данными, так как получаем только то, что нам действительно нужно.
  3. Сразу создавайте механизмы записи и вывода ошибок в таблицы (файлы) и другие источники. Если ваши алгоритмы четко выстроены, то ничто вам не мешает создать класс, который будет отрабатывать по ключу. Например, перед загрузкой есть «флаг», который позволяет записывать данные в таблицу с ошибками, тем самым вы точно знаете область оной.

Быстрее…

Немного поговорив про возможные особенности/ошибки, перейдем к нашей непосредственной цели, а именно к обработке в параллельном режиме больших объемов информации и немного расширим существующие пример (нагло заберем их из предыдущей статьи).

В ней создано несколько классов, отвечающих за входящие данные и обработку. Тогда мы основывались на классе RecursiveAction. Напомню, еще раз, что было сделано в примере. В классе StreamSettings мы разбиваем полученные данные на части, пока не достигнем порогового значения, выставленного в countValue = 500. Как я объяснял ранее, механизм ограничения вы можете создать любой. Например, вариант (valueId.size() / Runtime.getRuntime().availableProcessors()*2) тоже работоспособен и может применяться для нахождения некого оптимального значения.

 

Продолжим наши изыскания. Попробуем посмотреть новые варианты обработки, остановимся на классе RecursiveTask. Основное отличие будет в том, что метод compute() будет возвращать результат (а это требуется ну уж очень часто). Фактически мы можем дождаться выполнения нескольких подзадач и произвести вычисления. Далее показаны примеры, на которых мы остановимся подробнее.
Класс Stream отвечает за разбиение на подзадачи. В примере находим среднее значение и создаем экземпляр класса (Stream goVar1 = new Stream(forSplit,start, middle)) от 0 до «середины» и в (Stream goVar2 = new Stream(forSplit,middle,end)) передаем от «середины» до конечного элемента.

Отличие от предыдущего варианта, класса StreamSettings, не используется invokeAll, а будут вызываться методы fork() и join() соответственно.

 

В результате корректного выстраивания запусков и ожиданий запусков, вы можете создать удобную систему асинхронного выполнения вычислений.

Особенности обработки информации с помощью Fork/Join Java 8

Затронутая тема обширна и позволяет производить существенные усовершенствования. Выигрыш по времени составляет приблизительно в 1,7 раза по сравнению с последовательным запуском. Вы можете использовать доступные ресурсы эффективнее и перевести множество расчетов в параллельный режим.

Если остались вопросы, обращайтесь на почту javarules@mail.ru

Java в параллель. Учимся создавать подзадачи и контролировать их выполнение с помощью Fork/Join Framework: 3 комментария

  1. Иван on 24.09.2016 at 20:27 пишет:

    Добрый день.
    Вы можете подробнее обьяснить механизм fork/join и показать пример без дополнительных обработок и наворотов?

    • Роман on 04.10.2016 at 11:17 пишет:

      Добрый день.
      Да, в ближайшие дни я смогу подготовить примеры и подробно рассказать каждую строчку.

    • Роман on 11.10.2016 at 23:27 пишет:

      Есть прекрасный пример на сайте Oracle по ссылке https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
      В примере заложена простая истина, если моя работа мала, то выполняем, иначе разделим её на 2 части. У меня аналогично в примере за исключением проверки на процессоры.
      В обязательном методе compute() реализована проверка на количество 100000. Если это так, то отработает метод computeDirectly(). Если значение будет превышать, то через invokeAll будут созданы 2 экземпляра класса и переданы значения от начала до середины и от середины до конца. Далее произойдут вычисления.

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *

Навигация по записям