Pulsar 支持多种 schema,其中包括 protobuf,但是 Pulsar 原先的 protobuf schema 是基于 avro schema 进行实现的,这带来了一些问题,如不能通过 avro 很好地描述 protobuf 的结构、不方便对 schema 进行更新等。后来,Pulsar 添加了对 ProtobufNativeSchema 的支持,底层基于 ProtobufV3。本文将介绍 F# client 中对于 protobufNativeSchema 的实现。
ProtobufNativeSchema
在 F# client 中,所有的 schema 都需要实现 ISchema 接口,接口如下:
[<AbstractClass>]
type ISchema<'T>() =
abstract member SchemaInfo: SchemaInfo
abstract member Encode: 'T -> byte[]
abstract member Decode: byte[] -> 'T
abstract member GetSpecificSchema: SchemaInfo * SchemaVersion option -> ISchema<'T>
default this.GetSpecificSchema (_, _) =
this
abstract member SupportSchemaVersioning: bool
default this.SupportSchemaVersioning = false
abstract member Validate: byte[] -> unit
default this.Validate bytes =
this.Decode(bytes) |> ignore
复制代码
其中,最主要的是实现 SchemaInfo、Encode 和 Decode。
对于 Encode 和 Decode 则非常简单,直接调用 protobuf 库的序列化和反序列化方法即可。
override this.Decode bytes =
use stream = new MemoryStream(bytes)
Serializer.Deserialize(stream)
override this.Encode value =
if parameterIsClass && (isNull <| box value) then
raise <| SchemaSerializationException "Need Non-Null content value"
use stream = MemoryStreamManager.GetStream()
Serializer.Serialize(stream, value)
stream.ToArray()
复制代码
对于 ProtobufNativeSchema,其 SchemaInfo 如下:
override this.SchemaInfo =
{
Name = ""
Type = SchemaType.PROTOBUF_NATIVE
Schema = stringSchema()
Properties = Map.empty
}
复制代码
其中,我们需要生成 schema 字符串,这个 schema 将用于 broker 端的依赖检查、兼容性检查等功能,在 F# client 中,我们需要生成这样的 schema 字符串,其中包含各类 protobuf 的定义以及依赖关系和依赖的定义等内容。
虚拟文件系统
首先,我们需要根据用户所指定的类型,生成出用户类型的 proto 文件,代码如下:
let userClassNamespace = typeof<'T>.Namespace
let userClassName = typeof<'T>.Name
let protoForType = Serializer.GetProto<'T> ()
let protoFileName = userClassName + ".proto"
复制代码
但这个文件并不是真正的文件,而是存储于内存中的虚拟文件。为了让 protobuf 的库能够读取到这个文件,我们不能使用默认的文件系统去读取磁盘上真正的 proto 文件,而是需要实现一套虚拟文件系统,并能够通过这个虚拟文件系统获取到用户的 proto 文件和其所依赖的其他 proto 文件。
虚拟文件系统实现如下:
type VirtualFile(fileName:string, content:string)=
let protobufReflectionAssembly = Assembly.GetAssembly typeof<IFileSystem>
let embeddedProtoFilesNames = protobufReflectionAssembly.GetManifestResourceNames()
let isUserFile (path: string) =
// we need to subtract the prefix `/` here.
path.Substring 1 = fileName
let getEmbeddedProtoName (path: string) =
"ProtoBuf" + path.Replace('/', '.')
interface IFileSystem with
member this.Exists path =
isUserFile path
|| Array.contains (getEmbeddedProtoName path) embeddedProtoFilesNames
member this.OpenText path =
if isUserFile path then
new StringReader(content) :> TextReader
else
let embededResourceStream = path |> getEmbeddedProtoName |> protobufReflectionAssembly.GetManifestResourceStream
new StreamReader(embededResourceStream) :> TextReader
复制代码
主要实现两个方法:Exists 和 OpenText。构造函数传入用户的 proto 文件的文件名和内容,默认的文件名是用户的类名,如对于 SimpleRecord 的类,它的 proto 文件名则是:SimpleRecord.proto。
这里可以看到 isUserFile 方法,因为 protobuf 库在调用这个文件系统时,会传入根路径,如/SimpleRecord.proto,所以我们需要去除根目录前缀再进行比较,判断当前访问的是否是用户的 proto 文件,如果是,则返回用户 proto 文件的内容。
除此之外,我们还需要实现读取一些内置依赖文件的功能,这些文件存储与 protobuf 库中的 resouces 中,因为 prorotbuf 库所调用传入的路径和在 assembly resources 中存储的文件名有差异,所以我们能够 getEmbeddedProtoName 来做一次转换。
生成 string schema
接下来我们需要创建 FileDescriptorSet,在里面设置前面所生成的用户的类的 proto。
let set = FileDescriptorSet( FileSystem = VirtualFile(protoFileName, protoForType))
let baseUri = Uri("file://" + protoFileName, UriKind.Absolute)
set.AddImportPath(baseUri.AbsolutePath)
set.Add protoFileName |> ignore
set.Process()
复制代码
我们指定 baseUri 后,在调用 set.Process()的时候,就会从 baseUri 开始,也就是从用户的 proto 开始处理,而这些文件都在 VirtualFile 的 FileSystem 中,在 Process 中,会构建依赖关系,并读取各个依赖的内容等等。
最后我们对 FileDescriptorSet 进行序列化,生成 ProtobufNativeSchemaData。
use stream = MemoryStreamManager.GetStream()
Serializer.Serialize(stream, set)
ProtobufNativeSchemaData (stream.ToArray (), userClassNamespace + "." + userClassName, protoFileName)
复制代码
ProtobufNativeSchemaData 的定义如下:
type ProtobufNativeSchemaData(fileDescriptorSet:byte[],
rootMessageTypeName: string,
rootFileDescriptorName:string) =
member this.fileDescriptorSet = fileDescriptorSet
member this.rootMessageTypeName =rootMessageTypeName
member this.rootFileDescriptorName =rootFileDescriptorName
复制代码
这时,我们对它进行 Json 序列化,最终就能够得到 string 类型的 schema 了,如下:
{
"fileDescriptorSet": "Cm4KGVNpbXBsZVByb3RvUmVjb3JkVjEucHJvdG8SDHB1bHNhcl90ZXN0cyI7ChNTaW1wbGVQcm90b1JlY29yZFYxEhIKBE5hbWUYASABKAlSBE5hbWUSEAoDQWdlGAIgASgFUgNBZ2ViBnByb3RvMw==",
"rootMessageTypeName": "pulsar_tests.SimpleProtoRecordV1",
"rootFileDescriptorName": "SimpleProtoRecordV1.proto"
}
复制代码
通过 string schema,就可以组成 SchemaInfo 并传给 Broker 进行各类检验等,通过 Encode、Decode 进行序列化和反序列化。
评论