Apache Kafka
Apache Kafka uses a zookeeper to store information regarding the Kafka cluster and user info; in short, Zookeeper stores metadata about the Kafka cluster. Zookeeper is a centralized, open-source software that manages distributed applications. It provides a basic collection of primitives to implement higher-level synchronization, framework management, groups, and naming services.
Kafka can be setup in distributed or standalone mode.
Setup (debian)
Go to STEP 1 of the Quick start page and click Download. The download page shows the latest version available, and link to download. Alternatively, use
curl
as follow:
$ curl "https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz" -o ~/kafka_2.13-3.3.1.tgz
Extract and go to the folder
$ cd ~
$ tar -xzf kafka_2.13-3.3.1.tgz
$ cd kafka_2.13-3.3.1
Install the prerequisite Java 8+; see SETUP > Dev Environment > Java
$ java --version
openjdk 11.0.16 2022-07-19
OpenJDK Runtime Environment (build 11.0.16+8-post-Ubuntu-0ubuntu120.04)
OpenJDK 64-Bit Server VM (build 11.0.16+8-post-Ubuntu-0ubuntu120.04, mixed mode, sharing)
Start kafka with zookeeper; Note: start all services in the correct order is important.
From one terminal, let's call it "zookeeper terminal":
# Start the ZooKeeper service; CTRL-C to end
$ cd kafka_2.13-3.3.1
$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2022-10-16 20:37:32,995] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-10-16 20:37:32,997] WARN config/zookeeper.properties is relative. Prepend ./ to indicate that you're sure! (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-10-16 20:37:33,003] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[...]
[2022-10-16 20:37:33,055] INFO ______ _ (org.apache.zookeeper.server.ZooKeeperServer)
[2022-10-16 20:37:33,055] INFO |___ / | | (org.apache.zookeeper.server.ZooKeeperServer)
[2022-10-16 20:37:33,055] INFO / / ___ ___ | | __ ___ ___ _ __ ___ _ __ (org.apache.zookeeper.server.ZooKeeperServer)
[2022-10-16 20:37:33,055] INFO / / / _ \ / _ \ | |/ / / _ \ / _ \ | '_ \ / _ \ | '__| (org.apache.zookeeper.server.ZooKeeperServer)
[2022-10-16 20:37:33,055] INFO / /__ | (_) | | (_) | | < | __/ | __/ | |_) | | __/ | | (org.apache.zookeeper.server.ZooKeeperServer)
[2022-10-16 20:37:33,056] INFO /_____| \___/ \___/ |_|\_\ \___| \___| | .__/ \___| |_| (org.apache.zookeeper.server.ZooKeeperServer)
[2022-10-16 20:37:33,056] INFO | | (org.apache.zookeeper.server.ZooKeeperServer)
[2022-10-16 20:37:33,056] INFO |_| (org.apache.zookeeper.server.ZooKeeperServer)
[...]
[2022-10-16 20:37:33,224] INFO ZooKeeper audit is disabled. (org.apache.zookeeper.audit.ZKAuditProvider)
From another terminal, let's call it "kafka terminal"
# Start the Kafka broker service; CTRL-C to end
$ cd kafka_2.13-3.3.1
$ bin/kafka-server-start.sh config/server.properties
[2022-10-16 20:40:26,554] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[...]
[2022-10-16 20:40:27,007] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2022-10-16 20:40:27,085] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[...]
[2022-10-16 20:40:29,166] INFO Kafka version: 3.3.1 (org.apache.kafka.common.utils.AppInfoParser)
[2022-10-16 20:40:29,167] INFO Kafka commitId: e23c59d00e687ff5 (org.apache.kafka.common.utils.AppInfoParser)
[2022-10-16 20:40:29,167] INFO Kafka startTimeMs: 1665978029155 (org.apache.kafka.common.utils.AppInfoParser)
[2022-10-16 20:40:29,186] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2022-10-16 20:40:29,410] INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller, from now on will use broker DESKTOP-U4G084O.localdomain:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2022-10-16 20:40:29,410] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use broker DESKTOP-U4G084O.localdomain:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
Now the Kafka service is running and ready.
[Optional] Setup Kafka as linux service
Source: https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-20-04
Test the Setup
To test the Kafka setup, let's publish and consume a “Hello, kafka” message that is published to a "TestTopic" topic. A producer publishes a message to the topic, and a consumer reads the messages from the topic.
To begin, open a new terminal and let's call it "publisher terminal". Create a topic named
TestTopic
:
$ cd kafka_2.13-3.3.1
# Create a topic
$ bin/kafka-topics.sh --create --topic TestTopic --bootstrap-server localhost:9092
Created topic TestTopic.
$
# Show details of the topic
$ bin/kafka-topics.sh --describe --topic TestTopic --bootstrap-server localhost:9092
Topic: TestTopic TopicId: 8X9r29saQ2eo71UfaYahRQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: TestTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
$
# List all the topics
$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
TestTopic
$
# Delete a topic
$ bin/kafka-topics.sh --delete --topic TestTopic --bootstrap-server localhost:9092
$
Next, from the "publisher terminal", publish the string
"Hello, Kafka"
to theTestTopic
topic:
$ cd kafka_2.13-3.3.1
$ echo "Hello, Kafka" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null
$
Open a new terminal and let's call it "consumer terminal". Run the consumer to read messages from
TestTopic
. Note the use of the--from-beginning
flag, which allows the consumption of messages that were published before the consumer was started:
$ cd kafka_2.13-3.3.1
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestTopic --from-beginning
Hello, Kafka
Go back to the "publisher terminal" again, and publish a second message:
$ cd kafka_2.13-3.3.1
$ echo "Hello again, Kafka" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null
$
Go back to the "consumer terminal", and to see the new published message:
$ cd kafka_2.13-3.3.1
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestTopic --from-beginning
Hello, Kafka
Hello again, Kafka
If all tests are good, terminate (CTRL-C) the followings:
Go to the "consumer terminal", enter CTRL-C
Go to the "kafka terminal", enter CTRL-C
Last, go to the "zookeeper terminal", enter CTRL-C
This is the end of the testing.
Push log file to Kafka producer
Kafka broker should be running; otherwise see Setup section above.
Create the
LogFileTopic
topic
$ cd kafka_2.13-3.3.1
$ bin/kafka-topics.sh --create --topic LogFileTopic --bootstrap-server localhost:9092
Created topic LogFileTopic.
Start the producer with input source from log file
$ bin/kafka-console-producer.sh --broker-list localhost:9092 \
--topic LogFileTopic < /var/logs
Snowflake-Kafka connector
Source: Snowflake - Build and Architect Data Pipelines Using AWS by Siddharth Raghunath; Published by Packt Publishing Github: https://github.com/PacktPublishing/Snowflake---Build-and-Architect-Data-Pipelines-using-AWS/tree/main/Section%2011%20-kafka-streaming-snowflake
Download module from maven: Kafka - Snowflake connector - https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector, version 1.5.0
Copy the jar file (
snowflake-kafka-connector-1.5.0.jar
) tokafka-folder/libs
Download from Github the properties file,
SF_connect.properties
, and copy it tokafka-folder/config
folder, and update the following properties:
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
topics=sales-data
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
snowflake.topic2table.map=sales-data:sales_data
# from snowflake console
snowflake.url.name=az28777.us-west-2.snowflakecomputing.com
# production user for kafka
snowflake.user.name=siddharth
snowflake.private.key=<discuss below>
# make sure the database exists
snowflake.database.name=ecommerce_db
# create this schema in snowflake
snowflake.schema.name=kafka_live_schema
key.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
name=kafka_live_stream_data
Update the
plugin.path
in theconfig/connect-standalone.properties
file:
plugin.path=/home/<username>/kafka_2.13-3.3.1/libs/
Create the private & public keys that will be used for snowflake authentication. The keys will be created using openssl (see Create encryption private/public keys for details).
The private key (
rsa_key.pem
) will be entered into thesnowflake.private.key
property in the properties file,SF_connect.properties
, and in thekafka-folder/config
folder. Ignore the first and last line. Copy the lines in between and add a continuation character (\
or backlash) at the end of each line except the last.The public key (
rsa_pub.key
) will be entered in the snowflake user table, "RSA_PUBLIC_KEY" field from the snowflake web console. The command is "alter user <usename> set RSA_PUBLIC_KEY='<public_key>'
". Copy the lines except first and last.
How-to
Last updated