src/stream/nodejs.js

Maintainability

80.20

Lines of code

202

Created with Raphaël 2.1.002550751002019-3-122018-12-32018-8-22018-5-92018-4-32018-2-282017-12-212017-11-12017-6-72017-4-29

2019-10-30
Maintainability: 80.2

Created with Raphaël 2.1.00751502253002019-3-122018-12-32018-8-22018-5-92018-4-32018-2-282017-12-212017-11-12017-6-72017-4-29

2019-10-30
Lines of Code: 202

Difficulty

20.21

Estimated Errors

0.56

Function weight

By Complexity

Created with Raphaël 2.1.0<anonymous>3

By SLOC

Created with Raphaël 2.1.0<anonymous>19
1
/**
2
 * @file NodeJS specific stream implementation
3
 * @since 0.1.9
4
 */
5
/*#ifndef(UMD)*/
6
"use strict";
7
/*global _gpfDefine*/ // Shortcut for gpf.define
8
/*global _gpfEmptyFunc*/ // An empty function
9
/*global _gpfStreamSecureRead*/ // Generate a wrapper to secure multiple calls to stream#read
10
/*global _gpfStreamSecureWrite*/ // Generates a wrapper to secure multiple calls to stream#write
11
/*exported _GpfNodeBaseStream*/ // gpf.node.BaseStream
12
/*exported _GpfNodeReadableStream*/ // gpf.node.ReadableStream
13
/*exported _GpfNodeWritableStream*/ // gpf.node.WritableStream
14
/*#endif*/
15
 
16
var
17
    _GpfNodeBaseStream = _gpfDefine({
18
        $class: "gpf.node.BaseStream",
19
 
20
        /**
21
         * Base class wrapping NodeJS streams
22
         *
23
         * @param {Object} stream NodeJS stream object
24
         * @param {Function} [close] Close handler
25
         *
26
         * @constructor gpf.node.BaseStream
27
         * @since 0.1.9
28
         */
29
        constructor: function (stream, close) {
30
            this._stream = stream;
31
            if (typeof close === "function") {
32
                this._close = close;
33
            }
34
            stream.on("error", this._onError.bind(this));
35
        },
36
 
37
        /**
38
         * Function to be called when the stream is closed
39
         * @type {Function}
40
         * @since 0.1.9
41
         */
42
        _close: _gpfEmptyFunc,
43
 
44
        /**
45
         * Close the stream
46
         *
47
         * @return {Promise} Resolved when closed
48
         * @since 0.1.9
49
         */
50
        close: function () {
51
            return this._close();
52
        },
53
 
54
        //region Error handling
55
 
56
        /**
57
         * NodeJS stream object
58
         * @since 0.1.9
59
         */
60
        _stream: null,
61
 
62
        /**
63
         * The stream has an invalid state and can't be used anymore
64
         * @since 0.1.9
65
         */
66
        _invalid: false,
67
 
68
        /**
69
         * Current promise rejection callback
70
         * @type {Function}
71
         * @since 0.1.9
72
         */
73
        _reject: gpf.Error.invalidStreamState,
74
 
75
        /**
76
         * If the stream has an invalid state, the exception {@see gpf.Error.InvalidStreamState} is thrown
77
         *
78
         * @throws {gpf.Error.InvalidStreamState}
79
         * @since 0.1.9
80
         */
81
        _checkIfValid: function () {
82
            if (this._invalid) {
83
                gpf.Error.invalidStreamState();
84
            }
85
        },
86
 
87
        /**
88
         * Bound to the error event of the stream, reject the current promise if it occurs.
89
         *
90
         * @param {*} error Stream error
91
         * @since 0.1.9
92
         */
93
        _onError: function (error) {
94
            this._invalid = true;
95
            this._reject(error);
96
        }
97
 
98
        //endregion
99
 
100
    }),
101
 
102
    /**
103
     * Wraps a readable stream from NodeJS into a IReadableStream
104
     *
105
     * @param {Object} stream NodeJS stream object
106
     * @param {Function} [close] Close handler
107
     *
108
     * @class gpf.node.ReadableStream
109
     * @extends gpf.node.BaseStream
110
     * @implements {gpf.interfaces.IReadableStream}
111
     * @since 0.1.9
112
     */
113
    _GpfNodeReadableStream = _gpfDefine({
114
        $class: "gpf.node.ReadableStream",
115
        $extend: "gpf.node.BaseStream",
116
 
117
        //region gpf.interfaces.IReadableStream
118
 
119
        /**
120
         * @gpf:sameas gpf.interfaces.IReadableStream#read
121
         * @since 0.1.9
122
         */
123
        read: _gpfStreamSecureRead(function (output) {
124
            var me = this, //eslint-disable-line no-invalid-this
125
                stream = me._stream;
126
            return new Promise(function (resolve, reject) {
127
                me._reject = reject;
128
                me._checkIfValid();
129
                stream
130
                    .on("data", me._onData.bind(me, output))
131
                    .on("end", function () {
132
                        me._invalid = true;
133
                        resolve();
134
                    });
135
            });
136
        }),
137
 
138
        //endregion
139
 
140
        /**
141
         * Stream 'data' event handler
142
         *
143
         * @param {gpf.interfaces.IWritableStream} output Output stream
144
         * @param {Object} chunk Buffer
145
         * @since 0.1.9
146
         */
147
        _onData: function (output, chunk) {
148
            var me = this,
149
                stream = me._stream;
150
            stream.pause();
151
            output.write(chunk)
152
                .then(function () {
153
                    stream.resume();
154
                }, me._reject);
155
        }
156
 
157
    }),
158
 
159
    /**
160
     * Wraps a writable stream from NodeJS into a IWritableStream
161
     *
162
     * @param {Object} stream NodeJS stream object
163
     * @param {Function} [close] Close handler
164
     *
165
     * @class gpf.node.WritableStream
166
     * @extends gpf.node.BaseStream
167
     * @implements {gpf.interfaces.IWritableStream}
168
     * @since 0.1.9
169
     */
170
    _GpfNodeWritableStream = _gpfDefine({
171
        $class: "gpf.node.WritableStream",
172
        $extend: "gpf.node.BaseStream",
173
 
174
        //region gpf.interfaces.IWritableStream
175
 
176
        /**
177
         * @gpf:sameas gpf.interfaces.IWritableStream#write
178
         * @since 0.1.9
179
         */
180
        write: _gpfStreamSecureWrite(function (buffer) {
181
            var me = this, //eslint-disable-line no-invalid-this
182
                stream = me._stream;
183
            return new Promise(function (resolve, reject) {
184
                var noDrain;
185
                me._reject = reject;
186
                me._checkIfValid();
187
                noDrain = stream.write(buffer, function (error) {
188
                    if (!error && noDrain) {
189
                        resolve();
190
                    }
191
                });
192
                if (!noDrain) {
193
                    stream.once("drain", function () {
194
                        resolve();
195
                    });
196
                }
197
            });
198
        })
199
 
200
        //endregion
201
 
202
    });