CnosDB 2.0 Arrow Flight SQL 使用指北
- 2023-04-01 内蒙古
本文字数:13573 字
阅读完需:约 45 分钟
随着新版本的发布,细心的小伙伴们想必已经发现 CnosDB 2.0 已经全面支持了 Arrow Flight SQL。易用高效,数据访问不再难!Arrow Flight SQL 让 CnosDB 2.0 十亿行数据查询秒级响应成为可能!本篇文章详细介绍了 Arrow Flight SQL 及使用 Arrow Flight SQL 的优势及各种语言的使用方法,方便大家快速上手使用 Arrow Flight SQL。
Arrow Flight SQL
Arrow Flight SQL 是一种使用 Arrow 内存格式和 Flight RPC 框架与 SQL 数据库交互的协议,其结合 Arrow 内存中的列式格式(Columnar Format)以及 Flight RPC 框架,来加速 SQL 数据库操作。通过使用 Arrow Flight SQL,用户访问数据时,不仅可以使用原生 SQL 的标准语法,而且可以大幅度地提升数据访问性能,让十亿行数据查询秒级响应成为可能!
目前我们支持 Arrow Flight SQL 客户端的环境有:
• C++
• Go
• Java
• Rust
• 基于 Arrow Flight SQL 的 JDBC
Arrow Flight SQL 的优势
Arrow Flight SQL 具有的优势:
1. 功能强大:功能与 JDBC 和 ODBC 等 API 类似,包括执行查询,创建准备好的语句。
2. 安全:使用 Flight,支持开箱即用的加密和身份验证等功能。
3. 性能:与实现 Arrow Flight 的客户端和服务端通信,无需进行数据转化,同时允许进一步优化,如并行数据访问,从而大幅度提升数据访问性能。
Arrow Flight 与 JDBC/ODBC 性能的比较:
1. Arrow Flight 在客户端和服务端通信传输数据时,无需进行数据转化,而 ODBC 的实现通常需要自定义在线二进制协议。
2. Arrow Flight 可以并行传输数据,先获取数据的访问计划,数据可以分布在不同的服务器上,客户端可以并行从不同服务器上拉取数据。
3. Arrow Flight 采用 Arrow Columnar Format 格式,该格式获取数据的复杂度是 O(1),对向量化计算友好。
虽然它可以直接用于数据库访问,但它不能直接替代 JDBC/ODBC。 但是,Arrow Flight SQL 可以用作具体的有线协议/驱动程序实现,支持 JDBC/ODBC 驱动程序,并减少数据库的实现负担。
客户端使用 Arrow Flight SQL 与数据库连接,查询数据,执行 SQL 的流程大致如下:
1. 创建 Flight SQL 客户端
2. 验证用户名,密码
3. 执行 SQL,获取 FlightInfo 结构体
4. 通过 FlightInfo 结构体中的 FlightEndPoint 获取到 FlightData 数据流
FlightInfo 中包含有关数据所在位置的详细信息,客户端可以从适当的服务器获取数据。服务器信息被编码为 FlightInfo 中的一系列 FlightEndpoint 消息。 每个 Endpoint 代表包含响应数据子集的某个位置。
一个 FlightEndpoint 包含一个服务器地址列表,一个 Ticket,一个服务器用来识别请求数据的二进制 Token。FlightEndPoint 没有定义顺序,如果数据集是排序的,只会在一个 FlightEndPoint 中返回数据。
流程图如下:
C++
1.安装 Apache Arrow 你可以去官方文档(https://arrow.apache.org/install/)找到详细的安装教程。在 Mac 系统下,使用 brew 命令就可以简单安装。
brew install apache-arrow
brew install apache-arrow-glib
2. 配置 CMakeLists.txt
cmake_minimum_required(VERSION 3.24)
project(arrow_flight_cpp)
set(CMAKE_CXX_STANDARD 20)
find_package(Arrow REQUIRED)
find_package(ArrowFlight REQUIRED)
find_package(ArrowFlightSql REQUIRED)
include_directories(${ARROW_INCLUDE_DIR})
add_executable(arrow_flight_cpp main.cpp)
target_link_libraries(arrow_flight_cpp PRIVATE Arrow::arrow_shared)
target_link_libraries(arrow_flight_cpp PRIVATE ArrowFlight::arrow_flight_shared)
target_link_libraries(arrow_flight_cpp PRIVATE ArrowFlightSql::arrow_flight_sql_shared)
3. C++ Arrow 库的用法 arrow 的函数大多数是返回 arrow::Result<T>类型,因此需要把代码写在返回值为 arrow::Result<T>的类型的函数中,如下
arrow::Result <std::unique_ptr<FlightClient>> get_location() {
ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp("localhost", 31004));
ARROW_ASSIGN_OR_RAISE(auto client, FlightClient::Connect(location))
}
ARROW_ASSIGN_OR_RAISE 宏的效果是,先对右边返回值为 arrow::Result<T>类型的表达式求值,如果出现异常,则提前 return,赋上相应的 Status 值
为了方便,示例代码均写在 lambda 函数中
int main() {
auto fun = []() {
// code
}
fun();
return 0;
}
4. 执行身份验证,并创建一个 FlightSqlClient
ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp("localhost", 31004));
ARROW_ASSIGN_OR_RAISE(auto client, FlightClient::Connect(location))
auto user = "root";
auto password = "";
//Base64加密认证
auto auth = client->AuthenticateBasicToken({}, user, password);
auto sql_client = std::make_unique<FlightSqlClient>(std::move(client));
ARROW_RETURN_NOT_OK(auth); // 如果result出现异常,直接return
FlightCallOptions call_options;
call_options.headers.push_back(auth.ValueOrDie()); //把认证放到调用选项中
5. 执行 SQL 取得 FlightInfo
ARROW_ASSIGN_OR_RAISE(auto info, sql_client->Execute(call_options, "SELECT now();"));
const auto endpoints = info->endpoints();
6. 通过 FlightEndPoint 取回数据
for (auto i = 0; i < endpoints.size(); i++) {
auto &ticket = endpoints[i].ticket;
// stream中包含数据
ARROW_ASSIGN_OR_RAISE(auto stream, sql_client->DoGet(call_options, ticket));
// 获取数据的Schema
auto schema = stream->GetSchema();
ARROW_RETURN_NOT_OK(schema);
std::cout << "Schema:" << schema->get()->ToString() << std::endl;
// 取得并打印数据
while(true) {
ARROW_ASSIGN_OR_RAISE(FlightStreamChunk chunk, stream->Next());
if (chunk.data == nullptr) {
break;
}
std::cout << chunk.data->ToString();
}
}
7.整体代码
#include <iostream>
#include <arrow/flight/api.h>
#include <arrow/flight/sql/api.h>
using namespace arrow::flight;
using namespace arrow::flight::sql;
using namespace arrow;
int main() {
auto fun = []() {
ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp("localhost", 31004));
ARROW_ASSIGN_OR_RAISE(auto client, FlightClient::Connect(location))
std::cout << "location client" << std::endl;
auto sql_client = std::make_unique<FlightSqlClient>(std::move(client));
auto user = "root";
auto password = "";
auto auth = client->AuthenticateBasicToken({}, user, password);
ARROW_RETURN_NOT_OK(auth);
FlightCallOptions call_options;
call_options.headers.push_back(auth.ValueOrDie());
ARROW_ASSIGN_OR_RAISE(auto info, sql_client->Execute(call_options, "SELECT now();"));
const auto endpoints = info->endpoints();
for (auto i = 0; i < endpoints.size(); i++) {
auto &ticket = endpoints[i].ticket;
ARROW_ASSIGN_OR_RAISE(auto stream, sql_client->DoGet(call_options, ticket));
auto schema = stream->GetSchema();
ARROW_RETURN_NOT_OK(schema);
std::cout << "Schema:" << schema->get()->ToString() << std::endl;
while(true) {
ARROW_ASSIGN_OR_RAISE(FlightStreamChunk chunk, stream->Next());
if (chunk.data == nullptr) {
break;
}
std::cout << chunk.data->ToString();
}
}
return Status::OK();
};
auto status = fun();
std::cout << status.ToString() << std::endl;
return 0;
}
GO
1. 添加依赖
在 go.mod 中写入依赖
require (
github.com/apache/arrow/go/v10 v10.0.1
google.golang.org/grpc v1.51.0
)
2. 创建 Flight SQL 客户端
var dialOpts = []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
cl, err := flightsql.NewClient("localhost:31004", nil, nil, dialOpts...)
if err != nil {
fmt.Print(err)
return
}
3. 设置连接凭证,并取得已经验证的上下文
ctx, err := cl.Client.AuthenticateBasicToken(context.Background(), "root", "")
if err != nil {
fmt.Print(err)
return
}
4. 在已经验证的上下文中执行 SQL,取得 FlightInfo
info, err := cl.Execute(ctx, "SELECT now();")
if err != nil {
fmt.Print(err)
return
}
fmt.Println(info.Schema)
5. 根据 FlightInfo 取得数据 Reader
// 目前CnosDb仅实现了一个EndPoint
rdr, err := cl.DoGet(ctx, info.GetEndpoint()[0].Ticket)
if err != nil {
fmt.Print(err)
fmt.Println(35)
return
}
defer rdr.Release()
6. 操作 Reader 打印数据
n := 0
for rdr.Next() {
record := rdr.Record()
for i, col := range record.Columns() {
fmt.Printf("rec[%d][%q]: %v\n", n, record.ColumnName(i), col)
}
column := record.Column(0)
column.String()
n++
}
JAVA
1. 添加依赖
• 如果你使用 maven 构建 Java 项目,在 pom.xml 中写入依赖
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-flight -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-flight</artifactId>
<version>10.0.1</version>
<type>pom</type>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.arrow/flight-sql -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>flight-sql</artifactId>
<version>10.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.arrow/flight-core -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>10.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.arrow/flight-core -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>flight-core</artifactId>
<version>10.0.1</version>
</dependency>
</dependencies>
• 再写入
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
</build>
• 添加环境变量
_JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED"
java --add-opens=java.base/java.nio=ALL-UNNAMED -jar ...
# 或
env _JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED" java -jar ...
# 如果使用 maven
_JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED" mvn exec:java -Dexec.mainClass="YourMainCode"
2. 建 FlightSqlClient
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
final Location clientLocation = Location.forGrpcInsecure("localhost", 31004);
FlightClient client = FlightClient.builder(allocator, clientLocation).build();
FlightSqlClient sqlClinet = new FlightSqlClient(client);
3. 配置认证
Optional<CredentialCallOption> credentialCallOption = client.authenticateBasicToken("root", "");
final CallHeaders headers = new FlightCallHeaders();
headers.insert("tenant", "cnosdb");
Set<CallOption> options = new HashSet<>();
credentialCallOption.ifPresent(options::add);
options.add(new HeaderCallOption(headers));
CallOption[] callOptions = options.toArray(new CallOption[0]);
4. 执行 SQL,取得 FlightInfo
try (final FlightSqlClient.PreparedStatement preparedStatement = sqlClinet.prepare("SELECT now();", callOptions)) {
final FlightInfo info = preparedStatement.execute();
System.out.println(info.getSchema());
//剩余代码在下一个步骤
}
5. 取得数据
final Ticket ticket = info.getEndpoints().get(0).getTicket();
try (FlightStream stream = sqlClinet.getStream(ticket)) {
int n = 0;
while (stream.next()) {
List<FieldVector> vectors = stream.getRoot().getFieldVectors();
for (int i = 0; i < vectors.size(); i++) {
System.out.printf("%d %d %s", n, i , vectors.get(i));
}
n++;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
6. 全部代码
package org.example;
import org.apache.arrow.flight.*;
import org.apache.arrow.flight.grpc.CredentialCallOption;
import org.apache.arrow.flight.sql.FlightSqlClient;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
public class Main {
public static void main(String[] args) {
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
final Location clientLocation = Location.forGrpcInsecure("localhost", 31004);
FlightClient client = FlightClient.builder(allocator, clientLocation).build();
FlightSqlClient sqlClinet = new FlightSqlClient(client);
Optional<CredentialCallOption> credentialCallOption = client.authenticateBasicToken("root", "");
final CallHeaders headers = new FlightCallHeaders();
headers.insert("tenant", "cnosdb");
Set<CallOption> options = new HashSet<>();
credentialCallOption.ifPresent(options::add);
options.add(new HeaderCallOption(headers));
CallOption[] callOptions = options.toArray(new CallOption[0]);
try (final FlightSqlClient.PreparedStatement preparedStatement = sqlClinet.prepare("SELECT now();", callOptions)) {
final FlightInfo info = preparedStatement.execute();
System.out.println(info.getSchema());
final Ticket ticket = info.getEndpoints().get(0).getTicket();
try (FlightStream stream = sqlClinet.getStream(ticket)) {
int n = 0;
while (stream.next()) {
List<FieldVector> vectors = stream.getRoot().getFieldVectors();
for (int i = 0; i < vectors.size(); i++) {
System.out.printf("%d %d %s", n, i , vectors.get(i));
}
n++;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
JDBC
1. 添加依赖
<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-jdbc</artifactId>
<version>10.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.arrow/flight-sql-jdbc-driver -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>flight-sql-jdbc-driver</artifactId>
<version>10.0.1</version>
</dependency>
</dependencies>
添加环境变量
_JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED"
java --add-opens=java.base/java.nio=ALL-UNNAMED -jar ...
# 或
env _JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED" java -jar ...
# 如果使用 maven
_JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED" mvn exec:java -Dexec.mainClass="YourMainCode"
2. 设置属性并查询
package org.example;
import java.sql.*;
import java.util.Properties;
public class Main {
public static void main(String[] args) {
final Properties properties = new Properties();
properties.put("user", "root"); //用户名
properties.put("password", ""); //密码
properties.put("tenant", "cnosdb");//租户
properties.put("useEncryption", false);
try (
Connection connection = DriverManager.getConnection(
"jdbc:arrow-flight-sql://localhost:31004", properties
);
Statement statement = connection.createStatement())
{
ResultSet resultSet = statement.executeQuery("SELECT 1, 2, 3;");
while (resultSet.next()) {
int column1 = resultSet.getInt(1);
int column2 = resultSet.getInt(2);
int column3 = resultSet.getInt(3);
System.out.printf("%d %d %d", column1, column2, column3);
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
3.设置属性并执行 SQL
package org.example;
import java.sql.*;
import java.util.Properties;
public class Main {
public static void main(String[] args) {
final Properties properties = new Properties();
properties.put("user", "root");
properties.put("password", "");
properties.put("tenant", "cnosdb");
properties.put("useEncryption", false);
try (
Connection connection = DriverManager.getConnection(
"jdbc:arrow-flight-sql://localhost:31004", properties
);
Statement statement = connection.createStatement())
{
statement.execute("CREATE TABLE IF NOT EXISTS air\n" +
"(\n" +
" visibility DOUBLE,\n" +
" temperature DOUBLE,\n" +
" pressure DOUBLE,\n" +
" TAGS(station)\n" +
");");
statement.executeUpdate("INSERT INTO air (TIME, station, visibility, temperature, pressure) VALUES\n" +
" (1666165200290401000, 'XiaoMaiDao', 56, 69, 77);");
ResultSet resultSet = statement.executeQuery("select * from air limit 1;");
while (resultSet.next()) {
Timestamp column1 = resultSet.getTimestamp(1);
String column2 = resultSet.getString(2);
Double column3 = resultSet.getDouble(3);
Double column4 = resultSet.getDouble(4);
Double column5 = resultSet.getDouble(5);
System.out.printf("%s %s %f %f %f", column1, column2, column3, column4, column5);
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
Rust
代码运行在异步环境下
1. 添加依赖
arrow = {version = "28.0.0", features = ["prettyprint"] }
arrow-flight = {version = "28.0.0", features = ["flight-sql-experimental"]}
tokio = "1.23.0"
futures = "0.3.25"
prost-types = "0.11.2"
tonic = "0.8.3"
prost = "0.11.3"
http-auth-basic = "0.3.3"
base64 = "0.13.1"
2. 创建 FlightServerClient
let mut client = FlightServiceClient::connect("http://localhost:31004")
.await
.expect("connect faile");
3. 进行验证
let mut req = Request::new(futures::stream::iter(iter::once(
HandshakeRequest::default(),
)));
req.metadata_mut().insert(
AUTHORIZATION.as_str(),
AsciiMetadataValue::try_from(format!(
"Basic {}",
base64::encode(format!("{}:{}", "root", ""))
))
.expect("metadata construct fail"),
);
let resp = client.handshake(req).await.expect("handshake");
println!("handshake resp: {:?}", resp.metadata());
4. 执行 SQL
let cmd = CommandStatementQuery { query: "select now();".to_string(),};let pack = prost_types::Any::pack(&cmd).expect("pack");let fd = FlightDescriptor::new_cmd(pack.encode_to_vec());let mut req = Request::new(fd);req.metadata_mut().insert( AUTHORIZATION.as_str(), resp.metadata().get(AUTHORIZATION.as_str()).unwrap().clone(),);let resp = client.get_flight_info(req).await.expect("get_flight_info");let flight_info = resp.into_inner();let schema_ref =Arc::new(Schema::try_from(IpcMessage(flight_info.schema)).expect("Schema::try_from"));println!("{}", schema_ref);
5. 取得数据并打印
for ep in flight_info.endpoint { if let Some(ticket) = ep.ticket { let resp = client.do_get(ticket).await.expect("do_get"); let mut stream = resp.into_inner(); let mut dictionaries_by_id = HashMap::new(); let mut record_batches = Vec::new(); while let Some(Ok(flight_data)) = stream.next().await { let message = root_as_message(&flight_data.data_header[..]).expect("root as message"); match message.header_type() { ipc::MessageHeader::Schema => { println!("a schema when messages are read",); } ipc::MessageHeader::RecordBatch => { let record_batch = flight_data_to_arrow_batch( &flight_data, schema_ref.clone(), &dictionaries_by_id, ) .expect("record_batch_from_message"); record_batches.push(record_batch); } ipc::MessageHeader::DictionaryBatch => { let ipc_batch = message.header_as_dictionary_batch().unwrap(); reader::read_dictionary( &Buffer::from(flight_data.data_body), ipc_batch, &schema_ref, &mut dictionaries_by_id, &message.version(), ) .unwrap(); } _ => { panic!("Reading types other than record batches not yet supported"); } } } println!( "{}", arrow::util::pretty::pretty_format_batches(&record_batches).expect("print") ); }}
6. 完整代码
use std::collections::HashMap;
use std::iter;
use std::sync::Arc;
use arrow::buffer::Buffer;
use arrow::datatypes::Schema;
use arrow::ipc;
use arrow::ipc::{reader, root_as_message};
use arrow_flight::flight_service_client::FlightServiceClient;
use arrow_flight::sql::{CommandStatementQuery, ProstAnyExt};
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::{FlightDescriptor, HandshakeRequest, IpcMessage};
use futures::StreamExt;
use prost::Message;
use tonic::codegen::http::header::AUTHORIZATION;
use tonic::metadata::AsciiMetadataValue;
use tonic::Request;
#[tokio::main]
async fn main() {
let mut client = FlightServiceClient::connect("http://localhost:31004")
.await
.expect("connect");
let mut req = Request::new(futures::stream::iter(iter::once(
HandshakeRequest::default(),
)));
req.metadata_mut().insert(
AUTHORIZATION.as_str(),
AsciiMetadataValue::try_from(format!(
"Basic {}",
base64::encode(format!("{}:{}", "root", ""))
))
.expect("metadata construct fail"),
);
let resp = client.handshake(req).await.expect("handshake");
println!("handshake resp: {:?}", resp.metadata());
let cmd = CommandStatementQuery {
query: "select now();".to_string(),
};
let pack = prost_types::Any::pack(&cmd).expect("pack");
let fd = FlightDescriptor::new_cmd(pack.encode_to_vec());
let mut req = Request::new(fd);
req.metadata_mut().insert(
AUTHORIZATION.as_str(),
resp.metadata().get(AUTHORIZATION.as_str()).unwrap().clone(),
);
let resp = client.get_flight_info(req).await.expect("get_flight_info");
let flight_info = resp.into_inner();
let schema_ref =
Arc::new(Schema::try_from(IpcMessage(flight_info.schema)).expect("Schema::try_from"));
println!("{}", schema_ref);
for ep in flight_info.endpoint {
if let Some(ticket) = ep.ticket {
let resp = client.do_get(ticket).await.expect("do_get");
let mut stream = resp.into_inner();
let mut dictionaries_by_id = HashMap::new();
let mut record_batches = Vec::new();
while let Some(Ok(flight_data)) = stream.next().await {
let message =
root_as_message(&flight_data.data_header[..]).expect("root as message");
match message.header_type() {
ipc::MessageHeader::Schema => {
println!("a schema when messages are read",);
}
ipc::MessageHeader::RecordBatch => {
let record_batch = flight_data_to_arrow_batch(
&flight_data,
schema_ref.clone(),
&dictionaries_by_id,
)
.expect("record_batch_from_message");
record_batches.push(record_batch);
}
ipc::MessageHeader::DictionaryBatch => {
let ipc_batch = message.header_as_dictionary_batch().unwrap();
reader::read_dictionary(
&Buffer::from(flight_data.data_body),
ipc_batch,
&schema_ref,
&mut dictionaries_by_id,
&message.version(),
)
.unwrap();
}
_ => {
panic!("Reading types other than record batches not yet supported");
}
}
}
println!(
"{}",
arrow::util::pretty::pretty_format_batches(&record_batches).expect("print")
);
}
}
}
ODBC
目前仅支持 x86_64 架构的系统,Linux 仅支持 CentOS 和 RedHat 系列发行版。
更多关于 Arrow Flight SQL ODBC 的内容,请查看 Dremio 文档(https://docs.dremio.com/software/drivers/arrow-flight-sql-odbc-driver/)。
以下步骤基于 CentOS 7。
安装 ODBC 管理器
在 Linux 下安装 unixODBC
yum install unixODBC-devel
1. 安装 arrow-flight-odbc 驱动
wget https://download.dremio.com/arrow-flight-sql-odbc-driver/arrow-flight-sql-odbc-driver-LATEST.x86_64.rpm
yum localinstall arrow-flight-sql-odbc-driver-LATEST.x86_64.rpm
2. 修改配置文件修改位于/etc/odbc.ini 的配置文件
[ODBC Data Sources]
CNOSDB=Arrow Flight SQL ODBC Driver
[CNOSDB]
Description=ODBC Driver DSN for Arrow Flight SQL developed by Dremio
Driver=Arrow Flight SQL ODBC Driver
Host=localhost
Port=31004
UID=root
PWD=
Database=public
Tenant=cnosdb
useEncryption=false
TrustedCerts=/opt/arrow-flight-sql-odbc-driver/lib64/cacerts.pem
UseSystemTrustStore=true
其中 UID 是用户名,PWD 是密码
测试是否连接
isql -v CNOSDB
如果出现如下内容,说明连接成功
+---------------------------------------+
| Connected! |
| |
| sql-statement |
| help [tablename] |
| quit |
| |
+---------------------------------------+
SQL>
下面进入代码测试
1. 编写 cmak
cmake_minimum_required(VERSION 3.24)
project(arrow_flight_odbc C)
set(CMAKE_C_STANDARD 11)
find_package(ODBC)
include_directories(${ODBC_INCLUDE_DIR})
link_directories(/opt/arrow-flight-sql-odbc-driver/lib64)
add_executable(arrow_flight_odbc main.c)
target_link_libraries(arrow_flight_odbc ${ODBC_LIBRARY})
2. 编写 c 语言代码 main.c
#include <stdio.h>
#include <sql.h>
#include <sqlext.h>
int main() {
SQLHENV henv;
SQLHDBC hdbc;
SQLHSTMT hsmt;
SQLRETURN ret;
// 分配环境内存
ret = SQLAllocEnv(&henv);
if (ret != SQL_SUCCESS) {
fprintf(stderr, "Unable to allocate an environment handle");
return -1;
}
// 设置环境属性
ret = SQLSetEnvAttr(henv, SQL_ATTR_ODBC_VERSION, (void *) SQL_OV_ODBC3, 0);
if (ret != SQL_SUCCESS) {
fprintf(stderr, "Unable to set env attr");
return -1;
}
// 分配连接内存
ret = SQLAllocConnect(henv, &hdbc);
if (ret != SQL_SUCCESS) {
fprintf(stderr, "Unable to allocate connection");
}
//连接到driver
ret = SQLDriverConnect(hdbc, NULL, (SQLCHAR*) "DSN=CNOSDB;UID=root;PWD=", SQL_NTS,
NULL, 0, NULL, SQL_DRIVER_NOPROMPT);
if (ret != SQL_SUCCESS) {
fprintf(stderr, "connect fail");
}
// 分配语句空间
SQLAllocStmt(hdbc, &hsmt);
SQLCHAR *sql = "CREATE TABLE IF NOT EXISTS air (\n"
" visibility DOUBLE,\n"
" temperature DOUBLE,\n"
" pressure DOUBLE,\n"
" TAGS(station));";
// 执行 Create table
ret = SQLExecDirect(hsmt, sql, SQL_NTS);
if (ret != SQL_SUCCESS) {
fprintf(stderr, "Execute create fail");
}
sql = "INSERT INTO air (TIME, station, visibility, temperature, pressure) VALUES\n"
" (1666165200290401000, 'XiaoMaiDao', 56, 69, 77);";
// 执行 insert
ret = SQLExecDirect(hsmt, sql, SQL_NTS);
if (ret != SQL_SUCCESS) {
fprintf(stderr, "Execute insert fail");
}
sql = "SELECT * FROM air LIMIT 1";
//执行查询
ret = SQLExecDirect(hsmt, sql ,SQL_NTS);
if (ret != SQL_SUCCESS) {
fprintf(stderr, "Execute query fail");
}
SQL_TIMESTAMP_STRUCT time;
SQLCHAR station[50];
SQLDOUBLE visibility, temperature, pressure;
long time_len, station_len;
// 获取结果集
while (1) {
ret = SQLFetch(hsmt);
if (ret == SQL_ERROR || ret == SQL_SUCCESS_WITH_INFO) {
printf("error SQLFetch");
}
// 获取列的数据
if (ret == SQL_SUCCESS || ret == SQL_SUCCESS_WITH_INFO) {
SQLGetData(hsmt, 1, SQL_C_TIMESTAMP, &time, 0, NULL);
SQLGetData(hsmt, 2, SQL_C_CHAR, station, 50, &station_len);
SQLGetData(hsmt, 3, SQL_C_DOUBLE, &visibility, 0, NULL);
SQLGetData(hsmt, 4, SQL_C_DOUBLE, &temperature, 0, NULL);
SQLGetData(hsmt, 5, SQL_C_DOUBLE, &pressure, 0, NULL);
printf("%d-%02d-%02dT%02d:%02d:%02d, %s, %.2lf, %.2lf, %.2lf\n", time.year, time.month, time.day, time.hour, time.minute, time.second, station, visibility, temperature, pressure);
} else {
break;
}
}
return 0;
}
总结
CnosDB 2.0 的原生 Arrow 架构,提供了基于 Arrow Flight SQL 接口,通过使用 Arrow Flight SQL,可以用多种语言连接 CnosDB 2.0 时序数据库,高效地写入数据与查询数据,更能支持十亿行数据查询秒级响应。
关于更多的细节,可以在 CnosDB2.0 使用手册|连接器(https://docs.cnosdb.com/zh/guide/reference/connector.html)中查看相关使用方法,如果有需求或者建议,也请在 GitHub(https://github.com/cnosdb/cnosdb)上给我们提 issue。
CnosDB 简介
CnosDB 是一款高性能、高易用性的开源分布式时序数据库,现已正式发布及全部开源。
欢迎关注我们的社区网站:https://www.cnosdb.com
版权声明: 本文为 InfoQ 作者【CnosDB】的原创文章。
原文链接:【http://xie.infoq.cn/article/44cee1c940c4e733d27ead36d】。文章转载请联系作者。
CnosDB
还未添加个人签名 2022-04-18 加入
打造高性能、高压缩比、高可用的分布式云原生时间序列数据库,引领世界迈向万物智联 欢迎关注 https://www.cnosdb.com
评论