>  기사  >  웹 프론트엔드  >  JavaScript의 멀티스레드 런타임 라이브러리 Nexus.js에 대한 자세한 설명(코드 일부 첨부)

JavaScript의 멀티스레드 런타임 라이브러리 Nexus.js에 대한 자세한 설명(코드 일부 첨부)

亚连
亚连원래의
2018-05-18 17:49:141624검색

이 글에서는 주로 JavaScript 멀티 스레드 런타임 라이브러리 Nexus.js의 학습 경험과 코드 공유를 소개합니다. 도움이 필요한 친구들이 참고하여 함께 배울 수 있습니다.

이벤트 루프

이벤트 루프가 없습니다

(잠금 없는) 작업 개체가 있는 스레드 풀이 있습니다

setTimeout 또는 setImmediate가 호출되거나 Promise가 생성될 때마다 작업이 대기열에 추가됩니다. 작업 대기열 시계.

작업이 예약될 때마다 사용 가능한 첫 번째 스레드가 작업을 선택하고 실행합니다.

CPU 코어에서 Promise를 처리합니다. Promise.all()을 호출하면 Promise가 병렬로 해결됩니다.

ES6

은 async/await를 지원하며

await(...) 지원을 사용하는 것이 좋습니다.

구조 해제 지원

async try/catch/finally 지원

module

CommonJS는 지원되지 않습니다. (require(...) 및 module.exports)

모든 모듈은 ES6 가져오기/내보내기 구문을 사용합니다.

import('file-or-packge').then(...)을 통해 동적 가져오기를 지원합니다.

import 를 지원합니다. 메타(예: import.meta.filename 및 import.meta.dirname 등)

추가 기능: 다음과 같은 URL에서 직접 가져오기 지원:

import { h } from 'https://unpkg.com/preact/dist/preact.esm.js';

EventEmitter

Nexus는 Promise 기반 EventEmitter 클래스를 구현합니다

이벤트 핸들러는 모든 스레드에서 정렬되며 병렬로 실행됩니다.

EventEmitter.emit(...)의 반환 값은 이벤트 핸들러에서 반환 값 배열로 구문 분석될 수 있는 Promise입니다.

예:

class EmitterTest extends Nexus.EventEmitter {
 constructor() {
  super();
  for(let i = 0; i < 4; i++)
   this.on(&#39;test&#39;, value => { console.log(`fired test ${i}!`); console.inspect(value); });
  for(let i = 0; i < 4; i++)
   this.on(&#39;returns-a-value&#39;, v => `${v + i}`);
 }
}
const test = new EmitterTest();
async function start() {
 await test.emit(&#39;test&#39;, { payload: &#39;test 1&#39; });
 console.log(&#39;first test done!&#39;);
 await test.emit(&#39;test&#39;, { payload: &#39;test 2&#39; });
 console.log(&#39;second test done!&#39;);
 const values = await test.emit(&#39;returns-a-value&#39;, 10);
 console.log(&#39;third test done, returned values are:&#39;); console.inspect(values);
}
start().catch(console.error);

I/O

모든 입력/출력은 장치, 필터 및 스트림의 세 가지 기본 요소를 통해 수행됩니다.

모든 입력/출력 프리미티브는 EventEmitter 클래스를 구현합니다.

Device를 사용하려면 Device 위에 ReadableStream 또는 WritableStream을 만들어야 합니다.

데이터를 조작하려면 ReadableStream 또는 WritableStream에 필터를 추가할 수 있습니다.

마지막으로 source.pipe(...destinationStreams)를 사용하고 source.resume()이 데이터를 처리할 때까지 기다립니다.

모든 입력/출력 작업은 ArrayBuffer 개체를 사용하여 수행됩니다.

Filter는 데이터를 처리하기 위해 process(buffer) 방식을 시도했습니다.

예: 2개의 별도 출력 파일을 사용하여 UTF-8을 UTF6으로 변환합니다.

const startTime = Date.now();
 try {
  const device = new Nexus.IO.FilePushDevice(&#39;enwik8&#39;);
  const stream = new Nexus.IO.ReadableStream(device);
  stream.pushFilter(new Nexus.IO.EncodingConversionFilter("UTF-8", "UTF-16LE"));
  const wstreams = [0,1,2,3]
   .map(i => new Nexus.IO.WritableStream(new Nexus.IO.FileSinkDevice(&#39;enwik16-&#39; + i)));
  console.log(&#39;piping...&#39;);
  stream.pipe(...wstreams);
  console.log(&#39;streaming...&#39;);
  await stream.resume();
  await stream.close();
  await Promise.all(wstreams.map(stream => stream.close()));
  console.log(`finished in ${(Date.now() * startTime) / 1000} seconds!`);
 } catch (e) {
  console.error(&#39;An error occurred: &#39;, e);
 }
}
start().catch(console.error);

TCP/UDP

Nexus.js는 IP 주소/포트 바인딩 및 연결 모니터링을 담당하는 Acceptor 클래스를 제공합니다.

연결 요청이 수신될 때마다 연결 이벤트가 트리거되고 제공됩니다. 소켓 장치.

각 소켓 인스턴스는 전이중 I/O 장치입니다.

ReadableStream과 WritableStream을 사용하여 소켓을 작동할 수 있습니다.

가장 기본적인 예: (클라이언트에 "Hello World" 전송)

const acceptor = new Nexus.Net.TCP.Acceptor();
let count = 0;
acceptor.on(&#39;connection&#39;, (socket, endpoint) => {
 const connId = count++;
 console.log(`connection #${connId} from ${endpoint.address}:${endpoint.port}`);
 const rstream = new Nexus.IO.ReadableStream(socket);
 const wstream = new Nexus.IO.WritableStream(socket);
 const buffer = new Uint8Array(13);
 const message = &#39;Hello World!\n&#39;;
 for(let i = 0; i < 13; i++)
  buffer[i] = message.charCodeAt(i);
 rstream.pushFilter(new Nexus.IO.UTF8StringFilter());
 rstream.on(&#39;data&#39;, buffer => console.log(`got message: ${buffer}`));
 rstream.resume().catch(e => console.log(`client #${connId} at ${endpoint.address}:${endpoint.port} disconnected!`));
 console.log(`sending greeting to #${connId}!`);
 wstream.write(buffer);
});
acceptor.bind(&#39;127.0.0.1&#39;, 10000);
acceptor.listen();
console.log(&#39;server ready&#39;);

Http

Nexus는 기본적으로 TCPAcceptor를 상속하는 Nexus.Net.HTTP.Server 클래스를 제공합니다

기본 인터페이스

When 서버는 들어오는 연결의 기본 HTTP 헤더에 대한 구문 분석/검증을 완료하고 연결 및 동일한 정보를 사용하여 연결 이벤트가 트리거됩니다.

각 연결 인스턴스에는 요청과 응답 개체가 있습니다. 입출력 장치들입니다.

ReadableStream 및 WritableStream을 구성하여 요청/응답을 조작할 수 있습니다.

파이프를 통해 응답 객체에 연결하면 입력 스트림은 청크 인코딩 모드를 사용합니다. 그렇지 않으면 response.write()를 사용하여 일반 문자열을 작성할 수 있습니다.

복잡한 예: (블록 인코딩이 포함된 기본 Http 서버, 자세한 내용은 생략)

....
/**
 * Creates an input stream from a path.
 * @param path
 * @returns {Promise<ReadableStream>}
 */
async function createInputStream(path) {
 if (path.startsWith(&#39;/&#39;)) // If it starts with &#39;/&#39;, omit it.
  path = path.substr(1);
 if (path.startsWith(&#39;.&#39;)) // If it starts with &#39;.&#39;, reject it.
  throw new NotFoundError(path);
 if (path === &#39;/&#39; || !path) // If it&#39;s empty, set to index.html.
  path = &#39;index.html&#39;;
 /**
  * `import.meta.dirname` and `import.meta.filename` replace the old CommonJS `__dirname` and `__filename`.
  */
 const filePath = Nexus.FileSystem.join(import.meta.dirname, &#39;server_root&#39;, path);
 try {
  // Stat the target path.
  const {type} = await Nexus.FileSystem.stat(filePath);
  if (type === Nexus.FileSystem.FileType.Directory) // If it&#39;s a directory, return its &#39;index.html&#39;
   return createInputStream(Nexus.FileSystem.join(filePath, &#39;index.html&#39;));
  else if (type === Nexus.FileSystem.FileType.Unknown || type === Nexus.FileSystem.FileType.NotFound)
   // If it&#39;s not found, throw NotFound.
   throw new NotFoundError(path);
 } catch(e) {
  if (e.code)
   throw e;
  throw new NotFoundError(path);
 }
 try {
  // First, we create a device.
  const fileDevice = new Nexus.IO.FilePushDevice(filePath);
  // Then we return a new ReadableStream created using our source device.
  return new Nexus.IO.ReadableStream(fileDevice);
 } catch(e) {
  throw new InternalServerError(e.message);
 }
}
/**
 * Connections counter.
 */
let connections = 0;
/**
 * Create a new HTTP server.
 * @type {Nexus.Net.HTTP.Server}
 */
const server = new Nexus.Net.HTTP.Server();
// A server error means an error occurred while the server was listening to connections.
// We can mostly ignore such errors, we display them anyway.
server.on(&#39;error&#39;, e => {
 console.error(FgRed + Bright + &#39;Server Error: &#39; + e.message + &#39;\n&#39; + e.stack, Reset);
});
/**
 * Listen to connections.
 */
server.on(&#39;connection&#39;, async (connection, peer) => {
 // Start with a connection ID of 0, increment with every new connection.
 const connId = connections++;
 // Record the start time for this connection.
 const startTime = Date.now();
 // Destructuring is supported, why not use it?
 const { request, response } = connection;
 // Parse the URL parts.
 const { path } = parseURL(request.url);
 // Here we&#39;ll store any errors that occur during the connection.
 const errors = [];
 // inStream is our ReadableStream file source, outStream is our response (device) wrapped in a WritableStream.
 let inStream, outStream;
 try {
  // Log the request.
  console.log(`> #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${
   FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}"`, Reset);
  // Set the &#39;Server&#39; header.
  response.set(&#39;Server&#39;, `nexus.js/0.1.1`);
  // Create our input stream.
  inStream = await createInputStream(path);
  // Create our output stream.
  outStream = new Nexus.IO.WritableStream(response);
  // Hook all `error` events, add any errors to our `errors` array.
  inStream.on(&#39;error&#39;, e => { errors.push(e); });
  request.on(&#39;error&#39;, e => { errors.push(e); });
  response.on(&#39;error&#39;, e => { errors.push(e); });
  outStream.on(&#39;error&#39;, e => { errors.push(e); });
  // Set content type and request status.
  response
   .set(&#39;Content-Type&#39;, mimeType(path))
   .status(200);
  // Hook input to output(s).
  const disconnect = inStream.pipe(outStream);
  try {
   // Resume our file stream, this causes the stream to switch to HTTP chunked encoding.
   // This will return a promise that will only resolve after the last byte (HTTP chunk) is written.
   await inStream.resume();
  } catch (e) {
   // Capture any errors that happen during the streaming.
   errors.push(e);
  }
  // Disconnect all the callbacks created by `.pipe()`.
  return disconnect();
 } catch(e) {
  // If an error occurred, push it to the array.
  errors.push(e);
  // Set the content type, status, and write a basic message.
  response
   .set(&#39;Content-Type&#39;, &#39;text/plain&#39;)
   .status(e.code || 500)
   .send(e.message || &#39;An error has occurred.&#39;);
 } finally {
  // Close the streams manually. This is important because we may run out of file handles otherwise.
  if (inStream)
   await inStream.close();
  if (outStream)
   await outStream.close();
  // Close the connection, has no real effect with keep-alive connections.
  await connection.close();
  // Grab the response&#39;s status.
  let status = response.status();
  // Determine what colour to output to the terminal.
  const statusColors = {
   &#39;200&#39;: Bright + FgGreen, // Green for 200 (OK),
   &#39;404&#39;: Bright + FgYellow, // Yellow for 404 (Not Found)
   &#39;500&#39;: Bright + FgRed // Red for 500 (Internal Server Error)
  };
  let statusColor = statusColors[status];
  if (statusColor)
   status = statusColor + status + Reset;
  // Log the connection (and time to complete) to the console.
  console.log(`< #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${
   FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}" ${status} ${(Date.now() * startTime)}ms` +
   (errors.length ? " " + FgRed + Bright + errors.map(error => error.message).join(&#39;, &#39;) + Reset : Reset));
 }
});
/**
 * IP and port to listen on.
 */
const ip = &#39;0.0.0.0&#39;, port = 3000;
/**
 * Whether or not to set the `reuse` flag. (optional, default=false)
 */
const portReuse = true;
/**
 * Maximum allowed concurrent connections. Default is 128 on my system. (optional, system specific)
 * @type {number}
 */
const maxConcurrentConnections = 1000;
/**
 * Bind the selected address and port.
 */
server.bind(ip, port, portReuse);
/**
 * Start listening to requests.
 */
server.listen(maxConcurrentConnections);
/**
 * Happy streaming!
 */
console.log(FgGreen + `Nexus.js HTTP server listening at ${ip}:${port}` + Reset);

Benchmarks

지금까지 구현한 모든 내용을 다룬 것 같습니다. 그럼 이제 성능에 대해 이야기해보겠습니다.

다음은 100개의 동시 연결과 총 10000개의 요청을 포함하는 Appeal HTTP 서버의 현재 벤치마크입니다:

This is ApacheBench, Version 2.3 <$Revision: 1796539 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/
Benchmarking localhost (be patient).....done
Server Software:    nexus.js/0.1.1
Server Hostname:    localhost
Server Port:      3000
Document Path:     /
Document Length:    8673 bytes
Concurrency Level:   100
Time taken for tests:  9.991 seconds
Complete requests:   10000
Failed requests:    0
Total transferred:   87880000 bytes
HTML transferred:    86730000 bytes
Requests per second:  1000.94 [#/sec] (mean)
Time per request:    99.906 [ms] (mean)
Time per request:    0.999 [ms] (mean, across all concurrent requests)
Transfer rate:     8590.14 [Kbytes/sec] received
Connection Times (ms)
       min mean[+/-sd] median  max
Connect:    0  0  0.1   0    1
Processing:   6  99 36.6   84   464
Waiting:    5  99 36.4   84   463
Total:     6 100 36.6   84   464
Percentage of the requests served within a certain time (ms)
 50%   84
 66%   97
 75%  105
 80%  112
 90%  134
 95%  188
 98%  233
 99%  238
 100%  464 (longest request)

초당 요청 1000개. 구형 i7에서는 벤치마크 소프트웨어, 5G 메모리를 차지하는 IDE 및 서버 자체를 실행합니다.

voodooattack@voodooattack:~$ cat /proc/cpuinfo 
processor  : 0
vendor_id  : GenuineIntel
cpu family : 6
model    : 60
model name : Intel(R) Core(TM) i7-4770 CPU @ 3.40GHz
stepping  : 3
microcode  : 0x22
cpu MHz   : 3392.093
cache size : 8192 KB
physical id : 0
siblings  : 8
core id   : 0
cpu cores  : 4
apicid   : 0
initial apicid : 0
fpu   : yes
fpu_exception  : yes
cpuid level : 13
wp   : yes
flags    : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt dtherm ida arat pln pts
bugs    :
bogomips  : 6784.18
clflush size  : 64
cache_alignment : 64
address sizes  : 39 bits physical, 48 bits virtual
power management:

1000개의 동시 요청을 시도했지만 열려 있는 소켓이 많아 APacheBench 시간이 초과되었습니다. httperf를 시도했는데 결과는 다음과 같습니다.

voodooattack@voodooattack:~$ httperf --port=3000 --num-conns=10000 --rate=1000
httperf --client=0/1 --server=localhost --port=3000 --uri=/ --rate=1000 --send-buffer=4096 --recv-buffer=16384 --num-conns=10000 --num-calls=1
httperf: warning: open file limit > FD_SETSIZE; limiting max. # of open files to FD_SETSIZE
Maximum connect burst length: 262
Total: connections 9779 requests 9779 replies 9779 test-duration 10.029 s
Connection rate: 975.1 conn/s (1.0 ms/conn, <=1022 concurrent connections)
Connection time [ms]: min 0.5 avg 337.9 max 7191.8 median 79.5 stddev 848.1
Connection time [ms]: connect 207.3
Connection length [replies/conn]: 1.000
Request rate: 975.1 req/s (1.0 ms/req)
Request size [B]: 62.0
Reply rate [replies/s]: min 903.5 avg 974.6 max 1045.7 stddev 100.5 (2 samples)
Reply time [ms]: response 129.5 transfer 1.1
Reply size [B]: header 89.0 content 8660.0 footer 2.0 (total 8751.0)
Reply status: 1xx=0 2xx=9779 3xx=0 4xx=0 5xx=0
CPU time [s]: user 0.35 system 9.67 (user 3.5% system 96.4% total 99.9%)
Net I/O: 8389.9 KB/s (68.7*10^6 bps)
Errors: total 221 client-timo 0 socket-timo 0 connrefused 0 connreset 0
Errors: fd-unavail 221 addrunavail 0 ftab-full 0 other 0

보시다시피 여전히 작동합니다. 스트레스로 인해 일부 연결 시간이 초과될 수도 있습니다. 우리는 이 문제의 원인을 계속 조사하고 있습니다.

위 내용은 모두를 위해 제가 정리한 내용입니다. 앞으로 모든 사람에게 도움이 되기를 바랍니다.

관련 기사:

JS 작업 요약 DOM 트리 탐색 방법

JS는 배열 중복 제거 알고리즘을 구현합니다.

JS는 최소 공배수와 최대 공약수를 얻습니다.

위 내용은 JavaScript의 멀티스레드 런타임 라이브러리 Nexus.js에 대한 자세한 설명(코드 일부 첨부)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.