首页 >后端开发 >Golang >proto:无法解析无效的有线格式数据

proto:无法解析无效的有线格式数据

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB转载
2024-02-06 10:57:041092浏览

proto:无法解析无效的有线格式数据

问题内容

我是 protobufs 的新手,目前正在编写一个从 nats 服务器读取数据的客户端。从 nats 服务器发送的数据是 protobuf。

我正在编写的客户端是用 go 编写的。这是我编写的 .proto 文件:

syntax = "proto3";

package execution;

option go_package = "./protos/execution";

enum orderstatus {
  working = 0;
  rejected = 1;
  cancelled = 2;
  completed = 3;
}

enum ordertype {
  limit = 0;
  market = 1;
  stoplimit = 2;
  stopmarket = 3;
}

enum orderside {
  buy = 0;
  sell = 1;
}

enum rejectreason {
  norejection = 0;
  instrumentnotfound = 1;
  ordernotfound = 2;
  invalidordertype = 3;
  invalidaccount = 4;
  invalidside = 5;
  invalidamount = 6;
  invalidlimitprice = 7;
  invalidquotelimit = 8;
  invalidactivationprice = 9;
  invalidtimeinforce = 10;
  markethalted = 11;
  marketpaused = 12;
  nocounterorders = 13;
  missingexpirationtime = 14;
  incorrectexpirationtime = 15;
  internalerror = 16;
  illegalstatusswitch = 17;
  orderalreadyexists = 18;
  instrumentnotready = 19;
  externalsystemerror = 20;
}

enum reportcause {
  none = 0;
  neworder = 1;
  cancelorder = 2;
  masscancel = 3;
  expiration = 4;
  trigger = 5;
  marketstatuschange = 6;
}

enum timeinforce {
  goodtillcancel = 0;
  immediateorcancel = 1;
  fillorkill = 2;
}

enum cancelreason {
  notcancelled = 0;
  cancelledbytrader = 1;
  cancelledbysystem = 2;
  selfmatchprevention = 3;
  ordertimeinforce = 4;
  liquidation = 100;
}


message tradedata {
  int64 tradeid = 1;
  string amount = 4;
  string executionprice = 5;
  orderstatus orderstatus = 7;
  int64 accountid = 11;
  string matchedorderexternalid = 14;
  int64 matchedorderid = 16;
  string remainingamount = 17;
}

message execution {
  string origin = 4;
  orderside side = 7;
  string requestedprice = 8;
  string requestedamount = 9;
  string remainingamount = 10;
  int64 executedat = 13;
  orderstatus orderstatus = 14;
  repeated tradedata trades = 16;
  ordertype ordertype = 20;
  int64 version = 22;
  int64 accountid = 23;
  rejectreason rejectreason = 25;
  reportcause reportcause = 26;
  string instructionid = 27;
  string externalorderid = 28;
  int32 executionenginemarketid = 29;
  int64 orderid = 30;
  cancelreason cancelreason = 31;
  int64 txid = 32;
  timeinforce timeinforce = 34;
  string cancelledby = 35;
}

发布服务器是用 c# 编写的,其原始消息的代码如下:

[protocontract]
    public class executionreport : imarketresponse, iinstructionmessage, iordermatcherresponse
    {
        [protoignore]
        feedmessagetype ifeedmessage.type => feedmessagetype.executionreport;

        // resharper disable fieldcanbemadereadonly.global
        [protomember(4)] public string origin;
        [protomember(7)] public orderside side;
        [protomember(8)] public decimal requestedprice;
        [protomember(9)] public decimal requestedamount;
        [protomember(10)] public decimal remainingamount;
        [protomember(13)] public long executedat;
        [protomember(14)] public orderstatus orderstatus;
        [protomember(16)] public list<tradedata> trades = new list<tradedata>();
        [protomember(20)] public ordertype ordertype;
        [protomember(22)] public long version { get; set; }
        [protomember(23)] public long accountid;
        [protomember(25)] public rejectreason rejectreason;
        [protomember(26)] public reportcause reportcause;
        [protomember(27)] public guid instructionid { get; set; }
        [protomember(28)] public guid externalorderid;
        [protomember(29)] public int executionenginemarketid { get; set; }
        [protomember(30)] public long orderid;
        [protomember(31)] public cancelreason cancelreason;
        [protomember(32)] public long txid;
        [protomember(34)] public timeinforce timeinforce;
        [protomember(35)] public string cancelledby;
    }

[protocontract]
    [structlayout(layoutkind.sequential)]
    public struct tradedata
    {
        [protomember(1)] public long tradeid;
        [protomember(4)] public decimal amount;
        [protomember(5)] public decimal executionprice;
        [protomember(7)] public orderstatus orderstatus;
        [protomember(11)] public long accountid;
        [protomember(14)] public guid matchedorderexternalid;
        [protomember(16)] public long matchedorderid;
        [protomember(17)] public decimal remainingamount;
    }

在尝试解组数据时出现此错误

proto: cannot parse invalid wire-format data

这就是我解析数据的方式:

_, err = sc.subscribe("exec", func(m *stan.msg) {
varr := &protos.execution{}
err = proto.unmarshal(m.data, varr)
if err != nil {
    fmt.printf("err unmarshalling!: %v\n\n", err.error())
} else {
    fmt.printf("received a message: %+v\n", varr)
}

我从服务器接收到的示例字节数据:

[5 85 0 0 0 56 1 66 3 8 144 78 74 2 8 1 82 2 8 1 104 197 192 132 194 159 143 219 237 8 176 1 25 184 1 11 208 1 1 218 1 18 9 133 66 138 247 239 67 93 77 17 176 192 189 75 170 203 186 145 226 1 18 9 133 66 138 247 239 67 93 77 17 176 192 189 75 170 203 186 145 232 1 1 240 1 25 128 2 25]

添加更多详细信息:

这就是 c# 发送数据的方式:

public async task sendasync(ifeedmessage msg)
{
    var subject = feedsubject.formessage(msg);
    var data = msg.serializetoarray();
    using (_metrics.feedsendlatency.start(new metrictags("subject", subject.value)))
    {
        await _connection.publishasync(subject, data);
    }
}

这是feedmessage的结构(executionreport也间接继承它)

public interface ifeedmessage
{
    feedmessagetype type { get; }
    ifeedmessage clone();
    void reset();
}

这就是 serializetoarray() 的工作原理:

public static ArraySegment<byte> SerializeToArray(this IFeedMessage message)
{
    return message.SerializeToMemory(new MemoryStream());
}

public static ArraySegment<byte> SerializeToMemory(this IFeedMessage message, MemoryStream stream)
{
    var start = stream.Position;
    message.Serialize(stream);
    return new ArraySegment<byte>(stream.GetBuffer(), (int)start, (int)(stream.Position - start));
}

public static void Serialize(this IFeedMessage message, Stream stream)
{
    stream.WriteByte((byte)message.Type);
    RuntimeTypeModel.Default.SerializeWithLengthPrefix(stream, message, message.GetType(), PrefixStyle.Fixed32, 0);
}

不知道具体原因是什么。但我写的proto文件似乎是错误的。我浏览了几篇面临相同错误的帖子,但大多数都没有解决相同的问题。如果需要任何其他详细信息,请告诉我。

请帮我解决这个问题。


正确答案


根据评论中的讨论,我成功地整理了数据。

注释

  1. 数据以 5 个字节为前缀(这是完全没有必要的):
    • 消息类型 1 个字节
    • 数据长度为 4 个字节
  2. c# 实现使用特定于 c# 的 decimalguid 数据类型。 (正如 bcl.proto 中所评论的,跨平台代码通常应该完全避免它们)。

这是文件夹结构:

├── bcl.proto
├── execution.proto
├── go.mod
├── go.sum
├── main.go
└── protos
    ├── bcl.pb.go
    └── execution.pb.go

bcl.proto

此文件是从 github 复制的。 com/protobuf-net/protobuf-net。这是必需的,因为 .net 实现使用此原始文件中的 decimalguid

// the types in here indicate how protobuf-net represents certain types when using protobuf-net specific
// library features. note that it is not *required* to use any of these types, and cross-platform code
// should usually avoid them completely (ideally starting from a .proto schema)

// some of these are ugly, sorry. the timespan / datetime dates here pre-date the introduction of timestamp
// and duration, and the "well known" types should be preferred when possible. guids are particularly
// awkward - it turns out that there are multiple guid representations, and i accidentally used one that
// i can only call... "crazy-endian". just make sure you check the order!

// it should not be necessary to use bcl.proto from code that uses protobuf-net

syntax = "proto3";

option csharp_namespace = "protobuf.bcl";
option go_package = "./protos";

package bcl;

message timespan {
  sint64 value = 1; // the size of the timespan (in units of the selected scale)
  timespanscale scale = 2; // the scale of the timespan [default = days]
  enum timespanscale {
    days = 0;
    hours = 1;
    minutes = 2;
    seconds = 3;
    milliseconds = 4;
    ticks = 5;

    minmax = 15; // dubious
  }
}

message datetime {
  sint64 value = 1; // the offset (in units of the selected scale) from 1970/01/01
  timespanscale scale = 2; // the scale of the timespan [default = days]
  datetimekind kind = 3; // the kind of date/time being represented [default = unspecified]
  enum timespanscale {
    days = 0;
    hours = 1;
    minutes = 2;
    seconds = 3;
    milliseconds = 4;
    ticks = 5;

    minmax = 15; // dubious
  }
  enum datetimekind
  {
     // the time represented is not specified as either local time or coordinated universal time (utc).
     unspecified = 0;
     // the time represented is utc.
     utc = 1;
     // the time represented is local time.
     local = 2;
   }
}

message netobjectproxy {
  int32 existingobjectkey = 1; // for a tracked object, the key of the **first** time this object was seen
  int32 newobjectkey = 2; // for a tracked object, a **new** key, the first time this object is seen
  int32 existingtypekey = 3; // for dynamic typing, the key of the **first** time this type was seen
  int32 newtypekey = 4; // for dynamic typing, a **new** key, the first time this type is seen
  string typename = 8; // for dynamic typing, the name of the type (only present along with newtypekey)
  bytes payload = 10; // the new string/value (only present along with newobjectkey)
}

message guid {
  fixed64 lo = 1; // the first 8 bytes of the guid (note:crazy-endian)
  fixed64 hi = 2; // the second 8 bytes of the guid (note:crazy-endian)
}

message decimal {
  uint64 lo = 1; // the first 64 bits of the underlying value
  uint32 hi = 2; // the last 32 bis of the underlying value
  uint32 signscale = 3; // the number of decimal digits (bits 1-16), and the sign (bit 0)
}

execution.proto

syntax = "proto3";

package execution;

option go_package = "./protos";

import "bcl.proto";

enum orderstatus {
  working = 0;
  rejected = 1;
  cancelled = 2;
  completed = 3;
}

enum ordertype {
  limit = 0;
  market = 1;
  stoplimit = 2;
  stopmarket = 3;
}

enum orderside {
  buy = 0;
  sell = 1;
}

enum rejectreason {
  norejection = 0;
  instrumentnotfound = 1;
  ordernotfound = 2;
  invalidordertype = 3;
  invalidaccount = 4;
  invalidside = 5;
  invalidamount = 6;
  invalidlimitprice = 7;
  invalidquotelimit = 8;
  invalidactivationprice = 9;
  invalidtimeinforce = 10;
  markethalted = 11;
  marketpaused = 12;
  nocounterorders = 13;
  missingexpirationtime = 14;
  incorrectexpirationtime = 15;
  internalerror = 16;
  illegalstatusswitch = 17;
  orderalreadyexists = 18;
  instrumentnotready = 19;
  externalsystemerror = 20;
}

enum reportcause {
  none = 0;
  neworder = 1;
  cancelorder = 2;
  masscancel = 3;
  expiration = 4;
  trigger = 5;
  marketstatuschange = 6;
}

enum timeinforce {
  goodtillcancel = 0;
  immediateorcancel = 1;
  fillorkill = 2;
}

enum cancelreason {
  notcancelled = 0;
  cancelledbytrader = 1;
  cancelledbysystem = 2;
  selfmatchprevention = 3;
  ordertimeinforce = 4;
  liquidation = 100;
}


message tradedata {
  int64 tradeid = 1;
  bcl.decimal amount = 4;
  bcl.decimal executionprice = 5;
  orderstatus orderstatus = 7;
  int64 accountid = 11;
  bcl.guid matchedorderexternalid = 14;
  int64 matchedorderid = 16;
  bcl.decimal remainingamount = 17;
}

message execution {
  bytes origin = 4;
  orderside side = 7;
  bcl.decimal requestedprice = 8;
  bcl.decimal requestedamount = 9;
  bcl.decimal remainingamount = 10;
  int64 executedat = 13;
  orderstatus orderstatus = 14;
  repeated tradedata trades = 16;
  ordertype ordertype = 20;
  int64 version = 22;
  int64 accountid = 23;
  rejectreason rejectreason = 25;
  reportcause reportcause = 26;
  bcl.guid instructionid = 27;
  bcl.guid externalorderid = 28;
  int32 executionenginemarketid = 29;
  int64 orderid = 30;
  cancelreason cancelreason = 31;
  int64 txid = 32;
  timeinforce timeinforce = 34;
  string cancelledby = 35;
}

原型/

此文件夹中的文件是使用以下命令从 proto 文件生成的:

protoc --go_out=protos --go_opt=paths=source_relative bcl.proto execution.proto

go.mod

module mymodule.local

go 1.20

require google.golang.org/protobuf v1.30.0

ma​​in.go

package main

import (
    "encoding/binary"
    "log"

    "google.golang.org/protobuf/proto"

    "mymodule.local/protos"
)

func main() {
    data := []byte{5, 85, 0, 0, 0, 56, 1, 66, 3, 8, 144, 78, 74, 2, 8, 1, 82, 2, 8, 1, 104, 197, 192, 132, 194, 159, 143, 219, 237, 8, 176, 1, 25, 184, 1, 11, 208, 1, 1, 218, 1, 18, 9, 133, 66, 138, 247, 239, 67, 93, 77, 17, 176, 192, 189, 75, 170, 203, 186, 145, 226, 1, 18, 9, 133, 66, 138, 247, 239, 67, 93, 77, 17, 176, 192, 189, 75, 170, 203, 186, 145, 232, 1, 1, 240, 1, 25, 128, 2, 25}
    if len(data) < 5 {
        log.fatal("data should contain at least 5 bytes")
    }
    messagetype := data[0]
    length := binary.littleendian.uint32(data[1:5])
    data = data[5:]
    if length != uint32(len(data)) {
        log.fatalf("invalid data length: %d", length)
    }
    execution := &protos.execution{}

    err := proto.unmarshal(data, execution)
    if err != nil {
        log.fatalf("err unmarshalling!: %v", err)
    }

    log.printf("message type: %d, message: %+v", messagetype, execution)
}

问题中提供的数据的输出:

2023/06/15 17:50:58 message type: 5, message: Side:Sell  RequestedPrice:{lo:10000}  RequestedAmount:{lo:1}  RemainingAmount:{lo:1}  ExecutedAt:638223043314917445  Version:25  AccountId:11  ReportCause:NewOrder  InstructionId:{lo:5574686611683820165  hi:10500929413443338416}  ExternalOrderId:{lo:5574686611683820165  hi:10500929413443338416}  ExecutionEngineMarketId:1  OrderId:25  TxId:25

以上是proto:无法解析无效的有线格式数据的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文转载于:stackoverflow.com。如有侵权,请联系admin@php.cn删除