/**
* Add custom functionality to the default BroadcastChannel to be similiar with web socket API.
* So that the transform between communication channel is not too much different
* @class
*/
class BC extends BroadcastChannel {
constructor(channel) {
super(channel)
this.id = channel;
}
}
BC.prototype.send = function(msg, callback) {
console.log("Sending message", msg);
this.postMessage({
type:"message",
content:msg
});
if (typeof callback == "function") callback.call(this);
}
BC.prototype.ping = function() {
this.postMessage({
type:"ping"
})
}
BC.prototype.pong = function() {
this.postMessage({
type:"ping"
})
}
BC.prototype.terminate = function() {
this.close()
}
/**
* Main thread handler for each child threads
* This class will be called from Main Thread
* @class
*/
class Thread {
constructor(options) {
this.options = options || {};
this.id = this.options.id || "ch"+common.getAutoIncrement("thread");
Thread.list[this.id] = this;
this.status = "closed"
this.lastActivity = Date.now();
this.debugLevel = common.debugLevel() || 0;
this.$elm = $("<s></s>");
}
}
Thread.list = [];
Thread.sessions = {};
Thread.saveSession = async function(sessionName, content) {
await common.mkDir(nwPath.join(__dirname, "data/session"));
await common.filePutContents(nwPath.join(__dirname, "data/session", sessionName+".ses"), JSON.stringify(content), "utf8");
}
Thread.loadSession = async function(sessionName, force) {
if (!force) {
if (Thread.sessions[sessionName]) return Thread.sessions[sessionName];
}
try {
Thread.sessions[sessionName] = JSON.parse(await common.fileGetContents(nwPath.join(__dirname, "data/session", sessionName+".ses"), "utf8"));
return Thread.sessions[sessionName];
} catch (e) {
return "";
}
}
Thread.removeSession = async function(sessionName) {
delete Thread.sessions[sessionName];
await common.unlink(nwPath.join(__dirname, "data/session", sessionName+".ses"));
}
Thread.prototype.bump = function() {
this.lastActivity = Date.now();
return this.lastActivity;
}
Thread.prototype.close = async function(noConfirm) {
try {
if (!noConfirm) await threadServer.send(this.ws, "close");
} catch (e) {
// do nothing
}
this.ws.close();
delete Thread.list[this.id];
}
Thread.prototype.on = function(evt, fn) {
this.$elm.on(evt, fn)
}
Thread.prototype.off = function(evt, fn) {
this.$elm.off(evt, fn)
}
Thread.prototype.one = function(evt, fn) {
this.$elm.one(evt, fn)
}
Thread.prototype.trigger = function(evt, param) {
this.$elm.trigger(evt, param)
}
/**
* Execute immidiately or wait until the thread is ready
* todo : should wait until all libraries are loaded
* @param {} fn
*/
Thread.prototype.onReady = async function(fn) {
fn = fn || async function(){};
if (this.isReady()) {
return await fn()
} else {
return new Promise(async (resolve, reject) => {
this.one("ready", async function(e, param) {
resolve(await fn());
});
});
}
}
Thread.prototype.openThreadWindow = async function(options={}) {
if (this.isWindowOpened) return console.warn("Window with ID:"+this.id+" is already opened!");
this.on("statusChange", (e, status) => {
console.log("Status of the client is:", status);
});
this.on("close", (e) => {
console.log("Client closed");
this.close(true);
});
this.on("activity", (e) => {
this.bump();
});
this.on("log", (e, data) => {
data.msg[0] = `<span class='threadId'>${this.id} ></span> `+data.msg[0]
ui.log.apply(this, data.msg)
});
var params = new URLSearchParams({
id : this.id,
port : threadServer.port,
addons : options.addons || [],
});
return new Promise((resolve, reject) => {
//var showWindow = false;
//if (this.debugLevel) showWindow = true;
var options = {
new_instance : true,
show : false,
show_in_taskbar : false
}
if (this.debugLevel) {
options.show = true;
options.show_in_taskbar = true;
}
nw.Window.open(`www/thread.html?${params.toString()}`, options, (win) => {
console.log("Thread window is opened!");
this.isWindowOpened = true;
});
this.one("ready", function(e, param) {
console.log("client is connected:", param);
resolve();
});
});
}
Thread.prototype.isReady = function() {
if (this.status == "ready") return true;
return false;
}
Thread.prototype.setStatus = function(status) {
this.status = status;
return status;
}
Thread.prototype.sendWhenReady = async function(command, msg, options) {
return new Promise(async (resolve, reject) => {
var processResult = async (resultBody) => {
console.log("Processed result:", resultBody);
var result = resultBody.msg;
if (resultBody.type == "object") {
if (common.isJSON(result)) result = JSON.parse(result);
}
resolve(result);
}
if (this.isReady()) {
await threadServer.send(this.ws, command, msg, options);
this.one("result", async (e, resultBody) => {
await processResult(resultBody);
})
} else {
this.one("ready", async (e) => {
await threadServer.send(this.ws, command, msg, options);
this.one("result", async (e, resultBody) => {
await processResult(resultBody);
})
});
}
})
}
/**
* Handle socket as a server
* @class
*/
var ThreadServer = function() {
this.threadList = Thread.list;
this.$elm = $("<s></s>");
this.queue = [];
}
ThreadServer.getNumberOfCPU = function() {
var os = require('os');
return os.cpus().length;
}
ThreadServer.getRecomendedMaxThread = function() {
var os = require('os');
return os.cpus().length - 1;
}
ThreadServer.prototype.on = function(evt, fn) {
this.$elm.on(evt, fn)
}
ThreadServer.prototype.off = function(evt, fn) {
this.$elm.off(evt, fn)
}
ThreadServer.prototype.one = function(evt, fn) {
this.$elm.one(evt, fn)
}
ThreadServer.prototype.trigger = function(evt, param) {
this.$elm.trigger(evt, param)
}
ThreadServer.prototype.distributeTask = async function(tasks=[]) {
if (Array.isArray(tasks) == false) tasks = [tasks]
}
ThreadServer.shuffle = function(array) {
for (var i = array.length - 1; i > 0; i--) {
var j = Math.floor(Math.random() * (i + 1));
var temp = array[i];
array[i] = array[j];
array[j] = temp;
}
}
/**
* Get random iddle task
*/
ThreadServer.prototype.getIddleThread = function() {
var keys = Object.keys(this.threadList);
ThreadServer.shuffle(keys);
for (var i in keys) {
if (this.threadList[keys[i]].isReady()) return this.threadList[keys[i]];
}
}
ThreadServer.prototype.waitForIddleThread = async function() {
var iddleThread = this.getIddleThread();
if (iddleThread) return iddleThread;
return new Promise((resolve, reject)=> {
this.one("threadReady", (evt, thread) => {
if (thread.isReady()) {
resolve(thread);
} else {
resolve(this.waitForIddleThread());
}
})
})
}
ThreadServer.prototype.appendQueue = async function(queue) {
if (!Array.isArray(queue)) queue = [queue];
this.queue = this.queue.concat(queue);
}
ThreadServer.prototype.processQueue = async function(queue) {
queue = queue || this.queue
if (!Array.isArray(queue)) queue = [queue];
var results = [];
var promises = [];
while (queue.length > 0) {
var thread = await this.waitForIddleThread();
var thisProcess = queue.shift();
promises.push(
new Promise((resolve, reject) => {
thread.sendWhenReady("run", thisProcess).then((result)=>{
resolve(result);
results.push(result)
})
})
)
}
return new Promise((resolve, reject) => {
Promise.all(promises).then(()=> {
resolve(results);
})
})
}
/**
* Add task to the iddle process.
* If none is iddle, will wait until there is iddle process
*/
ThreadServer.prototype.addTask = async function() {
}
ThreadServer.prototype.getThreadById = function(id) {
if (!this.threadList[id]) console.warn("No thread with id", id);
return this.threadList[id];
}
ThreadServer.actions = {
echo: async function(data, ws, req) {
console.log("response back:", data.msg);
ws.send(data.msg);
},
close: async function(data, ws, req) {
this.threadList[ws.id].status = "closing";
await this.threadList[ws.id].close();
},
setStatus: function(data, ws, req) {
this.threadList[ws.id].status = data.msg;
this.threadList[ws.id].trigger("statusChange", data.msg);
if (data.msg == "ready") {
this.threadList[ws.id].trigger("ready", data.msg);
this.trigger("threadReady", this.threadList[ws.id]);
} else if (data.msg == "close") {
this.threadList[ws.id].trigger("close", data.msg);
this.trigger("threadClose", this.threadList[ws.id]);
}
},
result: function(data, ws, req) {
this.threadList[ws.id].trigger("result", data);
},
log: function(data, ws, req) {
this.threadList[ws.id].trigger("log", data);
}
}
ThreadServer.prototype.action = async function(action, data, ws, req) {
if (!ThreadServer.actions[action]) return;
return await ThreadServer.actions[action].call(this, data, ws, req)
}
ThreadServer.prototype.send = async function(ws, command, msg, options) {
var thread = this.getThreadById(ws.id);
thread.setStatus("processing");
options = options || {};
options.command = command || options.command;
options.msg = msg || options.msg || "";
options.type = typeof options.msg;
options.time = options.time || Date.now();
if (options.type == "function") {
options.msg = options.msg.toString();
}
return new Promise((resolve, reject) => {
//ws.send(JSON.stringify(options), ()=> {
ws.send(options, ()=> {
resolve();
});
})
}
ThreadServer.prototype.closeThreads = async function() {
ui.log("Closing threads");
for (var i in this.threadList) {
await this.threadList[i].close();
}
}
ThreadServer.prototype.close = async function() {
await this.closeThreads();
return new Promise((resolve, reject) => {
this.wss.close(()=> {
resolve();
})
})
}
/**
* Open n number of thread
* If some thread are already opened, will make sure that the opened thread not less then the number
* @param {} num=1
* @param {Object} options
* @param {Boolean} [options.force=false]
* @param {String[]} [options.addons]
*/
ThreadServer.prototype.startThread = async function(num=1, options={}) {
ui.log(`Starting ${num} thread(s)`);
if (!num) return;
var num = num || 1;
var promises = [];
if (!options.force) num = num - this.threadList.length;
for (var i=0; i<num; i++) {
var thisThread = new Thread();
this.createChannel(thisThread.id);
promises.push(thisThread.openThreadWindow(options));
}
return Promise.all(promises);
}
ThreadServer.prototype.processMessage = async function(message, ws, req) {
try {
if (!this.threadList[ws.id]) return;
console.log(`received from ${ws.id}: `, message);
var data = message;
if (common.isJSON(message)) data = JSON.parse(message);
data = data || {};
if (data.command) await this.action(data.command, data, ws, req);
} catch (e) {
console.warn("Error processing message ", e);
console.log("ProcessMessage", message);
}
}
ThreadServer.prototype.onServerStart = function() {
console.log("Thread server started on port "+this.port);
$(document).off("close.ts");
$(document).on("close.ts", ()=>{
this.close();
});
this.isReady = true;
}
/**
* Return true if the server is ready
*/
ThreadServer.prototype.getReadyState = async function() {
if (this.isReady) return "ready";
}
ThreadServer.prototype.createChannel = function(id) {
var bc = new BC(id);
this.threadList[id] = this.threadList[id] || {};
this.threadList[id].ws = bc;
bc.onmessage = async (event)=> {
console.log(event);
if (typeof event.data !== "object") return;
if (this.threadList[id]) this.threadList[id].trigger("activity");
if (event.data.type == "message") {
// message is buffer
//console.log("Message received from ", thisId, message, arg);
if (!this.threadList[id]) return console.warn("No thread with id ", id, "when trying to process message :", event.data.content);
await this.processMessage(event.data.content, bc, {});
} else if (event.data.type == "ping") {
bc.pong();
} else if (event.data.type == "pong") {
bc.ping();
}
};
}
ThreadServer.prototype.start = function() {
/*
var { WebSocketServer } = require('ws');
this.port = common.rand(40000, 45000);
this.wss = new WebSocketServer({ port: this.port });
this.wss.on('error', (err) => {
console.warn("Error occured", err);
})
this.wss.on('listening', async () => {
this.onServerStart();
});
this.wss.on('connection', (ws, req) => {
var thisId = nwPath.basename(req.url);
this.threadList[thisId] = this.threadList[thisId] || {};
this.threadList[thisId].ws = ws;
this.threadList[thisId].req = req;
ws.id = thisId;
console.log(thisId+" is connected:", ws, req);
ws.on('message', async (message, arg) => {
// message is buffer
//console.log("Message received from ", thisId, message, arg);
if (!this.threadList[thisId]) return console.warn("No thread with id ", thisId, "when trying to process message :", message);
this.threadList[thisId].trigger("activity");
await this.processMessage(message, ws, req);
});
ws.on('ping', async (message) => {
//console.log("ping from "+thisId, message);
this.threadList[thisId].trigger("activity");
ws.pong();
});
ws.on('pong', async (message) => {
//console.log("pong from "+thisId, message);
this.threadList[thisId].trigger("activity");
});
ws.send('Welcome to the Translator++ server');
});
*/
}
/**
* Host/Thread child side
* @class
*/
class ThreadHost {
constructor(options) {
this.$elm = $("<s></s>");
this.init()
}
}
ThreadHost.prototype.on = function(evt, fn) {
this.$elm.on(evt, fn)
}
ThreadHost.prototype.off = function(evt, fn) {
this.$elm.off(evt, fn)
}
ThreadHost.prototype.one = function(evt, fn) {
this.$elm.one(evt, fn)
}
ThreadHost.prototype.trigger = function(evt, param) {
this.$elm.trigger(evt, param)
}
ThreadHost.prototype.setStatus = async function(status) {
this.status = status;
await this.send("setStatus", status);
return this.status;
}
ThreadHost.prototype.getStatus = async function() {
return this.status;
}
ThreadHost.actions = {
echo: async function(data) {
console.log("response back:", data.msg);
this.ws.send(data.msg);
},
close: async function(data) {
await this.setStatus("closed");
await this.close();
},
run: async function(data) {
console.log("Setting status: busy");
this.displayStatus("busy");
await this.setStatus("busy");
try {
console.log("Executing function");
//if (data.type == "function") data.msg = `return (${data.msg})()`
data.msg = `return (${data.msg})()`
let AsyncFunction = Object.getPrototypeOf(async function(){}).constructor;
var thisVal = new AsyncFunction(data.msg);
console.log("Running command", thisVal.toString());
var result = await thisVal() || ""
await this.send("result", result);
} catch (e) {
console.warn(e);
await this.send("result", "Error executing "+data.msg+"\n"+e.toString(), {error:true});
console.error(`Error executing\n`+ `return (${data.msg})()\n`+e.toString());
}
this.displayStatus("ready");
await this.setStatus("ready");
}
}
ThreadHost.prototype.action = async function(action, data) {
console.log("Running action", arguments);
if (!ThreadHost.actions[action]) return;
return await ThreadHost.actions[action].call(this, data)
}
ThreadHost.prototype.send = async function(command, msg, options) {
options = options || {};
options.command = command || options.command;
options.msg = msg || options.msg;
options.type = typeof options.msg;
options.time = options.time || Date.now();
if (options.type == "function") {
options.msg = options.msg.toString();
}
return new Promise((resolve, reject) => {
//this.ws.send(JSON.stringify(options), ()=> {
this.ws.send(options, ()=> {
console.log("resolve")
resolve();
});
})
}
ThreadHost.prototype.processMessage = async function(message) {
var str = message.toString();
console.log('received: ', str);
var data = message;
if (common.isJSON(message)) data = JSON.parse(message);
data = data || {};
if (data.command) await this.action(data.command, data);
}
ThreadHost.prototype.ping = async function(timeout) {
timeout = timeout || 30000;
return new Promise((resolve, reject) => {
var tout = setTimeout(()=>{
reject(`Timeout after no reply for ${timeout} ms`);
}, timeout);
this.one("pong", ()=>{
console.log("Server is active");
clearTimeout(tout);
resolve();
})
this.ws.ping();
})
}
ThreadHost.prototype.pingWithInterval = function() {
var timeOutLooper = () => {
var pingInterval = setTimeout(async ()=>{
try {
await this.ping();
timeOutLooper();
} catch (e) {
clearTimeout(pingInterval);
this.close();
}
}, 15000)
}
timeOutLooper();
}
ThreadHost.prototype.connect = async function(channelId) {
this.bc = new BC(this.id);
this.bc.onmessage = async (event) => {
console.log(event);
if (typeof event.data !== "object") return;
if (event.data.type == "message") {
await this.processMessage(event.data.content);
} else if (event.data.type == "ping") {
this.bc.pong();
} else if (event.data.type == "pong") {
this.trigger("pong");
}
};
this.ws = this.bc;
this.displayStatus(`Thread ID ${this.id} is Connected to server!`);
this.setStatus('ready');
//this.pingWithInterval();
/*
return new Promise((resolve, reject) => {
var WS = require("ws");
var socket = new WS(`ws://127.0.0.1:${port}/thread/`+this.id);
//var socket = new WebSocket(`ws://127.0.0.1:${port}/thread/`+this.id);
this.ws = socket;
// Listen for messages
socket.addEventListener('message', async (event) => {
await this.processMessage(event.data);
});
// error
socket.addEventListener('error', (event) => {
console.log("error", event);
});
// close
socket.addEventListener('close', (event) => {
this.displayStatus("Disconnected!");
});
socket.addEventListener('ping', (event) => {
//console.log("received ping", event);
socket.pong();
});
socket.on('pong', (msg) => {
this.trigger("pong");
//console.log("received pong", msg);
});
// Connection opened
socket.addEventListener('open', (event) => {
this.displayStatus(`Thread ID ${this.id} is Connected to server!`);
this.setStatus('ready');
this.pingWithInterval();
resolve();
});
})
*/
}
ThreadHost.prototype.init = async function() {
this.query = window.location.search.substr(1)
var searchparam = new URLSearchParams(this.query);
this.id = searchparam.get("id");
this.port = searchparam.get("port");
// wait until loadMonitor is loaded;
//await LoadMonitor.waitUntilLoaded();
await addonLoader.untilCompleted();
await this.connect(this.id);
// replace the original behavior of ui.log
ui.log = function() {
threadHost.send("log", Array.from(arguments))
}
}
ThreadHost.prototype.close = async function() {
try {
await this.send("close");
} catch (e) {
}
this.ws.terminate();
win.close(true);
}
ThreadHost.prototype.displayStatus = function(str) {
$('[data-window="thread"] .status').html(str);
console.log(str);
}
$(document).ready(function() {
if ($('body').is('[data-window="thread"]')) {
window.threadHost = new ThreadHost();
var win = nw.Window.get();
win.on('close', function () {
window.threadHost.close();
this.hide(); // Pretend to be closed already
console.log("We're closing...");
this.close(true); // then close it forcefully
});
} else if ($('body').is('[data-window="trans"]')) {
window.threadServer = new ThreadServer();
//threadServer.start();
}
})