как слушать N каналов? (динамический оператор выбора)

116

чтобы запустить бесконечный цикл выполнения двух горутин, я могу использовать приведенный ниже код:

после получения сообщения он запустит новую горутину и будет работать вечно.

c1 := make(chan string)
c2 := make(chan string)

go DoStuff(c1, 5)
go DoStuff(c2, 2)

for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}

Теперь я хотел бы иметь такое же поведение для N горутин, но как в этом случае будет выглядеть оператор select?

Это бит кода, с которого я начал, но я не понимаю, как кодировать оператор select

numChans := 2

//I keep the channels in this slice, and want to "loop" over them in the select statemnt
var chans = [] chan string{}

for i:=0;i<numChans;i++{
    tmp := make(chan string);
    chans = append(chans, tmp);
    go DoStuff(tmp, i + 1)

//How shall the select statment be coded for this case?  
for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}
Джон Смит
источник
4
Я думаю, что вам нужно мультиплексирование каналов. golang.org/doc/effective_go.html#chan_of_chan По сути, у вас есть один единственный канал, который вы слушаете, а затем несколько дочерних каналов, которые переходят в основной канал. Связанный вопрос SO: stackoverflow.com/questions/10979608/…
Brenden

Ответы:

152

Вы можете сделать это с помощью Selectфункции из пакета отражения :

func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

Select выполняет операцию выбора, описанную списком случаев. Как и оператор выбора Go, он блокируется до тех пор, пока не будет продолжен хотя бы один из вариантов, делает единый псевдослучайный выбор, а затем выполняет этот случай. Он возвращает индекс выбранного случая и, если этот случай был операцией приема, полученное значение и логическое значение, указывающее, соответствует ли значение отправке по каналу (в отличие от нулевого значения, полученного из-за закрытия канала).

Вы передаете массив SelectCaseструктур, которые идентифицируют канал для выбора, направление операции и значение для отправки в случае операции отправки.

Итак, вы можете сделать что-то вроде этого:

cases := make([]reflect.SelectCase, len(chans))
for i, ch := range chans {
    cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
chosen, value, ok := reflect.Select(cases)
// ok will be true if the channel has not been closed.
ch := chans[chosen]
msg := value.String()

Вы можете поэкспериментировать с более конкретным примером здесь: http://play.golang.org/p/8zwvSk4kjx

Джеймс Хенстридж
источник
4
Есть ли практическое ограничение на количество дел в таком отборе? Тот, что если выйти за рамки, то производительность сильно пострадает?
Максим Владимирский
4
Может быть, это моя некомпетентность, но я обнаружил, что с этим шаблоном действительно сложно работать, когда вы отправляете и получаете сложные структуры через канал. Как сказал Тим Олклер, в моем случае было намного проще передать общий «совокупный» канал.
Bora M. Alper
91

Этого можно добиться, заключив каждый канал в горутину, которая «пересылает» сообщения на общий «агрегированный» канал. Например:

agg := make(chan string)
for _, ch := range chans {
  go func(c chan string) {
    for msg := range c {
      agg <- msg
    }
  }(ch)
}

select {
case msg <- agg:
    fmt.Println("received ", msg)
}

Если вам нужно знать, из какого канала было отправлено сообщение, вы можете обернуть его в структуру с любой дополнительной информацией, прежде чем пересылать в агрегированный канал.

В моем (ограниченном) тестировании этот метод значительно уступает по производительности при использовании пакета отражения:

$ go test dynamic_select_test.go -test.bench=.
...
BenchmarkReflectSelect         1    5265109013 ns/op
BenchmarkGoSelect             20      81911344 ns/op
ok      command-line-arguments  9.463s

Код теста здесь

Тим Оллклер
источник
2
Ваш тестовый код неверен, вам нужно пройти циклb.N в тесте. В противном случае результаты (которые делятся на b.N, 1 и 2000000000 в вашем выводе) будут совершенно бессмысленными.
Dave C
2
@DaveC Спасибо! Вывод не меняется, но результаты гораздо более разумные.
Тим Олклер 02
1
Действительно, я быстро взломал ваш тестовый код, чтобы получить некоторые реальные цифры . Вполне возможно, что в этом тесте что-то все еще отсутствует / неправильное, но единственное, что имеет более сложный код отражения, это то, что установка выполняется быстрее (с GOMAXPROCS = 1), поскольку для него не требуется куча горутин. Во всех остальных случаях простой канал слияния горутин сдувает отраженное решение (на ~ 2 порядка).
Dave C
2
Одним из важных недостатков (по сравнению с reflect.Selectподходом) является то, что горутины делают буфер слияния как минимум одним значением на каждом сливаемом канале. Обычно это не будет проблемой, но в некоторых конкретных приложениях это может стать преградой :(.
Дэйв Си
1
буферный канал слияния усугубляет проблему. Проблема в том, что только решение для отражения может иметь полностью небуферизованную семантику. Я пошел дальше и опубликовал тестовый код, с которым экспериментировал, как отдельный ответ, чтобы (надеюсь) прояснить то, что я пытался сказать.
Dave C
22

Чтобы расширить некоторые комментарии к предыдущим ответам и обеспечить более четкое сравнение, вот пример обоих подходов, представленных до сих пор с одним и тем же входом, срез каналов для чтения и функции для вызова для каждого значения, которое также должно знать, какое канал, из которого получено значение.

Есть три основных различия между подходами:

  • Сложность. Хотя отчасти это может быть предпочтением читателя, я считаю, что канальный подход более идиоматичен, прямолинейен и удобочитаем.

  • Производительность. В моей системе Xeon amd64 goroutines + channels out выполняет решение для отражения примерно на два порядка (в целом отражение в Go часто медленнее и должно использоваться только в случае крайней необходимости). Конечно, если есть какая-либо значительная задержка либо в функции, обрабатывающей результаты, либо в записи значений во входные каналы, эта разница в производительности может легко стать незначительной.

  • Семантика блокировки / буферизации. Важность этого зависит от варианта использования. Чаще всего это либо не имеет значения, либо небольшая дополнительная буферизация в решении для слияния горутин может быть полезна для повышения пропускной способности. Однако, если желательно иметь семантику, согласно которой разблокируется только один модуль записи, а его значение полностью обрабатывается до того, как любой другой модуль записи будет разблокирован, то этого можно достичь только с помощью решения отражения.

Обратите внимание, что оба подхода можно упростить, если либо «id» отправляющего канала не требуется, либо если исходные каналы никогда не будут закрыты.

Канал слияния горутин:

// Process1 calls `fn` for each value received from any of the `chans`
// channels. The arguments to `fn` are the index of the channel the
// value came from and the string value. Process1 returns once all the
// channels are closed.
func Process1(chans []<-chan string, fn func(int, string)) {
    // Setup
    type item struct {
        int    // index of which channel this came from
        string // the actual string item
    }
    merged := make(chan item)
    var wg sync.WaitGroup
    wg.Add(len(chans))
    for i, c := range chans {
        go func(i int, c <-chan string) {
            // Reads and buffers a single item from `c` before
            // we even know if we can write to `merged`.
            //
            // Go doesn't provide a way to do something like:
            //     merged <- (<-c)
            // atomically, where we delay the read from `c`
            // until we can write to `merged`. The read from
            // `c` will always happen first (blocking as
            // required) and then we block on `merged` (with
            // either the above or the below syntax making
            // no difference).
            for s := range c {
                merged <- item{i, s}
            }
            // If/when this input channel is closed we just stop
            // writing to the merged channel and via the WaitGroup
            // let it be known there is one fewer channel active.
            wg.Done()
        }(i, c)
    }
    // One extra goroutine to watch for all the merging goroutines to
    // be finished and then close the merged channel.
    go func() {
        wg.Wait()
        close(merged)
    }()

    // "select-like" loop
    for i := range merged {
        // Process each value
        fn(i.int, i.string)
    }
}

Выбор отражения:

// Process2 is identical to Process1 except that it uses the reflect
// package to select and read from the input channels which guarantees
// there is only one value "in-flight" (i.e. when `fn` is called only
// a single send on a single channel will have succeeded, the rest will
// be blocked). It is approximately two orders of magnitude slower than
// Process1 (which is still insignificant if their is a significant
// delay between incoming values or if `fn` runs for a significant
// time).
func Process2(chans []<-chan string, fn func(int, string)) {
    // Setup
    cases := make([]reflect.SelectCase, len(chans))
    // `ids` maps the index within cases to the original `chans` index.
    ids := make([]int, len(chans))
    for i, c := range chans {
        cases[i] = reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(c),
        }
        ids[i] = i
    }

    // Select loop
    for len(cases) > 0 {
        // A difference here from the merging goroutines is
        // that `v` is the only value "in-flight" that any of
        // the workers have sent. All other workers are blocked
        // trying to send the single value they have calculated
        // where-as the goroutine version reads/buffers a single
        // extra value from each worker.
        i, v, ok := reflect.Select(cases)
        if !ok {
            // Channel cases[i] has been closed, remove it
            // from our slice of cases and update our ids
            // mapping as well.
            cases = append(cases[:i], cases[i+1:]...)
            ids = append(ids[:i], ids[i+1:]...)
            continue
        }

        // Process each value
        fn(ids[i], v.String())
    }
}

[Полный код на игровой площадке Go .]

Дэйв С
источник
1
Также стоит отметить, что решение goroutines + channels не может делать все selectили reflect.Selectделает. Горутины будут продолжать вращаться, пока не поглотят все из каналов, поэтому нет четкого способа Process1выйти из игры раньше времени. Также есть вероятность проблем, если у вас несколько читателей, поскольку горутины буферизуют один элемент из каждого из каналов, чего не произойдет с select.
Джеймс Хенстридж
@JamesHenstridge, твое первое замечание об остановке неверно. Вы должны принять меры для остановки Process1 точно так же, как и для остановки Process2; например, путем добавления "стоп-канала", который закрывается, когда горутины должны остановиться. Process1 потребуется два случая selectв forцикле вместо более простого for rangeцикла, который сейчас используется. Process2 потребуется вставить другой регистр casesи специальный обработчик этого значения i.
Dave C
Это по-прежнему не решает проблему считывания значений из каналов, которые не будут использоваться в случае ранней остановки.
Джеймс Хенстридж
0

Почему этот подход не работает, если кто-то отправляет события?

func main() {
    numChans := 2
    var chans = []chan string{}

    for i := 0; i < numChans; i++ {
        tmp := make(chan string)
        chans = append(chans, tmp)
    }

    for true {
        for i, c := range chans {
            select {
            case x = <-c:
                fmt.Printf("received %d \n", i)
                go DoShit(x, i)
            default: continue
            }
        }
    }
}
noonex
источник
8
Это спин-петля. При ожидании значения входного канала потребляется весь доступный ЦП. Весь смысл в selectнескольких каналах (без defaultпредложения) заключается в том, что он эффективно ждет, пока хотя бы один будет готов, без вращения.
Dave C
0

Возможно более простой вариант:

Вместо того чтобы иметь массив каналов, почему бы не передать только один канал в качестве параметра функциям, выполняемым в отдельных горутинах, а затем прослушать канал в горутине потребителя?

Это позволяет вам выбирать только один канал в вашем слушателе, делая простой выбор и избегая создания новых горутин для агрегирования сообщений из нескольких каналов?

Фернандо Санчес
источник