Heim >Java >javaLernprogramm >Detaillierte Beispiele des Netty-Threading-Modells
Das Threading-Modell von Netty basiert hauptsächlich auf React und hat sich aufgrund unterschiedlicher Anwendungsszenarien zu mehreren Versionen entwickelt.
Das heißt, der Empfang von Dienstanfragen und die Ausführung von E/A-Vorgängen werden alle von einem Thread ausgeführt In kleinen Fällen kann der Single-Threaded-Modus auch einige Szenenprobleme lösen.
Wenn die Anzahl der Anforderungen zunimmt, wird der ursprüngliche Thread, der alle E/A-Vorgänge verarbeitet, immer unerträglicher. Entsprechende Leistungsindikatoren, Daher wird das Konzept eines Arbeits-Thread-Pools erwähnt. Zu diesem Zeitpunkt ist der Empfang der Dienstanforderung immer noch ein Thread. Nach dem Empfang der Anforderung wird der Thread, der die Anforderung empfängt, dem nachfolgenden Arbeits-Thread-Pool anvertraut und erhält einen Thread vom Thread Pool zur Ausführung.
Wenn das Anforderungsvolumen weiter zunimmt, kann ein einzelner Thread, der Serviceanfragen empfängt, nicht alle Clientverbindungen verarbeiten Der Thread-Pool, der Serviceanfragen empfängt, wird ebenfalls erweitert, und mehrere Threads sind für den gleichzeitigen Empfang von Clientverbindungen verantwortlich.
Bei den oben genannten handelt es sich um Nettys eigenes Thread-Modell, Optimierungsstrategien, die mit der Zunahme des Anfragevolumens kontinuierlich weiterentwickelt wurden. Bei RPC-Anfragen ist die Verarbeitung der Geschäftslogik das Wichtigste für Anwendungssysteme, und diese Art von Geschäft kann rechenintensiv oder IO-intensiv sein. Beispielsweise werden die meisten Anwendungen von Datenbankoperationen, Redis oder anderen Netzwerkdiensten begleitet , usw. Wenn die Geschäftsanforderung solche zeitaufwändigen E/A-Vorgänge enthält, wird empfohlen, die Aufgabe der Verarbeitung der Geschäftsanforderung einem unabhängigen Thread-Pool zuzuweisen, da sonst die eigenen Threads von netty blockiert werden können.
Arbeitsteilung zwischen dem Empfangsanforderungsthread und dem Arbeitsthread
Der Empfangsanforderungsthread ist Hauptverantwortlich für die Erstellung des Links und anschließendes Delegieren der Anforderung an den Arbeitsthread
Der Arbeitsthread ist für die Kodierung, Dekodierung, das Lesen von E/A und andere Vorgänge verantwortlich
Derzeit I Der implementierte RPC übernimmt den Multi-Receiver- und Multi-Worker-Thread-Modus. Die Ports sind auf der Serverseite wie folgt gebunden:
public void bind(ServiceConfig serviceConfig) {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(this.rpcServerInitializer)
.childOption(ChannelOption.SO_KEEPALIVE,true)
;try {ChannelFuture channelFuture = bootstrap.bind(serviceConfig.getHost(),serviceConfig.getPort()).sync();//...channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {throw new RpcException(e);
}
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
boosGroup ist eine Gruppe, die zum Empfangen von Serviceanfragen verwendet wird
workerGroup ist eine Gruppe, die speziell für IO-Operationen verantwortlich ist
Um Geschäftsthreads hinzuzufügen, müssen Sie nur weitermachen Delegieren Sie die Handle-Operationen an den Thread-Pool. Zur Erweiterung muss hier eine Schnittstelle definiert werden:
public interface RpcThreadPool {Executor getExecutor(int threadSize,int queues);
}
Referenzierter Dubbo-Thread-Pool
@Qualifier("fixedRpcThreadPool")@Componentpublic class FixedRpcThreadPool implements RpcThreadPool {private Executor executor;@Overridepublic Executor getExecutor(int threadSize,int queues) {if(null==executor) {synchronized (this) {if(null==executor) {
executor= new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //...}
});
}
}
}return executor;
}
}
Zwischenspiel:
Ich erinnere mich, dass einmal ein Freund plötzlich nach der Kerngröße fragte im Java-Thread-Pool gemeint? Ich war plötzlich kurzgeschlossen, weil ich normalerweise kein Multithreading schreibe. Wenn ich an den Datenbank-Thread-Pool denke, den ich normalerweise häufig verwende, bin ich von den darin enthaltenen Parametern ziemlich beeindruckt, kann mich aber einfach nicht erinnern coreSize. Später habe ich mir einige Parameter des Thread-Pools genauer angesehen. Jetzt kann ich die Gelegenheit nutzen, genauer hinzuschauen, um einen erneuten Kurzschluss zu vermeiden.
Wenn mehrere Thread-Pool-Implementierungen vorhanden sind, wird der Thread-Pool dynamisch anhand des Thread-Pool-Namens ausgewählt.
@Componentpublic class RpcThreadPoolFactory {@Autowiredprivate Map<String,RpcThreadPool> rpcThreadPoolMap;public RpcThreadPool getThreadPool(String threadPoolName){return this.rpcThreadPoolMap.get(threadPoolName);
}
}
Verpacken Sie den Methodenkörper in eine Aufgabe und übergeben Sie ihn zur Ausführung an den Thread-Pool.
@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) {this.executor.execute(new Runnable() {@Overridepublic void run() {RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this);RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation(rpcRequest));
channelHandlerContext.writeAndFlush(response);
}
});
}
Es fehlen derzeit Stresstests, daher gibt es noch keinen eindeutigen Datenvergleich.
Das obige ist der detaillierte Inhalt vonDetaillierte Beispiele des Netty-Threading-Modells. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!