Обработка ошибок в потоках node.js

164

Как правильно обрабатывать ошибки в потоках? Я уже знаю, что есть событие «ошибка», которое вы можете прослушать, но я хочу знать некоторые подробности о сколь угодно сложных ситуациях.

Для начала, что вы делаете, когда хотите создать простую цепочку труб:

input.pipe(transformA).pipe(transformB).pipe(transformC)...

И как правильно создать одно из этих преобразований, чтобы ошибки обрабатывались правильно?

Более связанные вопросы:

  • когда происходит ошибка, что происходит с событием 'end'? Разве его никогда не уволят? Это иногда увольняют? Зависит ли это от преобразования / потока? Какие стандарты здесь?
  • Есть ли механизмы для распространения ошибок через трубы?
  • домены решают эту проблему эффективно? Примеры были бы хорошими.
  • ошибки, возникающие в результате событий 'error', имеют следы стека? Иногда? Никогда? Есть ли способ получить один из них?
BT
источник
1
Это не тривиально. Promiseрамки делают это намного проще
slezica
27
К сожалению, обещания / фьючерсы не могут реально помочь вам с потоками ...
BT

Ответы:

222

преобразование

Потоки преобразования являются как читаемыми, так и записываемыми, и поэтому являются действительно хорошими «средними» потоками. По этой причине их иногда называют throughпотоками. В этом смысле они похожи на дуплексный поток, за исключением того, что они предоставляют удобный интерфейс для манипулирования данными, а не просто для их отправки. Цель потока преобразования состоит в том, чтобы манипулировать данными при их передаче по потоку. Вы можете, например, сделать несколько асинхронных вызовов или получить пару полей, переназначить некоторые вещи и т. Д.


Где вы могли бы поместить поток преобразования


Как создать поток преобразования смотрите здесь и здесь . Все, что вам нужно сделать, это:

  1. включить модуль потока
  2. создавать (или наследовать) класс Transform
  3. реализовать _transformметод, который принимает (chunk, encoding, callback).

Кусок - это ваши данные. Большую часть времени вам не нужно беспокоиться о кодировании, если вы работаете в objectMode = true. Обратный вызов вызывается, когда вы закончите обработку чанка. Этот кусок затем передается следующему потоку.

Если вам нужен хороший вспомогательный модуль, который позволит вам действительно легко выполнять потоковую передачу , я предлагаю использовать via2 .

Для обработки ошибок, продолжайте читать.

труба

В цепочке труб обработка ошибок действительно нетривиальна. Согласно этой теме .pipe () не создан для пересылки ошибок. Так что-то вроде ...

var a = createStream();
a.pipe(b).pipe(c).on('error', function(e){handleError(e)});

... будет только слушать ошибки в потоке c. Если произошла ошибка a, она не будет передана и, на самом деле, выдастся. Чтобы сделать это правильно:

var a = createStream();
a.on('error', function(e){handleError(e)})
.pipe(b)
.on('error', function(e){handleError(e)})
.pipe(c)
.on('error', function(e){handleError(e)});

Теперь, хотя второй способ более многословен, вы можете, по крайней мере, сохранить контекст, где происходят ваши ошибки. Обычно это хорошая вещь.

Одна библиотека, которую я нахожу полезной, хотя, если у вас есть случай, когда вы хотите захватить только ошибки в месте назначения, и вас не волнует, где это произошло, это поток событий .

конец

Когда происходит событие ошибки, событие завершения не будет запущено (явно). Излучение события ошибки завершит поток.

домены

По моему опыту, домены работают очень хорошо большую часть времени. Если у вас есть необработанное событие ошибки (т.е. выдается ошибка в потоке без прослушивателя), сервер может дать сбой. Теперь, как указано в приведенной выше статье, вы можете заключить поток в домен, который должен правильно отлавливать все ошибки.

var d = domain.create();
 d.on('error', handleAllErrors);
 d.run(function() {
     fs.createReadStream(tarball)
       .pipe(gzip.Gunzip())
       .pipe(tar.Extract({ path: targetPath }))
       .on('close', cb);
 });

Прелесть доменов в том, что они сохранят следы стека. Хотя Event-Stream также хорошо справляется с этой задачей.

Для дальнейшего чтения, ознакомьтесь со справочником . Довольно подробно, но очень полезно и дает отличные ссылки на множество полезных модулей.

mshell_lauren
источник
Это действительно отличная информация, спасибо! Не могли бы вы добавить немного о том, почему вы хотите создать поток преобразования и почему это относится к моему вопросу?
BT
Конечно - хотя я понял, что это связано, так как вы спросили об этом; )
mshell_lauren
1
Отправьте сообщение об этом через isaccs в Google Groups- nodejs: groups.google.com/d/msg/nodejs/lJYT9hZxFu0/L59CFbqWGyYJ (не grokbase)
jpillora
Этот ответ написан отлично. Я собираюсь изучить предложение домена - похоже, это то решение, которое я искал.
точка с запятой
12
Обратите внимание, что вам не нужно оборачивать .on('error')обработчик в анонимную функцию, т. a.on('error', function(e){handleError(e)})a.on('error', handleError)
Е. Это
28

Если вы используете узел> = v10.0.0, вы можете использовать stream.pipeline и stream.finished .

Например:

const { pipeline, finished } = require('stream');

pipeline(
  input, 
  transformA, 
  transformB, 
  transformC, 
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
});


finished(input, (err) => {
  if (err) {
    console.error('Stream failed', err);
  } else {
    console.log('Stream is done reading');
  }
});

Смотрите этот github PR для дальнейшего обсуждения.

shusson
источник
1
Почему вы используете finished, когда pipelineуже есть обратный вызов?
Маркос Перейра
4
Вы можете по-разному обрабатывать ошибки между конвейером и отдельными потоками.
Шуссон
25

домены устарели. они тебе не нужны.

в этом вопросе различия между преобразованием или записью не так важны.

Ответ mshell_lauren великолепен, но в качестве альтернативы вы также можете явно прослушивать событие ошибки в каждом потоке, который, по вашему мнению, может быть ошибочным. и повторно использовать функцию обработчика, если вы предпочитаете.

var a = createReadableStream()
var b = anotherTypeOfStream()
var c = createWriteStream()

a.on('error', handler)
b.on('error', handler)
c.on('error', handler)

a.pipe(b).pipe(c)

function handler (err) { console.log(err) }

это предотвращает печально известное неперехваченное исключение, если один из этих потоков запускает событие ошибки

Бент Кардан
источник
3
LOL весело проводит время, обрабатывая 3 разных события ошибок, и молитесь, чтобы тот, кто написал 3 разных потоковых библиотеки, правильно реализовал обработку ошибок
Александр Миллс
4
@Alex Mills 1) В чем заключается проблема обработки трех событий и почему они «разные», когда их тип одинаков - errorможно также согласиться с тем фактом, что каждое событие отличается; 2) какие потоковые библиотеки написаны выше, кроме родной функциональности Node.js? и 3) почему имеет значение, как они обрабатывают события внутри, когда это, очевидно, позволяет кому-либо присоединять дополнительные обработчики ошибок поверх того, что уже есть?
amn
10

Ошибки всей цепочки можно распространить на самый правый поток с помощью простой функции:

function safePipe (readable, transforms) {
    while (transforms.length > 0) {
        var new_readable = transforms.shift();
        readable.on("error", function(e) { new_readable.emit("error", e); });
        readable.pipe(new_readable);
        readable = new_readable;
    }
    return readable;
}

который можно использовать как:

safePipe(readable, [ transform1, transform2, ... ]);
Глеба
источник
5

.on("error", handler)заботится только об ошибках потока, но если вы используете пользовательские потоки преобразования, .on("error", handler)не улавливайте ошибки, происходящие внутри_transform функции. Таким образом, можно сделать что-то вроде этого для управления потоком приложений:

thisКлючевое слово в _transformфункции относится к Streamсамому себе, который является EventEmitter. Таким образом, вы можете использовать, try catchкак показано ниже, чтобы перехватить ошибки, а затем передать их в пользовательские обработчики событий.

// CustomTransform.js
CustomTransformStream.prototype._transform = function (data, enc, done) {
  var stream = this
  try {
    // Do your transform code
  } catch (e) {
    // Now based on the error type, with an if or switch statement
    stream.emit("CTError1", e)
    stream.emit("CTError2", e)
  }
  done()
}

// StreamImplementation.js
someReadStream
  .pipe(CustomTransformStream)
  .on("CTError1", function (e) { console.log(e) })
  .on("CTError2", function (e) { /*Lets do something else*/ })
  .pipe(someWriteStream)

Таким образом, вы можете хранить свою логику и обработчики ошибок отдельно. Кроме того, вы можете обрабатывать только некоторые ошибки и игнорировать другие.

ОБНОВЛЕНИЕ
Альтернатива: наблюдаемые RXJS

Викас Гаутам
источник
4

Используйте пакет multiipe для объединения нескольких потоков в один дуплексный поток. И обрабатывать ошибки в одном месте.

const pipe = require('multipipe')

// pipe streams
const stream = pipe(streamA, streamB, streamC) 


// centralized error handling
stream.on('error', fn)
Сергей Савенко
источник
1

Используйте шаблон Node.js, создав механику потока Transform и вызвав его обратный вызов doneс аргументом для распространения ошибки:

var transformStream1 = new stream.Transform(/*{objectMode: true}*/);

transformStream1.prototype._transform = function (chunk, encoding, done) {
  //var stream = this;

  try {
    // Do your transform code
    /* ... */
  } catch (error) {
    // nodejs style for propagating an error
    return done(error);
  }

  // Here, everything went well
  done();
}

// Let's use the transform stream, assuming `someReadStream`
// and `someWriteStream` have been defined before
someReadStream
  .pipe(transformStream1)
  .on('error', function (error) {
    console.error('Error in transformStream1:');
    console.error(error);
    process.exit(-1);
   })
  .pipe(someWriteStream)
  .on('close', function () {
    console.log('OK.');
    process.exit();
  })
  .on('error', function (error) {
    console.error(error);
    process.exit(-1);
   });
Дерек
источник
Хм, так вы говорите, что если бы все потоковые процессоры были построены так, ошибки будут распространяться?
BT
-2

Try catch не будет регистрировать ошибки, которые произошли в потоке, потому что они генерируются после того, как вызывающий код уже завершился. Вы можете обратиться к документации:

https://nodejs.org/dist/latest-v10.x/docs/api/errors.html

Mehran
источник
Спасибо, но это не отвечает на вопрос вообще.
BT
Предоставление мне 40 страниц документа не помогает. Как вы думаете, что я должен упомянуть на этой гигантской странице? Кроме того, вы читали мой вопрос? Мой вопрос не в том, "пытаюсь ли поймать работу с потоками?" Я уже хорошо знаю, что try-catch не будет работать с асинхронными ошибками, например, с конвейерами потоковой обработки.
BT