Прежде чем говорить о том, почему потоки – мой любимый инструмент в мире, давайте посмотрим на пару сценариев.
Сценарий
Проблема : Помощь! Мне нужно загрузить процесс файла 300 ГБ, затем вставьте каждый элемент в SQL!
Решение : Ручьи
Сценарий 2.
Проблема : Я сопоставил массив около 300 раз и должен вернуть окончательный результат
Решение : Ручьи
Сценарий 3.
Проблема : Я хочу повторить буфер, для которого я не знаю, когда я получу следующий предмет. И я не особенно хочу создавать сумасшедшее логическое дерево, чтобы ждать его
Решение : Ручьи
Сценарий 4.
Проблема : У меня есть ряд отображения и записи функций, которые мне нужно запустить на бесконечной функции генератора, массив и событий
Решение : Ручьи
Я думаю, что вы получите мою точку зрения. Во всех сценариях я описал, ручьи могут быть решением.
Потоки Является ли одна из вещей, без которых я не могу жить. Метод трубы является идеальным примером того, как поток делает все правильно.
//streams at their prime var rawData = createRawStream(); var digestRaw = createRawToJSValues(); var handleData = createJSONMapper(); var prepareForOutput = createOutputPreparation(); var output = createWriteStream(); rawData.pipe(digestRaw).pipe(handleData).pipe(prepareForOutput).pipe(output);
Немного истории
Здесь Страница Википедии на поток обработки парадигмы. На странице Wiki авторы упомянули GPU и другую высокую процедуру интенсивности. Хотя это понятно причину того, почему люди могут использовать потоки, трудно отслеживать, почему Node.js решил реализовать свои реализации FS в качестве потоков. Я подозреваю, что причина в значительной степени зависит от Unix трубы Также он использует синтаксис труб.
Синтаксис труб может быть весело играть с иногда, но в основном совершенно бесполезно, если вы не просто экспериментируете с ним. Для браузеров можно предположить, что ближайшая реализация была Mediastreams Отказ Однако они не были стандартизированы. Веб-аудио API , не используя Node.js Stream API, безусловно, реализует аналогичный. Их веб-API использует Аудиосорченод
и AudiodeStinationnoDe
вместо Readstreams
и WriteStreams
Отказ Надеюсь, у нас будет стандартизирован Потоки Доступно в браузере.
Тонкие аспекты
Толчок и тянуть источники потоков
Существует две основные формы источников потоков. Источник потягивания позволяет потоковым потребителям (запись или преобразование) для извлечения предметов, поскольку они им нужны. Нажимной поток отправляет данные по трубе без ухода. Несмотря на то, что в большинстве случаев это не будет важно для вас, если вы начнете попасть в ситуации, когда рекваром является проблемой, или когда «живая» важнее, чем обеспечение «успеха», он скоро станет очевидным, что эти два для.
Обработка, как вам это нужно
FskcreateSeadstream
Это отличный пример для обработки предметов, сколько вам нужно. Здесь мы можем напрямую указать, сколько байтов должно быть пропущено и как использовать документированные Читать метод взять куски за раз. Важно отметить, что чтение не рекомендуется. Однако такая обработка может быть сделана альтернативными способами. Если у вас есть источник, который позволяет вам читать куски одновременно через метод, вот способ реализации его.
const Readable = require('stream').Readable; class OnDemandMinimalChunks extends Readable { constructor(chunkSize, readFn, initialOffset, limit) { super({ allowHalfOpen: true }); this.chunkSize = chunkSize; this.offset = initialOffset; this.limit = limit; this.readFn = readDn; this.isReading = false; } _read(size) { if(this.isReading){ return false; } this.isReading = true; readFn( this.offset, Math.min(this.offset + this.chunkSize, this.limit) ).then((value) =>{ this.push(value); this.offset += this.chunkSize; if(this.offset >= this.limit){ this.push(null); } this.isReading = false; }); } }
Объединение нескольких источников
Это может звучать как милый трюк, но у него очень реальные возможности. Объединенный поток Может выступать в качестве коллективного регистратора, как Emitter Image Emagiter или событие присоединяйтесь к диффатам буферизованной строки из нескольких источников. Заманчиво списать объединенный поток как смешно, но я уверен, что вы поймете, когда вам это нужно. Хотя объединение всего сразу может быть в порядке, вы также можете быть заинтересованы в Заказанные слияния Отказ
Обеспечение данных не теряется
Поскольку мы не знаем, когда начнется чтение, если вы не используете его синкравно, вообще это будет хорошая идея, чтобы трусить его на Passthrough Особенно, если вы создаете потребителя асинронно. Пока Readstream делает буфер , вам может потребоваться приостановить его, чтобы предотвратить любую потерю. Я лично столкнулся с проблемами, где были потеряны начальные данные.
Вот пример того, что я реализовал в прошлом
const Duplex = require('stream').Duplex; class StreamBuffer extends Duplex { constructor() { super({ allowHalfOpen: true }); this.bufferedData = Buffer.alloc(0); this.waiting = false; } _write(chunk, encoding, callback) { var leftover = false; if(this.waiting !== false){ if(this.waiting > chunk.size()){ this.push(Buffer.from(chunk, encoding)); } else { var temp = Buffer.from(chunk, encoding); this.push(temp.slice(0, this.waiting)) leftover = temp.slice(this.waiting); } } else { leftover = Buffer.from(chunk, encoding) } if(leftover !== false) { this.bufferedData = Buffer.concat( [this.buffer, leftover], [this.buffer.length + leftover.length] ); } callback(); } _flush(cb){ this.hasEnded = true; if(this.waiting !== false){ return this.push(null); } cb(); } _read(size) { if(this.bufferedData.length === 0){ if(this.hasEnded){ return this.push(null); } this.waiting += size return false; } if(size >= this.bufferedData.length){ this.waiting = size - this.bufferedData.length; this.push(this.bufferedData); this.bufferedData = Buffer.alloc(0); return false; } this.push(this.bufferedData.slice(0, size)); this.bufferedData = this.bufferedData.slice(size); } }
Универсальность дуплекса
Дуплекс – это как aDestream, так и поток записи. Конечно, это один звучит потрясающе и денди, но возможности не ограничиваются «преобразовывать потоки».
Только обработка “последних”
Во-первых, вам нужно «использовать» свои потоки, чтобы вы могли знать, как поздно слишком поздно. Но преобразование данных не означает, что отображает его или фильтруют его. Это также может означать контроль потока. Это включает в себя Lifo, а также только обработку последних найденных. Вы можете адаптировать его, чтобы стать несколько
const Duplex = require('stream').Duplex; class OnlyLatestBuffer extends Duplex { constructor(limit, fastForward) { super({ allowHalfOpen: true }); limit = limit || 1; this.limit = limit; this.bufferedItems = []; this.waiting = false; } _write(chunk, encoding, callback) { if(this.waiting !== false){ this.waiting = false; this.push(chunk); }else{ if(this.bufferedItems.length === this.limit){ this.bufferedItems.shift(); } this.bufferedItems.push(chunk); } callback(); } _flush(cb){ this.hasEnded = true; if(this.waiting !== false){ this.push(null); } cb(); } _read(size) { if(this.bufferedItems.length === 0){ if(this.hasEnded){ return this.push(null); } this.waiting = true; return false; } this.push(this.bufferedItems.shift()); } }
Доверие
Часто дуплекс будет действовать как IO к внешнему источнику – отличный пример подключения TCP – это отличный. Моя любимая библиотека Websocket – Web-Driver-Node Просто потому, что это не сервер Blackbox, и вам не нужно просто обернуть его вокруг тела и розетки в конструкторе. Вместо этого вы используете все данные TCP на драйвер Websocket, который преобразует и испускает данные в качестве событий сообщений. Он также преобразует и отправляет все данные по сокете, вы направляете его, чтобы отправить. Я особенно ценю чистоту в реализации, даже если это не самый быстрый.
var http = require('http'), websocket = require('websocket-driver'); var server = http.createServer(); server.on('upgrade', function(request, socket, body) { if (!websocket.isWebSocket(request)) return; var driver = websocket.http(request); driver.io.write(body); socket.pipe(driver.io).pipe(socket); driver.messages.on('data', function(message) { console.log('Got a message', message); }); driver.start(); });
Трансформировать ручку
Хотя большинство преобразований реализуются с ожиданием, что они произойдут в контексте, который они были созданы и запускаются, можно на самом деле позволить преобразованию обрабатывать другие API или даже конфигурации. Простой пример – Преобразовать API Отказ Здесь вы можете позвонить this.push (товар)
и/или обратный вызов в любое время. Вы также можете запустить функцию Offsite через вызов AJAX.
new Transform({ transform(chunk, encoding, callback){ fetch("domain.com/some/api", { method: post, body: chunk }) .then(function(resp){ if(resp.status !== 200){ return Promise.resolve(resp.string()).then(function(str){ throw str; }) } return Promise.resolve(resp.string()) }).then(callback.bind(void 0, void 0), callback); } })
Это также может быть повторно поощно, чтобы нацелиться на серию работников или даже преобразование трубопроводов по требованию, то есть Узел красный Отказ
Алиас репипгинг
Еще одна аккуратная вещь, которую вы можете сделать с трансляциями, состоит в том, что они могут вытащить результаты возвращаться к себе, пока не будут результаты, которые не могут быть отменены. Реализация для этого будет система псевдонимов. В системе псевдонима «Некоторое имя»
может быть псевдоним для «ближе к реальному»
, что может быть псевдоним для "/Некоторые/разрядно/путь"
Отказ Тем не менее, если вы просто трубы его два или три раза, вам пропустили все возможные функциональные возможности.
var alias_map = {}; new Transform({ readableObjectMode: true, writableObjectMode: true, transform(chunk, encoding, callback){ if(chunk in alias_map){ this.write(chunk); } else{ this.push(chunk) } callback(); } })
Пишет
Обеспение финишной линии
В то время как поток к обещанию обрабатывает оба потока записи и чтение потоков, вы можете заинтересовать только в том, как пишет поток обрабатывает его (если вам вообще не любит потоки … Но я надеюсь, что вы влюбитесь в это! ). Это позволяет для обещаний и ручьев к сосуществу.
getReadableStream().then(function(readable){ readable.pipe(transform).pipe(transform).pipe(writable) return streamToPromise(writable); }).then(function(){ console.log("done"); })
Поток Ui
Реагировать, безусловно, замечательная система. Однако для того, чтобы использовать потоки с реагированием, ваша лучшая ставка – использовать Redux Отказ В противном случае вы также можете включить письменный поток в обновленное списку, который излучает событие, которое каждый из них записи и записи и заканчивается на компоненте.
// How it should be class ComponentWritable extends Writable { constructor(component, key){ super({ objectMode: true }); this.component = component; this.key = key; this.list = []; component.on("willUnmount", ()=>{ this.end(); }); component.on("willMount", ()=>{ this.component.setState(this.createState()) }); } _write(chunk, encoding, callback){ this.list = this.list.concat([ chunk ]); this.component.on("didUpdate", function(){ setTimeout(callback, 0); }); this.component.setState(this.createState()); } createState(){ return { [this.key]: this.list } } } // how it be function mountWritable(component, stateKey){ if(!component.writableStreams){ component.writableStreams = {}; } component.writableStreams[stateKey] = new Writable({ objectMode: true, write: (chunk, encoding, callback)=>{ this.setState({ [stateKey]: component.writableStreams[stateKey].val.concat([ chunk ]) }); component.writableStreams[stateKey].cb = callback; } ); component.writableStreams[stateKey].val = []; component.writableStreams[stateKey].cb = false; } function handleUnmount(component){ component.writableStreams && Object.keys(component.writableStreams).forEach(function(key){ var stream = component.writableStreams[key] var cb = stream.cb; stream.cb = false; cb && cb(); }); } function handleUpdate(component){ component.writableStreams && Object.keys(component.writableStreams).forEach(function(key){ var stream = component.writableStreams[key] stream.end(); delete component.writableStreams[key]; }); } class WritableComponent extends Component { componentWillMount(){ var stateKey = "some-key"; mountWritable(this, stateKey); } componentWillUnmount(){ handleUnmount(this); } componentDidUpdate(){ handleUpdate(this) } }
Держать его простой
RXJS.
Это кажется довольно стандартным для угловых. Пока я не смотрел на это слишком глубоко, это в моем списке «Стандарты, чтобы учиться».
Highland Js.
Highland был в развитии в течение длительного времени; Однако я не решил укусить пулю и использовать его для замены потоков.
Через2
При создании потоков это весело и все, удерживая вещи чистыми без создания классов часто трудно. Через2 позволяет делать вещи быстро и легко.
События Stream.
Это библиотека, которая преобразует что-то WG EventDispatcher
в читаемые потоки.