Maison  >  Article  >  interface Web  >  Comment implémenter la communication deno (avec code)

Comment implémenter la communication deno (avec code)

不言
不言avant
2019-03-11 17:02:272526parcourir

Le contenu de cet article concerne la méthode de mise en œuvre de la communication deno (avec code). Il a une certaine valeur de référence. Les amis dans le besoin peuvent s'y référer.

Méthode de communication

Le code d'exécution de Deno est similaire au nœud, y compris les méthodes synchrones et asynchrones. La méthode asynchrone est implémentée via Promise.then.

Typescript/Javascript appelle rust

Comme mentionné dans la section précédente, deno initialisera l'instance d'isolat v8 lors de son démarrage. Pendant le processus d'initialisation, les fonctions c++ sont. liées aux instances de v8 isolate Lorsque la v8 exécute du code Javascript, ces fonctions liées peuvent être appelées comme des fonctions Javascript. La liaison spécifique est implémentée comme suit :

void InitializeContext(v8::Isolate* isolate, v8::Local<v8::Context> context) {
  v8::HandleScope handle_scope(isolate);
  v8::Context::Scope context_scope(context);

  auto global = context->Global();

  auto deno_val = v8::Object::New(isolate);
  CHECK(global->Set(context, deno::v8_str("libdeno"), deno_val).FromJust());

  auto print_tmpl = v8::FunctionTemplate::New(isolate, Print);
  auto print_val = print_tmpl->GetFunction(context).ToLocalChecked();
  CHECK(deno_val->Set(context, deno::v8_str("print"), print_val).FromJust());

  auto recv_tmpl = v8::FunctionTemplate::New(isolate, Recv);
  auto recv_val = recv_tmpl->GetFunction(context).ToLocalChecked();
  CHECK(deno_val->Set(context, deno::v8_str("recv"), recv_val).FromJust());

  auto send_tmpl = v8::FunctionTemplate::New(isolate, Send);
  auto send_val = send_tmpl->GetFunction(context).ToLocalChecked();
  CHECK(deno_val->Set(context, deno::v8_str("send"), send_val).FromJust());

  auto eval_context_tmpl = v8::FunctionTemplate::New(isolate, EvalContext);
  auto eval_context_val =
      eval_context_tmpl->GetFunction(context).ToLocalChecked();
  CHECK(deno_val->Set(context, deno::v8_str("evalContext"), eval_context_val)
            .FromJust());

  auto error_to_json_tmpl = v8::FunctionTemplate::New(isolate, ErrorToJSON);
  auto error_to_json_val =
      error_to_json_tmpl->GetFunction(context).ToLocalChecked();
  CHECK(deno_val->Set(context, deno::v8_str("errorToJSON"), error_to_json_val)
            .FromJust());

  CHECK(deno_val->SetAccessor(context, deno::v8_str("shared"), Shared)
            .FromJust());
}

Une fois la liaison terminée, le mappage des méthodes C++ et des méthodes Typescript peut être implémenté dans Typescript via le code suivant

libdeno.ts
interface Libdeno {
  recv(cb: MessageCallback): void;

  send(control: ArrayBufferView, data?: ArrayBufferView): null | Uint8Array;

  print(x: string, isErr?: boolean): void;

  shared: ArrayBuffer;

  /** Evaluate provided code in the current context.
   * It differs from eval(...) in that it does not create a new context.
   * Returns an array: [output, errInfo].
   * If an error occurs, `output` becomes null and `errInfo` is non-null.
   */
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  evalContext(code: string): [any, EvalErrorInfo | null];

  errorToJSON: (e: Error) => string;
}

export const libdeno = window.libdeno as Libdeno;

Lors de l'exécution du code Typescript, il vous suffit d'introduire libdeno et d'appeler directement la méthode c++, par exemple :

import { libdeno } from "./libdeno";
function sendInternal(
  builder: flatbuffers.Builder,
  innerType: msg.Any,
  inner: flatbuffers.Offset,
  data: undefined | ArrayBufferView,
  sync = true
): [number, null | Uint8Array] {
  const cmdId = nextCmdId++;
  msg.Base.startBase(builder);
  msg.Base.addInner(builder, inner);
  msg.Base.addInnerType(builder, innerType);
  msg.Base.addSync(builder, sync);
  msg.Base.addCmdId(builder, cmdId);
  builder.finish(msg.Base.endBase(builder));
  const res = libdeno.send(builder.asUint8Array(), data);
  builder.inUse = false;
  return [cmdId, res];
}

Appelez la méthode libdeno.send pour transmettre les données en c++, puis appelez-le via C++ Le code Rust implémente des opérations d'ingénierie spécifiques.

Synchronisation des couches Typescript et implémentation asynchrone

Synchronisation

Dans Typescript, il vous suffit de définir le paramètre sync de la méthode sendInternal sur true, il sera jugé. en fonction du paramètre sync. Exécutez des opérations synchrones ou asynchrones. Si sync est vrai, la méthode libdeono.send renverra le résultat de l'exécution. Pour transférer des données entre rust et typescript, les données doivent être sérialisées. opération de sérialisation ici.

const [cmdId, resBuf] = sendInternal(builder, innerType, inner, data, true);
Implémentation asynchrone

De même, pour implémenter la méthode asynchrone, il vous suffit de définir le paramètre sync sur false. Cependant, par rapport à la synchronisation, les opérations asynchrones ont plus de méthodes de repli. exécution Lors d'une communication asynchrone, la méthode libdeno.send renverra un cmdId unique pour identifier cette opération d'appel. Dans le même temps, une fois la communication asynchrone terminée, un objet de promesse sera créé, utilisant cmdId comme clé et promesse comme valeur, et ajouté à la carte. Le code est le suivant :

const [cmdId, resBuf] = sendInternal(builder, innerType, inner, data, false);
  util.assert(resBuf == null);
  const promise = util.createResolvable<msg.Base>();
  promiseTable.set(cmdId, promise);
  return promise;

rust implémente le synchrone et l'asynchrone

Lorsque la méthode libdeno.send est appelée dans Typescript, la méthode Send dans le fichier C++ liaison.cc est appelée, qui est dans deno est lié à l'isolement v8 une fois initialisé. Dans la méthode Send, la méthode de répartition dans le fichier ops.rs est appelée, qui implémente le mappage des messages aux fonctions. Chaque type de message correspond à une fonction. Par exemple, un message de lecture de fichier correspond à une fonction de lecture de fichier.

pub fn dispatch(
  isolate: &Isolate,
  control: libdeno::deno_buf,
  data: libdeno::deno_buf,
) -> (bool, Box<Op>) {
  let base = msg::get_root_as_base(&control);
  let is_sync = base.sync();
  let inner_type = base.inner_type();
  let cmd_id = base.cmd_id();

  let op: Box<Op> = if inner_type == msg::Any::SetTimeout {
    // SetTimeout is an exceptional op: the global timeout field is part of the
    // Isolate state (not the IsolateState state) and it must be updated on the
    // main thread.
    assert_eq!(is_sync, true);
    op_set_timeout(isolate, &base, data)
  } else {
    // Handle regular ops.
    let op_creator: OpCreator = match inner_type {
      msg::Any::Accept => op_accept,
      msg::Any::Chdir => op_chdir,
      msg::Any::Chmod => op_chmod,
      msg::Any::Close => op_close,
      msg::Any::FetchModuleMetaData => op_fetch_module_meta_data,
      msg::Any::CopyFile => op_copy_file,
      msg::Any::Cwd => op_cwd,
      msg::Any::Dial => op_dial,
      msg::Any::Environ => op_env,
      msg::Any::Exit => op_exit,
      msg::Any::Fetch => op_fetch,
      msg::Any::FormatError => op_format_error,
      msg::Any::Listen => op_listen,
      msg::Any::MakeTempDir => op_make_temp_dir,
      msg::Any::Metrics => op_metrics,
      msg::Any::Mkdir => op_mkdir,
      msg::Any::Open => op_open,
      msg::Any::ReadDir => op_read_dir,
      msg::Any::ReadFile => op_read_file,
      msg::Any::Readlink => op_read_link,
      msg::Any::Read => op_read,
      msg::Any::Remove => op_remove,
      msg::Any::Rename => op_rename,
      msg::Any::ReplReadline => op_repl_readline,
      msg::Any::ReplStart => op_repl_start,
      msg::Any::Resources => op_resources,
      msg::Any::Run => op_run,
      msg::Any::RunStatus => op_run_status,
      msg::Any::SetEnv => op_set_env,
      msg::Any::Shutdown => op_shutdown,
      msg::Any::Start => op_start,
      msg::Any::Stat => op_stat,
      msg::Any::Symlink => op_symlink,
      msg::Any::Truncate => op_truncate,
      msg::Any::WorkerGetMessage => op_worker_get_message,
      msg::Any::WorkerPostMessage => op_worker_post_message,
      msg::Any::Write => op_write,
      msg::Any::WriteFile => op_write_file,
      msg::Any::Now => op_now,
      msg::Any::IsTTY => op_is_tty,
      msg::Any::Seek => op_seek,
      msg::Any::Permissions => op_permissions,
      msg::Any::PermissionRevoke => op_revoke_permission,
      _ => panic!(format!(
        "Unhandled message {}",
        msg::enum_name_any(inner_type)
      )),
    };
    op_creator(&isolate, &base, data)
  };

  // ...省略多余的代码
}

Dans chaque type de fonction, l'exécution synchrone ou asynchrone sera déterminée en fonction de la valeur du paramètre de synchronisation transmise lors de l'appel de la méthode libdeo.send dans Typescript.

let (is_sync, op) = dispatch(isolate, control_buf, zero_copy_buf);
Exécution synchrone

Après l'exécution de la méthode de répartition, la variable is_sync sera renvoyée Si is_sync est vrai, cela signifie que la méthode est exécutée de manière synchrone et op signifie le résultat renvoyé. . Le code rust appellera la méthode deno_respond dans le fichier c++ api.cc pour synchroniser les résultats de l'exécution. La méthode deno_respond détermine s'il s'agit d'un message de synchronisation en fonction de la valeur de current_args_. Si current_args_ a une valeur, le résultat est renvoyé directement. .

Exécution asynchrone

Dans deno, l'exécution d'opérations asynchrones est implémentée via le module Tokio de rust Après avoir appelé la méthode de dispatch, s'il s'agit d'une opération asynchrone, la valeur de is_sync est fausse. , et l'opération ne le fait pas. Ensuite, il y a le résultat de l'exécution, mais une fonction d'exécution. Utilisez le module tokio pour dériver un thread pour exécuter la fonction de manière asynchrone.

    let task = op
      .and_then(move |buf| {
        let sender = tx; // tx is moved to new thread
        sender.send((zero_copy_id, buf)).expect("tx.send error");
        Ok(())
      }).map_err(|_| ());
    tokio::spawn(task);

Lors de l'initialisation deno, un pipeline sera créé. Le code est le suivant :

let (tx, rx) = mpsc::channel::<(usize, Buf)>();

Le pipeline peut réaliser la communication entre différents threads puisque l'opération asynchrone crée une nouvelle exécution de Threads. , les sous-threads ne peuvent donc pas communiquer directement avec le thread principal et doivent être implémentés via le mécanisme de pipeline. Une fois le code asynchrone exécuté, appelez la méthode tx.send pour ajouter les résultats de l'exécution au pipeline. La boucle d'événements lira les résultats du pipeline et les renverra à chaque fois.

Boucle d'événements

Étant donné que les opérations asynchrones reposent sur des boucles d'événements, expliquons d'abord la boucle d'événements dans deno. En fait, la boucle d'événements est très simple. dans une boucle. Lorsque les conditions sont remplies, l'événement La boucle terminera l'exécution. Le code de la boucle d'événement principal dans deno est implémenté comme suit :

pub fn event_loop(&self) -> Result<(), JSError> {
    // Main thread event loop.
    while !self.is_idle() {
      match recv_deadline(&self.rx, self.get_timeout_due()) {
        Ok((zero_copy_id, buf)) => self.complete_op(zero_copy_id, buf),
        Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(),
        Err(e) => panic!("recv_deadline() failed: {:?}", e),
      }
      self.check_promise_errors();
      if let Some(err) = self.last_exception() {
        return Err(err);
      }
    }
    // Check on done
    self.check_promise_errors();
    if let Some(err) = self.last_exception() {
      return Err(err);
    }
    Ok(())
  }

La méthode self.is_idle est utilisée pour déterminer si tout le monde est actif. les opérations asynchrones sont terminées. Lorsque toutes les opérations asynchrones sont terminées, pour arrêter la boucle d'événements, le code de la méthode is_idle est le suivant :

fn is_idle(&self) -> bool {
    self.ntasks.get() == 0 && self.get_timeout_due().is_none()
  }

Lorsqu'un appel de méthode asynchrone est généré, la méthode suivante sera appelé pour augmenter la valeur interne des ntasks de 1,

fn ntasks_increment(&self) {
    assert!(self.ntasks.get() >= 0);
    self.ntasks.set(self.ntasks.get() + 1);
  }

dans Dans la boucle d'événements, chaque fois qu'une valeur est obtenue du pipeline, la boucle d'événements agit comme un consommateur, et le sous-thread qui exécute la méthode asynchrone et agit en tant que producteur. Si un résultat d'exécution est obtenu dans une boucle d'événement, la méthode ntasks_decrement sera appelée pour décrémenter la valeur interne de ntasks de 1. Lorsque la valeur de ntasks est 0, la boucle d'événement quittera l'exécution. Dans chaque boucle, la valeur obtenue dans le pipeline est utilisée comme paramètre, la méthode complete_op est appelée et le résultat est renvoyé.

rust中将异步操作结果返回回去

在初始化v8实例时,绑定的c++方法中有一个Recv方法,该方法的作用时暴露一个Typescript的函数给rust,在deno的io.ts文件的start方法中执行libdeno.recv(handleAsyncMsgFromRust),将handleAsyncMsgFromRust函数通过c++方法暴露给rust。具体实现如下:

export function start(source?: string): msg.StartRes {
  libdeno.recv(handleAsyncMsgFromRust);

  // First we send an empty `Start` message to let the privileged side know we
  // are ready. The response should be a `StartRes` message containing the CLI
  // args and other info.
  const startResMsg = sendStart();

  util.setLogDebug(startResMsg.debugFlag(), source);

  setGlobals(startResMsg.pid(), startResMsg.noColor(), startResMsg.execPath()!);

  return startResMsg;
}

当异步操作执行完成后,可以在rust中直接调用handleAsyncMsgFromRust方法,将结果返回给Typescript。先看一下handleAsyncMsgFromRust方法的实现细节:

export function handleAsyncMsgFromRust(ui8: Uint8Array): void {
  // If a the buffer is empty, recv() on the native side timed out and we
  // did not receive a message.
  if (ui8 && ui8.length) {
    const bb = new flatbuffers.ByteBuffer(ui8);
    const base = msg.Base.getRootAsBase(bb);
    const cmdId = base.cmdId();
    const promise = promiseTable.get(cmdId);
    util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
    promiseTable.delete(cmdId);
    const err = errors.maybeError(base);
    if (err != null) {
      promise!.reject(err);
    } else {
      promise!.resolve(base);
    }
  }
  // Fire timers that have become runnable.
  fireTimers();
}

从代码handleAsyncMsgFromRust方法的实现中可以知道,首先通过flatbuffer反序列化返回的结果,然后获取返回结果的cmdId,根据cmdId获取之前创建的promise对象,然后调用promise.resolve方法触发promise.then中的代码执行。

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer