我是 protobufs 的新手,目前正在编写一个从 nats 服务器读取数据的客户端。从 nats 服务器发送的数据是 protobuf。
我正在编写的客户端是用 go 编写的。这是我编写的 .proto 文件:
syntax = "proto3"; package execution; option go_package = "./protos/execution"; enum orderstatus { working = 0; rejected = 1; cancelled = 2; completed = 3; } enum ordertype { limit = 0; market = 1; stoplimit = 2; stopmarket = 3; } enum orderside { buy = 0; sell = 1; } enum rejectreason { norejection = 0; instrumentnotfound = 1; ordernotfound = 2; invalidordertype = 3; invalidaccount = 4; invalidside = 5; invalidamount = 6; invalidlimitprice = 7; invalidquotelimit = 8; invalidactivationprice = 9; invalidtimeinforce = 10; markethalted = 11; marketpaused = 12; nocounterorders = 13; missingexpirationtime = 14; incorrectexpirationtime = 15; internalerror = 16; illegalstatusswitch = 17; orderalreadyexists = 18; instrumentnotready = 19; externalsystemerror = 20; } enum reportcause { none = 0; neworder = 1; cancelorder = 2; masscancel = 3; expiration = 4; trigger = 5; marketstatuschange = 6; } enum timeinforce { goodtillcancel = 0; immediateorcancel = 1; fillorkill = 2; } enum cancelreason { notcancelled = 0; cancelledbytrader = 1; cancelledbysystem = 2; selfmatchprevention = 3; ordertimeinforce = 4; liquidation = 100; } message tradedata { int64 tradeid = 1; string amount = 4; string executionprice = 5; orderstatus orderstatus = 7; int64 accountid = 11; string matchedorderexternalid = 14; int64 matchedorderid = 16; string remainingamount = 17; } message execution { string origin = 4; orderside side = 7; string requestedprice = 8; string requestedamount = 9; string remainingamount = 10; int64 executedat = 13; orderstatus orderstatus = 14; repeated tradedata trades = 16; ordertype ordertype = 20; int64 version = 22; int64 accountid = 23; rejectreason rejectreason = 25; reportcause reportcause = 26; string instructionid = 27; string externalorderid = 28; int32 executionenginemarketid = 29; int64 orderid = 30; cancelreason cancelreason = 31; int64 txid = 32; timeinforce timeinforce = 34; string cancelledby = 35; }
发布服务器是用 c# 编写的,其原始消息的代码如下:
[protocontract] public class executionreport : imarketresponse, iinstructionmessage, iordermatcherresponse { [protoignore] feedmessagetype ifeedmessage.type => feedmessagetype.executionreport; // resharper disable fieldcanbemadereadonly.global [protomember(4)] public string origin; [protomember(7)] public orderside side; [protomember(8)] public decimal requestedprice; [protomember(9)] public decimal requestedamount; [protomember(10)] public decimal remainingamount; [protomember(13)] public long executedat; [protomember(14)] public orderstatus orderstatus; [protomember(16)] public list<tradedata> trades = new list<tradedata>(); [protomember(20)] public ordertype ordertype; [protomember(22)] public long version { get; set; } [protomember(23)] public long accountid; [protomember(25)] public rejectreason rejectreason; [protomember(26)] public reportcause reportcause; [protomember(27)] public guid instructionid { get; set; } [protomember(28)] public guid externalorderid; [protomember(29)] public int executionenginemarketid { get; set; } [protomember(30)] public long orderid; [protomember(31)] public cancelreason cancelreason; [protomember(32)] public long txid; [protomember(34)] public timeinforce timeinforce; [protomember(35)] public string cancelledby; } [protocontract] [structlayout(layoutkind.sequential)] public struct tradedata { [protomember(1)] public long tradeid; [protomember(4)] public decimal amount; [protomember(5)] public decimal executionprice; [protomember(7)] public orderstatus orderstatus; [protomember(11)] public long accountid; [protomember(14)] public guid matchedorderexternalid; [protomember(16)] public long matchedorderid; [protomember(17)] public decimal remainingamount; }
在尝试解组数据时出现此错误
proto: cannot parse invalid wire-format data
这就是我解析数据的方式:
_, err = sc.subscribe("exec", func(m *stan.msg) { varr := &protos.execution{} err = proto.unmarshal(m.data, varr) if err != nil { fmt.printf("err unmarshalling!: %v\n\n", err.error()) } else { fmt.printf("received a message: %+v\n", varr) }
我从服务器接收到的示例字节数据:
[5 85 0 0 0 56 1 66 3 8 144 78 74 2 8 1 82 2 8 1 104 197 192 132 194 159 143 219 237 8 176 1 25 184 1 11 208 1 1 218 1 18 9 133 66 138 247 239 67 93 77 17 176 192 189 75 170 203 186 145 226 1 18 9 133 66 138 247 239 67 93 77 17 176 192 189 75 170 203 186 145 232 1 1 240 1 25 128 2 25]
添加更多详细信息:
这就是 c# 发送数据的方式:
public async task sendasync(ifeedmessage msg) { var subject = feedsubject.formessage(msg); var data = msg.serializetoarray(); using (_metrics.feedsendlatency.start(new metrictags("subject", subject.value))) { await _connection.publishasync(subject, data); } }
这是feedmessage的结构(executionreport也间接继承它)
public interface ifeedmessage { feedmessagetype type { get; } ifeedmessage clone(); void reset(); }
这就是 serializetoarray()
的工作原理:
public static ArraySegment<byte> SerializeToArray(this IFeedMessage message) { return message.SerializeToMemory(new MemoryStream()); } public static ArraySegment<byte> SerializeToMemory(this IFeedMessage message, MemoryStream stream) { var start = stream.Position; message.Serialize(stream); return new ArraySegment<byte>(stream.GetBuffer(), (int)start, (int)(stream.Position - start)); } public static void Serialize(this IFeedMessage message, Stream stream) { stream.WriteByte((byte)message.Type); RuntimeTypeModel.Default.SerializeWithLengthPrefix(stream, message, message.GetType(), PrefixStyle.Fixed32, 0); }
不知道具体原因是什么。但我写的proto文件似乎是错误的。我浏览了几篇面临相同错误的帖子,但大多数都没有解决相同的问题。如果需要任何其他详细信息,请告诉我。
请帮我解决这个问题。
根据评论中的讨论,我成功地整理了数据。
注释:
decimal
和 guid
数据类型。 (正如 bcl.proto
中所评论的,跨平台代码通常应该完全避免它们)。这是文件夹结构:
├── bcl.proto ├── execution.proto ├── go.mod ├── go.sum ├── main.go └── protos ├── bcl.pb.go └── execution.pb.go
bcl.proto:
此文件是从 github 复制的。 com/protobuf-net/protobuf-net。这是必需的,因为 .net 实现使用此原始文件中的 decimal
和 guid
。
// the types in here indicate how protobuf-net represents certain types when using protobuf-net specific // library features. note that it is not *required* to use any of these types, and cross-platform code // should usually avoid them completely (ideally starting from a .proto schema) // some of these are ugly, sorry. the timespan / datetime dates here pre-date the introduction of timestamp // and duration, and the "well known" types should be preferred when possible. guids are particularly // awkward - it turns out that there are multiple guid representations, and i accidentally used one that // i can only call... "crazy-endian". just make sure you check the order! // it should not be necessary to use bcl.proto from code that uses protobuf-net syntax = "proto3"; option csharp_namespace = "protobuf.bcl"; option go_package = "./protos"; package bcl; message timespan { sint64 value = 1; // the size of the timespan (in units of the selected scale) timespanscale scale = 2; // the scale of the timespan [default = days] enum timespanscale { days = 0; hours = 1; minutes = 2; seconds = 3; milliseconds = 4; ticks = 5; minmax = 15; // dubious } } message datetime { sint64 value = 1; // the offset (in units of the selected scale) from 1970/01/01 timespanscale scale = 2; // the scale of the timespan [default = days] datetimekind kind = 3; // the kind of date/time being represented [default = unspecified] enum timespanscale { days = 0; hours = 1; minutes = 2; seconds = 3; milliseconds = 4; ticks = 5; minmax = 15; // dubious } enum datetimekind { // the time represented is not specified as either local time or coordinated universal time (utc). unspecified = 0; // the time represented is utc. utc = 1; // the time represented is local time. local = 2; } } message netobjectproxy { int32 existingobjectkey = 1; // for a tracked object, the key of the **first** time this object was seen int32 newobjectkey = 2; // for a tracked object, a **new** key, the first time this object is seen int32 existingtypekey = 3; // for dynamic typing, the key of the **first** time this type was seen int32 newtypekey = 4; // for dynamic typing, a **new** key, the first time this type is seen string typename = 8; // for dynamic typing, the name of the type (only present along with newtypekey) bytes payload = 10; // the new string/value (only present along with newobjectkey) } message guid { fixed64 lo = 1; // the first 8 bytes of the guid (note:crazy-endian) fixed64 hi = 2; // the second 8 bytes of the guid (note:crazy-endian) } message decimal { uint64 lo = 1; // the first 64 bits of the underlying value uint32 hi = 2; // the last 32 bis of the underlying value uint32 signscale = 3; // the number of decimal digits (bits 1-16), and the sign (bit 0) }
execution.proto
syntax = "proto3"; package execution; option go_package = "./protos"; import "bcl.proto"; enum orderstatus { working = 0; rejected = 1; cancelled = 2; completed = 3; } enum ordertype { limit = 0; market = 1; stoplimit = 2; stopmarket = 3; } enum orderside { buy = 0; sell = 1; } enum rejectreason { norejection = 0; instrumentnotfound = 1; ordernotfound = 2; invalidordertype = 3; invalidaccount = 4; invalidside = 5; invalidamount = 6; invalidlimitprice = 7; invalidquotelimit = 8; invalidactivationprice = 9; invalidtimeinforce = 10; markethalted = 11; marketpaused = 12; nocounterorders = 13; missingexpirationtime = 14; incorrectexpirationtime = 15; internalerror = 16; illegalstatusswitch = 17; orderalreadyexists = 18; instrumentnotready = 19; externalsystemerror = 20; } enum reportcause { none = 0; neworder = 1; cancelorder = 2; masscancel = 3; expiration = 4; trigger = 5; marketstatuschange = 6; } enum timeinforce { goodtillcancel = 0; immediateorcancel = 1; fillorkill = 2; } enum cancelreason { notcancelled = 0; cancelledbytrader = 1; cancelledbysystem = 2; selfmatchprevention = 3; ordertimeinforce = 4; liquidation = 100; } message tradedata { int64 tradeid = 1; bcl.decimal amount = 4; bcl.decimal executionprice = 5; orderstatus orderstatus = 7; int64 accountid = 11; bcl.guid matchedorderexternalid = 14; int64 matchedorderid = 16; bcl.decimal remainingamount = 17; } message execution { bytes origin = 4; orderside side = 7; bcl.decimal requestedprice = 8; bcl.decimal requestedamount = 9; bcl.decimal remainingamount = 10; int64 executedat = 13; orderstatus orderstatus = 14; repeated tradedata trades = 16; ordertype ordertype = 20; int64 version = 22; int64 accountid = 23; rejectreason rejectreason = 25; reportcause reportcause = 26; bcl.guid instructionid = 27; bcl.guid externalorderid = 28; int32 executionenginemarketid = 29; int64 orderid = 30; cancelreason cancelreason = 31; int64 txid = 32; timeinforce timeinforce = 34; string cancelledby = 35; }
原型/
此文件夹中的文件是使用以下命令从 proto 文件生成的:
protoc --go_out=protos --go_opt=paths=source_relative bcl.proto execution.proto
go.mod
module mymodule.local go 1.20 require google.golang.org/protobuf v1.30.0
main.go
package main import ( "encoding/binary" "log" "google.golang.org/protobuf/proto" "mymodule.local/protos" ) func main() { data := []byte{5, 85, 0, 0, 0, 56, 1, 66, 3, 8, 144, 78, 74, 2, 8, 1, 82, 2, 8, 1, 104, 197, 192, 132, 194, 159, 143, 219, 237, 8, 176, 1, 25, 184, 1, 11, 208, 1, 1, 218, 1, 18, 9, 133, 66, 138, 247, 239, 67, 93, 77, 17, 176, 192, 189, 75, 170, 203, 186, 145, 226, 1, 18, 9, 133, 66, 138, 247, 239, 67, 93, 77, 17, 176, 192, 189, 75, 170, 203, 186, 145, 232, 1, 1, 240, 1, 25, 128, 2, 25} if len(data) < 5 { log.fatal("data should contain at least 5 bytes") } messagetype := data[0] length := binary.littleendian.uint32(data[1:5]) data = data[5:] if length != uint32(len(data)) { log.fatalf("invalid data length: %d", length) } execution := &protos.execution{} err := proto.unmarshal(data, execution) if err != nil { log.fatalf("err unmarshalling!: %v", err) } log.printf("message type: %d, message: %+v", messagetype, execution) }
问题中提供的数据的输出:
2023/06/15 17:50:58 message type: 5, message: Side:Sell RequestedPrice:{lo:10000} RequestedAmount:{lo:1} RemainingAmount:{lo:1} ExecutedAt:638223043314917445 Version:25 AccountId:11 ReportCause:NewOrder InstructionId:{lo:5574686611683820165 hi:10500929413443338416} ExternalOrderId:{lo:5574686611683820165 hi:10500929413443338416} ExecutionEngineMarketId:1 OrderId:25 TxId:25
以上是proto:无法解析无效的有线格式数据的详细内容。更多信息请关注PHP中文网其他相关文章!