写点什么

[Pulsar] F# client 的 ProtobufNativeSchema 实现

作者:Zike Yang
  • 2021 年 12 月 06 日
  • 本文字数:2779 字

    阅读完需:约 9 分钟

Pulsar 支持多种 schema,其中包括 protobuf,但是 Pulsar 原先的 protobuf schema 是基于 avro schema 进行实现的,这带来了一些问题,如不能通过 avro 很好地描述 protobuf 的结构、不方便对 schema 进行更新等。后来,Pulsar 添加了对 ProtobufNativeSchema 的支持,底层基于 ProtobufV3。本文将介绍 F# client 中对于 protobufNativeSchema 的实现。


ProtobufNativeSchema

在 F# client 中,所有的 schema 都需要实现 ISchema 接口,接口如下:

[<AbstractClass>]type ISchema<'T>() =    abstract member SchemaInfo: SchemaInfo    abstract member Encode: 'T -> byte[]    abstract member Decode: byte[] -> 'T    abstract member GetSpecificSchema: SchemaInfo * SchemaVersion option -> ISchema<'T>    default this.GetSpecificSchema (_, _) =        this    abstract member SupportSchemaVersioning: bool    default this.SupportSchemaVersioning = false    abstract member Validate: byte[] -> unit    default this.Validate bytes =        this.Decode(bytes) |> ignore
复制代码

其中,最主要的是实现 SchemaInfo、Encode 和 Decode。

对于 Encode 和 Decode 则非常简单,直接调用 protobuf 库的序列化和反序列化方法即可。

override this.Decode bytes =    use stream = new MemoryStream(bytes)       Serializer.Deserialize(stream)    override this.Encode value =                if parameterIsClass && (isNull <| box value) then        raise <| SchemaSerializationException "Need Non-Null content value"    use stream = MemoryStreamManager.GetStream()    Serializer.Serialize(stream, value)    stream.ToArray()  
复制代码

对于 ProtobufNativeSchema,其 SchemaInfo 如下:

override this.SchemaInfo =    {    Name = ""    Type = SchemaType.PROTOBUF_NATIVE    Schema = stringSchema()     Properties = Map.empty}
复制代码

其中,我们需要生成 schema 字符串,这个 schema 将用于 broker 端的依赖检查、兼容性检查等功能,在 F# client 中,我们需要生成这样的 schema 字符串,其中包含各类 protobuf 的定义以及依赖关系和依赖的定义等内容。


虚拟文件系统

首先,我们需要根据用户所指定的类型,生成出用户类型的 proto 文件,代码如下:

let userClassNamespace = typeof<'T>.Namespacelet userClassName = typeof<'T>.Name
let protoForType = Serializer.GetProto<'T> ()let protoFileName = userClassName + ".proto"
复制代码

但这个文件并不是真正的文件,而是存储于内存中的虚拟文件。为了让 protobuf 的库能够读取到这个文件,我们不能使用默认的文件系统去读取磁盘上真正的 proto 文件,而是需要实现一套虚拟文件系统,并能够通过这个虚拟文件系统获取到用户的 proto 文件和其所依赖的其他 proto 文件。

虚拟文件系统实现如下:

type VirtualFile(fileName:string, content:string)=    let protobufReflectionAssembly = Assembly.GetAssembly typeof<IFileSystem>    let embeddedProtoFilesNames = protobufReflectionAssembly.GetManifestResourceNames()    let isUserFile (path: string) =        // we need to subtract the prefix `/` here.        path.Substring 1 = fileName    let getEmbeddedProtoName (path: string) =        "ProtoBuf" + path.Replace('/', '.')    interface IFileSystem with        member this.Exists path =            isUserFile path            || Array.contains (getEmbeddedProtoName path) embeddedProtoFilesNames        member this.OpenText path =            if isUserFile path then                new StringReader(content) :> TextReader            else                let embededResourceStream = path |> getEmbeddedProtoName |> protobufReflectionAssembly.GetManifestResourceStream                new StreamReader(embededResourceStream) :> TextReader
复制代码

主要实现两个方法:Exists 和 OpenText。构造函数传入用户的 proto 文件的文件名和内容,默认的文件名是用户的类名,如对于 SimpleRecord 的类,它的 proto 文件名则是:SimpleRecord.proto。

这里可以看到 isUserFile 方法,因为 protobuf 库在调用这个文件系统时,会传入根路径,如/SimpleRecord.proto,所以我们需要去除根目录前缀再进行比较,判断当前访问的是否是用户的 proto 文件,如果是,则返回用户 proto 文件的内容。

除此之外,我们还需要实现读取一些内置依赖文件的功能,这些文件存储与 protobuf 库中的 resouces 中,因为 prorotbuf 库所调用传入的路径和在 assembly resources 中存储的文件名有差异,所以我们能够 getEmbeddedProtoName 来做一次转换。


生成 string schema

接下来我们需要创建 FileDescriptorSet,在里面设置前面所生成的用户的类的 proto。

let set = FileDescriptorSet( FileSystem = VirtualFile(protoFileName, protoForType))let baseUri =  Uri("file://" + protoFileName, UriKind.Absolute)set.AddImportPath(baseUri.AbsolutePath)set.Add protoFileName |> ignoreset.Process()     
复制代码

我们指定 baseUri 后,在调用 set.Process()的时候,就会从 baseUri 开始,也就是从用户的 proto 开始处理,而这些文件都在 VirtualFile 的 FileSystem 中,在 Process 中,会构建依赖关系,并读取各个依赖的内容等等。

最后我们对 FileDescriptorSet 进行序列化,生成 ProtobufNativeSchemaData。

use stream = MemoryStreamManager.GetStream()Serializer.Serialize(stream, set)                  
ProtobufNativeSchemaData (stream.ToArray (), userClassNamespace + "." + userClassName, protoFileName)
复制代码

ProtobufNativeSchemaData 的定义如下:

type   ProtobufNativeSchemaData(fileDescriptorSet:byte[],                                       rootMessageTypeName: string,                                       rootFileDescriptorName:string) =    member this.fileDescriptorSet = fileDescriptorSet    member  this.rootMessageTypeName =rootMessageTypeName    member  this.rootFileDescriptorName =rootFileDescriptorName
复制代码

这时,我们对它进行 Json 序列化,最终就能够得到 string 类型的 schema 了,如下:

{  "fileDescriptorSet": "Cm4KGVNpbXBsZVByb3RvUmVjb3JkVjEucHJvdG8SDHB1bHNhcl90ZXN0cyI7ChNTaW1wbGVQcm90b1JlY29yZFYxEhIKBE5hbWUYASABKAlSBE5hbWUSEAoDQWdlGAIgASgFUgNBZ2ViBnByb3RvMw==",  "rootMessageTypeName": "pulsar_tests.SimpleProtoRecordV1",  "rootFileDescriptorName": "SimpleProtoRecordV1.proto"}
复制代码


通过 string schema,就可以组成 SchemaInfo 并传给 Broker 进行各类检验等,通过 Encode、Decode 进行序列化和反序列化。

发布于: 3 小时前阅读数: 4
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] F# client的ProtobufNativeSchema实现