Home » Blog » CnosDB 2.0 Arrow Flight SQL User Guide

CnosDB 2.0 Arrow Flight SQL User Guide

With the release of the new version, attentive users have probably noticed that CnosDB 2.0 now fully supports Arrow Flight SQL. It's user-friendly and efficient, making data access no longer a challenge! Arrow Flight SQL enables CnosDB 2.0 to achieve sub-second response times for queries on billions of rows of data. This article provides a detailed introduction to Arrow Flight SQL, its advantages, and usage methods in various languages, allowing everyone to quickly get started with Arrow Flight SQL.

Arrow Flight SQL

Arrow Flight SQL is a protocol that allows interaction between SQL databases and Arrow memory format using the Flight RPC framework. It combines the columnar format in Arrow memory and the Flight RPC framework to accelerate SQL database operations. By using Arrow Flight SQL, users can not only utilize the standard syntax of native SQL but also significantly improve data access performance, enabling sub-second response times for queries on billions of rows of data.
Currently, we support the following environments for Arrow Flight SQL clients:• C++
• Go
• Java
• Rust
• JDBC based on Arrow Flight SQL

Advantages of Arrow Flight SQL 

1. Powerful Functionality: It provides similar functionality to JDBC and ODBC APIs, including query execution and prepared statement creation.2. Security: With Flight, it supports out-of-the-box encryption and authentication features, ensuring secure communication.3. Performance: By communicating directly with the client and server implementations of Arrow Flight, there is no need for data conversion. This allows for further optimization, such as parallel data access, resulting in a significant improvement in data access performance.
Comparison of Arrow Flight with JDBC/ODBC performance:1. When transferring data between the client and server, Arrow Flight does not require data conversion, while ODBC implementations often require custom binary protocols.2. Arrow Flight enables parallel data transmission by obtaining data access plans first. The data can be distributed across different servers, allowing the client to fetch data in parallel from multiple servers.3. Arrow Flight uses the Arrow Columnar Format, which has a data retrieval complexity of O(1) and is friendly for vectorized computations.While Arrow Flight can be used for direct database access, it cannot directly replace JDBC/ODBC. However, Arrow Flight SQL can be used as a specific wire protocol/driver implementation that supports JDBC/ODBC drivers and reduces the implementation burden on the database.

Image
The process of using Arrow Flight SQL on the client side to connect to the database, query data, and execute SQL can be outlined as follows:1. Create the Flight SQL client.
2. Validate the username and password.
3. Execute the SQL query and retrieve the FlightInfo structure.
4. Obtain the FlightData stream from the FlightEndPoint within the FlightInfo structure.The FlightInfo contains detailed information about the location of the data, allowing the client to fetch data from the appropriate server. The server information is encoded as a series of FlightEndpoint messages within the FlightInfo. Each Endpoint represents a location that contains a subset of the response data.A FlightEndpoint consists of a list of server addresses, a ticket, and a binary token used by the server to identify the requested data. FlightEndpoints do not have a defined order, and if the dataset is sorted, the data will only be returned within one FlightEndpoint.The process flowchart is as follows:

Image

C++

1. To install Apache Arrow, you can go to the official documentation( https://arrow.apache.org/install/ )Find detailed installation tutorials. On Mac systems, using the brew command allows for easy installation.
  • brew install apache-arrow
  • brew install apache-arrow-glib
2. Configure CMakeLists.txt

cmake_minimum_required(VERSION 3.24) project(arrow_flight_cpp)

set(CMAKE_CXX_STANDARD 20)

find_package(Arrow REQUIRED)
find_package(ArrowFlight REQUIRED)
find_package(ArrowFlightSql REQUIRED)

include_directories(${ARROW_INCLUDE_DIR})
add_executable(arrow_flight_cpp main.cpp)
target_link_libraries(arrow_flight_cpp PRIVATE Arrow::arrow_shared) target_link_libraries(arrow_flight_cpp PRIVATE ArrowFlight::arrow_flight_shared) target_link_libraries(arrow_flight_cpp PRIVATE ArrowFlightSql::arrow_flight_sql_shared)
3. The usage of the C++Arrow library. Most of the arrow functions return the arrow:: Result<T>type, so you need to write the code in the function that returns the arrow:: Result<T>type, as follows

arrow::Result <std::unique_ptr<FlightClient>> get_location() {
    ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp("localhost"31004));    
    ARROW_ASSIGN_OR_RAISE(auto client, FlightClient::Connect(location)) }
ARROW_ ASSIGN_ OR_ The effect of the RAISE macro is to first evaluate an expression on the right that returns a value of type arrow:: Result<T>. If an exception occurs, it is returned in advance and assigned the corresponding Status value. For convenience, the sample code is written in the lambda function

int main() {
auto fun = []() {
// code
}
fun();
return 0;
}}
4. Perform authentication and create a FlightSqlClient
ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp("localhost"31004));
ARROW_ASSIGN_OR_RAISE(auto client, FlightClient::Connect(location))auto user = "root";
auto password = "";
//encryption authentication
auto auth = client->AuthenticateBasicToken({}, user, password);
auto sql_client = std::make_unique<FlightSqlClient>(std::move(client));

ARROW_RETURN_NOT_OK(auth); // If abnormality occurs in result,return directly
FlightCallOptions call_options; call_options.headers.push_back(auth.ValueOrDie()); //把认证放到调用选项中
5. Execute SQL to obtain FlightInfo

ARROW_ASSIGN_OR_RAISE(auto info, sql_client->Execute(call_options, "SELECT now();"));
const auto endpoints = info->endpoints();
6. Retrieve data through FlightEndPoint

for (auto i = 0; i < endpoints.size(); i++) { 
  auto &ticket = endpoints[i].ticket;  
// data included in stream 
  ARROW_ASSIGN_OR_RAISE(auto stream, sql_client->DoGet(call_options, ticket)); 
// obtain data in Schema 
  auto schema = stream->GetSchema(); 
  ARROW_RETURN_NOT_OK(schema); 
  std::cout << "Schema:" << schema->get()->ToString() << std::endl;
// obtain and print data 
  while(true) {   
    ARROW_ASSIGN_OR_RAISE(FlightStreamChunk chunk, stream->Next());
   if (chunk.data == nullptr) {
     break;
   }
   std::cout << chunk.data->ToString();
}
}
7. Overall Code
  • #include <iostream>
    #include <arrow/flight/api.h>
    #include <arrow/flight/sql/api.h>
    using namespace arrow::flight;
    using namespace arrow::flight::sql;
    using namespace arrow;
    ​
    int main() {
    ​
     auto fun = []() {
       ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp("localhost", 31004));
       ARROW_ASSIGN_OR_RAISE(auto client, FlightClient::Connect(location))
         std::cout << "location client" << std::endl;
       auto sql_client = std::make_unique<FlightSqlClient>(std::move(client));
    ​
       auto user = "root";
       auto password = "";
       auto auth = client->AuthenticateBasicToken({}, user, password);
       ARROW_RETURN_NOT_OK(auth);
       FlightCallOptions call_options;
       call_options.headers.push_back(auth.ValueOrDie());
    ​
       ARROW_ASSIGN_OR_RAISE(auto info, sql_client->Execute(call_options, "SELECT now();"));
       const auto endpoints = info->endpoints();
       for (auto i = 0; i < endpoints.size(); i++) {
         auto &ticket = endpoints[i].ticket;
    ​
         ARROW_ASSIGN_OR_RAISE(auto stream, sql_client->DoGet(call_options, ticket));
    ​
         auto schema = stream->GetSchema();
         ARROW_RETURN_NOT_OK(schema);
    ​
         std::cout << "Schema:" << schema->get()->ToString() << std::endl;
         while(true) {
           ARROW_ASSIGN_OR_RAISE(FlightStreamChunk chunk, stream->Next());
           if (chunk.data == nullptr) {
             break;
          }
           std::cout << chunk.data->ToString();
        }
      }
       return Status::OK();
    };
    ​
     auto status = fun();
     std::cout << status.ToString() << std::endl;
    ​
     return 0;
    }

GO

                              
1. Add Dependency
Write dependencies in go.mod
require (
 github.com/apache/arrow/go/v10 v10.0.1
 google.golang.org/grpc v1.51.0
)

2. Create Flight SQL client

var dialOpts = []grpc.DialOption{
 grpc.WithTransportCredentials(insecure.NewCredentials()),
}
cl, err := flightsql.NewClient("localhost:31004", nil, nil, dialOpts...)
if err != nil {
 fmt.Print(err)
 return
}

3. Set connection credentials and obtain verified context

ctx, err := cl.Client.AuthenticateBasicToken(context.Background(), "root", "")
if err != nil {
 fmt.Print(err)
 return
}
4. Execute SQL in the verified context to obtain FlightInfo

info, err := cl.Execute(ctx, "SELECT now();")
if err != nil {
 fmt.Print(err)
 return
}
fmt.Println(info.Schema)

5. Obtaining Data Reader Based on FlightInfo

// 目前CnosDb仅实现了一个EndPoint
rdr, err := cl.DoGet(ctx, info.GetEndpoint()[0].Ticket)
if err != nil {
 fmt.Print(err)
 fmt.Println(35)
 return
}
defer rdr.Release()

6. Operate Reader to print data

n := 0
for rdr.Next() {
 record := rdr.Record()
 for i, col := range record.Columns() {
   fmt.Printf("rec[%d][%q]: %v\n", n, record.ColumnName(i), col)
}
 column := record.Column(0)
 column.String()
 n++
}

JAVA

1. Add Dependency
If you use Maven to build a Java project, write dependencies in pom.xml
<dependencies>
 <!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-flight -->
 <dependency>
   <groupId>org.apache.arrow</groupId>
   <artifactId>arrow-flight</artifactId>
   <version>10.0.1</version>
   <type>pom</type>
 </dependency>
​
 <!-- https://mvnrepository.com/artifact/org.apache.arrow/flight-sql -->
 <dependency>
   <groupId>org.apache.arrow</groupId>
   <artifactId>flight-sql</artifactId>
   <version>10.0.1</version>
 </dependency>
​
 <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
 <dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-api</artifactId>
   <version>2.0.5</version>
 </dependency>
​
 <!-- https://mvnrepository.com/artifact/org.apache.arrow/flight-core -->
 <dependency>
   <groupId>org.apache.arrow</groupId>
   <artifactId>arrow-memory-netty</artifactId>
   <version>10.0.1</version>
 </dependency>
​
 <!-- https://mvnrepository.com/artifact/org.apache.arrow/flight-core -->
 <dependency>
   <groupId>org.apache.arrow</groupId>
   <artifactId>flight-core</artifactId>
   <version>10.0.1</version>
 </dependency>
</dependencies>

• Then write

<build>
 <extensions>
   <extension>
     <groupId>kr.motd.maven</groupId>
     <artifactId>os-maven-plugin</artifactId>
     <version>1.7.1</version>
   </extension>
 </extensions>
</build>
  •  Add enviroment variables

_JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED"
java --add-opens=java.base/java.nio=ALL-UNNAMED -jar ...
# or
env _JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED" java -jar ...
​
​
# if using maven 
_JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED" mvn exec:java -Dexec.mainClass="YourMainCode"
2. Create FlightSqlClient

BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
final Location clientLocation = Location.forGrpcInsecure("localhost", 31004);
​
FlightClient client = FlightClient.builder(allocator, clientLocation).build();
FlightSqlClient sqlClinet = new FlightSqlClient(client);

3. Setting Identification

Optional<CredentialCallOption> credentialCallOption = client.authenticateBasicToken("root", "");
final CallHeaders headers = new FlightCallHeaders();
headers.insert("tenant", "cnosdb");
Set<CallOption> options = new HashSet<>();
​
credentialCallOption.ifPresent(options::add);
options.add(new HeaderCallOption(headers));
CallOption[] callOptions = options.toArray(new CallOption[0]);

4. Execute SQL, Obtain FlightInfo

try (final FlightSqlClient.PreparedStatement preparedStatement = sqlClinet.prepare("SELECT now();", callOptions)) {
 final FlightInfo info = preparedStatement.execute();
 System.out.println(info.getSchema());
} 

5. Get data

final Ticket ticket = info.getEndpoints().get(0).getTicket();
try (FlightStream stream = sqlClinet.getStream(ticket)) {
 int n = 0;
 while (stream.next()) {
   List<FieldVector> vectors = stream.getRoot().getFieldVectors();
   for (int i = 0; i < vectors.size(); i++) {
     System.out.printf("%d %d %s", n, i , vectors.get(i));
  }
   n++;
}
} catch (Exception e) {
 throw new RuntimeException(e);
}

6. Overall Code

package org.example;
​
import org.apache.arrow.flight.*;
import org.apache.arrow.flight.grpc.CredentialCallOption;
import org.apache.arrow.flight.sql.FlightSqlClient;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
​
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
​
​
public class Main {
 public static void main(String[] args) {
   BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
   final Location clientLocation = Location.forGrpcInsecure("localhost", 31004);
​
   FlightClient client = FlightClient.builder(allocator, clientLocation).build();
   FlightSqlClient sqlClinet = new FlightSqlClient(client);
​
   Optional<CredentialCallOption> credentialCallOption = client.authenticateBasicToken("root", "");
   final CallHeaders headers = new FlightCallHeaders();
   headers.insert("tenant", "cnosdb");
   Set<CallOption> options = new HashSet<>();
​
   credentialCallOption.ifPresent(options::add);
   options.add(new HeaderCallOption(headers));
   CallOption[] callOptions = options.toArray(new CallOption[0]);
​
   try (final FlightSqlClient.PreparedStatement preparedStatement = sqlClinet.prepare("SELECT now();", callOptions)) {
     final FlightInfo info = preparedStatement.execute();
     System.out.println(info.getSchema());
     final Ticket ticket = info.getEndpoints().get(0).getTicket();
     try (FlightStream stream = sqlClinet.getStream(ticket)) {
       int n = 0;
       while (stream.next()) {
         List<FieldVector> vectors = stream.getRoot().getFieldVectors();
         for (int i = 0; i < vectors.size(); i++) {
           System.out.printf("%d %d %s", n, i , vectors.get(i));
        }
         n++;
      }
    } catch (Exception e) {
       throw new RuntimeException(e);
    }
  }
}
}

JDBC

1. Add Dependency
<dependencies>
 <dependency>
   <groupId>org.apache.arrow</groupId>
   <artifactId>arrow-jdbc</artifactId>
   <version>10.0.1</version>
 </dependency>
 <!-- https://mvnrepository.com/artifact/org.apache.arrow/flight-sql-jdbc-driver -->
 <dependency>
   <groupId>org.apache.arrow</groupId>
   <artifactId>flight-sql-jdbc-driver</artifactId>
   <version>10.0.1</version>
 </dependency>
</dependencies>
Add Environment Variables
_JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED"
java --add-opens=java.base/java.nio=ALL-UNNAMED -jar ...
# 或
env _JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED" java -jar ...


# 如果使用 maven 
_JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED" mvn exec:java -Dexec.mainClass="YourMainCode"

2. Set Propeties and Query

package org.example;
​
import java.sql.*;
import java.util.Properties;
​
public class Main {
 public static void main(String[] args) {
   final Properties properties = new Properties();
   properties.put("user", "root"); //用户名
   properties.put("password", "");  //密码
   properties.put("tenant", "cnosdb");//租户
   properties.put("useEncryption", false);
   try (
     Connection connection = DriverManager.getConnection(
       "jdbc:arrow-flight-sql://localhost:31004", properties
    );
     Statement statement = connection.createStatement())
  {
     ResultSet resultSet = statement.executeQuery("SELECT 1, 2, 3;");
​
     while (resultSet.next()) {
       int column1 = resultSet.getInt(1);
       int column2 = resultSet.getInt(2);
       int column3 = resultSet.getInt(3);
       System.out.printf("%d %d %d", column1, column2, column3);
    }
  } catch (SQLException e) {
     throw new RuntimeException(e);
  }
}
}

3.Set properties and execute SQL

package org.example;
​
import java.sql.*;
import java.util.Properties;
​
public class Main {
 public static void main(String[] args) {
   final Properties properties = new Properties();
   properties.put("user", "root");
   properties.put("password", "");
   properties.put("tenant", "cnosdb");
   properties.put("useEncryption", false);
   try (
     Connection connection = DriverManager.getConnection(
       "jdbc:arrow-flight-sql://localhost:31004", properties
    );
     Statement statement = connection.createStatement())
  {
     statement.execute("CREATE TABLE IF NOT EXISTS air\n" +
                       "(\n" +
                       "   visibility DOUBLE,\n" +
                       "   temperature DOUBLE,\n" +
                       "   pressure   DOUBLE,\n" +
                       "   TAGS(station)\n" +
                       ");");
     statement.executeUpdate("INSERT INTO air (TIME, station, visibility, temperature, pressure) VALUES\n" +
                             "   (1666165200290401000, 'XiaoMaiDao', 56, 69, 77);");
     ResultSet resultSet = statement.executeQuery("select * from air limit 1;");
​
     while (resultSet.next()) {
       Timestamp column1 = resultSet.getTimestamp(1);
       String column2 = resultSet.getString(2);
       Double column3 = resultSet.getDouble(3);
       Double column4 = resultSet.getDouble(4);
       Double column5 = resultSet.getDouble(5);
​
       System.out.printf("%s %s %f %f %f", column1, column2, column3, column4, column5);
    }
  } catch (SQLException e) {
     throw new RuntimeException(e);
  }
}
}

 

Rust

Code runs in an asynchronous environment

1. Add Dependency

arrow = {version = "28.0.0", features = ["prettyprint"] }
arrow-flight = {version = "28.0.0", features = ["flight-sql-experimental"]}
tokio = "1.23.0"
futures = "0.3.25"
prost-types = "0.11.2"
tonic = "0.8.3"
prost = "0.11.3"
http-auth-basic = "0.3.3"
base64 = "0.13.1"

2. Create FlightServerClient

let mut client = FlightServiceClient::connect("http://localhost:31004")
.await
.expect("connect faile");
3. Validation

let mut req = Request::new(futures::stream::iter(iter::once(
 HandshakeRequest::default(),
)));
​
req.metadata_mut().insert(
 AUTHORIZATION.as_str(),
 AsciiMetadataValue::try_from(format!(
   "Basic {}",
   base64::encode(format!("{}:{}", "root", ""))
))
.expect("metadata construct fail"),
);
​
let resp = client.handshake(req).await.expect("handshake");
​
println!("handshake resp: {:?}", resp.metadata());

4. Execute SQL

let cmd = CommandStatementQuery {  query: "select now();".to_string(),};let pack = prost_types::Any::pack(&cmd).expect("pack");let fd = FlightDescriptor::new_cmd(pack.encode_to_vec());let mut req = Request::new(fd);req.metadata_mut().insert(  AUTHORIZATION.as_str(),  resp.metadata().get(AUTHORIZATION.as_str()).unwrap().clone(),);let resp = client.get_flight_info(req).await.expect("get_flight_info");let flight_info = resp.into_inner();let schema_ref =Arc::new(Schema::try_from(IpcMessage(flight_info.schema)).expect("Schema::try_from"));println!("{}", schema_ref);

5. Get data and print

for ep in flight_info.endpoint {  if let Some(ticket) = ep.ticket {    let resp = client.do_get(ticket).await.expect("do_get");    let mut stream = resp.into_inner();    let mut dictionaries_by_id = HashMap::new();    let mut record_batches = Vec::new();    while let Some(Ok(flight_data)) = stream.next().await {      let message =      root_as_message(&flight_data.data_header[..]).expect("root as message");      match message.header_type() {        ipc::MessageHeader::Schema => {          println!("a schema when messages are read",);        }        ipc::MessageHeader::RecordBatch => {          let record_batch = flight_data_to_arrow_batch(            &flight_data,            schema_ref.clone(),            &dictionaries_by_id,          )          .expect("record_batch_from_message");          record_batches.push(record_batch);        }        ipc::MessageHeader::DictionaryBatch => {          let ipc_batch = message.header_as_dictionary_batch().unwrap();          reader::read_dictionary(            &Buffer::from(flight_data.data_body),            ipc_batch,            &schema_ref,            &mut dictionaries_by_id,            &message.version(),          )          .unwrap();        }        _ => {          panic!("Reading types other than record batches not yet supported");        }      }    }    println!(      "{}",      arrow::util::pretty::pretty_format_batches(&record_batches).expect("print")    );  }}

6. Overall Code

use std::collections::HashMap;
use std::iter;
use std::sync::Arc;

use arrow::buffer::Buffer;
use arrow::datatypes::Schema;
use arrow::ipc;
use arrow::ipc::{reader, root_as_message};
use arrow_flight::flight_service_client::FlightServiceClient;
use arrow_flight::sql::{CommandStatementQuery, ProstAnyExt};
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::{FlightDescriptor, HandshakeRequest, IpcMessage};
use futures::StreamExt;

use prost::Message;
use tonic::codegen::http::header::AUTHORIZATION;
use tonic::metadata::AsciiMetadataValue;
use tonic::Request;

#[tokio::main]
async fn main() {

    let mut client = FlightServiceClient::connect("http://localhost:31004")
        .await
        .expect("connect");

    let mut req = Request::new(futures::stream::iter(iter::once(
        HandshakeRequest::default(),
    )));

    req.metadata_mut().insert(
        AUTHORIZATION.as_str(),
        AsciiMetadataValue::try_from(format!(
            "Basic {}",
            base64::encode(format!("{}:{}", "root", ""))
        ))
        .expect("metadata construct fail"),
    );

    let resp = client.handshake(req).await.expect("handshake");

    println!("handshake resp: {:?}", resp.metadata());

    let cmd = CommandStatementQuery {
        query: "select now();".to_string(),
    };
    let pack = prost_types::Any::pack(&cmd).expect("pack");
    let fd = FlightDescriptor::new_cmd(pack.encode_to_vec());

    let mut req = Request::new(fd);
    req.metadata_mut().insert(
        AUTHORIZATION.as_str(),
        resp.metadata().get(AUTHORIZATION.as_str()).unwrap().clone(),
    );
    let resp = client.get_flight_info(req).await.expect("get_flight_info");

    let flight_info = resp.into_inner();
    let schema_ref =
        Arc::new(Schema::try_from(IpcMessage(flight_info.schema)).expect("Schema::try_from"));
    println!("{}", schema_ref);

    for ep in flight_info.endpoint {
        if let Some(ticket) = ep.ticket {
            let resp = client.do_get(ticket).await.expect("do_get");
            let mut stream = resp.into_inner();
            let mut dictionaries_by_id = HashMap::new();

            let mut record_batches = Vec::new();
            while let Some(Ok(flight_data)) = stream.next().await {
                let message =
                    root_as_message(&flight_data.data_header[..]).expect("root as message");
                match message.header_type() {
                    ipc::MessageHeader::Schema => {
                        println!("a schema when messages are read",);
                    }

                    ipc::MessageHeader::RecordBatch => {
                        let record_batch = flight_data_to_arrow_batch(
                            &flight_data,
                            schema_ref.clone(),
                            &dictionaries_by_id,
                        )
                        .expect("record_batch_from_message");
                        record_batches.push(record_batch);
                    }
                    ipc::MessageHeader::DictionaryBatch => {
                        let ipc_batch = message.header_as_dictionary_batch().unwrap();

                        reader::read_dictionary(
                            &Buffer::from(flight_data.data_body),
                            ipc_batch,
                            &schema_ref,
                            &mut dictionaries_by_id,
                            &message.version(),
                        )
                        .unwrap();
                    }
                    _ => {
                        panic!("Reading types other than record batches not yet supported");
                    }
                }
            }

            println!(
                "{}",
                arrow::util::pretty::pretty_format_batches(&record_batches).expect("print")
            );
        }
    }
}

ODBC

Currently only supports x86_ A 64 architecture system, Linux only supports CentOS and RedHat series distributions.
For more information on Arrow Flight SQL ODBC, please refer to the Dremio documentation.
The following steps are based on CentOS 7.
Installing the ODBC Manager
Installing unixODBC under Linux
yum install unixODBC-devel

1. Intsall arrow-flight-odbc driver

wget https://download.dremio.com/arrow-flight-sql-odbc-driver/arrow-flight-sql-odbc-driver-LATEST.x86_64.rpm 
yum localinstall arrow-flight-sql-odbc-driver-LATEST.x86_64.rpm 

2. Modify the configuration file to modify the configuration file located in/etc/odbc. ini

[ODBC Data Sources]
CNOSDB=Arrow Flight SQL ODBC Driver

[CNOSDB]
Description=ODBC Driver DSN for Arrow Flight SQL developed by Dremio
Driver=Arrow Flight SQL ODBC Driver
Host=localhost
Port=31004  
UID=root
PWD=
Database=public
Tenant=cnosdb
useEncryption=false
TrustedCerts=/opt/arrow-flight-sql-odbc-driver/lib64/cacerts.pem
UseSystemTrustStore=true
Where UID is the username and PWD is the password to test whether to connect

isql -v CNOSDB
Connection succeed if it is presented as

+---------------------------------------+
| Connected!                           |
|                                       |
| sql-statement                         |
| help [tablename]                     |
| quit                                 |
|                                       |
+---------------------------------------+
SQL>
Next, proceed to code testing
1. Write cmak

cmake_minimum_required(VERSION 3.24)
project(arrow_flight_odbc C)
​
set(CMAKE_C_STANDARD 11)
find_package(ODBC)
include_directories(${ODBC_INCLUDE_DIR})
link_directories(/opt/arrow-flight-sql-odbc-driver/lib64)
add_executable(arrow_flight_odbc main.c)
target_link_libraries(arrow_flight_odbc ${ODBC_LIBRARY})
2. Write C code main. c

#include <stdio.h>
#include <sql.h>
#include <sqlext.h>

int main() {
  SQLHENV henv;
  SQLHDBC hdbc;
  SQLHSTMT hsmt;
  SQLRETURN ret;
  
  
  // Allocate environment memory
  ret = SQLAllocEnv(&henv);
  if (ret != SQL_SUCCESS) {
    fprintf(stderr, "Unable to allocate an environment handle");
    return -1;
  }
  // Setting Environmental Properties
  ret = SQLSetEnvAttr(henv,  SQL_ATTR_ODBC_VERSION, (void *) SQL_OV_ODBC3, 0);
  if (ret != SQL_SUCCESS) {
    fprintf(stderr, "Unable to set env attr");
    return -1;
  }
  // Allocate connection memory
  ret = SQLAllocConnect(henv, &hdbc);
  if (ret != SQL_SUCCESS) {
    fprintf(stderr, "Unable to allocate connection");
  }
  //connect to driver
  ret = SQLDriverConnect(hdbc, NULL, (SQLCHAR*) "DSN=CNOSDB;UID=root;PWD=", SQL_NTS,
                         NULL, 0, NULL, SQL_DRIVER_NOPROMPT);
  if (ret != SQL_SUCCESS) {
    fprintf(stderr, "connect fail");
  }
  // allocate statement space
  SQLAllocStmt(hdbc, &hsmt);

  SQLCHAR *sql = "CREATE TABLE IF NOT EXISTS air (\n"
    " visibility  DOUBLE,\n"
    " temperature DOUBLE,\n"
    " pressure    DOUBLE,\n"
    " TAGS(station));";
  // 执行 Create table
  ret = SQLExecDirect(hsmt, sql, SQL_NTS);
  if (ret != SQL_SUCCESS) {
    fprintf(stderr, "Execute create fail");
  }

 
  sql = "INSERT INTO air (TIME, station, visibility, temperature, pressure) VALUES\n"
    "    (1666165200290401000, 'XiaoMaiDao', 56, 69, 77);";
  // 执行 insert
  ret = SQLExecDirect(hsmt, sql, SQL_NTS);
  if (ret != SQL_SUCCESS) {
    fprintf(stderr, "Execute insert fail");
  }

  sql = "SELECT * FROM air LIMIT 1";
  //Execute query
  ret = SQLExecDirect(hsmt, sql ,SQL_NTS);
  if (ret != SQL_SUCCESS) {
    fprintf(stderr, "Execute query fail");
  }
  SQL_TIMESTAMP_STRUCT time;
  SQLCHAR station[50];
  SQLDOUBLE visibility, temperature, pressure;
  long time_len, station_len;
  
  // get resultset
  while (1) {
    ret = SQLFetch(hsmt);
    if (ret == SQL_ERROR || ret == SQL_SUCCESS_WITH_INFO) {
      printf("error SQLFetch");
    }
    // get column data
    if (ret == SQL_SUCCESS || ret == SQL_SUCCESS_WITH_INFO) {
      SQLGetData(hsmt, 1, SQL_C_TIMESTAMP, &time, 0, NULL);
      SQLGetData(hsmt, 2, SQL_C_CHAR, station, 50, &station_len);
      SQLGetData(hsmt, 3, SQL_C_DOUBLE, &visibility, 0, NULL);
      SQLGetData(hsmt, 4, SQL_C_DOUBLE, &temperature, 0, NULL);
      SQLGetData(hsmt, 5, SQL_C_DOUBLE, &pressure, 0, NULL);
      printf("%d-%02d-%02dT%02d:%02d:%02d, %s, %.2lf, %.2lf, %.2lf\n", time.year, time.month, time.day, time.hour, time.minute, time.second, station, visibility, temperature, pressure);
    } else {
     break;
    }
  }

  return 0;
}

Conclusion

The native Arrow architecture of CnosDB 2.0 provides an Arrow Flight SQL interface. By using Arrow Flight SQL, CnosDB 2.0 temporal databases can be connected in multiple languages, efficiently writing and querying data, and can also support second level responses for querying billions of rows of data.

For more details, you can check the relevant usage methods in the CnosDB2.0 User Manual | Connector. If you have any requirements or suggestions, please also provide us with an issue on GitHub.

CnosDB is a open-source time-series database of high performance, high compression rate and high ease-of-use. Welcome to join our developer community at discord: https://discord.gg/TJtFXcp8WW