src/stream/bufferedread.js

Maintainability

81.69

Lines of code

247

Created with Raphaël 2.1.002550751002019-3-122018-12-32018-8-22018-5-92018-4-32018-2-282017-12-21

2019-10-30
Maintainability: 81.69

Created with Raphaël 2.1.00751502253002019-3-122018-12-32018-8-22018-5-92018-4-32018-2-282017-12-21

2019-10-30
Lines of Code: 247

Difficulty

19.53

Estimated Errors

0.62

Function weight

By Complexity

Created with Raphaël 2.1.0<anonymous>2

By SLOC

Created with Raphaël 2.1.0<anonymous>14
1
/**
2
 * @file Buffered read stream base
3
 * @since 0.2.3
4
 */
5
/*#ifndef(UMD)*/
6
"use strict";
7
/*global _gpfArraySlice*/ // [].slice.call
8
/*global _gpfDefine*/ // Shortcut for gpf.define
9
/*global _gpfEmptyFunc*/ // An empty function
10
/*global _gpfIgnore*/ // Helper to remove unused parameter warning
11
/*global _gpfStreamSecureInstallProgressFlag*/ // Install the progress flag used by _gpfStreamSecureRead and Write
12
/*global _gpfStreamSecureRead*/ // Generate a wrapper to secure multiple calls to stream#read
13
/*exported _GpfStreamBufferedRead*/ // gpf.stream.BufferedRead
14
/*#endif*/
15
 
16
// NOTE: to avoid naming collisions with subclasses, all private members are prefixed with _read
17
 
18
/**
19
 * Token being used to control data flow
20
 *
21
 * @constructor
22
 * @since 0.2.3
23
 */
24
function _GpfStreamBufferedReadToken () {
25
}
26
 
27
_GpfStreamBufferedReadToken.prototype = {
28
 
29
    /**
30
     * Execute the action associated to the token
31
     *
32
     * @method _GpfStreamBufferedReadToken#execute
33
     * @param {gpf.stream.BufferedRead} bufferedRead Instance of gpf.stream.BufferedRead
34
     * @since 0.2.3
35
     */
36
    execute: _gpfEmptyFunc
37
 
38
};
39
 
40
var
41
    /**
42
     * Unique token to signal an error: it ensures the error is triggered at the right time
43
     *
44
     * @type {_GpfStreamBufferedReadToken}
45
     * @since 0.2.3
46
     */
47
    _gpfStreamBufferedReadError = new _GpfStreamBufferedReadToken(),
48
 
49
    /**
50
     * Unique token to signal end of read: it ensures the error is triggered at the right time
51
     *
52
     * @type {_GpfStreamBufferedReadToken}
53
     * @since 0.2.3
54
     */
55
    _gpfStreamBufferedReadEnd = new _GpfStreamBufferedReadToken();
56
 
57
_gpfStreamBufferedReadError.execute = function (bufferedRead) {
58
    bufferedRead._readReject(bufferedRead._readBuffer.shift());
59
};
60
 
61
_gpfStreamBufferedReadEnd.execute = function (bufferedRead) {
62
    bufferedRead._readResolve();
63
};
64
 
65
var
66
    _GpfStreamBufferedRead = _gpfDefine({
67
        $class: "gpf.stream.BufferedRead",
68
 
69
        /**
70
         * Implements IReadableStream by offering methods manipulating a buffer:
71
         * - {@link gpf.stream.BufferedRead#_appendToReadBuffer}
72
         * - {@link gpf.stream.BufferedRead#_completeReadBuffer}
73
         * - {@link gpf.stream.BufferedRead#_setReadError}
74
         *
75
         * Make sure to implement the {@link gpf.interfaces.IFlushableStream} interface
76
         * to complete the buffer.
77
         *
78
         * @constructor gpf.stream.BufferedRead
79
         * @implements {gpf.interfaces.IReadableStream}
80
         * @since 0.2.3
81
         */
82
        constructor: function () {
83
            this._readBuffer = [];
84
        },
85
 
86
        /**
87
         * Read buffer, also contains tokens to signal the end of the read ({@see _GpfStreamBufferedReadToken} and
88
         * {@see _gpfStreamBufferedReadEnd})
89
         * @since 0.2.3
90
         */
91
        _readBuffer: [],
92
 
93
        /**
94
         * Stream to write to
95
         *
96
         * @type {gpf.interfaces.IWritableStream}
97
         * @since 0.2.3
98
         */
99
        _readWriteToStream: null,
100
 
101
        /**
102
         * Read Promise resolve function
103
         *
104
         * @type {Function}
105
         * @since 0.2.3
106
         */
107
        _readResolve: null,
108
 
109
        /**
110
         * Read Promise reject function
111
         *
112
         * @type {Function}
113
         * @since 0.2.3
114
         */
115
        _readReject: null,
116
 
117
        //region Secured writing
118
 
119
        _readDataIsToken: function (data) {
120
            if (data instanceof _GpfStreamBufferedReadToken) {
121
                data.execute(this);
122
                return true;
123
            }
124
            return false;
125
        },
126
 
127
        _readWriteToOutput: function () {
128
            var me = this,
129
                data = me._readBuffer.shift();
130
            if (me._readDataIsToken(data)) {
131
                return Promise.resolve();
132
            }
133
            return me._readWriteToStream.write(data)
134
                .then(function () {
135
                    if (me._readBuffer.length) {
136
                        return me._readWriteToOutput();
137
                    }
138
                    me._readNotWriting = true;
139
                });
140
        },
141
 
142
        /**
143
         * Critical section to avoid writing while writing
144
         * @since 0.2.3
145
         */
146
        _readNotWriting: true,
147
 
148
        /**
149
         * Triggers write only if no write is in progress
150
         * @since 0.2.3
151
         */
152
        _readSafeWrite: function () {
153
            var me = this;
154
            if (me._readNotWriting) {
155
                me._readNotWriting = false;
156
                me._readWriteToOutput()
157
                    .then(undefined, function (reason) {
158
                        me._readReject(reason);
159
                    });
160
            }
161
        },
162
 
163
        /**
164
         * Check if data exists and trigger write consequently
165
         * @since 0.2.3
166
         */
167
        _readCheckIfData: function () {
168
            if (this._readBuffer.length) {
169
                this._readSafeWrite();
170
            }
171
        },
172
 
173
        /**
174
         * Check if a read is in progress and trigger write consequently
175
         * @since 0.2.3
176
         */
177
        _readCheckIfOutput: function () {
178
            if (this._readWriteToStream) {
179
                this._readCheckIfData();
180
            }
181
        },
182
 
183
        //endregion
184
 
185
        //region Protected interface for sub classes
186
 
187
        /**
188
         * Adds data to the read buffer
189
         *
190
         * @param {...*} data Data to write
191
         * @gpf:chainable
192
         * @protected
193
         * @since 0.2.3
194
         */
195
        _appendToReadBuffer: function (data) {
196
            _gpfIgnore(data);
197
            this._readBuffer = this._readBuffer.concat(_gpfArraySlice(arguments));
198
            this._readCheckIfOutput();
199
            return this;
200
        },
201
 
202
        /**
203
         * Ends the read without any error
204
         *
205
         * @protected
206
         * @since 0.2.3
207
         */
208
        _completeReadBuffer: function () {
209
            this._readBuffer.push(_gpfStreamBufferedReadEnd);
210
            this._readCheckIfOutput();
211
        },
212
 
213
        /**
214
         * Ends the read with an error
215
         *
216
         * @param {*} reason Rejection reason
217
         * @protected
218
         * @since 0.2.3
219
         */
220
        _setReadError: function (reason) {
221
            this._readBuffer.push(_gpfStreamBufferedReadError, reason);
222
            this._readCheckIfOutput();
223
        },
224
 
225
        //endregion
226
 
227
        //region gpf.interfaces.IReadableStream
228
 
229
        /**
230
         * @gpf:sameas gpf.interfaces.IReadableStream#read
231
         * @since 0.2.3
232
         */
233
        read: _gpfStreamSecureRead(function (output) {
234
            var me = this; //eslint-disable-line no-invalid-this
235
            me._readWriteToStream = output;
236
            me._readCheckIfData();
237
            return new Promise(function (resolve, reject) {
238
                me._readResolve = resolve;
239
                me._readReject = reject;
240
            });
241
        })
242
 
243
        //endregion
244
 
245
    });
246
 
247
_gpfStreamSecureInstallProgressFlag(_GpfStreamBufferedRead);