首頁  >  文章  >  web前端  >  JS的多執行緒運行庫Nexus.js使用詳解

JS的多執行緒運行庫Nexus.js使用詳解

php中世界最好的语言
php中世界最好的语言原創
2018-04-14 09:48:461801瀏覽

這次帶給大家JS的多執行緒運行庫Nexus.js使用詳解,使用JS多執行緒運行庫Nexus.js的注意事項有哪些,以下是實戰案例,一起來看一下。

首先,如果你不熟悉這個項目,建議先閱讀之前寫的一系列文章。如果你不想閱讀這些,不用擔心。這裡面也會牽扯到那些內容。

現在,讓我們開始吧。

去年,我開始實作Nexus.js,這是一個基於Webkit/JavaScript核心的多執行緒服務端JavaScript運行程式庫。有一段時間我放棄了做這件事,由於一些我無法控制的原因,我不打算在這裡討論,主要是:我無法讓自己長時間工作。

所以,讓我們從討論Nexus的架構開始,以及它是如何運作的。

事件循環

沒有事件循環

有一個帶有(無鎖)任務物件的線程池

每次呼叫setTimeout或setImmediate或建立一個Promise時,任務就會排隊到任務佇列鐘。

每當計劃任務時,第一個可用的執行緒將選擇任務並執行它。

在CPU核心上處理Promise。對Promise.all()的呼叫將並行的解決Promise。

ES6

支援async/await,並建議使用

支持for await(...)

# 支持解構

支援async try/catch/finally

模組

不支援CommonJS。 (require(...)和module.exports)

所有模組使用ES6的import/export語法

支援動態導入透過import('file-or-packge').then(...)

支援import.meta,例如:import.meta.filename以及import.meta.dirname等等

附加功能:支援直接從網址匯入,例如:

import { h } from 'https://unpkg.com/preact/dist/preact.esm.js';

EventEmitter

## Nexus實作了基於Promise的EventEmitter類別

事件處理

程式在所有執行緒上排序,並將並行處理執行。

EventEmitter.emit(...)的傳回值是一個Promise,它可以被解析為在事件處理器中傳回值所構成的陣列。 例如:

class EmitterTest extends Nexus.EventEmitter {
 constructor() {
  super();
  for(let i = 0; i < 4; i++)
   this.on(&#39;test&#39;, value => { console.log(`fired test ${i}!`); console.inspect(value); });
  for(let i = 0; i < 4; i++)
   this.on(&#39;returns-a-value&#39;, v => `${v + i}`);
 }
}
const test = new EmitterTest();
async function start() {
 await test.emit('test', { payload: 'test 1' });
 console.log('first test done!');
 await test.emit('test', { payload: 'test 2' });
 console.log('second test done!');
 const values = await test.emit('returns-a-value', 10);
 console.log('third test done, returned values are:'); console.inspect(values);
}
start().catch(console.error);

I/O

所有輸入/輸出都透過三個原語完成​​:Device,Filter和Stream。

所有輸入/輸出原語都實作了EventEmitter類別

要使用Device,你需要在Device之上建立一個ReadableStream或WritableStream

# 若要操作數據,可以將Filters新增至ReadableStream或WritableStream。 最後,使用source.pipe(...destinationStreams),然後等待source.resume()來處理資料。

所有的輸入/輸出操作都是使用ArrayBuffer物件完成的。

Filter試了process(buffer)方法來處理資料。

例如:使用2個獨立的輸出檔將UTF-8轉換為UTF6。 ###
const startTime = Date.now();
 try {
  const device = new Nexus.IO.FilePushDevice('enwik8');
  const stream = new Nexus.IO.ReadableStream(device);
  stream.pushFilter(new Nexus.IO.EncodingConversionFilter("UTF-8", "UTF-16LE"));
  const wstreams = [0,1,2,3]
   .map(i => new Nexus.IO.WritableStream(new Nexus.IO.FileSinkDevice('enwik16-' + i)));
  console.log('piping...');
  stream.pipe(...wstreams);
  console.log('streaming...');
  await stream.resume();
  await stream.close();
  await Promise.all(wstreams.map(stream => stream.close()));
  console.log(`finished in ${(Date.now() * startTime) / 1000} seconds!`);
 } catch (e) {
  console.error('An error occurred: ', e);
 }
}
start().catch(console.error);
### ######TCP/UDP############# Nexus.js提供了一個Acceptor類,負責綁定ip位址/連接埠和監聽連接####### 每次收到一個連線請求,connection事件就會被觸發,並且提供一個Socket設備。 ###### 每一個Socket實例是全雙工的I/O設備。 ###### 你可以使用ReadableStream和WritableStream來操作Socket。 ###### 最基礎的範例:(傳送「Hello World」給客戶端)###
const acceptor = new Nexus.Net.TCP.Acceptor();
let count = 0;
acceptor.on('connection', (socket, endpoint) => {
 const connId = count++;
 console.log(`connection #${connId} from ${endpoint.address}:${endpoint.port}`);
 const rstream = new Nexus.IO.ReadableStream(socket);
 const wstream = new Nexus.IO.WritableStream(socket);
 const buffer = new Uint8Array(13);
 const message = 'Hello World!\n';
 for(let i = 0; i < 13; i++)
  buffer[i] = message.charCodeAt(i);
 rstream.pushFilter(new Nexus.IO.UTF8StringFilter());
 rstream.on(&#39;data&#39;, buffer => console.log(`got message: ${buffer}`));
 rstream.resume().catch(e => console.log(`client #${connId} at ${endpoint.address}:${endpoint.port} disconnected!`));
 console.log(`sending greeting to #${connId}!`);
 wstream.write(buffer);
});
acceptor.bind('127.0.0.1', 10000);
acceptor.listen();
console.log('server ready');
### ######Http############ Nexus提供了一個Nexus.Net.HTTP.Server類,基本上繼承了TCPAcceptor###### 一些基礎介面###

当服务器端完成了对传入连接的基本的Http头的解析/校验时,将使用连接和同样的信息触发connection事件

每一个连接实例都又一个request和一个response对象。这些是输入/输出设备。

你可以构造ReadableStream和WritableStream来操纵request/response。

如果你通过管道连接到一个Response对象,输入的流将会使用分块编码的模式。否者,你可以使用response.write()来写入一个常规的字符串

复杂例子:(基本的Http服务器与块编码,细节省略)

....
/**
 * Creates an input stream from a path.
 * @param path
 * @returns {Promise<ReadableStream>}
 */
async function createInputStream(path) {
 if (path.startsWith('/')) // If it starts with '/', omit it.
  path = path.substr(1);
 if (path.startsWith('.')) // If it starts with '.', reject it.
  throw new NotFoundError(path);
 if (path === '/' || !path) // If it's empty, set to index.html.
  path = 'index.html';
 /**
  * `import.meta.dirname` and `import.meta.filename` replace the old CommonJS `dirname` and `filename`.
  */
 const filePath = Nexus.FileSystem.join(import.meta.dirname, 'server_root', path);
 try {
  // Stat the target path.
  const {type} = await Nexus.FileSystem.stat(filePath);
  if (type === Nexus.FileSystem.FileType.Directory) // If it's a directory, return its 'index.html'
   return createInputStream(Nexus.FileSystem.join(filePath, 'index.html'));
  else if (type === Nexus.FileSystem.FileType.Unknown || type === Nexus.FileSystem.FileType.NotFound)
   // If it's not found, throw NotFound.
   throw new NotFoundError(path);
 } catch(e) {
  if (e.code)
   throw e;
  throw new NotFoundError(path);
 }
 try {
  // First, we create a device.
  const fileDevice = new Nexus.IO.FilePushDevice(filePath);
  // Then we return a new ReadableStream created using our source device.
  return new Nexus.IO.ReadableStream(fileDevice);
 } catch(e) {
  throw new InternalServerError(e.message);
 }
}
/**
 * Connections counter.
 */
let connections = 0;
/**
 * Create a new HTTP server.
 * @type {Nexus.Net.HTTP.Server}
 */
const server = new Nexus.Net.HTTP.Server();
// A server error means an error occurred while the server was listening to connections.
// We can mostly ignore such errors, we display them anyway.
server.on('error', e => {
 console.error(FgRed + Bright + 'Server Error: ' + e.message + '\n' + e.stack, Reset);
});
/**
 * Listen to connections.
 */
server.on('connection', async (connection, peer) => {
 // Start with a connection ID of 0, increment with every new connection.
 const connId = connections++;
 // Record the start time for this connection.
 const startTime = Date.now();
 // Destructuring is supported, why not use it?
 const { request, response } = connection;
 // Parse the URL parts.
 const { path } = parseURL(request.url);
 // Here we'll store any errors that occur during the connection.
 const errors = [];
 // inStream is our ReadableStream file source, outStream is our response (device) wrapped in a WritableStream.
 let inStream, outStream;
 try {
  // Log the request.
  console.log(`> #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${
   FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}"`, Reset);
  // Set the 'Server' header.
  response.set('Server', `nexus.js/0.1.1`);
  // Create our input stream.
  inStream = await createInputStream(path);
  // Create our output stream.
  outStream = new Nexus.IO.WritableStream(response);
  // Hook all `error` events, add any errors to our `errors` array.
  inStream.on('error', e => { errors.push(e); });
  request.on('error', e => { errors.push(e); });
  response.on('error', e => { errors.push(e); });
  outStream.on('error', e => { errors.push(e); });
  // Set content type and request status.
  response
   .set('Content-Type', mimeType(path))
   .status(200);
  // Hook input to output(s).
  const disconnect = inStream.pipe(outStream);
  try {
   // Resume our file stream, this causes the stream to switch to HTTP chunked encoding.
   // This will return a promise that will only resolve after the last byte (HTTP chunk) is written.
   await inStream.resume();
  } catch (e) {
   // Capture any errors that happen during the streaming.
   errors.push(e);
  }
  // Disconnect all the callbacks created by `.pipe()`.
  return disconnect();
 } catch(e) {
  // If an error occurred, push it to the array.
  errors.push(e);
  // Set the content type, status, and write a basic message.
  response
   .set('Content-Type', 'text/plain')
   .status(e.code || 500)
   .send(e.message || 'An error has occurred.');
 } finally {
  // Close the streams manually. This is important because we may run out of file handles otherwise.
  if (inStream)
   await inStream.close();
  if (outStream)
   await outStream.close();
  // Close the connection, has no real effect with keep-alive connections.
  await connection.close();
  // Grab the response's status.
  let status = response.status();
  // Determine what colour to output to the terminal.
  const statusColors = {
   '200': Bright + FgGreen, // Green for 200 (OK),
   '404': Bright + FgYellow, // Yellow for 404 (Not Found)
   '500': Bright + FgRed // Red for 500 (Internal Server Error)
  };
  let statusColor = statusColors[status];
  if (statusColor)
   status = statusColor + status + Reset;
  // Log the connection (and time to complete) to the console.
  console.log(`< #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${
   FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}" ${status} ${(Date.now() * startTime)}ms` +
   (errors.length ? " " + FgRed + Bright + errors.map(error => error.message).join(', ') + Reset : Reset));
 }
});
/**
 * IP and port to listen on.
 */
const ip = '0.0.0.0', port = 3000;
/**
 * Whether or not to set the `reuse` flag. (optional, default=false)
 */
const portReuse = true;
/**
 * Maximum allowed concurrent connections. Default is 128 on my system. (optional, system specific)
 * @type {number}
 */
const maxConcurrentConnections = 1000;
/**
 * Bind the selected address and port.
 */
server.bind(ip, port, portReuse);
/**
 * Start listening to requests.
 */
server.listen(maxConcurrentConnections);
/**
 * Happy streaming!
 */
console.log(FgGreen + `Nexus.js HTTP server listening at ${ip}:${port}` + Reset);

基准

我想我已经涵盖了到目前为止所实现的一切。那么现在我们来谈谈性能。

这里是上诉Http服务器的当前基准,有100个并发连接和总共10000个请求:

This is ApacheBench, Version 2.3 <$Revision: 1796539 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/
Benchmarking localhost (be patient).....done
Server Software:    nexus.js/0.1.1
Server Hostname:    localhost
Server Port:      3000
Document Path:     /
Document Length:    8673 bytes
Concurrency Level:   100
Time taken for tests:  9.991 seconds
Complete requests:   10000
Failed requests:    0
Total transferred:   87880000 bytes
HTML transferred:    86730000 bytes
Requests per second:  1000.94 [#/sec] (mean)
Time per request:    99.906 [ms] (mean)
Time per request:    0.999 [ms] (mean, across all concurrent requests)
Transfer rate:     8590.14 [Kbytes/sec] received
Connection Times (ms)
       min mean[+/-sd] median  max
Connect:    0  0  0.1   0    1
Processing:   6  99 36.6   84   464
Waiting:    5  99 36.4   84   463
Total:     6 100 36.6   84   464
Percentage of the requests served within a certain time (ms)
 50%   84
 66%   97
 75%  105
 80%  112
 90%  134
 95%  188
 98%  233
 99%  238
 100%  464 (longest request)

每秒1000个请求。在一个老的i7上,上面运行了包括这个基准测试软件,一个占用了5G内存的IDE,以及服务器本身。

voodooattack@voodooattack:~$ cat /proc/cpuinfo 
processor  : 0
vendor_id  : GenuineIntel
cpu family : 6
model    : 60
model name : Intel(R) Core(TM) i7-4770 CPU @ 3.40GHz
stepping  : 3
microcode  : 0x22
cpu MHz   : 3392.093
cache size : 8192 KB
physical id : 0
siblings  : 8
core id   : 0
cpu cores  : 4
apicid   : 0
initial apicid : 0
fpu   : yes
fpu_exception  : yes
cpuid level : 13
wp   : yes
flags    : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt dtherm ida arat pln pts
bugs    :
bogomips  : 6784.18
clflush size  : 64
cache_alignment : 64
address sizes  : 39 bits physical, 48 bits virtual
power management:

我尝试了1000个并发请求,但是APacheBench由于许多套接字被打开而超时。我尝试了httperf,这里是结果:

voodooattack@voodooattack:~$ httperf --port=3000 --num-conns=10000 --rate=1000
httperf --client=0/1 --server=localhost --port=3000 --uri=/ --rate=1000 --send-buffer=4096 --recv-buffer=16384 --num-conns=10000 --num-calls=1
httperf: warning: open file limit > FD_SETSIZE; limiting max. # of open files to FD_SETSIZE
Maximum connect burst length: 262
Total: connections 9779 requests 9779 replies 9779 test-duration 10.029 s
Connection rate: 975.1 conn/s (1.0 ms/conn, <=1022 concurrent connections)
Connection time [ms]: min 0.5 avg 337.9 max 7191.8 median 79.5 stddev 848.1
Connection time [ms]: connect 207.3
Connection length [replies/conn]: 1.000
Request rate: 975.1 req/s (1.0 ms/req)
Request size [B]: 62.0
Reply rate [replies/s]: min 903.5 avg 974.6 max 1045.7 stddev 100.5 (2 samples)
Reply time [ms]: response 129.5 transfer 1.1
Reply size [B]: header 89.0 content 8660.0 footer 2.0 (total 8751.0)
Reply status: 1xx=0 2xx=9779 3xx=0 4xx=0 5xx=0
CPU time [s]: user 0.35 system 9.67 (user 3.5% system 96.4% total 99.9%)
Net I/O: 8389.9 KB/s (68.7*10^6 bps)
Errors: total 221 client-timo 0 socket-timo 0 connrefused 0 connreset 0
Errors: fd-unavail 221 addrunavail 0 ftab-full 0 other 0

正如你看到的,它任然能工作。尽管由于压力,有些连接会超时。我仍在研究导致这个问题的原因。

相信看了本文案例你已经掌握了方法,更多精彩请关注php中文网其它相关文章!

推荐阅读:

JS做出移动端触摸轮播效果

JS怎样将json格式数组下载到excel表格里

以上是JS的多執行緒運行庫Nexus.js使用詳解的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn