Предотвращение утечек памяти с помощью перечислителей Scalaz 7 zipWithIndex / group

106

Задний план

Как отмечено в этом вопросе , я использую итерации Scalaz 7 для обработки большого (т. Е. Неограниченного) потока данных в постоянном пространстве кучи.

Мой код выглядит так:

type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]

def processChunk(c: Chunk, idx: Long): Result

def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
  Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
    rs ++ vs map { 
      case (c, i) => processChunk(c, i) 
    }
  } &= (data.zipWithIndex mapE Iteratee.group(P))

Эта проблема

Кажется, я столкнулся с утечкой памяти, но я недостаточно знаком с Scalaz / FP, чтобы знать, в Scalaz или в моем коде ошибка. Интуитивно я ожидаю, что этот код потребует только (порядка) P раз больше Chunkместа.

Примечание. Я нашел аналогичный вопрос, в котором OutOfMemoryErrorвстречался, но мой код не использует consume.

Тестирование

Я провел несколько тестов, чтобы попытаться выявить проблему. Подводя итог, можно сказать, что утечка возникает только тогда, когда используются оба zipWithIndexи group.

// no zipping/grouping
scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO
res47: Long = 4294967296

// grouping only
scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO
res49: Long = 4294967296

// zipping and grouping
scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO
java.lang.OutOfMemoryError: Java heap space

// zipping only
scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO
res51: Long = 4294967296

// no zipping/grouping, larger arrays
scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO
res53: Long = 17179869184

// zipping only, larger arrays
scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO
res54: Long = 17179869184

Код для тестов:

import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._

// define an enumerator that produces a stream of new, zero-filled arrays
def enumArrs(sz: Int, n: Int) = 
  Iteratee.enumIterator[Array[Int], IO](
    Iterator.continually(Array.fill(sz)(0)).take(n))

// define an iteratee that consumes a stream of arrays 
// and computes its length
val i1 = Iteratee.fold[Array[Int], IO, Long](0) { 
  (c, a) => c + a.length 
}

// define an iteratee that consumes a grouped stream of arrays 
// and computes its length
val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) { 
  (c, as) => c + as.map(_.length).sum 
}

// define an iteratee that consumes a grouped/zipped stream of arrays
// and computes its length
val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) {
  (c, vs) => c + vs.map(_._1.length).sum
}

// define an iteratee that consumes a zipped stream of arrays
// and computes its length
val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) {
  (c, v) => c + v._1.length
}

Вопросы

  • Ошибка в моем коде?
  • Как я могу заставить эту работу работать в постоянном пространстве кучи?
Аарон Новструп
источник
6
В итоге я сообщил об этом как о проблеме в Scalaz .
Аарон Новструп, 03
1
Это будет неинтересно, но вы можете попробовать -XX:+HeapDumpOnOutOfMemoryErrorпроанализировать дамп с помощью eclipse MAT eclipse.org/mat, чтобы увидеть, какая строка кода удерживает массивы.
huynhjl
10
@huynhjl FWIW, я попытался проанализировать кучу как с помощью JProfiler, так и с помощью MAT, но не смог пройти через все ссылки на классы анонимных функций и т. д. Scala действительно нуждается в специальных инструментах для такого рода вещей.
Аарон Новструп
Что, если утечки нет, а просто то, что вы делаете, требует стремительно увеличивающегося объема памяти? Вы можете легко реплицировать zipWithIndex без этой конкретной конструкции FP, просто поддерживая varсчетчик по мере продвижения .
Иезекииль Виктор
@EzekielVictor Я не уверен, что понимаю комментарий. Вы предлагаете, чтобы добавление одного Longиндекса для каждого фрагмента изменило алгоритм с постоянного на непостоянное пространство кучи? Версия без архивирования явно использует постоянное пространство кучи, потому что она может «обрабатывать» столько блоков, сколько вы готовы подождать.
Аарон Новструп,

Ответы:

4

Это будет небольшим утешением для тех, кто застрял со старым iterateeAPI, но я недавно подтвердил, что эквивалентный тест проходит против API scalaz-stream . Это более новый API обработки потоков, предназначенный для замены iteratee.

Вот тестовый код для полноты картины:

// create a stream containing `n` arrays with `sz` Ints in each one
def streamArrs(sz: Int, n: Int): Process[Task, Array[Int]] =
  (Process emit Array.fill(sz)(0)).repeat take n

(streamArrs(1 << 25, 1 << 14).zipWithIndex 
      pipe process1.chunk(4) 
      pipe process1.fold(0L) {
    (c, vs) => c + vs.map(_._1.length.toLong).sum
  }).runLast.run

Это должно работать с любым значением nпараметра (при условии, что вы готовы подождать достаточно долго) - я тестировал 2 ^ 14 массивов по 32 МБ (то есть всего половину ТиБ памяти, выделенной с течением времени).

Аарон Новструп
источник