Pulsar 支持多种 schema,其中包括 protobuf,但是 Pulsar 原先的 protobuf schema 是基于 avro schema 进行实现的,这带来了一些问题,如不能通过 avro 很好地描述 protobuf 的结构、不方便对 schema 进行更新等。后来,Pulsar 添加了对 ProtobufNativeSchema 的支持,底层基于 ProtobufV3。本文将介绍 F# client 中对于 protobufNativeSchema 的实现。
ProtobufNativeSchema
在 F# client 中,所有的 schema 都需要实现 ISchema 接口,接口如下:
[<AbstractClass>]type ISchema<'T>() = abstract member SchemaInfo: SchemaInfo abstract member Encode: 'T -> byte[] abstract member Decode: byte[] -> 'T abstract member GetSpecificSchema: SchemaInfo * SchemaVersion option -> ISchema<'T> default this.GetSpecificSchema (_, _) = this abstract member SupportSchemaVersioning: bool default this.SupportSchemaVersioning = false abstract member Validate: byte[] -> unit default this.Validate bytes = this.Decode(bytes) |> ignore
复制代码
其中,最主要的是实现 SchemaInfo、Encode 和 Decode。
对于 Encode 和 Decode 则非常简单,直接调用 protobuf 库的序列化和反序列化方法即可。
override this.Decode bytes = use stream = new MemoryStream(bytes) Serializer.Deserialize(stream) override this.Encode value = if parameterIsClass && (isNull <| box value) then raise <| SchemaSerializationException "Need Non-Null content value" use stream = MemoryStreamManager.GetStream() Serializer.Serialize(stream, value) stream.ToArray()
复制代码
对于 ProtobufNativeSchema,其 SchemaInfo 如下:
override this.SchemaInfo = { Name = "" Type = SchemaType.PROTOBUF_NATIVE Schema = stringSchema() Properties = Map.empty}
复制代码
其中,我们需要生成 schema 字符串,这个 schema 将用于 broker 端的依赖检查、兼容性检查等功能,在 F# client 中,我们需要生成这样的 schema 字符串,其中包含各类 protobuf 的定义以及依赖关系和依赖的定义等内容。
虚拟文件系统
首先,我们需要根据用户所指定的类型,生成出用户类型的 proto 文件,代码如下:
let userClassNamespace = typeof<'T>.Namespacelet userClassName = typeof<'T>.Name
let protoForType = Serializer.GetProto<'T> ()let protoFileName = userClassName + ".proto"
复制代码
但这个文件并不是真正的文件,而是存储于内存中的虚拟文件。为了让 protobuf 的库能够读取到这个文件,我们不能使用默认的文件系统去读取磁盘上真正的 proto 文件,而是需要实现一套虚拟文件系统,并能够通过这个虚拟文件系统获取到用户的 proto 文件和其所依赖的其他 proto 文件。
虚拟文件系统实现如下:
type VirtualFile(fileName:string, content:string)= let protobufReflectionAssembly = Assembly.GetAssembly typeof<IFileSystem> let embeddedProtoFilesNames = protobufReflectionAssembly.GetManifestResourceNames() let isUserFile (path: string) = // we need to subtract the prefix `/` here. path.Substring 1 = fileName let getEmbeddedProtoName (path: string) = "ProtoBuf" + path.Replace('/', '.') interface IFileSystem with member this.Exists path = isUserFile path || Array.contains (getEmbeddedProtoName path) embeddedProtoFilesNames member this.OpenText path = if isUserFile path then new StringReader(content) :> TextReader else let embededResourceStream = path |> getEmbeddedProtoName |> protobufReflectionAssembly.GetManifestResourceStream new StreamReader(embededResourceStream) :> TextReader
复制代码
主要实现两个方法:Exists 和 OpenText。构造函数传入用户的 proto 文件的文件名和内容,默认的文件名是用户的类名,如对于 SimpleRecord 的类,它的 proto 文件名则是:SimpleRecord.proto。
这里可以看到 isUserFile 方法,因为 protobuf 库在调用这个文件系统时,会传入根路径,如/SimpleRecord.proto,所以我们需要去除根目录前缀再进行比较,判断当前访问的是否是用户的 proto 文件,如果是,则返回用户 proto 文件的内容。
除此之外,我们还需要实现读取一些内置依赖文件的功能,这些文件存储与 protobuf 库中的 resouces 中,因为 prorotbuf 库所调用传入的路径和在 assembly resources 中存储的文件名有差异,所以我们能够 getEmbeddedProtoName 来做一次转换。
生成 string schema
接下来我们需要创建 FileDescriptorSet,在里面设置前面所生成的用户的类的 proto。
let set = FileDescriptorSet( FileSystem = VirtualFile(protoFileName, protoForType))let baseUri = Uri("file://" + protoFileName, UriKind.Absolute)set.AddImportPath(baseUri.AbsolutePath)set.Add protoFileName |> ignoreset.Process()
复制代码
我们指定 baseUri 后,在调用 set.Process()的时候,就会从 baseUri 开始,也就是从用户的 proto 开始处理,而这些文件都在 VirtualFile 的 FileSystem 中,在 Process 中,会构建依赖关系,并读取各个依赖的内容等等。
最后我们对 FileDescriptorSet 进行序列化,生成 ProtobufNativeSchemaData。
use stream = MemoryStreamManager.GetStream()Serializer.Serialize(stream, set)
ProtobufNativeSchemaData (stream.ToArray (), userClassNamespace + "." + userClassName, protoFileName)
复制代码
ProtobufNativeSchemaData 的定义如下:
type ProtobufNativeSchemaData(fileDescriptorSet:byte[], rootMessageTypeName: string, rootFileDescriptorName:string) = member this.fileDescriptorSet = fileDescriptorSet member this.rootMessageTypeName =rootMessageTypeName member this.rootFileDescriptorName =rootFileDescriptorName
复制代码
这时,我们对它进行 Json 序列化,最终就能够得到 string 类型的 schema 了,如下:
{ "fileDescriptorSet": "Cm4KGVNpbXBsZVByb3RvUmVjb3JkVjEucHJvdG8SDHB1bHNhcl90ZXN0cyI7ChNTaW1wbGVQcm90b1JlY29yZFYxEhIKBE5hbWUYASABKAlSBE5hbWUSEAoDQWdlGAIgASgFUgNBZ2ViBnByb3RvMw==", "rootMessageTypeName": "pulsar_tests.SimpleProtoRecordV1", "rootFileDescriptorName": "SimpleProtoRecordV1.proto"}
复制代码
通过 string schema,就可以组成 SchemaInfo 并传给 Broker 进行各类检验等,通过 Encode、Decode 进行序列化和反序列化。
评论