Home » Blog » Efficient Data Transmission: Easily Connect Kafka Real-time Data to CnosDB

Efficient Data Transmission: Easily Connect Kafka Real-time Data to CnosDB

In this article, we will mainly introduce how to implement a Kafka+Telegraf+CnosDB synchronous real-time acquisition and storage solution of streaming data in the Ubuntu 22.04.2 LTS environment. In this operation, the CnosDB version is 2.3.0, the Kafka version is 2.5.1, and the Telegraf version is 1.27.1

Efficient data transmission: Easily connect Kafka real-time data to CnosDB

 

As more and more application architectures move towards microservices or serverless structures, the number of applications and services is increasing every day. Users can process increasing amounts of time series data either through real-time aggregation or through calculations whose output is measurements or metrics. Faced with the massive amounts of data generated, users can capture and observe changes in data in the system in a variety of ways. In a cloud-native environment, the most popular one is to use events.

Apache Kafka is a durable, high-performance messaging system that is also considered a distributed stream processing platform. It can be applied to many use cases, including messaging, data integration, log aggregation, and metrics. And when it comes to metrics, just a message backbone or broker is not enough. Although Apache Kafka is durable, it is not designed for running metrics and monitoring queries. This is exactly the strength of CnosDB.

Architecture plan

By combining Kafka, Telegraf and CnosDB, the complete process of data can be realized:

  1. Data generation: Use sensors, devices, or other data sources to generate data and send it to a Kafka topic.
  2. Kafka message queue: Kafka receives and stores data streams to ensure data security and reliability.
  3. Telegraf consumer: Telegraf, as a consumer of Kafka, subscribes to Kafka topics and obtains data streams.
  4. CnosDB data storage: The preprocessed data is sent to CnosDB by Telegraf for storage of time series data.

The overall application architecture is shown in the figure:

Kafka

Apache Kafka is an open source distributed stream processing platform. It is designed to process real-time data streams and has the characteristics of high reliability, high throughput and low latency. It is currently used by most companies. It can be used in a variety of ways, including:

  • Stream processing: It provides an event backbone by storing real-time events for aggregation, enrichment, and processing.
  • Metrics: Apache Kafka becomes the central aggregation point for many distributed components or applications, such as microservices. These applications can send real-time metrics for consumption by other platforms, including CnosDB.
  • Data Integration: Data and event changes can be captured and sent to Apache Kafka where they can be used by any application that needs to act on these changes.
  • Log aggregation: Apache Kafka can act as the message backbone of the log streaming platform, converting log blocks into data streams.

Several core concepts

  1. Instance (Broker): Kafka's Broker is the server node in the Kafka cluster, responsible for storing and forwarding messages, providing high availability, fault tolerance and reliability.
  2. Topic: Topic in Apache Kafka is a logical storage unit, just like a table in a relational database. Topics are distributed through brokers via partitions, providing scalability and resiliency.
  3. Producer: The producer publishes messages to the specified topic in Kafka. Producers can choose to send messages to specific partitions or let Kafka automatically determine the distribution strategy.
  4. Consumer: A consumer reads messages from one or more partitions of a specified topic. Consumers can be organized in different ways, such as unicast, multicast, consumer groups, etc.
  5. Publish-subscribe model: means that producers publish messages to one or more topics, and consumers can subscribe to one or more topics, receive and process messages from them.

Simply put, when a client sends data to an Apache Kafka cluster instance, it must send it to a topic.

Additionally, when a client reads data from an Apache Kafka cluster, it must read from a topic. Clients that send data to Apache Kafka become producers, while clients that read data from the Kafka cluster become consumers. The data flow diagram is as follows:

Note: More complex concepts are not introduced here, such as topic partitions, offsets, consumer groups, etc. Users can refer to the official guidance document to learn:

Kafka: [https://kafka.apache.org/documentation/#gettingStarted]

Deploy Kafka

Download and install Kafka [https://kafka.apache.org/]

1. Prerequisite: You need to ensure that you have a JDK environment and a Zookeeper environment. If not, you can use the following command to install it:

sudo apt install openjdk-8-jdk
sudo apt install zookeeper

2. Download the Kafka installation package and unzip it

wget https://archive.apache.org/dist/kafka/2.5.1/kafka_2.12-2.5.1.tgz
tar -zxvf kafka_2.12-2.5.1.tgz

3. Enter the decompressed Kafka directory

cd  kafka_2.12-2.5.1

4. Modify the configuration file of $KAFKA_HOME/config/server.properties (the port, log path and other configuration information can be modified as needed)

5. Save and close the editor. Run the following command to start Kafka:

bin/kafka-server-start.sh config/server.properties

Kafka will run in the background and listen for connections through the default port 9092.

Telegraf

Telegraf is an open source server agent used to collect, process, and transmit system and application metrics data. Telegraf supports a variety of input and output plug-ins and can be integrated with a variety of different types of systems and services. It can collect indicator data from multiple sources such as system statistics, log files, API interfaces, message queues, etc., and send it to various targets, such as CnosDB, Elasticsearch, Kafka, Prometheus, etc. This makes Telegraf very flexible and adaptable to different monitoring and data processing scenarios.

  • Lightweight: Telegraf is designed as a lightweight agent that takes up relatively little system resources and can run efficiently in various environments.
  • Plug-in driver: Telegraf uses plug-ins to support various input and output functions. It provides a rich plugin ecosystem covering numerous systems and services. Users can choose appropriate plug-ins according to their own needs to collect and transmit indicator data.
  • Data processing and conversion: Telegraf has flexible data processing and conversion functions. It can filter, process, convert and aggregate the collected indicator data through the plug-in chain (Plugin Chain), thereby providing more accurate and advanced data analysis.

Deploy Telegraf

1. Install Telegraf

sudo apt-get update && sudo apt-get install telegraf

2. Switch to the directory /etc/telegraf where Telegraf’s default configuration file is located.

3. Add the target OUTPUT PLUGIN in the configuration file telegraf.config

[[outputs.http]]
  url = "http://127.0.0.1:8902/api/v1/write?db=telegraf"
  timeout = "5s"
  method = "POST"
  username = "root"
  password = ""
  data_format = "influx"
  use_batch_format = true
  content_encoding = "identity"
  idle_conn_timeout = 10

Parameters to modify as needed:

url:CnosDB 地址和端口
username:连接 CnosDB 的用户名
password:连接 CnosDB 的用户名对应的密码

Note: The remaining parameters can remain the same as in the above configuration example

4. Release the following configuration comments in the configuration file and modify them as needed

[[inputs.kafka_consumer]]
brokers = ["127.0.0.1:9092"]
topics = ["oceanic"]
data_format = "json"

parameter:

brokers:Kafka 的 broker list 
topics:指定写入 Kafka 目标的 topic
data_format:写入数据的格式

Note: The remaining parameters can remain the same as in the above configuration example

5.  Start Telegraf

telegraf -config /etc/telegraf/telegraf.conf

CnosDB

Deploy CnosDB

For detailed operations, please refer to:  CnosDB installation

【https://docs.cnosdb.com/zh/latest/start/install.html】

Integrate

Kafka creates topic

1. Enter the bin folder of kafka

2. Execute the command and create the topic

./kafka-topics.sh --create --topic oceanic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Python simulates writing data to Kakfa

1.Write code:

import time
import json
import random


from kafka import KafkaProducer


def random_pressure():
    return round(random.uniform(0, 10), 1)


def random_tempreture():
    return round(random.uniform(0, 100), 1)


def random_visibility():
    return round(random.uniform(0, 100), 1)


def get_json_data():
    data = {}


    data["pressure"] = random_pressure()
    data["temperature"] = random_temp_cels()
    data["visibility"] = random_visibility()


    return json.dumps(data) 


def main():
    producer = KafkaProducer(bootstrap_servers=['ip:9092'])


    for _ in rang(2000):
        json_data = get_json_data()
        producer.send('oceanic', bytes(f'{json_data}','UTF-8'))
        print(f"Sensor data is sent: {json_data}")
        time.sleep(5)




if __name__ == "__main__":
    main()

2. Run the Python script

python3 test.py

View data in kafka topic

1. Execute the following command to view the specified topic data

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic oceanic --from-beginning

View the data synchronized to CnosDB

1. Use tools to connect to CnosDB

cnosdb-cli

2. Switch to the specified library

\c public

3. View data

select * from kafka_consumer;

Additional reading

1. Use Telegraf to collect data and write it to CnosDB:

https://docs.cnosdb.com/zh/latest/versatility/collect/telegraf.html

2.Python connector:

https://docs.cnosdb.com/zh/latest/reference/connector/python.html

3.CnosDB quick start:

https://docs.cnosdb.com/zh/latest/start/quick_start.html