Maison >Java >javaDidacticiel >Exemples détaillés du modèle de thread Netty

Exemples détaillés du modèle de thread Netty

零下一度
零下一度original
2017-07-03 11:33:003688parcourir

Modèle de thread Netty

Le modèle de thread Netty est principalement basé sur React et a évolué vers plusieurs versions en raison de différents scénarios d'application.

Mode monothread

C'est-à-dire que la réception des demandes de service et l'exécution des opérations d'E/S sont toutes effectuées par un seul thread, puisque des opérations d'E/S non bloquantes telles que le multiplexage d'E/S sont utilisées, le nombre de demandes. augmente Dans de petits cas, le mode monothread peut également résoudre certains problèmes de scène.

Mode thread multi-travailleur à réception unique

Lorsque le nombre de requêtes augmente, le thread d'origine traitant toutes les opérations d'E/S devient de plus en plus insupportable Indicateurs de performance correspondants, Ainsi, le concept de pool de threads de travail est mentionné.À ce stade, la réception de la demande de service est toujours un thread.Après avoir reçu la demande, le thread recevant la demande sera confié au pool de threads de travail suivant et obtiendra un thread du thread. pool pour exécution.

Mode de thread de réception multiple et multi-travailleur

Lorsque le volume de requêtes augmente encore, un seul thread qui reçoit les demandes de service ne peut pas gérer toutes les connexions client, donc Le pool de threads qui reçoit les demandes de service est également étendu et plusieurs threads sont chargés de recevoir les connexions client en même temps.

Fil d'affaires RPC

Les éléments mentionnés ci-dessus sont le propre modèle de fil de Netty, des stratégies d'optimisation qui ont été continuellement développées avec l'augmentation du volume de demandes. Pour les requêtes RPC, la chose la plus importante pour les systèmes d'application est le traitement de la logique métier, et ce type d'activité peut nécessiter beaucoup de calculs ou d'E/S. Par exemple, la plupart des applications sont accompagnées d'opérations de base de données, de redis ou d'autres services réseau. , etc. S'il y a de telles opérations d'E/S fastidieuses dans la demande commerciale, il est recommandé d'attribuer la tâche de traitement de la demande commerciale à un pool de threads indépendant, sinon les propres threads de netty pourraient être bloqués.

Répartition du travail entre le thread de requête de réception et le thread de travail

  • Le thread de requête de réception est principalement responsable de la création du lien, puis déléguer la demande au thread de travail

  • Le thread de travail est responsable de l'encodage, du décodage, de la lecture des IO et d'autres opérations

Implémentation de la solution

Actuellement I Le RPC implémenté adopte le mode multi-récepteur et multi-worker thread Les ports sont liés côté serveur comme ceci :

public void bind(ServiceConfig serviceConfig) {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(this.rpcServerInitializer)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
            ;try {ChannelFuture channelFuture = bootstrap.bind(serviceConfig.getHost(),serviceConfig.getPort()).sync();//...channelFuture.channel().closeFuture().sync();


            } catch (InterruptedException e) {throw new RpcException(e);
            }
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

boosGroup est un groupe utilisé pour recevoir les demandes de service
workerGroup est un groupe spécifiquement responsable des opérations IO

Pour ajouter des fils de discussion, il vous suffit d'approfondir. déléguez les opérations de handle au pool de threads. Pour l'expansion, une interface doit être définie ici :

Définir l'interface du pool de threads

public interface RpcThreadPool {Executor getExecutor(int threadSize,int queues);
}

Implémenter un pool de threads de taille fixe.

Pool de threads Dubbo référencé

@Qualifier("fixedRpcThreadPool")@Componentpublic class FixedRpcThreadPool implements RpcThreadPool {private Executor executor;@Overridepublic Executor getExecutor(int threadSize,int queues) {if(null==executor) {synchronized (this) {if(null==executor) {
                    executor= new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS,
                            queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {                                   //...}
                            });
                }
            }
        }return executor;
    }
}

Interlude :
Je me souviens d'une fois, un ami m'a soudainement demandé quelle était la taille du noyau. dans le pool de threads Java, cela signifie-t-il ? J'ai été soudainement court-circuité, car je n'écris généralement pas en multithread. Quand je pense au pool de threads de base de données que j'utilise habituellement, je suis assez impressionné par les paramètres qu'il contient, mais je ne m'en souviens tout simplement pas. coreSize. Plus tard, j'ai examiné de plus près certains paramètres du pool de threads. Je peux désormais en profiter pour y regarder de plus près afin d'éviter à nouveau un court-circuit.

Thread Pool Factory

Lorsqu'il existe plusieurs implémentations de pool de threads, le pool de threads est sélectionné dynamiquement par le nom du pool de threads.

@Componentpublic class RpcThreadPoolFactory {@Autowiredprivate Map<String,RpcThreadPool> rpcThreadPoolMap;public RpcThreadPool getThreadPool(String threadPoolName){return this.rpcThreadPoolMap.get(threadPoolName);
    }
}

Modifiez la méthode channelRead0 de ChannelHandle

Enveloppez le corps de la méthode dans une tâche et remettez-le au pool de threads pour exécution.

@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) {this.executor.execute(new Runnable() {@Overridepublic void run() {RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this);RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation(rpcRequest));
            channelHandlerContext.writeAndFlush(response);
        }
    });
}

Problème

Il y a actuellement un manque de tests de résistance, il n'y a donc pas encore de comparaison claire des données.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn