火山引擎的 AI 语音技术
作者:淼.
- 2023-12-08 山东
本文字数:4787 字
阅读完需:约 16 分钟
火山引擎实名认证
说到火山引擎,我们就不得不提到字节跳动,说到字节跳动我们又不得不说说抖音这款近几年爆火的短视频 app。
火山引擎和抖音都属于字节跳动的产品,所以我们在火山引擎创建账号的时候,会提示我们在抖音 app 进行扫码的实名认证,通过认证之后我们才会使用火山引擎的提供的产品。
语音技术
在火山引擎的首页我们搜索语音就会通过模糊搜索出现我们想要的语音技术
进入语音技术的首页之后我们先在我们的账号上创建一个语音技术的应用,点击“立即使用”之后进入选择应用技术页面
创建好了之后我们再来看它的开发文档,我们直接来到火山引擎的开发文档的调用参数说明
json 格式的请求
{
"app": {
"appid": "appid123",
"token": "access_token",
"cluster": "volcano_tts",
},
"user": {
"uid": "uid123"
},
"audio": {
"voice_type": "BV700_streaming",
"encoding": "mp3",
"compression_rate": 1,
"rate": 24000,
"bits": 16,
"channel": 1,
"speed_ratio": 1.0,
"volume_ratio": 1.0,
"pitch_ratio": 1.0,
"emotion": "happy",
"language": "cn"
},
"request": {
"reqid": "uuid",
"text": "语音合成",
"text_type": "plain",
"operation": "query",
"silence_duration": "125",
"with_frontend": "1",
"frontend_type": "unitTson",
"pure_english_opt": "1"
}
}
复制代码
json 响应返回参数
{
"reqid": "reqid",
"code": 3000,
"operation": "query",
"message": "Success",
"sequence": -1,
"data": "base64 encoded binary data",
"addition": {
"description": "...",
"duration": "1960",
"frontend": "{
"words": [{
"word": "字",
"start_time": 0.025,
"end_time": 0.185
},
...
{
"word": "。",
"start_time": 1.85,
"end_time": 1.955
}],
"phonemes": [{
"phone": "C0z",
"start_time": 0.025,
"end_time": 0.105
},
...
{
"phone": "。",
"start_time": 1.85,
"end_time": 1.955
}]
}"
}
}
复制代码
基于 python 来调用语音 api 开发
#coding=utf-8
'''
requires Python 3.6 or later
pip install asyncio
pip install websockets
'''
import asyncio
import websockets
import uuid
import json
import gzip
import copy
MESSAGE_TYPES = {11: "audio-only server response", 12: "frontend server response", 15: "error message from server"}
MESSAGE_TYPE_SPECIFIC_FLAGS = {0: "no sequence number", 1: "sequence number > 0",
2: "last message from server (seq < 0)", 3: "sequence number < 0"}
MESSAGE_SERIALIZATION_METHODS = {0: "no serialization", 1: "JSON", 15: "custom type"}
MESSAGE_COMPRESSIONS = {0: "no compression", 1: "gzip", 15: "custom compression method"}
appid = "自己创建的appid"
token = "自己的token"
cluster = "自己的cluster"
voice_type = "自己的格式"
host = "openspeech.bytedance.com"
api_url = f"wss://{host}/api/v1/tts/ws_binary"
# version: b0001 (4 bits)
# header size: b0001 (4 bits)
# message type: b0001 (Full client request) (4bits)
# message type specific flags: b0000 (none) (4bits)
# message serialization method: b0001 (JSON) (4 bits)
# message compression: b0001 (gzip) (4bits)
# reserved data: 0x00 (1 byte)
default_header = bytearray(b'\x11\x10\x11\x00')
request_json = {
"app": {
"appid": appid,
"token": "access_token",
"cluster": cluster
},
"user": {
"uid": "388808087185088"
},
"audio": {
"voice_type": "xxx",
"encoding": "mp3",
"speed_ratio": 1.0,
"volume_ratio": 1.0,
"pitch_ratio": 1.0,
},
"request": {
"reqid": "xxx",
"text": "语音合成。",
"text_type": "plain",
"operation": "xxx"
}
}
async def test_submit():
submit_request_json = copy.deepcopy(request_json)
submit_request_json["audio"]["voice_type"] = voice_type
submit_request_json["request"]["reqid"] = str(uuid.uuid4())
submit_request_json["request"]["operation"] = "submit"
payload_bytes = str.encode(json.dumps(submit_request_json))
payload_bytes = gzip.compress(payload_bytes) # if no compression, comment this line
full_client_request = bytearray(default_header)
full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big')) # payload size(4 bytes)
full_client_request.extend(payload_bytes) # payload
print("\n------------------------ test 'submit' -------------------------")
print("request json: ", submit_request_json)
print("\nrequest bytes: ", full_client_request)
file_to_save = open("test_submit.mp3", "wb")
header = {"Authorization": f"Bearer; {token}"}
async with websockets.connect(api_url, extra_headers=header, ping_interval=None) as ws:
await ws.send(full_client_request)
while True:
res = await ws.recv()
done = parse_response(res, file_to_save)
if done:
file_to_save.close()
break
print("\nclosing the connection...")
async def test_query():
query_request_json = copy.deepcopy(request_json)
query_request_json["audio"]["voice_type"] = voice_type
query_request_json["request"]["reqid"] = str(uuid.uuid4())
query_request_json["request"]["operation"] = "query"
payload_bytes = str.encode(json.dumps(query_request_json))
payload_bytes = gzip.compress(payload_bytes) # if no compression, comment this line
full_client_request = bytearray(default_header)
full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big')) # payload size(4 bytes)
full_client_request.extend(payload_bytes) # payload
print("\n------------------------ test 'query' -------------------------")
print("request json: ", query_request_json)
print("\nrequest bytes: ", full_client_request)
file_to_save = open("test_query.mp3", "wb")
header = {"Authorization": f"Bearer; {token}"}
async with websockets.connect(api_url, extra_headers=header, ping_interval=None) as ws:
await ws.send(full_client_request)
res = await ws.recv()
parse_response(res, file_to_save)
file_to_save.close()
print("\nclosing the connection...")
def parse_response(res, file):
print("--------------------------- response ---------------------------")
# print(f"response raw bytes: {res}")
protocol_version = res[0] >> 4
header_size = res[0] & 0x0f
message_type = res[1] >> 4
message_type_specific_flags = res[1] & 0x0f
serialization_method = res[2] >> 4
message_compression = res[2] & 0x0f
reserved = res[3]
header_extensions = res[4:header_size*4]
payload = res[header_size*4:]
print(f" Protocol version: {protocol_version:#x} - version {protocol_version}")
print(f" Header size: {header_size:#x} - {header_size * 4} bytes ")
print(f" Message type: {message_type:#x} - {MESSAGE_TYPES[message_type]}")
print(f" Message type specific flags: {message_type_specific_flags:#x} - {MESSAGE_TYPE_SPECIFIC_FLAGS[message_type_specific_flags]}")
print(f"Message serialization method: {serialization_method:#x} - {MESSAGE_SERIALIZATION_METHODS[serialization_method]}")
print(f" Message compression: {message_compression:#x} - {MESSAGE_COMPRESSIONS[message_compression]}")
print(f" Reserved: {reserved:#04x}")
if header_size != 1:
print(f" Header extensions: {header_extensions}")
if message_type == 0xb: # audio-only server response
if message_type_specific_flags == 0: # no sequence number as ACK
print(" Payload size: 0")
return False
else:
sequence_number = int.from_bytes(payload[:4], "big", signed=True)
payload_size = int.from_bytes(payload[4:8], "big", signed=False)
payload = payload[8:]
print(f" Sequence number: {sequence_number}")
print(f" Payload size: {payload_size} bytes")
file.write(payload)
if sequence_number < 0:
return True
else:
return False
elif message_type == 0xf:
code = int.from_bytes(payload[:4], "big", signed=False)
msg_size = int.from_bytes(payload[4:8], "big", signed=False)
error_msg = payload[8:]
if message_compression == 1:
error_msg = gzip.decompress(error_msg)
error_msg = str(error_msg, "utf-8")
print(f" Error message code: {code}")
print(f" Error message size: {msg_size} bytes")
print(f" Error message: {error_msg}")
return True
elif message_type == 0xc:
msg_size = int.from_bytes(payload[:4], "big", signed=False)
payload = payload[4:]
if message_compression == 1:
payload = gzip.decompress(payload)
print(f" Frontend message: {payload}")
else:
print("undefined message type!")
return True
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(test_submit())
loop.run_until_complete(test_query())
复制代码
设置合成的发音音色
音色切换选择:
def select_vcn(self,*arg):
if self.cb.get()=='灿灿 2.0':
self.vcn="xiaoyan"
elif self.cb.get()=='炀炀':
self.vcn="aisjiuxu"
elif self.cb.get()=='通用女声':
self.vcn="aisxping"
elif self.cb.get()=='通用男声':
self.vcn="aisbabyxu"
elif self.cb.get()=='超自然音色-燃燃':
self.vcn="aisjinger"
print(self.vcn)
复制代码
结合 python+火山引擎 AI 语音合成技术效果:
划线
评论
复制
发布于: 刚刚阅读数: 9
版权声明: 本文为 InfoQ 作者【淼.】的原创文章。
原文链接:【http://xie.infoq.cn/article/fe258d0527e0aaced5a265aee】。文章转载请联系作者。
淼.
关注
还未添加个人签名 2022-10-24 加入
还未添加个人简介
评论