Home >Web Front-end >JS Tutorial >Detailed explanation of multi-threaded runtime library Nexus.js in JavaScript (part of the code attached)

Detailed explanation of multi-threaded runtime library Nexus.js in JavaScript (part of the code attached)

亚连
亚连Original
2018-05-18 17:49:141662browse

This article mainly introduces the learning experience and code sharing of JavaScript multi-threaded runtime library Nexus.js. Friends in need can refer to it and learn together.

Event loop

There is no event loop

There is a thread pool with a (lock-free) task object

Every time setTimeout or setImmediate is called or a Promise is created, the task is queued to the task queue clock.

Whenever a task is scheduled, the first available thread will select the task and execute it.

Handle Promise on the CPU core. Calls to Promise.all() will resolve Promises in parallel.

ES6

supports async/await, and it is recommended to use

supports for await(...)

Support destructuring

Support async try/catch/finally

Module

CommonJS is not supported. (require(...) and module.exports)

All modules use the ES6 import/export syntax

Support dynamic import through import('file-or-packge').then( ...)

Support import.meta, such as: import.meta.filename and import.meta.dirname, etc.

Additional features: Support importing directly from URL, for example:

import { h } from 'https://unpkg.com/preact/dist/preact.esm.js';

EventEmitter

Nexus implements the Promise-based EventEmitter class

Event handlers are sequenced on all threads and will be executed in parallel.

The return value of EventEmitter.emit(...) is a Promise, which can be parsed into an array of return values ​​in the event handler.

For example:

class EmitterTest extends Nexus.EventEmitter {
 constructor() {
  super();
  for(let i = 0; i < 4; i++)
   this.on(&#39;test&#39;, value => { console.log(`fired test ${i}!`); console.inspect(value); });
  for(let i = 0; i < 4; i++)
   this.on(&#39;returns-a-value&#39;, v => `${v + i}`);
 }
}
const test = new EmitterTest();
async function start() {
 await test.emit(&#39;test&#39;, { payload: &#39;test 1&#39; });
 console.log(&#39;first test done!&#39;);
 await test.emit(&#39;test&#39;, { payload: &#39;test 2&#39; });
 console.log(&#39;second test done!&#39;);
 const values = await test.emit(&#39;returns-a-value&#39;, 10);
 console.log(&#39;third test done, returned values are:&#39;); console.inspect(values);
}
start().catch(console.error);

I/O

All input/output is done through three primitives: Device, Filter and Stream.

All input/output primitives implement the EventEmitter class

To use Device, you need to create a ReadableStream or WritableStream on top of Device

To manipulate data, you can Filters are added to ReadableStream or WritableStream.

Finally, use source.pipe(...destinationStreams), and then wait for source.resume() to process the data.

All input/output operations are completed using ArrayBuffer objects.

Filter tried the process(buffer) method to process the data.

Example: Convert UTF-8 to UTF6 using 2 separate output files.

const startTime = Date.now();
 try {
  const device = new Nexus.IO.FilePushDevice(&#39;enwik8&#39;);
  const stream = new Nexus.IO.ReadableStream(device);
  stream.pushFilter(new Nexus.IO.EncodingConversionFilter("UTF-8", "UTF-16LE"));
  const wstreams = [0,1,2,3]
   .map(i => new Nexus.IO.WritableStream(new Nexus.IO.FileSinkDevice(&#39;enwik16-&#39; + i)));
  console.log(&#39;piping...&#39;);
  stream.pipe(...wstreams);
  console.log(&#39;streaming...&#39;);
  await stream.resume();
  await stream.close();
  await Promise.all(wstreams.map(stream => stream.close()));
  console.log(`finished in ${(Date.now() * startTime) / 1000} seconds!`);
 } catch (e) {
  console.error(&#39;An error occurred: &#39;, e);
 }
}
start().catch(console.error);

TCP/UDP

##Nexus.js provides an Acceptor class, which is responsible for binding ip addresses/ports and monitoring connections

Every time a connection request is received, the connection event will be triggered and a Socket device will be provided.

Each Socket instance is a full-duplex I/O device.

You can use ReadableStream and WritableStream to operate Socket.

The most basic example: (Send "Hello World" to the client)

const acceptor = new Nexus.Net.TCP.Acceptor();
let count = 0;
acceptor.on(&#39;connection&#39;, (socket, endpoint) => {
 const connId = count++;
 console.log(`connection #${connId} from ${endpoint.address}:${endpoint.port}`);
 const rstream = new Nexus.IO.ReadableStream(socket);
 const wstream = new Nexus.IO.WritableStream(socket);
 const buffer = new Uint8Array(13);
 const message = &#39;Hello World!\n&#39;;
 for(let i = 0; i < 13; i++)
  buffer[i] = message.charCodeAt(i);
 rstream.pushFilter(new Nexus.IO.UTF8StringFilter());
 rstream.on(&#39;data&#39;, buffer => console.log(`got message: ${buffer}`));
 rstream.resume().catch(e => console.log(`client #${connId} at ${endpoint.address}:${endpoint.port} disconnected!`));
 console.log(`sending greeting to #${connId}!`);
 wstream.write(buffer);
});
acceptor.bind(&#39;127.0.0.1&#39;, 10000);
acceptor.listen();
console.log(&#39;server ready&#39;);

Http

Nexus provides A Nexus.Net.HTTP.Server class, which basically inherits TCPAcceptor

Some basic interfaces

When the server completes the parsing/correction of the basic Http header of the incoming connection When verified, the connection event will be triggered using the connection and the same information

Each connection instance has a request and a response object. These are input/output devices.

You can construct ReadableStream and WritableStream to manipulate request/response.

If you connect to a Response object through a pipe, the input stream will use chunked encoding mode. Otherwise, you can use response.write() to write a regular string.

Complex Example: (Basic Http Server with Block Encoding, details omitted)

....
/**
 * Creates an input stream from a path.
 * @param path
 * @returns {Promise<ReadableStream>}
 */
async function createInputStream(path) {
 if (path.startsWith(&#39;/&#39;)) // If it starts with &#39;/&#39;, omit it.
  path = path.substr(1);
 if (path.startsWith(&#39;.&#39;)) // If it starts with &#39;.&#39;, reject it.
  throw new NotFoundError(path);
 if (path === &#39;/&#39; || !path) // If it&#39;s empty, set to index.html.
  path = &#39;index.html&#39;;
 /**
  * `import.meta.dirname` and `import.meta.filename` replace the old CommonJS `__dirname` and `__filename`.
  */
 const filePath = Nexus.FileSystem.join(import.meta.dirname, &#39;server_root&#39;, path);
 try {
  // Stat the target path.
  const {type} = await Nexus.FileSystem.stat(filePath);
  if (type === Nexus.FileSystem.FileType.Directory) // If it&#39;s a directory, return its &#39;index.html&#39;
   return createInputStream(Nexus.FileSystem.join(filePath, &#39;index.html&#39;));
  else if (type === Nexus.FileSystem.FileType.Unknown || type === Nexus.FileSystem.FileType.NotFound)
   // If it&#39;s not found, throw NotFound.
   throw new NotFoundError(path);
 } catch(e) {
  if (e.code)
   throw e;
  throw new NotFoundError(path);
 }
 try {
  // First, we create a device.
  const fileDevice = new Nexus.IO.FilePushDevice(filePath);
  // Then we return a new ReadableStream created using our source device.
  return new Nexus.IO.ReadableStream(fileDevice);
 } catch(e) {
  throw new InternalServerError(e.message);
 }
}
/**
 * Connections counter.
 */
let connections = 0;
/**
 * Create a new HTTP server.
 * @type {Nexus.Net.HTTP.Server}
 */
const server = new Nexus.Net.HTTP.Server();
// A server error means an error occurred while the server was listening to connections.
// We can mostly ignore such errors, we display them anyway.
server.on(&#39;error&#39;, e => {
 console.error(FgRed + Bright + &#39;Server Error: &#39; + e.message + &#39;\n&#39; + e.stack, Reset);
});
/**
 * Listen to connections.
 */
server.on(&#39;connection&#39;, async (connection, peer) => {
 // Start with a connection ID of 0, increment with every new connection.
 const connId = connections++;
 // Record the start time for this connection.
 const startTime = Date.now();
 // Destructuring is supported, why not use it?
 const { request, response } = connection;
 // Parse the URL parts.
 const { path } = parseURL(request.url);
 // Here we&#39;ll store any errors that occur during the connection.
 const errors = [];
 // inStream is our ReadableStream file source, outStream is our response (device) wrapped in a WritableStream.
 let inStream, outStream;
 try {
  // Log the request.
  console.log(`> #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${
   FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}"`, Reset);
  // Set the &#39;Server&#39; header.
  response.set(&#39;Server&#39;, `nexus.js/0.1.1`);
  // Create our input stream.
  inStream = await createInputStream(path);
  // Create our output stream.
  outStream = new Nexus.IO.WritableStream(response);
  // Hook all `error` events, add any errors to our `errors` array.
  inStream.on(&#39;error&#39;, e => { errors.push(e); });
  request.on(&#39;error&#39;, e => { errors.push(e); });
  response.on(&#39;error&#39;, e => { errors.push(e); });
  outStream.on(&#39;error&#39;, e => { errors.push(e); });
  // Set content type and request status.
  response
   .set(&#39;Content-Type&#39;, mimeType(path))
   .status(200);
  // Hook input to output(s).
  const disconnect = inStream.pipe(outStream);
  try {
   // Resume our file stream, this causes the stream to switch to HTTP chunked encoding.
   // This will return a promise that will only resolve after the last byte (HTTP chunk) is written.
   await inStream.resume();
  } catch (e) {
   // Capture any errors that happen during the streaming.
   errors.push(e);
  }
  // Disconnect all the callbacks created by `.pipe()`.
  return disconnect();
 } catch(e) {
  // If an error occurred, push it to the array.
  errors.push(e);
  // Set the content type, status, and write a basic message.
  response
   .set(&#39;Content-Type&#39;, &#39;text/plain&#39;)
   .status(e.code || 500)
   .send(e.message || &#39;An error has occurred.&#39;);
 } finally {
  // Close the streams manually. This is important because we may run out of file handles otherwise.
  if (inStream)
   await inStream.close();
  if (outStream)
   await outStream.close();
  // Close the connection, has no real effect with keep-alive connections.
  await connection.close();
  // Grab the response&#39;s status.
  let status = response.status();
  // Determine what colour to output to the terminal.
  const statusColors = {
   &#39;200&#39;: Bright + FgGreen, // Green for 200 (OK),
   &#39;404&#39;: Bright + FgYellow, // Yellow for 404 (Not Found)
   &#39;500&#39;: Bright + FgRed // Red for 500 (Internal Server Error)
  };
  let statusColor = statusColors[status];
  if (statusColor)
   status = statusColor + status + Reset;
  // Log the connection (and time to complete) to the console.
  console.log(`< #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${
   FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}" ${status} ${(Date.now() * startTime)}ms` +
   (errors.length ? " " + FgRed + Bright + errors.map(error => error.message).join(&#39;, &#39;) + Reset : Reset));
 }
});
/**
 * IP and port to listen on.
 */
const ip = &#39;0.0.0.0&#39;, port = 3000;
/**
 * Whether or not to set the `reuse` flag. (optional, default=false)
 */
const portReuse = true;
/**
 * Maximum allowed concurrent connections. Default is 128 on my system. (optional, system specific)
 * @type {number}
 */
const maxConcurrentConnections = 1000;
/**
 * Bind the selected address and port.
 */
server.bind(ip, port, portReuse);
/**
 * Start listening to requests.
 */
server.listen(maxConcurrentConnections);
/**
 * Happy streaming!
 */
console.log(FgGreen + `Nexus.js HTTP server listening at ${ip}:${port}` + Reset);

Benchmark

I think I've covered everything I've implemented so far. So now let's talk about performance.

Here are the current benchmarks for the appeal HTTP server, with 100 concurrent connections and a total of 10,000 requests:

This is ApacheBench, Version 2.3 <$Revision: 1796539 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/
Benchmarking localhost (be patient).....done
Server Software:    nexus.js/0.1.1
Server Hostname:    localhost
Server Port:      3000
Document Path:     /
Document Length:    8673 bytes
Concurrency Level:   100
Time taken for tests:  9.991 seconds
Complete requests:   10000
Failed requests:    0
Total transferred:   87880000 bytes
HTML transferred:    86730000 bytes
Requests per second:  1000.94 [#/sec] (mean)
Time per request:    99.906 [ms] (mean)
Time per request:    0.999 [ms] (mean, across all concurrent requests)
Transfer rate:     8590.14 [Kbytes/sec] received
Connection Times (ms)
       min mean[+/-sd] median  max
Connect:    0  0  0.1   0    1
Processing:   6  99 36.6   84   464
Waiting:    5  99 36.4   84   463
Total:     6 100 36.6   84   464
Percentage of the requests served within a certain time (ms)
 50%   84
 66%   97
 75%  105
 80%  112
 90%  134
 95%  188
 98%  233
 99%  238
 100%  464 (longest request)

1000 requests per second. On an old i7, it runs the benchmark software, an IDE that takes up 5G of memory, and the server itself.

voodooattack@voodooattack:~$ cat /proc/cpuinfo 
processor  : 0
vendor_id  : GenuineIntel
cpu family : 6
model    : 60
model name : Intel(R) Core(TM) i7-4770 CPU @ 3.40GHz
stepping  : 3
microcode  : 0x22
cpu MHz   : 3392.093
cache size : 8192 KB
physical id : 0
siblings  : 8
core id   : 0
cpu cores  : 4
apicid   : 0
initial apicid : 0
fpu   : yes
fpu_exception  : yes
cpuid level : 13
wp   : yes
flags    : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt dtherm ida arat pln pts
bugs    :
bogomips  : 6784.18
clflush size  : 64
cache_alignment : 64
address sizes  : 39 bits physical, 48 bits virtual
power management:

I tried 1000 concurrent requests but APacheBench times out due to many sockets being opened. I tried httperf and here are the results:

voodooattack@voodooattack:~$ httperf --port=3000 --num-conns=10000 --rate=1000
httperf --client=0/1 --server=localhost --port=3000 --uri=/ --rate=1000 --send-buffer=4096 --recv-buffer=16384 --num-conns=10000 --num-calls=1
httperf: warning: open file limit > FD_SETSIZE; limiting max. # of open files to FD_SETSIZE
Maximum connect burst length: 262
Total: connections 9779 requests 9779 replies 9779 test-duration 10.029 s
Connection rate: 975.1 conn/s (1.0 ms/conn, <=1022 concurrent connections)
Connection time [ms]: min 0.5 avg 337.9 max 7191.8 median 79.5 stddev 848.1
Connection time [ms]: connect 207.3
Connection length [replies/conn]: 1.000
Request rate: 975.1 req/s (1.0 ms/req)
Request size [B]: 62.0
Reply rate [replies/s]: min 903.5 avg 974.6 max 1045.7 stddev 100.5 (2 samples)
Reply time [ms]: response 129.5 transfer 1.1
Reply size [B]: header 89.0 content 8660.0 footer 2.0 (total 8751.0)
Reply status: 1xx=0 2xx=9779 3xx=0 4xx=0 5xx=0
CPU time [s]: user 0.35 system 9.67 (user 3.5% system 96.4% total 99.9%)
Net I/O: 8389.9 KB/s (68.7*10^6 bps)
Errors: total 221 client-timo 0 socket-timo 0 connrefused 0 connreset 0
Errors: fd-unavail 221 addrunavail 0 ftab-full 0 other 0

As you can see, it still works. Although some connections will time out due to stress. We are still researching the cause of this issue.

The above is what I compiled for everyone. I hope it will be helpful to everyone in the future.

Related articles:

Summary of JS operation DOM tree traversal method

JS implementation of array deduplication algorithm

JS obtains the least common multiple and greatest common divisor

The above is the detailed content of Detailed explanation of multi-threaded runtime library Nexus.js in JavaScript (part of the code attached). For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn