Home Reference Source Test

Tutorials

Stream processing with text8 data

Input raw text8 corpus file and return the occurent number of each tokens in corpus.

import * as Preprocessing from 'causal-net.preprocessing';
import * as Log from 'causal-net.log';
import * as Utils from 'causal-net.utils';
import * as Storage from 'causal-net.storage';
import * as fs from 'fs';
var { indexDBStorage } = Storage;
var { stream } = Utils;
var { termLogger } = Log;
var { nlpPreprocessing, tokenizerEN } = Preprocessing;
'use strict'

create stream process

var remainingChars = '', wordFreqCount = {}, lineIndex = 0;
function tranformFn(chunkData, chunkEncoding, afterTransformFn){
    let sampleText = chunkData + remainingChars;
    let sampleLines = sampleText.split('\n');
    let transformedData = [];
    for(let line of sampleLines){
        let tokens = tokenizerEN.tokenize(line);
        wordFreqCount = nlpPreprocessing.wordFreqCount(tokens, wordFreqCount);
        lineIndex += 1;
        transformedData.push({lineIndex, tokens});
    }
    afterTransformFn(null, transformedData);
};
var transformer = stream.makeTransform(tranformFn);

function writeTokens(transformedData, chunkEncoding, afterWriteFn){
    const WriteTokensToFile = async (transformedData)=>{
        for(let {lineIndex, tokens} of transformedData){
//             console.log({lineIndex});
            await indexDBStorage.writeFile(`/corpus/line_${lineIndex}`, JSON.stringify(tokens));
        }
    }
    WriteTokensToFile(transformedData).then(()=>{
        afterWriteFn();
    })
}
var writer = stream.makeWritable(writeTokens);
var characterCount = 0;
(async ()=>{
    var corpusReader = fs.createReadStream('../datasets/text8/text8.txt');
    const CorpusStreamer = stream.makePipeline([corpusReader, transformer, writer], (data)=>{
        characterCount += data.length;
    });
    termLogger.groupBegin('stream performance');
    let result = await CorpusStreamer;
    termLogger.groupEnd()
    termLogger.log({ result, characterCount } );
})();
stream performance: begin at Fri Mar 15 2019 16:42:45 GMT+0700 (Indochina Time)
stream performance: end after 8514 (ms)
{ result: 'Success', characterCount: 100000000 }
termLogger.log({'show 100 items': Object.entries(wordFreqCount).slice(0,100)});
{ 'show 100 items':
   [ [ 'anarchism', 303 ],
     [ 'originated', 572 ],
     [ 'as', 131819 ],
     [ 'a', 325895 ],
     [ 'term', 7220 ],
     [ 'of', 593676 ],
     [ 'abuse', 563 ],
     [ 'first', 28809 ],
     [ 'used', 22736 ],
     [ 'against', 8431 ],
     [ 'early', 10172 ],
     [ 'working', 2270 ],
     [ 'class', 3412 ],
     [ 'radicals', 116 ],
     [ 'including', 9630 ],
     [ 'the', 1061363 ],
     [ 'diggers', 25 ],
     [ 'english', 11868 ],
     [ 'revolution', 2029 ],
     [ 'and', 416615 ],
     [ 'sans', 68 ],
     [ 'culottes', 6 ],
     [ 'french', 8736 ],
     [ 'whilst', 481 ],
     [ 'is', 183158 ],
     [ 'still', 7378 ],
     [ 'in', 372203 ],
     [ 'pejorative', 114 ],
     [ 'way', 6432 ],
     [ 'to', 316375 ],
     [ 'describe', 1352 ],
     [ 'any', 11804 ],
     [ 'act', 3502 ],
     [ 'that', 109508 ],
     [ 'violent', 653 ],
     [ 'means', 4165 ],
     [ 'destroy', 466 ],
     [ 'organization', 2374 ],
     [ 'society', 4067 ],
     [ 'it', 73335 ],
     [ 'has', 37865 ],
     [ 'also', 44358 ],
     [ 'been', 25381 ],
     [ 'taken', 3043 ],
     [ 'up', 12446 ],
     [ 'positive', 1254 ],
     [ 'label', 646 ],
     [ 'by', 111829 ],
     [ 'self', 2879 ],
     [ 'defined', 2449 ],
     [ 'anarchists', 203 ],
     [ 'word', 5678 ],
     [ 'derived', 1701 ],
     [ 'from', 72865 ],
     [ 'greek', 4577 ],
     [ 'without', 5660 ],
     [ 'archons', 10 ],
     [ 'ruler', 617 ],
     [ 'chief', 2130 ],
     [ 'king', 7457 ],
     [ 'political', 6967 ],
     [ 'philosophy', 2758 ],
     [ 'belief', 1572 ],
     [ 'rulers', 687 ],
     [ 'are', 76523 ],
     [ 'unnecessary', 146 ],
     [ 'should', 5113 ],
     [ 'be', 61283 ],
     [ 'abolished', 399 ],
     [ 'although', 9286 ],
     [ 'there', 22706 ],
     [ 'differing', 231 ],
     [ 'interpretations', 395 ],
     [ 'what', 8581 ],
     [ 'this', 58827 ],
     [ 'refers', 1570 ],
     [ 'related', 3535 ],
     [ 'social', 4307 ],
     [ 'movements', 1002 ],
     [ 'advocate', 331 ],
     [ 'elimination', 216 ],
     [ 'authoritarian', 185 ],
     [ 'institutions', 1021 ],
     [ 'particularly', 2881 ],
     [ 'state', 12905 ],
     [ 'anarchy', 109 ],
     [ 'most', 25562 ],
     [ 'use', 14011 ],
     [ 'does', 5220 ],
     [ 'not', 44030 ],
     [ 'imply', 257 ],
     [ 'chaos', 331 ],
     [ 'nihilism', 42 ],
     [ 'or', 68948 ],
     [ 'anomie', 7 ],
     [ 'but', 35356 ],
     [ 'rather', 4605 ],
     [ 'harmonious', 28 ],
     [ 'anti', 3103 ],
     [ 'place', 5345 ] ] }

After preprocessing, data is saved into files under /copus/ folder

(async ()=>{
    termLogger.groupBegin('get list of preprocessing files')
    let listFiles = await indexDBStorage.getFileList('/corpus/');
    termLogger.groupEnd()
    termLogger.groupBegin('read one file from indexDB')
    let tokens = await indexDBStorage.readFile(listFiles[0]);
    termLogger.groupEnd()
    termLogger.log([ listFiles.length , JSON.parse(tokens).length]);
})()    
get list of preprocessing files: begin at Fri Mar 15 2019 16:42:56 GMT+0700 (Indochina Time)
get list of preprocessing files: end after 194 (ms)
read one file from indexDB: begin at Fri Mar 15 2019 16:42:56 GMT+0700 (Indochina Time)
read one file from indexDB: end after 0 (ms)
[ 3228, 1293 ]