搜尋
首頁web前端html教學使用Protostuff序列化_html/css_WEB-ITnose

rpc调用,有多种序列化的方式,通用如json,mongodb使用的bson;java方面的,比如Java默认的序列化,比如hessian;还有跨语言的,比如thrift、protocolbuf。thrift和pb的好处是序列化后size比较小,但是缺点是得生成java代码,这个挺鸡肋的,所以不管二者运行时效率有多高,开发效率相对比较低的。像hessian,是有一些在用,但是感觉不如pb那样强大。所以也一直在寻找运行效率与开发效率兼得的序列化方式。偶尔在网上看到protostuff,觉得找到了一直在找的这种序列化方式。

protostuff简介

protobuf的一个缺点是需要数据结构的预编译过程,首先要编写.proto格式的配置文件,再通过protobuf提供的工具生成各种语言响应的代码。由于java具有反射和动态代码生成的能力,这个预编译过程不是必须的,可以在代码执行时来实现。有 protostuff已经实现了这个功能。

protostuff效率

  • Ser Time+Deser Time (ns)

  • Size, Compressed size [light] in bytes

使用

pom依赖

        <dependency>            <groupId>com.dyuproject.protostuff</groupId>            <artifactId>protostuff-core</artifactId>            <version>1.0.8</version>        </dependency>        <dependency>            <groupId>com.dyuproject.protostuff</groupId>            <artifactId>protostuff-runtime</artifactId>            <version>1.0.8</version>        </dependency>

工具类

public class SerializationUtil {    private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();    private static Objenesis objenesis = new ObjenesisStd(true);    private static <T> Schema<T> getSchema(Class<T> clazz) {        @SuppressWarnings("unchecked")        Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);        if (schema == null) {            schema = RuntimeSchema.getSchema(clazz);            if (schema != null) {                cachedSchema.put(clazz, schema);            }        }        return schema;    }    /**     * 序列化     *     * @param obj     * @return     */    public static <T> byte[] serializer(T obj) {        @SuppressWarnings("unchecked")        Class<T> clazz = (Class<T>) obj.getClass();        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);        try {            Schema<T> schema = getSchema(clazz);            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);        } catch (Exception e) {            throw new IllegalStateException(e.getMessage(), e);        } finally {            buffer.clear();        }    }    /**     * 反序列化     *     * @param data     * @param clazz     * @return     */    public static <T> T deserializer(byte[] data, Class<T> clazz) {        try {            T obj = objenesis.newInstance(clazz);            Schema<T> schema = getSchema(clazz);            ProtostuffIOUtil.mergeFrom(data, obj, schema);            return obj;        } catch (Exception e) {            throw new IllegalStateException(e.getMessage(), e);        }    }}

基于netty的rpc

  • NettyServer

public class NettyServer {    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);    private int ioThreadNum;    //内核为此套接口排队的最大连接个数,对于给定的监听套接口,内核要维护两个队列,未链接队列和已连接队列大小总和最大值    private int backlog;    private int port;    private Channel channel;    private EventLoopGroup bossGroup;    private EventLoopGroup workerGroup;    public NettyServer(int ioThreadNum, int backlog, int port) {        this.ioThreadNum = ioThreadNum;        this.backlog = backlog;        this.port = port;    }    public void start() throws InterruptedException {        bossGroup = new NioEventLoopGroup();        workerGroup = new NioEventLoopGroup(this.ioThreadNum);        final Map<String,Object> demoService = new HashMap<String, Object>();        demoService.put("com.patterncat.service.HelloService", new HelloServiceImpl());        ServerBootstrap serverBootstrap = new ServerBootstrap();        serverBootstrap.group(bossGroup, workerGroup)                .channel(NioServerSocketChannel.class)                .option(ChannelOption.SO_BACKLOG, backlog)                //注意是childOption                .childOption(ChannelOption.SO_KEEPALIVE, true)                .childOption(ChannelOption.TCP_NODELAY, true)                .childHandler(new ChannelInitializer<SocketChannel>() {                    @Override                    protected void initChannel(SocketChannel socketChannel) throws Exception {                        socketChannel.pipeline()                                .addLast(new RpcDecoder(RpcRequest.class))                                .addLast(new RpcEncoder(RpcResponse.class))                                .addLast(new ServerRpcHandler(demoService));                    }                });        channel = serverBootstrap.bind("127.0.0.1",port).sync().channel();        logger.info("NettyRPC server listening on port "+ port + " and ready for connections...");        Runtime.getRuntime().addShutdownHook(new Thread(){            @Override            public void run(){                //do shutdown staff            }        });    }    public void stop() {        if (null == channel) {            throw new ServerStopException();        }        bossGroup.shutdownGracefully();        workerGroup.shutdownGracefully();        channel.closeFuture().syncUninterruptibly();        bossGroup = null;        workerGroup = null;        channel = null;    }}
  • ServerRpcHandler

public class ServerRpcHandler extends SimpleChannelInboundHandler<RpcRequest> {    private static final Logger logger = LoggerFactory.getLogger(ServerRpcHandler.class);    private final Map<String, Object> serviceMapping;    public ServerRpcHandler(Map<String, Object> serviceMapping) {        this.serviceMapping = serviceMapping;    }    @Override    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception {        RpcResponse response = new RpcResponse();        response.setTraceId(rpcRequest.getTraceId());        try {            logger.info("server handle request:{}",rpcRequest);            Object result = handle(rpcRequest);            response.setResult(result);        } catch (Throwable t) {            response.setError(t);        }        channelHandlerContext.writeAndFlush(response);    }    private Object handle(RpcRequest request) throws Throwable {        String className = request.getClassName();        Object serviceBean = serviceMapping.get(className);        Class<?> serviceClass = serviceBean.getClass();        String methodName = request.getMethodName();        Class<?>[] parameterTypes = request.getParameterTypes();        Object[] parameters = request.getParameters();        FastClass serviceFastClass = FastClass.create(serviceClass);        FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);        return serviceFastMethod.invoke(serviceBean, parameters);    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        logger.error(cause.getMessage(), cause);        RpcResponse response = new RpcResponse();        if(cause instanceof ServerException){            response.setTraceId(((ServerException) cause).getTraceId());        }        response.setError(cause);        ctx.writeAndFlush(response);    }}
  • NettyClient

public class NettyClient implements IClient {    private EventLoopGroup workerGroup;    private Channel channel;    private int workerGroupThreads;    private ClientRpcHandler clientRpcHandler;    private final Optional<Pair<Long,TimeUnit>> NO_TIMEOUT = Optional.<Pair<Long,TimeUnit>>absent();    public NettyClient(int workerGroupThreads) {        this.workerGroupThreads = workerGroupThreads;    }    public void connect(InetSocketAddress socketAddress) {        workerGroup = new NioEventLoopGroup(workerGroupThreads);        clientRpcHandler = new ClientRpcHandler();        Bootstrap bootstrap = new Bootstrap();        bootstrap                .group(workerGroup)                .channel(NioSocketChannel.class)                .option(ChannelOption.SO_KEEPALIVE, true)                .option(ChannelOption.TCP_NODELAY, true)                .handler(new ChannelInitializer<SocketChannel>() {                    @Override                    protected void initChannel(SocketChannel ch) throws Exception {                        ch.pipeline()                                .addLast(new RpcDecoder(RpcResponse.class))                                .addLast(new RpcEncoder(RpcRequest.class))                                .addLast(clientRpcHandler);                    }                });        channel = bootstrap.connect(socketAddress.getAddress().getHostAddress(), socketAddress.getPort())                .syncUninterruptibly()                .channel();    }    public RpcResponse syncSend(RpcRequest request) throws InterruptedException {        System.out.println("send request:"+request);        channel.writeAndFlush(request).sync();        return clientRpcHandler.send(request,NO_TIMEOUT);    }    public RpcResponse asyncSend(RpcRequest request,TimeUnit timeUnit,long timeout) throws InterruptedException {        channel.writeAndFlush(request);        return clientRpcHandler.send(request, Optional.of(Pair.of(timeout,timeUnit)));    }    public InetSocketAddress getRemoteAddress() {        SocketAddress remoteAddress = channel.remoteAddress();        if (!(remoteAddress instanceof InetSocketAddress)) {            throw new RuntimeException("Get remote address error, should be InetSocketAddress");        }        return (InetSocketAddress) remoteAddress;    }    public void close() {        if (null == channel) {            throw new ClientCloseException();        }        workerGroup.shutdownGracefully();        channel.closeFuture().syncUninterruptibly();        workerGroup = null;        channel = null;    }}
  • ClientRpcHandler

@ChannelHandler.Sharablepublic class ClientRpcHandler extends SimpleChannelInboundHandler<RpcResponse> {    //用blocking queue主要是用阻塞的功能,省的自己加锁    private final ConcurrentHashMap<String, BlockingQueue<RpcResponse>> responseMap = new ConcurrentHashMap<String, BlockingQueue<RpcResponse>>();    //messageReceived    @Override    protected void channelRead0(ChannelHandlerContext ctx, RpcResponse rpcResponse) throws Exception {        System.out.println("receive response:"+rpcResponse);        BlockingQueue<RpcResponse> queue = responseMap.get(rpcResponse.getTraceId());        queue.add(rpcResponse);    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        super.exceptionCaught(ctx, cause);        cause.printStackTrace();    }    public RpcResponse send(RpcRequest request,Optional<Pair<Long,TimeUnit>> timeout) throws InterruptedException {        responseMap.putIfAbsent(request.getTraceId(), new LinkedBlockingQueue<RpcResponse>(1));        RpcResponse response = null;        try {            BlockingQueue<RpcResponse> queue = responseMap.get(request.getTraceId());            if(timeout == null || !timeout.isPresent()){                response = queue.take();            }else{                response = queue.poll(timeout.get().getKey(),timeout.get().getValue());            }        } finally {            responseMap.remove(request.getTraceId());        }        return response;    }}
  • decoder

public class RpcDecoder extends ByteToMessageDecoder {    private Class<?> genericClass;    public RpcDecoder(Class<?> genericClass) {        this.genericClass = genericClass;    }    @Override    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {        if (byteBuf.readableBytes() < 4) {            return;        }        byteBuf.markReaderIndex();        int dataLength = byteBuf.readInt();        if (dataLength < 0) {            channelHandlerContext.close();        }        if (byteBuf.readableBytes() < dataLength) {            byteBuf.resetReaderIndex();        }        byte[] data = new byte[dataLength];        byteBuf.readBytes(data);        Object obj = SerializationUtil.deserializer(data, genericClass);        list.add(obj);    }}
  • encoder

public class RpcEncoder extends MessageToByteEncoder {    private Class<?> genericClass;    public RpcEncoder(Class<?> genericClass) {        this.genericClass = genericClass;    }    @Override    protected void encode(ChannelHandlerContext channelHandlerContext, Object obj, ByteBuf byteBuf) throws Exception {        if (genericClass.isInstance(obj)) {            byte[] data = SerializationUtil.serializer(obj);            byteBuf.writeInt(data.length);            byteBuf.writeBytes(data);        }    }}

工程源码 protocol-demo

参考

  • jvm-serializers

  • protostuff

  • java序列化/反序列化之xstream、protobuf、protostuff 的比较与使用例子

  • Protostuff序列化

  • protostuff介绍

  • Protostuff详解

  • 序列化框架 kryo VS hessian VS Protostuff VS java

  • Protostuff序列化和反序列化

  • eishay/jvm-serializers

  • Protostuff 序列化

  • 使用Netty实现多路复用的client

陳述
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
超越HTML:網絡開發的基本技術超越HTML:網絡開發的基本技術Apr 26, 2025 am 12:04 AM

要構建一個功能強大且用戶體驗良好的網站,僅靠HTML是不夠的,還需要以下技術:JavaScript賦予網頁動態和交互性,通過操作DOM實現實時變化。 CSS負責網頁的樣式和佈局,提升美觀度和用戶體驗。現代框架和庫如React、Vue.js和Angular,提高開發效率和代碼組織結構。

HTML中的布爾屬性是什麼?舉一些例子。HTML中的布爾屬性是什麼?舉一些例子。Apr 25, 2025 am 12:01 AM

布爾屬性是HTML中的特殊屬性,不需要值即可激活。 1.布爾屬性通過存在與否控制元素行為,如disabled禁用輸入框。 2.它們的工作原理是瀏覽器解析時根據屬性的存在改變元素行為。 3.基本用法是直接添加屬性,高級用法可通過JavaScript動態控制。 4.常見錯誤是誤以為需要設置值,正確寫法應簡潔。 5.最佳實踐是保持代碼簡潔,合理使用布爾屬性以優化網頁性能和用戶體驗。

如何驗證您的HTML代碼?如何驗證您的HTML代碼?Apr 24, 2025 am 12:04 AM

HTML代碼可以通過在線驗證器、集成工具和自動化流程來確保其清潔度。 1)使用W3CMarkupValidationService在線驗證HTML代碼。 2)在VisualStudioCode中安裝並配置HTMLHint擴展進行實時驗證。 3)利用HTMLTidy在構建流程中自動驗證和清理HTML文件。

HTML與CSS和JavaScript:比較Web技術HTML與CSS和JavaScript:比較Web技術Apr 23, 2025 am 12:05 AM

HTML、CSS和JavaScript是構建現代網頁的核心技術:1.HTML定義網頁結構,2.CSS負責網頁外觀,3.JavaScript提供網頁動態和交互性,它們共同作用,打造出用戶體驗良好的網站。

HTML作為標記語言:其功能和目的HTML作為標記語言:其功能和目的Apr 22, 2025 am 12:02 AM

HTML的功能是定義網頁的結構和內容,其目的在於提供一種標準化的方式來展示信息。 1)HTML通過標籤和屬性組織網頁的各個部分,如標題和段落。 2)它支持內容與表現分離,提升維護效率。 3)HTML具有可擴展性,允許自定義標籤增強SEO。

HTML,CSS和JavaScript的未來:網絡開發趨勢HTML,CSS和JavaScript的未來:網絡開發趨勢Apr 19, 2025 am 12:02 AM

HTML的未來趨勢是語義化和Web組件,CSS的未來趨勢是CSS-in-JS和CSSHoudini,JavaScript的未來趨勢是WebAssembly和Serverless。 1.HTML的語義化提高可訪問性和SEO效果,Web組件提升開發效率但需注意瀏覽器兼容性。 2.CSS-in-JS增強樣式管理靈活性但可能增大文件體積,CSSHoudini允許直接操作CSS渲染。 3.WebAssembly優化瀏覽器應用性能但學習曲線陡,Serverless簡化開發但需優化冷啟動問題。

HTML:結構,CSS:樣式,JavaScript:行為HTML:結構,CSS:樣式,JavaScript:行為Apr 18, 2025 am 12:09 AM

HTML、CSS和JavaScript在Web開發中的作用分別是:1.HTML定義網頁結構,2.CSS控製網頁樣式,3.JavaScript添加動態行為。它們共同構建了現代網站的框架、美觀和交互性。

HTML的未來:網絡設計的發展和趨勢HTML的未來:網絡設計的發展和趨勢Apr 17, 2025 am 12:12 AM

HTML的未來充滿了無限可能。 1)新功能和標準將包括更多的語義化標籤和WebComponents的普及。 2)網頁設計趨勢將繼續向響應式和無障礙設計發展。 3)性能優化將通過響應式圖片加載和延遲加載技術提升用戶體驗。

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

MinGW - Minimalist GNU for Windows

MinGW - Minimalist GNU for Windows

這個專案正在遷移到osdn.net/projects/mingw的過程中,你可以繼續在那裡關注我們。 MinGW:GNU編譯器集合(GCC)的本機Windows移植版本,可自由分發的導入函式庫和用於建置本機Windows應用程式的頭檔;包括對MSVC執行時間的擴展,以支援C99功能。 MinGW的所有軟體都可以在64位元Windows平台上運作。

PhpStorm Mac 版本

PhpStorm Mac 版本

最新(2018.2.1 )專業的PHP整合開發工具

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器