写点什么

文盘 Rust —— rust 连接 oss | 京东云技术团队

  • 2023-05-10
    北京
  • 本文字数:3128 字

    阅读完需:约 10 分钟

文盘Rust —— rust连接oss | 京东云技术团队

作者:京东科技 贾世闻


对象存储是云的基础组件之一,各大云厂商都有相关产品。这里跟大家介绍一下 rust 与对象存储交到的基本套路和其中的一些技巧。

基本连接

我们以 [S3 sdk](


https://github.com/awslabs/aws-sdk-rust)为例来说说基本的连接与操作,作者验证过 aws、京东云、阿里云。主要的增删改查功能没有什么差别。


  • 建立客户端


let shared_config = SdkConfig::builder()         .credentials_provider(SharedCredentialsProvider::new(Credentials::new(            "LTAI5t7NPuPKsXm6UeSa1",            "DGHuK03ESXQYqQ83buKMHs9NAwz",             None,             None,             "Static",         )))         .endpoint_url("http://oss-cn-beijing.aliyuncs.com")         .region(Region::new("oss-cn-beijing"))         .build();     let s3_config_builder = aws_sdk_s3::config::Builder::from(&shared_config);     let client = aws_sdk_s3::Client::from_conf(s3_config_builder.build());
复制代码


建立 Client 所需要的参数主要有你需要访问的 oss 的 AK、SK,endpoint url 以及服务所在的区域。以上信息都可以在服务商的帮助文档查询到。


  • 对象列表


let mut obj_list = client     .list_objects_v2()     .bucket(bucket)     .max_keys(max_keys)     .prefix(prefix_str)     .continuation_token(token_str);
let list = obj_list.send().await.unwrap();println!("{:?}",list.contents());println!("{:?}",list.next_continuation_token());
复制代码


使用 list_objects_v2 函数返回对象列表,相比 list_objects 函数,list_objects_v2 可以通过 continuation_token 和 max_keys 控制返回列表的长度。list.contents()返回对象列表数组,


list.next_continuation_token()返回继续查询的 token。


  • 上传文件


let content = ByteStream::from("content in file".as_bytes()); let exp = aws_smithy_types::DateTime::from_secs(100);let upload = client    .put_object()    .bucket("bucket")    .key("/test/key")    .expires(exp)    .body(content);upload.send().await.unwrap();
复制代码


指定 bucket 及对象路径,body 接受 ByteStream 类型作为文件内容,最后设置过期时间 expires,无过期时间时不指定该配置即可。


  • 下载文件


let key = "/tmp/test/key".to_string();let resp = client    .get_object()    .bucket("bucket")    .key(&key)    .send()    .await.unwrap();let data = resp.body.collect().await.unwrap();let bytes = data.into_bytes();
let path = std::path::Path::new("/tmp/key")if let Some(p) = path.parent() { std::fs::create_dir_all(p).unwrap();}let mut file = OpenOptions::new() .write(true) .truncate(true) .create(true) .open(path).unwrap();let _ = file.write(&*bytes);file.flush().unwrap();
复制代码


通过 get_object()函数获取 GetObjectOutput。返回值的 body 就是文件内容,将 body 转换为 bytes,最后打开文件写入即可。


  • 删除文件


let mut keys = vec![];let key1 = ObjectIdentifier::builder()    .set_key(Some("/tmp/key1".to_string()))    .build();let key2 = ObjectIdentifier::builder()    .set_key(Some("/tmp/key2".to_string()))    .build()keys.push(key1);keys.push(key2)client    .delete_objects()    .bucket(bucket)    .delete(Delete::builder().set_objects(Some(keys)).build())    .send()    .await    .unwrap();
复制代码


delete_objects 批量删除对象。首先构建 keys vector,定义要删除的对象,然后通过 Delete::builder(),构建 Delete model。

大文件上传

let mut file = fs::File::open("/tmp/file_name").unwrap();let chunk_size = 1024*1024;let mut part_number = 0;let mut upload_parts: Vec = Vec::new();
//获取上传idlet multipart_upload_res: CreateMultipartUploadOutput = self .client .create_multipart_upload() .bucket("bucket") .key("/tmp/key") .send() .await.unwrap();let upload_id = match multipart_upload_res.upload_id() { Some(id) => id, None => { return Err(anyhow!("upload id is None")); }};
//分段上传文件并记录completer_partloop { let mut buf = vec![0; chuck_size]; let read_count = file.read(&mut buf)?; part_number += 1;
if read_count == 0 { break; }
let body = &buf[..read_count]; let stream = ByteStream::from(body.to_vec());
let upload_part_res = self .client .upload_part() .key(key) .bucket(bucket) .upload_id(upload_id) .body(stream) .part_number(part_number) .send() .await.unwrap();
let completer_part = CompletedPart::builder() .e_tag(upload_part_res.e_tag.unwrap_or_default()) .part_number(part_number) .build();
upload_parts.push(completer_part);
if read_count != chuck_size { break; }}// 完成上传文件合并let completed_multipart_upload: CompletedMultipartUpload = CompletedMultipartUpload::builder() .set_parts(Some(upload_parts)) .build();
let _complete_multipart_upload_res = self .client .complete_multipart_upload() .bucket("bucket") .key(key) .multipart_upload(completed_multipart_upload) .upload_id(upload_id) .send() .await.unwrap();
复制代码


有时候面对大文件,比如几百兆甚至几个 G 的文件,为了节约带宽和内存,我才采取分段上传的方案,然后在对象存储的服务端做合并。基本流程是:指定 bucket 和 key,获取一个上传 id;按流读取文件,分段上传字节流,并记录 CompletedPart;通知服务器按照 CompletedPart 集合来合并文件。具体过程代码已加注释,这里不再累述。

大文件下载

let mut file = match OpenOptions::new()            .truncate(true)            .create(true)            .write(true)            .open("/tmp/target_file");let key = "/tmp/test/key".to_string();let resp = client    .get_object()    .bucket("bucket")    .key(&key)    .send()    .await.unwrap();
let content_len = resp.content_length();let mut byte_stream_async_reader = resp.body.into_async_read();let mut content_len_usize: usize = content_len.try_into().unwrap();loop { if content_len_usize > chunk_size { let mut buffer = vec![0; chunk_size]; let _ = byte_stream_async_reader.read_exact(&mut buffer).await.unwrap(); file.write_all(&buffer).unwrap(); content_len_usize -= chunk_size; continue; } else { let mut buffer = vec![0; content_len_usize]; let _ = byte_stream_async_reader.read_exact(&mut buffer).await.unwrap(); file.write_all(&buffer).unwrap(); break; }}file.flush().unwrap();
复制代码


在从对象存储服务端下载文件的过程中也会遇到大文件问题。为了节约带宽和内存,我们采取读取字节流的方式分段写入文件。首先 get_object()函数获取 ByteStream,通过 async_reader 流式读取对象字节,分段写入文件。


对象存储的相关话题今天先聊到这儿,下期见。

发布于: 刚刚阅读数: 6
用户头像

拥抱技术,与开发者携手创造未来! 2018-11-20 加入

我们将持续为人工智能、大数据、云计算、物联网等相关领域的开发者,提供技术干货、行业技术内容、技术落地实践等文章内容。京东云开发者社区官方网站【https://developer.jdcloud.com/】,欢迎大家来玩

评论

发布
暂无评论
文盘Rust —— rust连接oss | 京东云技术团队_rust_京东科技开发者_InfoQ写作社区