Как создать потоки из строки в Node.Js?

177

Я использую библиотеку ya-csv , которая ожидает либо файл, либо поток в качестве входных данных, но у меня есть строка.

Как мне преобразовать эту строку в поток в Node?

pathikrit
источник

Ответы:

27

Начиная с узла 10.17, stream.Readable имеет fromметод, позволяющий легко создавать потоки из любой итерируемой (которая включает литералы массива):

const { Readable } = require("stream")

const readable = Readable.from(["input string"])

readable.on("data", (chunk) => {
  console.log(chunk) // will be called once with `"input string"`
})

Обратите внимание, что, по крайней мере, между 10.17 и 12.3 строка сама по себе является итеративной, поэтому Readable.from("input string")будет работать, но генерировать одно событие на символ. Readable.from(["input string"])будет генерировать одно событие на элемент в массиве (в данном случае один элемент).

Также обратите внимание, что в более поздних узлах (вероятно, 12.3, так как в документации говорится, что функция была изменена тогда), больше нет необходимости заключать строку в массив.

https://nodejs.org/api/stream.html#stream_stream_readable_from_iterable_options

Fizker
источник
Согласно stream.Readable.from , Calling Readable.from (строка) или Readable.from (буфер) не будет иметь строк и буферов, которые будут повторяться для соответствия семантике других потоков по соображениям производительности.
сокр
Виноват. Функция была добавлена ​​в 10.7 и вела себя так, как я описал. Некоторое время назад строки больше не нужно заключать в массивы (начиная с 12.3, он больше не выполняет итерацию каждого символа отдельно).
Физкер
186

Как @substack исправил меня в #node , новый API потоков в Node v10 делает это проще:

const Readable = require('stream').Readable;
const s = new Readable();
s._read = () => {}; // redundant? see update below
s.push('your text here');
s.push(null);

... после чего вы можете свободно трубы он или иным образом передать его предполагаемого потребителя.

Это не так чисто, как возобновить однострочно, но это позволяет избежать дополнительной зависимости.

( Обновление: в версиях от 0.10.26 до v9.2.1, вызов pushнапрямую из приглашения REPL завершится с not implementedисключением, если вы не установили _read. Он не будет зависать внутри функции или скрипта. Если несогласованность делает вас нервный, включи noop.)

Гарт Кидд
источник
6
Из документов (ссылка) : «Все реализации потока с возможностью считывания должны предоставлять _readметод для извлечения данных из базового ресурса».
Феликс Рабе
2
@eye_mew сначала нужно указать ('stream')
Джим Джонс
8
Почему вы nullвставляете в буфер потока?
Допатраман
5
@dopatraman nullсообщает потоку, что он завершил чтение всех данных и закрывает поток
chrishiestand
2
Похоже, вы не должны делать это таким образом. Цитируя документы : « readable.push()Метод предназначен для вызова только читаемыми реализаторами и только из readable._read()метода».
Аксель Раушмайер
127

Не используйте ответ Джо Лисса. В большинстве случаев это будет работать, но в моем случае я потерял 4 или 5 часов поиска ошибок. Для этого не нужны сторонние модули.

НОВЫЙ ОТВЕТ :

var Readable = require('stream').Readable

var s = new Readable()
s.push('beep')    // the string you want
s.push(null)      // indicates end-of-file basically - the end of the stream

Это должен быть полностью совместимый поток для чтения. Смотрите здесь для получения дополнительной информации о том, как правильно использовать потоки.

СТАРЫЙ ОТВЕТ : Просто используйте собственный поток PassThrough:

var stream = require("stream")
var a = new stream.PassThrough()
a.write("your string")
a.end()

a.pipe(process.stdout) // piping will work as normal
/*stream.on('data', function(x) {
   // using the 'data' event works too
   console.log('data '+x)
})*/
/*setTimeout(function() {
   // you can even pipe after the scheduler has had time to do other things
   a.pipe(process.stdout) 
},100)*/

a.on('end', function() {
    console.log('ended') // the end event will be called properly
})

Обратите внимание, что событие 'close' не генерируется (что не требуется для потоковых интерфейсов).

BT
источник
2
@Finn Вам не нужны парены в javascript, если нет никаких аргументов
BT
не используйте "var" в 2018 году! но const
stackdave
30

Просто создайте новый экземпляр streamмодуля и настройте его в соответствии с вашими потребностями:

var Stream = require('stream');
var stream = new Stream();

stream.pipe = function(dest) {
  dest.write('your string');
  return dest;
};

stream.pipe(process.stdout); // in this case the terminal, change to ya-csv

или

var Stream = require('stream');
var stream = new Stream();

stream.on('data', function(data) {
  process.stdout.write(data); // change process.stdout to ya-csv
});

stream.emit('data', 'this is my string');
zemirco
источник
13
Этот код нарушает соглашения о потоках. pipe()должен возвращать целевой поток, по крайней мере.
Грейм
2
Событие окончания не вызывается, если вы используете этот код. Это не хороший способ создать поток, который можно использовать в общем.
BT
12

Изменить: ответ Гарт, вероятно, лучше.

Мой старый текст ответа сохраняется ниже.


Для того, чтобы преобразовать строку в поток, вы можете использовать паузу через поток:

through().pause().queue('your string').end()

Пример:

var through = require('through')

// Create a paused stream and buffer some data into it:
var stream = through().pause().queue('your string').end()

// Pass stream around:
callback(null, stream)

// Now that a consumer has attached, remember to resume the stream:
stream.resume()
Джо Лисс
источник
Я не мог заставить решение ZeMirco работать для моего случая использования, но resumerработало довольно хорошо. Спасибо!
mpen
Предложение @substack возобновилось для меня очень хорошо. Спасибо!
Гарт Кидд
2
Resumer великолепен, но «автоматическое возобновление потока на nextTick» может принести сюрпризы, если вы ожидаете, что сможете передать поток неизвестным потребителям! У меня был некоторый код, который передавал поток контента в файл, если удалось сохранить метаданные в БД. Это была таинственная ошибка, и она удалась, когда запись в БД немедленно вернулась! Позже я реорганизовал вещи, чтобы быть внутри асинхронного блока, и бум, поток никогда не читался. Урок: если вы не знаете, кто будет использовать ваш поток, придерживайтесь метода through (). Pause (). Queue ('string'). End ().
Веселый Роджер
1
Я потратил около 5 часов на отладку своего кода, потому что использовал ответную часть этого ответа. Было бы здорово, если бы вы могли ... удалить это
BT
10

Для этого есть модуль: https://www.npmjs.com/package/string-to-stream

var str = require('string-to-stream')
str('hi there').pipe(process.stdout) // => 'hi there' 
Lori
источник
1
Это игра слов "есть приложение для этого"? ;)
masterxilo
1
Полезная ссылка в комментарии: npmjs.com/package/string-to-stream
Дем Пилафян
К вашему сведению, я пытался использовать эту библиотеку для записи JSON на Google Drive, но у меня это не сработало. Написал статью об этом здесь: medium.com/@dupski/… . Также добавлен в качестве ответа ниже
Рассел Бриггс
6

в кофе-скрипте:

class StringStream extends Readable
  constructor: (@str) ->
    super()

  _read: (size) ->
    @push @str
    @push null

используй это:

new StringStream('text here').pipe(stream1).pipe(stream2)
xinthink
источник
6

Другое решение - передать функцию чтения в конструктор Readable (см. Параметры чтения потока cf doc )

var s = new Readable({read(size) {
    this.push("your string here")
    this.push(null)
  }});

Вы можете после использования s.pipe для примера

Филипп Т.
источник
Какова цель возвращения в конце?
Кирилл Резников
«всегда возвращать что-то (или ничего)», это пример из документации.
Филипп Т.
В JS, если функция не имеет возврата, это эквивалент вашего пустого возврата. Не могли бы вы предоставить ссылку, где вы ее нашли?
Кирилл Резников
ты должен прав. Я сказал, что больше для лучшей практики. Я ничего не хочу вернуть, это не ошибка. Поэтому я удаляю строку.
Филипп Т.
5

Я устал от переучивания этого каждые шесть месяцев, поэтому я просто опубликовал модуль npm, чтобы абстрагировать детали реализации:

https://www.npmjs.com/package/streamify-string

Это ядро ​​модуля:

const Readable = require('stream').Readable;
const util     = require('util');

function Streamify(str, options) {

  if (! (this instanceof Streamify)) {
    return new Streamify(str, options);
  }

  Readable.call(this, options);
  this.str = str;
}

util.inherits(Streamify, Readable);

Streamify.prototype._read = function (size) {

  var chunk = this.str.slice(0, size);

  if (chunk) {
    this.str = this.str.slice(size);
    this.push(chunk);
  }

  else {
    this.push(null);
  }

};

module.exports = Streamify;

strэто то, stringчто должно быть передано конструктору при вызове и будет выведено потоком в виде данных. optionsТипичные параметры, которые могут быть переданы в поток, согласно документации .

Согласно Travis CI, он должен быть совместим с большинством версий узла.

Крис Аллен Лейн
источник
2
Когда я опубликовал это изначально, я не включил соответствующий код, который, как мне сказали, осуждается.
Крис Аллен Лейн
2

Вот простое решение в TypeScript:

import { Readable } from 'stream'

class ReadableString extends Readable {
    private sent = false

    constructor(
        private str: string
    ) {
        super();
    }

    _read() {
        if (!this.sent) {
            this.push(Buffer.from(this.str));
            this.sent = true
        }
        else {
            this.push(null)
        }
    }
}

const stringStream = new ReadableString('string to be streamed...')
Рассел Бриггс
источник
1

JavaScript является утилитарным, поэтому, если вы просто скопируете API читаемого потока , он будет работать просто отлично. Фактически, вы, вероятно, не можете реализовать большинство этих методов или просто оставить их как заглушки; все, что вам нужно реализовать - это то, что использует библиотека. Вы также можете использовать предварительно созданный EventEmitterкласс Node для работы с событиями, так что вам не придется реализовывать addListenerи тому подобное самостоятельно.

Вот как вы можете реализовать это в CoffeeScript:

class StringStream extends require('events').EventEmitter
  constructor: (@string) -> super()

  readable: true
  writable: false

  setEncoding: -> throw 'not implemented'
  pause: ->    # nothing to do
  resume: ->   # nothing to do
  destroy: ->  # nothing to do
  pipe: -> throw 'not implemented'

  send: ->
    @emit 'data', @string
    @emit 'end'

Тогда вы можете использовать это так:

stream = new StringStream someString
doSomethingWith stream
stream.send()
icktoofay
источник
Я получаю это: TypeError: string is not a function at String.CALL_NON_FUNCTION (native) когда я использую это какnew StringStream(str).send()
Pathikrit
То, что JavaScript использует утку, не означает, что вы должны изобретать велосипед. Узел уже обеспечивает реализацию для потоков. Просто создайте новый экземпляр, stream.Readableподобный предложенному @Garth Kidd.
Суким
4
@Sukima: stream.Readable не существовало, когда я написал этот ответ.
icktoofay