recherche

Maison  >  Questions et réponses  >  le corps du texte

java - netty如何实现向客户端主动发送消息

需求场景:
智能家居网关(以下简称gateway),需要和netty服务器通讯(以下简称netty),netty和gateway之间需要保持长连接(换句话说,netty和gateway之间都会主动给对方发送消息)

碰到的问题:
netty作为服务器端如何主动的向gateway发送消息,我尝试当每个gateway连接到netty(TCP/IP)时使用一个map把该channelSocket的id和该channelSocket绑定在一起

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String uuid = ctx.channel().id().asLongText();
        GatewayService.addGatewayChannel(uuid, (SocketChannel)ctx.channel());
        System.out.println("a new connect come in: " + uuid);
    }

GatewayService其实就是一个ConcurrentHashMap

public class GatewayService {
    
    private static Map<String, SocketChannel> map = new ConcurrentHashMap<>();
    
    public static void addGatewayChannel(String id, SocketChannel gateway_channel){
        map.put(id, gateway_channel);
    }
    
    public static Map<String, SocketChannel> getChannels(){
        return map;
    }

    public static SocketChannel getGatewayChannel(String id){
        return map.get(id);
    }
    
    public static void removeGatewayChannel(String id){
        map.remove(id);
    }
}

我在服务器端尝试每间隔一段时间loop这个ConcurrentHashMap如果里面已经有绑定的channelSocket,就使用write方法向客户端发送消息

Runnable sendTask = new Runnable() {
            @Override
            public void run() {
                sendTaskLoop:
                    for(;;){
                        System.out.println("task is beginning...");
                        try{
                            Map<String, SocketChannel> map = GatewayService.getChannels();
                            Iterator<String> it = map.keySet().iterator();
                            while (it.hasNext()) {
                                String key = it.next();
                                SocketChannel obj = map.get(key);
                                System.out.println("channel id is: " + key);
                                System.out.println("channel: " + obj.isActive());
                                obj.writeAndFlush("hello, it is Server test header ping");
                            }
                        }catch(Exception e){break sendTaskLoop;}
                        try {
                            Thread.sleep(5000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
            }
        };
        new Thread(sendTask).start();

理论上客户端应该是可以接受到我发送的消息,但是我观察了一下源代码,发现writeAndFlush这个方法最终会被handler触发,于是我又在handler中覆写了write方法

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("write handler");
        ctx.writeAndFlush(msg);
    }

可是最终结果客户端并没有收到任何消息,请问netty如何主动向客户端发送消息?

ringa_leeringa_lee2818 Il y a quelques jours890

répondre à tous(4)je répondrai

  • 阿神

    阿神2017-04-18 09:05:28

    On peut à peu près deviner que le serveur n'a pas d'encodeur, il ne peut donc pas être envoyé du tout
    D'après le code publié, vous pouvez voir qu'il existe deux façons d'envoyer des messages au client :

    obj.writeAndFlush("hello, it is Server test header ping");

    et

    ctx.writeAndFlush(msg);//Object

    Dans netty, toutes les entrées et sorties sont ByteBuf L'auteur doit confirmer si le serveur dispose d'un encodeur correspondant et convertir la chaîne en ByteBuf.

    répondre
    0
  • 巴扎黑

    巴扎黑2017-04-18 09:05:28

    GatewayService n'a pas besoin d'être forcé à se convertir en type SocketChannel lors de l'enregistrement du canal
    Il n'y a aucun problème pour obtenir le canal et exécuter writeAndFlush.
    Afin de résoudre davantage le problème, vous pouvez procéder comme suit :
    1. Surveiller le résultat de la promesse de ctx.writeAndFlush(msg);, par exemple

    .
    ctx.writeAndFlush(msg).addListener(new ChannelFutureListener() {
    
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                // do sth
            } else {
                // do sth
            }
        }
    });

    2. Faites attention à ce que l'encodeur et le décodeur des deux côtés du client et du serveur soient identiques

    répondre
    0
  • 天蓬老师

    天蓬老师2017-04-18 09:05:28

    Il est recommandé d'utiliser ChannelGroup pour gérer les clients. La raison pour laquelle le serveur leader ne peut pas transmettre de données peut être due au fait qu'il n'y a pas d'encodeur. ctx.writeAndFlush ne peut pas écrire directement les types de chaîne.

    répondre
    0
  • PHPz

    PHPz2017-04-18 09:05:28

    Bonjour, votre code peut être modifié comme ceci :
    Le code du handler est modifié comme suit :
    @Override

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String uuid = ctx.channel().id().asLongText();
        GatewayService.addGatewayChannel(uuid, ctx);
        logger.info("a new connect come in: " + uuid);
        logger.info("a new connect come in remote ip: " + ctx.channel().remoteAddress());
    }
    
    

    GatewayService :

    classe publique GatewayService {

    private static Map<String, Object> map = new ConcurrentHashMap();
    public static void addGatewayChannel(String id, ChannelHandlerContext gateway_ctx){
        map.put(id, gateway_ctx);
    }
    public static Map<String, Object> getChannels(){
        return map;
    }
    
    public static Object getGatewayChannel(String id){
        return map.get(id);
    }
    
    public static void removeGatewayChannel(String id){
        map.remove(id);
    }

    }

    Le code pour parcourir le client est le suivant :
    la classe publique TimeTask implémente Runnable{

    private static final Log logger = LogFactory.getLog(TcpCMDHandler.class);
    public static final String CHARSET = "UTF-8";
    @Override
    public void run() {
        sendTaskLoop:
        for(;;){
            System.out.println("task is beginning...");
            try{
                Map<String, Object> map = GatewayService.getChannels();
                Iterator<String> it = map.keySet().iterator();
                while (it.hasNext()) {
                    String key = it.next();
                    ChannelHandlerContext ctx = (ChannelHandlerContext)map.get(key);
                    logger.info("channel id is: " + key);
                    logger.info("channel: " + ctx.channel().isActive());
    
                    String bak = "hello, every one!";
                    byte[] responB = bak.getBytes(CHARSET);
        //            ByteBuf response = Unpooled.buffer(responB.length);
    
                    final ByteBuf time = ctx.alloc().buffer(responB.length);
                    time.writeBytes(responB);
    
                    ctx.writeAndFlush(time); // (3)
    

    // obj.writeAndFlush("bonjour, c'est le ping de l'en-tête de test du serveur");

                }
            }catch(Exception e){
                break
                sendTaskLoop;
            }
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    }

    C'est tout. Je l'ai personnellement testé et ça marche

    répondre
    0
  • Annulerrépondre