需求场景:
智能家居网关(以下简称gateway),需要和netty服务器通讯(以下简称netty),netty和gateway之间需要保持长连接(换句话说,netty和gateway之间都会主动给对方发送消息)
碰到的问题:
netty作为服务器端如何主动的向gateway发送消息,我尝试当每个gateway连接到netty(TCP/IP)时使用一个map把该channelSocket的id和该channelSocket绑定在一起
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String uuid = ctx.channel().id().asLongText();
GatewayService.addGatewayChannel(uuid, (SocketChannel)ctx.channel());
System.out.println("a new connect come in: " + uuid);
}
GatewayService其实就是一个ConcurrentHashMap
public class GatewayService {
private static Map<String, SocketChannel> map = new ConcurrentHashMap<>();
public static void addGatewayChannel(String id, SocketChannel gateway_channel){
map.put(id, gateway_channel);
}
public static Map<String, SocketChannel> getChannels(){
return map;
}
public static SocketChannel getGatewayChannel(String id){
return map.get(id);
}
public static void removeGatewayChannel(String id){
map.remove(id);
}
}
我在服务器端尝试每间隔一段时间loop这个ConcurrentHashMap如果里面已经有绑定的channelSocket,就使用write方法向客户端发送消息
Runnable sendTask = new Runnable() {
@Override
public void run() {
sendTaskLoop:
for(;;){
System.out.println("task is beginning...");
try{
Map<String, SocketChannel> map = GatewayService.getChannels();
Iterator<String> it = map.keySet().iterator();
while (it.hasNext()) {
String key = it.next();
SocketChannel obj = map.get(key);
System.out.println("channel id is: " + key);
System.out.println("channel: " + obj.isActive());
obj.writeAndFlush("hello, it is Server test header ping");
}
}catch(Exception e){break sendTaskLoop;}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
new Thread(sendTask).start();
理论上客户端应该是可以接受到我发送的消息,但是我观察了一下源代码,发现writeAndFlush这个方法最终会被handler触发,于是我又在handler中覆写了write方法
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("write handler");
ctx.writeAndFlush(msg);
}
可是最终结果客户端并没有收到任何消息,请问netty如何主动向客户端发送消息?
阿神2017-04-18 09:05:28
It can be roughly guessed that the server does not have an encoder, so it cannot be sent out at all.
From the posted code, you can see that there are two ways to send messages to the client:
obj.writeAndFlush("hello, it is Server test header ping");
and
ctx.writeAndFlush(msg);//Object
In netty, everything comes and goes ByteBuf
,楼主应确定服务端是否有对应的编码器,将字符串转化为ByteBuf
.
巴扎黑2017-04-18 09:05:28
GatewayService does not need to be forced to convert to SocketChannel type when saving the channel.
There is no problem in getting the channel and doing writeAndFlush.
In order to further troubleshoot the problem, you can do this,
1. Monitor the Promise result of ctx.writeAndFlush(msg);, such as
ctx.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// do sth
} else {
// do sth
}
}
});
2. Pay attention to whether the encoder and decoder on the client and server are the same
天蓬老师2017-04-18 09:05:28
It is recommended to use ChannelGroup to manage clients. The reason why the leader server cannot push data may be because there is no encoder. ctx.writeAndFlush cannot directly write string types.
PHPz2017-04-18 09:05:28
Hello, your code can be modified like this:
Handler’s code is modified as follows:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String uuid = ctx.channel().id().asLongText();
GatewayService.addGatewayChannel(uuid, ctx);
logger.info("a new connect come in: " + uuid);
logger.info("a new connect come in remote ip: " + ctx.channel().remoteAddress());
}
GatewayService:
public class GatewayService {
private static Map<String, Object> map = new ConcurrentHashMap();
public static void addGatewayChannel(String id, ChannelHandlerContext gateway_ctx){
map.put(id, gateway_ctx);
}
public static Map<String, Object> getChannels(){
return map;
}
public static Object getGatewayChannel(String id){
return map.get(id);
}
public static void removeGatewayChannel(String id){
map.remove(id);
}
}
The code for traversing the client is as follows:
public class TimeTask implements Runnable{
private static final Log logger = LogFactory.getLog(TcpCMDHandler.class);
public static final String CHARSET = "UTF-8";
@Override
public void run() {
sendTaskLoop:
for(;;){
System.out.println("task is beginning...");
try{
Map<String, Object> map = GatewayService.getChannels();
Iterator<String> it = map.keySet().iterator();
while (it.hasNext()) {
String key = it.next();
ChannelHandlerContext ctx = (ChannelHandlerContext)map.get(key);
logger.info("channel id is: " + key);
logger.info("channel: " + ctx.channel().isActive());
String bak = "hello, every one!";
byte[] responB = bak.getBytes(CHARSET);
// ByteBuf response = Unpooled.buffer(responB.length);
final ByteBuf time = ctx.alloc().buffer(responB.length);
time.writeBytes(responB);
ctx.writeAndFlush(time); // (3)
// obj.writeAndFlush("hello, it is Server test header ping");
}
}catch(Exception e){
break
sendTaskLoop;
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
That’s it. I personally tested it, it works