Heim >Java >javaLernprogramm >Detaillierte Beispiele des Netty-Threading-Modells

Detaillierte Beispiele des Netty-Threading-Modells

零下一度
零下一度Original
2017-07-03 11:33:003730Durchsuche

Netty-Threading-Modell

Das Threading-Modell von Netty basiert hauptsächlich auf React und hat sich aufgrund unterschiedlicher Anwendungsszenarien zu mehreren Versionen entwickelt.

Single-Threaded-Modus

Das heißt, der Empfang von Dienstanfragen und die Ausführung von E/A-Vorgängen werden alle von einem Thread ausgeführt In kleinen Fällen kann der Single-Threaded-Modus auch einige Szenenprobleme lösen.

Einzelner empfangender Multi-Worker-Thread-Modus

Wenn die Anzahl der Anforderungen zunimmt, wird der ursprüngliche Thread, der alle E/A-Vorgänge verarbeitet, immer unerträglicher. Entsprechende Leistungsindikatoren, Daher wird das Konzept eines Arbeits-Thread-Pools erwähnt. Zu diesem Zeitpunkt ist der Empfang der Dienstanforderung immer noch ein Thread. Nach dem Empfang der Anforderung wird der Thread, der die Anforderung empfängt, dem nachfolgenden Arbeits-Thread-Pool anvertraut und erhält einen Thread vom Thread Pool zur Ausführung.

Multiple-Empfangs- und Multi-Worker-Thread-Modus

Wenn das Anforderungsvolumen weiter zunimmt, kann ein einzelner Thread, der Serviceanfragen empfängt, nicht alle Clientverbindungen verarbeiten Der Thread-Pool, der Serviceanfragen empfängt, wird ebenfalls erweitert, und mehrere Threads sind für den gleichzeitigen Empfang von Clientverbindungen verantwortlich.

RPC-Geschäftsthread

Bei den oben genannten handelt es sich um Nettys eigenes Thread-Modell, Optimierungsstrategien, die mit der Zunahme des Anfragevolumens kontinuierlich weiterentwickelt wurden. Bei RPC-Anfragen ist die Verarbeitung der Geschäftslogik das Wichtigste für Anwendungssysteme, und diese Art von Geschäft kann rechenintensiv oder IO-intensiv sein. Beispielsweise werden die meisten Anwendungen von Datenbankoperationen, Redis oder anderen Netzwerkdiensten begleitet , usw. Wenn die Geschäftsanforderung solche zeitaufwändigen E/A-Vorgänge enthält, wird empfohlen, die Aufgabe der Verarbeitung der Geschäftsanforderung einem unabhängigen Thread-Pool zuzuweisen, da sonst die eigenen Threads von netty blockiert werden können.

Arbeitsteilung zwischen dem Empfangsanforderungsthread und dem Arbeitsthread

  • Der Empfangsanforderungsthread ist Hauptverantwortlich für die Erstellung des Links und anschließendes Delegieren der Anforderung an den Arbeitsthread

  • Der Arbeitsthread ist für die Kodierung, Dekodierung, das Lesen von E/A und andere Vorgänge verantwortlich

Lösungsimplementierung

Derzeit I Der implementierte RPC übernimmt den Multi-Receiver- und Multi-Worker-Thread-Modus. Die Ports sind auf der Serverseite wie folgt gebunden:

public void bind(ServiceConfig serviceConfig) {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(this.rpcServerInitializer)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
            ;try {ChannelFuture channelFuture = bootstrap.bind(serviceConfig.getHost(),serviceConfig.getPort()).sync();//...channelFuture.channel().closeFuture().sync();


            } catch (InterruptedException e) {throw new RpcException(e);
            }
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

boosGroup ist eine Gruppe, die zum Empfangen von Serviceanfragen verwendet wird
workerGroup ist eine Gruppe, die speziell für IO-Operationen verantwortlich ist

Um Geschäftsthreads hinzuzufügen, müssen Sie nur weitermachen Delegieren Sie die Handle-Operationen an den Thread-Pool. Zur Erweiterung muss hier eine Schnittstelle definiert werden:

Thread-Pool-Schnittstelle definieren

public interface RpcThreadPool {Executor getExecutor(int threadSize,int queues);
}

Thread-Pool mit fester Größe implementieren

Referenzierter Dubbo-Thread-Pool

@Qualifier("fixedRpcThreadPool")@Componentpublic class FixedRpcThreadPool implements RpcThreadPool {private Executor executor;@Overridepublic Executor getExecutor(int threadSize,int queues) {if(null==executor) {synchronized (this) {if(null==executor) {
                    executor= new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS,
                            queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {                                   //...}
                            });
                }
            }
        }return executor;
    }
}

Zwischenspiel:
Ich erinnere mich, dass einmal ein Freund plötzlich nach der Kerngröße fragte im Java-Thread-Pool gemeint? Ich war plötzlich kurzgeschlossen, weil ich normalerweise kein Multithreading schreibe. Wenn ich an den Datenbank-Thread-Pool denke, den ich normalerweise häufig verwende, bin ich von den darin enthaltenen Parametern ziemlich beeindruckt, kann mich aber einfach nicht erinnern coreSize. Später habe ich mir einige Parameter des Thread-Pools genauer angesehen. Jetzt kann ich die Gelegenheit nutzen, genauer hinzuschauen, um einen erneuten Kurzschluss zu vermeiden.

Thread-Pool-Factory

Wenn mehrere Thread-Pool-Implementierungen vorhanden sind, wird der Thread-Pool dynamisch anhand des Thread-Pool-Namens ausgewählt.

@Componentpublic class RpcThreadPoolFactory {@Autowiredprivate Map<String,RpcThreadPool> rpcThreadPoolMap;public RpcThreadPool getThreadPool(String threadPoolName){return this.rpcThreadPoolMap.get(threadPoolName);
    }
}

Ändern Sie die Methode „channelRead0“ von ChannelHandle.

Verpacken Sie den Methodenkörper in eine Aufgabe und übergeben Sie ihn zur Ausführung an den Thread-Pool.

@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) {this.executor.execute(new Runnable() {@Overridepublic void run() {RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this);RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation(rpcRequest));
            channelHandlerContext.writeAndFlush(response);
        }
    });
}

Problem

Es fehlen derzeit Stresstests, daher gibt es noch keinen eindeutigen Datenvergleich.

Das obige ist der detaillierte Inhalt vonDetaillierte Beispiele des Netty-Threading-Modells. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn