Параллельные коллекции и свертка

Начиная с версии 2.9, в Scala была добавлена поддержка параллельных коллекций. Сама по себе эта тема довольно большая, но для простоты можно сказать, что некоторые операции подобных коллекций выполняются в параллель. Разбиение на пакеты и иерархия классов параллельных коллекций (scala.collection.parallel) в общем и целом напоминает организацию непараллельного scala.collection с разбиением на мутабельные и иммутабельные типы. При этом они не являются подтипами нормальных коллекций, а вместо этого у нормальных и параллельных коллекций есть общие трейты-родители, названия которых, как правило, начинаются с префикса Gen. Названия параллельных коллекций начинаются с префикса Par.

Преобразование уже существующей коллекции в параллельую и назад делается соотвественно вызовами методов par и seq из трейта Parallelizable.

scala> 1 to 10 par
res0: scala.collection.parallel.immutable.ParRange = ParRange(1, 2, 3, 4, 5, 6, 7,
8, 9, 10)
scala> Array(1,2,3,4) par
res1: scala.collection.parallel.mutable.ParArray[Int] = ParArray(1, 2, 3, 4)
scala> val parSeq = List(1,2,3,4).par
parSeq: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3, 4)
scala> parSeq.seq
res0: scala.collection.immutable.Seq[Int] = Vector(1, 2, 3, 4)

Параллельные коллекции имеют те же методы, что и нормальные коллекции, но уже с другими реализациями.

Например, для того, чтобы вычислить факториал в несколько потоков, достаточно написать:

def parFact(n:BigInt) = (BigInt(1) to n).par.product

Для сравнения, если бы мы задались целью написать свою многопоточную версию версию вычисления факториала, то получилось бы что-то вроде:

object ParallelFactorial {

  val numThreads:Int = 10
  import ExecutionContext.Implicits.global

  def parallelFactorial(n: Int): BigInt = {
    val blocks = splitIntoBlocks[BigInt]((BigInt(1) to BigInt(n)).toList, numThreads)
    val f = Future.sequence(blocks.map {b => Future (b.product) })
    Await.result(f.map(results => results.product), Duration(120, TimeUnit.SECONDS))
  }

  private def splitIntoBlocks[A](xs: Seq[A], n: Int) = {
    val m = xs.length
    val targets = (0 to n).map{x => math.round((x.toDouble*m)/n).toInt}
    def snip(xs: Seq[A], ns: Seq[Int], got: Vector[Seq[A]]): Vector[Seq[A]] = {
      if (ns.length<2) got
      else {
        val (i,j) = (ns.head, ns.tail.head)
        snip(xs.drop(j-i), ns.tail, got :+ xs.take(j-i))
      }
    }
    snip(xs, targets, Vector.empty)
  }
}

Очевидно, что версия с использованием параллельной коллекции выглядит тривиальной, с точки зрения клиента. Это и являлось целью создания фреймворка параллельных коллекций - сделать многопоточное програмиирование более доступным, потому что очевидно, что при многопоточном программировании используются несколько другие алгоритмы и структуры данных. Но некоторые вещи, тем не менее, можно обобщить - например, явно присутствет фазы разделения входной коллекции на части, обработка частей в отдельных потоках, а затем сборка финального результата. За реализацию этих фаз во фреймворке параллельных коллекций отвечают:

  • Splitter -который отвечает за разбивку коллекции; яляется также итератором.

  • Combiner - который соотвествует Builder у в последовательных коллекциях, и имеет метод combine, который принимает на вход другой Combiner.

и возвращает новый, содержащий объединение элементов обоих (более подробно см. здесь, здесь) и здесь.

Параллелизм в параллельных коллекциях реализован в стиле “divide and conquer”, т.е. исходная коллекция разбивается на меньшие части, и уже эти части обрабатываются последовательно.

Параллельные операции оформляются как задачи (см. scala.collection.parallel.Task). Tasks передаются на выполнение объекту scala.collection.parallel.TaskSupport, который может быть сконфигурирован для коллекции. Примером конкретной реализации TaskSupport явлеятся ForkJoinTaskSupport.

Конечно, нужно при этом помнить, что любой дополнительный абстрактный слой имеет определенную цену, да и сама многопточность имеет некоторые накладные расходы, поэтому выбор - использовать параллельную версию или нет, должен основываться на замерах производительности.

Теперь о свертке. Как ясно из названия, методы foldLeft, foldRight, reduceLeft, и reduceRight - связаны с последовательной обработкой; фактически они делегируют вызовы в соотвествующую последовательную реализацию, т.е. параллельными не являются. Паралелльными являются методы метод fold, reduce и aggregate.

scala> val sum = Array(1,2,3).par.reduce(_ + _)
sum: Int = 6

Результат этой операции будет совпадать с результатом вызова reduceLeft. Вообще, нужно заметить, что секции, на которые будет разбита коллекция, будут в конце концоы смерджены в результате таким образом, что будет выглядеть так, что вроде бы порядок обхода был сохранен, хотя эти секции реально выполнялись и не по порядку - и это верно для всех трех операций - fold/aggregate/reduce. Трюк состоит в том, что операция, которая используется в свертке (_ + _ в данном случае) должна быть ассоциативной, т.е. позволять произвольные разбиения и перегруппировки (т.е. как произойдет разбиение на группы, заранее неизвестно), но с сохранением порядка - например, 1, 2, 3, может быть сведен в (1 + 2) + 3 или 1 + (2 + 3), но не в (2 + 1) + 3. При этом операция не должна быть коммутативной. Пример такой операции - конкатенация строки:

"acbdefgh".par.map(_.toString).reduce(_+_)

И fold, и aggregate могут выполнять работу в параллель: каждый из них обходит элементы в разных группах последовательно, и затем группы мерджаться так, что первоначальный порядок сохранен (хотя то, как происходит разбиение на группы, мы не знаем). Например, возьмем список List("a","b","c","d") - сначала он может быть разбит на группы "a"+"b" и "c"+"d", которые будут орбработаны параллельно, причем "c" + "d" а затем "a"+"b", а затем будет выполено слияние "ab"+"cd", причем в результате порядок сохранен.

Но в то время как метод reduce является полной заменой для reduceLeft или reduceRight в ситуациях, когда для нас не важен порядрк обработки, и то с как мы уже видели в 1-й части, ситуация с fold несколько сложнее - достаточно сравнить сигнатуры fold and foldLeft из трейта ParIterable[T]:

def fold [U >: T] (z: U)(op: (U, U) U): U
def foldLeft [S] (z: S)(op: (S, T) S): S

Метод fold более ограничен в отношении типов, которые он может использовать. В то время как в foldLeft типы элементов, с одной стороны, и аккумулятора и результата, с другой совершенно разные и никак не связаны между собой, в fold тип результата должен быть предком типа элементов.

Причина ограничения именно в многопоточном варианте - в однопоточном этой проблемы нет. В то время как один поток работает над одной секцией, другой поток - над другой, и не понятно, какой из них закончит раньше, то операция op должна быть коммутативной, т.е. возможна как ситуация op(a,b), так и op(b,a) - в отличие от ситуации с foldLeft. Это можно сделать двумя путями - либо наложить ограничение на типы (как это сделано в fold), либо предоставить дополнительную операцию для комбинирования - как это сделано в aggregate:

def aggregate [B] (z: B)(seqop: (B, A)  B, combop: (B, B)  B): B

Здесь A - тип элементов коллекции, B - тип аккумулятора и результата. Допустим, у нас есть четыре элемента. Тогда возможен следующий сценарий работы aggregate:

LiJdm4J.png

Например, путь есть набор слов GenSeq("I", "have", "a", "dream"), и мы хотим узнать, сколько букв в этих словах в общей сложности.

import scala.collection.GenSeq
val seq = GenSeq("I", "have", "a", "dream")
val chars = seq.aggregate(0)(_ + _.length, _ + _)

Сначала, в первый проход, мы, например, получаем:

0 + "I".length       // 1
0 + "have".length    // 4
0 + "a".length       // 1
0 + "dream".length   // 5

Затем мы, возможно, получим:

4 + 2 // 5
2 + 7 // 6

И как последний шаг

5 + 6 // 11

и мы получили результат. Т.е. мало того, что у нас получилось 3 прохода (благодаря параллелизму) вместо 7 в случае последовательной реализации, в случае aggregate мы, предоставив combop, получаем гибкость в использовании типов.

Option.fold

Операция fold есть также в Optionе, но работает она немного неожиданным образом.

@inline final def fold[B](ifEmpty: => B)(f: A => B): B

Option.fold делает одно из двух: или вызывает функцию f со значением Optionа - если оно есть, или возвращает другое значение ifEmpty, если в нем содержится None. Т.е. фактически это комбинация map и getOrElse:

val x: R = option map f getOrElse ifEmpty

С Option.fold же это выглядит так:

val x: R = option.fold(ifEmpty)(f)

В данном случае аналогия с fold, который есть у коллекций, несколько натянутая, даже если представить, что Option является коллекцией с количеством элементов более от 0 до 1. Для данного метода бы лучше подошло бы название mapOrElse.