Я пытаюсь понять, как реорганизовать программу, которую я раньше написал бы как последовательность переходов состояний:
У меня есть бизнес-логика:
type In = Long
type Count = Int
type Out = Count
type S = Map[Int, Count]
val inputToIn: String => Option[In]
= s => try Some(s.toLong) catch { case _ : Throwable => None }
def transition(in: In): S => (S, Out)
= s => { val n = s.getOrElse(in, 0); (s + (in -> n+1), n+1) }
val ZeroOut: Out = 0
val InitialState: S = Map.empty
С их помощью я хочу создать программу для передачи некоторого начального состояния (пустая карта), чтения ввода из stdin , преобразования его In
, запуска перехода между состояниями и вывода текущего состояния S
и вывода Out
на стандартный вывод .
Раньше я бы сделал что-то вроде этого:
val runOnce = StateT[IO, S, Out](s => IO.readLn.map(inputToIn) flatMap {
case None => IO((s, ZeroOut))
case Some(in) => val (t, o) = transition(in)(s)
IO.putStrLn(t.toString) |+| IO.putStrLn(o.toString) >| IO((t, o))
})
Stream.continually(runOnce).sequenceU.eval(InitialState)
Однако мне действительно сложно понять, как связать этот подход (поток переходов состояний) с scalaz-stream . Я начал с этого:
type Transition = S => (S, Out)
val NoTransition: Transition = s => (s, 0)
io.stdInLines.map(inputToIn).map(_.fold(NoTransition)(transition))
Это типа: Process[Task, Transition]
. Я действительно не знаю, что делать дальше.
- Как мне "передать" мою
InitialState
и запустить программу,S
распределяя вывод на каждом шаге в качестве вводаS
для следующего? - Как мне получить значения
S
иOut
на каждом этапе и распечатать их в стандартный вывод (при условии, что я могу преобразовать их в строки)?
Пытаясь использовать единственное for-понимание, я тоже застреваю:
for {
i <- Process.eval(Task.now(InitialState))
l <- io.stdInLines.map(inputToIn)
...
Любая помощь приветствуется!
У меня теперь немного больше.
type In_ = (S, Option[In])
type Out_ = (S, Out)
val input: Process[Task, In_]
= for {
i <- Process.emit(InitialState)
o <- io.stdInLines.map(inputToIn)
} yield (i, o)
val prog =
input.pipe(process1.collect[In_, Out_]) {
case (s, Some(in)) => transition(in)(s)
}).to(io.stdOutLines.contramap[Out_](_.toString))
потом
prog.run.run
Это не работает: похоже, что состояние не передается через поток. Скорее, на каждом этапе передается начальное состояние.
Пол Кьюзано предложил использовать подход process1.scan
. Итак, теперь я делаю это:
type In_ = In
type Out_ = (S, Out)
val InitialOut_ = (InitialState, ZeroOut)
val program =
io.stdInLines.collect(Function.unlift(inputToIn)).pipe(
process1.scan[In_, Out_](InitialOut_) {
case ((s, _), in) => transition(in)(s)
}).to(io.stdOutLines.contramap[Out_](_.shows))
Здесь есть проблема: в этом конкретном примере мой Out
тип - моноид , поэтому мое начальное состояние может быть создано с использованием его идентификатора, но обычно это может быть не так. Что бы я тогда делал? (Думаю, я мог бы использовать, Option
но это кажется ненужным.)
источник
StateT
конструкцию, что и монада-носитель для своего потока?type Carrier[A] = StateT[Task, S, A]; val input: Process[Carrier, Option[In]] = ...; prog.run.run(initialValue).run // prog.run is a Carrier[Unit] i.e. StateT
Ответы:
import io.FilePath import scalaz.stream._ import Process._ import scalaz.concurrent.Task import Task._ import scalaz.{Show, Reducer, Monoid} import scalaz.std.list._ import scalaz.syntax.foldable._ import scalaz.syntax.bind._ import scalaz.stream._ import io._ import scalaz.stream.text._ import Processes._ import process1.lift import control.Functions._ /** * A Fold[T] can be used to pass over a Process[Task, T]. * * It has: * * - accumulation, with an initial state, of type S, a fold action and an action to perform with the last state * * - side-effects with a Sink[Task, (T, S)] to write to a file for example, using the current element in the Process * and the current accumulated state * * This covers many of the needs of iterating over a Scalaz stream and is composable because there is a Monoid * instance for Folds * */ trait Fold[T] { type S def prepare: Task[Unit] def sink: Sink[Task, (T, S)] def fold: (T, S) => S def init: S def last(s: S): Task[Unit] /** create a Process1 returning the state values */ def foldState1: Process1[T, S] = Processes.foldState1(fold)(init) /** create a Process1 returning the folded elements and the state values */ def zipWithState1: Process1[T, (T, S)] = Processes.zipWithState1(fold)(init) } /** * Fold functions and typeclasses */ object Fold { /** * Create a Fold from a Sink with no accumulation */ def fromSink[T](aSink: Sink[Task, T]) = new Fold[T] { type S = Unit lazy val sink: Sink[Task, (T, S)] = aSink.extend[S] def prepare = Task.now(()) def fold = (t: T, u: Unit) => u def init = () def last(u: Unit) = Task.now(u) } /** * Transform a simple sink where the written value doesn't depend on the * current state into a sink where the current state is passed all the time * (and actually ignored) * Create a Fold a State function */ def fromState[T, S1](state: (T, S1) => S1)(initial: S1) = new Fold[T] { type S = S1 lazy val sink: Sink[Task, (T, S)] = unitSink[T, S] def prepare = Task.now(()) def fold = state def init = initial def last(s: S) = Task.now(()) } /** * Create a Fold from a side-effecting function */ def fromFunction[T](f: T => Task[Unit]): Fold[T] = fromSink(Process.constant(f)) /** * Create a Fold from a Reducer */ def fromReducer[T, S1](reducer: Reducer[T, S1]): Fold[T] = new Fold[T] { type S = S1 lazy val sink: Sink[Task, (T, S)] = unitSink[T, S] def prepare = Task.now(()) def fold = reducer.cons def init = reducer.monoid.zero def last(s: S) = Task.now(()) } /** * Create a Fold from a Reducer and a last action */ def fromReducerAndLast[T, S1](reducer: Reducer[T, S1], lastTask: S1 => Task[Unit]): Fold[T] = new Fold[T] { type S = S1 lazy val sink: Sink[Task, (T, S)] = unitSink[T, S] def prepare = Task.now(()) def fold = reducer.cons def init = reducer.monoid.zero def last(s: S) = lastTask(s) } /** * This Sink doesn't do anything * It can be used to build a Fold that does accumulation only */ def unitSink[T, S]: Sink[Task, (T, S)] = channel((tu: (T, S)) => Task.now(())) /** * Unit Fold with no side-effect or accumulation */ def unit[T] = fromSink(channel((t: T) => Task.now(()))) /** * Unit fold function */ def unitFoldFunction[T]: (T, Unit) => Unit = (t: T, u: Unit) => u /** create a fold sink to output lines to a file */ def showToFilePath[T : Show, S](path: FilePath): Sink[Task, (T, S)] = io.fileChunkW(path.path).pipeIn(lift(Show[T].shows) |> utf8Encode).extend[S] implicit class FoldOps[T](val fold: Fold[T]) { } /** * Monoid for Folds, where effects are sequenced */ implicit def foldMonoid[T]: Monoid[Fold[T]] = new Monoid[Fold[T]] { def append(f1: Fold[T], f2: =>Fold[T]): Fold[T] = f1 >> f2 lazy val zero = Fold.unit[T] } /** * create a new Fold sequencing the effects of 2 Folds */ implicit class sequenceFolds[T](val fold1: Fold[T]) { def >>(fold2: Fold[T]) = new Fold[T] { type S = (fold1.S, fold2.S) def prepare = fold1.prepare >> fold2.prepare def sink = fold1.sink.zipWith(fold2.sink) { (f1: ((T, fold1.S)) => Task[Unit], f2: ((T, fold2.S)) => Task[Unit]) => (ts: (T, S)) => { val (t, (s1, s2)) = ts (f1((t, s1)) |@| f2((t, s2)))((_,_)) } } def fold = (t : T, s12: (fold1.S, fold2.S)) => (fold1.fold(t, s12._1), fold2.fold(t, s12._2)) def last(s12: (fold1.S, fold2.S)) = (fold1.last(s12._1) |@| fold2.last(s12._2))((_,_)) def init = (fold1.init, fold2.init) } } /** * Run a fold an return the last value */ def runFoldLast[T](process: Process[Task, T], fold: Fold[T]): Task[fold.S] = fold.prepare >> logged(process |> fold.zipWithState1).drainW(fold.sink).map(_._2).runLastOr(fold.init) /** * Run a Fold an let it perform a last action with the accumulated state */ def runFold[T](process: Process[Task, T], fold: Fold[T]): Task[Unit] = runFoldLast(process, fold).flatMap(fold.last) /** * Run a list of Folds, sequenced with the Fold Monoid */ def runFolds[T](process: Process[Task, T], folds: List[Fold[T]]): Task[Unit] = runFold(process, folds.suml) }
источник