Apache Kafka

Apache Kafka uses a zookeeper to store information regarding the Kafka cluster and user info; in short, Zookeeper stores metadata about the Kafka cluster. Zookeeper is a centralized, open-source software that manages distributed applications. It provides a basic collection of primitives to implement higher-level synchronization, framework management, groups, and naming services.

Kafka can be setup in distributed or standalone mode.

Setup (debian)

  • Go to STEP 1 of the Quick start page and click Download. The download page shows the latest version available, and link to download. Alternatively, use curl as follow:

$ curl "https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz" -o ~/kafka_2.13-3.3.1.tgz
  • Extract and go to the folder

$ cd ~
$ tar -xzf kafka_2.13-3.3.1.tgz
$ cd kafka_2.13-3.3.1
  • Install the prerequisite Java 8+; see SETUP > Dev Environment > Java

$ java --version
openjdk 11.0.16 2022-07-19
OpenJDK Runtime Environment (build 11.0.16+8-post-Ubuntu-0ubuntu120.04)
OpenJDK 64-Bit Server VM (build 11.0.16+8-post-Ubuntu-0ubuntu120.04, mixed mode, sharing)
  • Start kafka with zookeeper; Note: start all services in the correct order is important.

From one terminal, let's call it "zookeeper terminal":

# Start the ZooKeeper service; CTRL-C to end
$ cd kafka_2.13-3.3.1
$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2022-10-16 20:37:32,995] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-10-16 20:37:32,997] WARN config/zookeeper.properties is relative. Prepend ./ to indicate that you're sure! (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-10-16 20:37:33,003] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[...]
[2022-10-16 20:37:33,055] INFO   ______                  _                                           (org.apache.zookeeper.server.ZooKeeperServer)
[2022-10-16 20:37:33,055] INFO  |___  /                 | |                                          (org.apache.zookeeper.server.ZooKeeperServer)
[2022-10-16 20:37:33,055] INFO     / /    ___     ___   | | __   ___    ___   _ __     ___   _ __    (org.apache.zookeeper.server.ZooKeeperServer)
[2022-10-16 20:37:33,055] INFO    / /    / _ \   / _ \  | |/ /  / _ \  / _ \ | '_ \   / _ \ | '__| (org.apache.zookeeper.server.ZooKeeperServer)
[2022-10-16 20:37:33,055] INFO   / /__  | (_) | | (_) | |   <  |  __/ |  __/ | |_) | |  __/ | |     (org.apache.zookeeper.server.ZooKeeperServer)
[2022-10-16 20:37:33,056] INFO  /_____|  \___/   \___/  |_|\_\  \___|  \___| | .__/   \___| |_| (org.apache.zookeeper.server.ZooKeeperServer)
[2022-10-16 20:37:33,056] INFO                                               | |                      (org.apache.zookeeper.server.ZooKeeperServer)
[2022-10-16 20:37:33,056] INFO                                               |_|                      (org.apache.zookeeper.server.ZooKeeperServer)
[...]
[2022-10-16 20:37:33,224] INFO ZooKeeper audit is disabled. (org.apache.zookeeper.audit.ZKAuditProvider)

From another terminal, let's call it "kafka terminal"

# Start the Kafka broker service; CTRL-C to end
$ cd kafka_2.13-3.3.1
$ bin/kafka-server-start.sh config/server.properties
[2022-10-16 20:40:26,554] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[...]
[2022-10-16 20:40:27,007] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2022-10-16 20:40:27,085] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[...]
[2022-10-16 20:40:29,166] INFO Kafka version: 3.3.1 (org.apache.kafka.common.utils.AppInfoParser)
[2022-10-16 20:40:29,167] INFO Kafka commitId: e23c59d00e687ff5 (org.apache.kafka.common.utils.AppInfoParser)
[2022-10-16 20:40:29,167] INFO Kafka startTimeMs: 1665978029155 (org.apache.kafka.common.utils.AppInfoParser)
[2022-10-16 20:40:29,186] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2022-10-16 20:40:29,410] INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller, from now on will use broker DESKTOP-U4G084O.localdomain:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2022-10-16 20:40:29,410] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use broker DESKTOP-U4G084O.localdomain:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
  • Now the Kafka service is running and ready.

[Optional] Setup Kafka as linux service

Source: https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-20-04

Test the Setup

To test the Kafka setup, let's publish and consume a “Hello, kafka” message that is published to a "TestTopic" topic. A producer publishes a message to the topic, and a consumer reads the messages from the topic.

  • To begin, open a new terminal and let's call it "publisher terminal". Create a topic named TestTopic:

$ cd kafka_2.13-3.3.1
# Create a topic
$ bin/kafka-topics.sh --create --topic TestTopic --bootstrap-server localhost:9092
Created topic TestTopic.
$
# Show details of the topic
$ bin/kafka-topics.sh --describe --topic TestTopic --bootstrap-server localhost:9092
Topic: TestTopic        TopicId: 8X9r29saQ2eo71UfaYahRQ PartitionCount: 1       ReplicationFactor: 1    Configs: 
        Topic: TestTopic        Partition: 0    Leader: 0       Replicas: 0     Isr: 0
$
# List all the topics
$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
TestTopic
$
# Delete a topic
$ bin/kafka-topics.sh --delete --topic TestTopic --bootstrap-server localhost:9092
$
  • Next, from the "publisher terminal", publish the string "Hello, Kafka" to the TestTopic topic:

$ cd kafka_2.13-3.3.1
$ echo "Hello, Kafka" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null
$
  • Open a new terminal and let's call it "consumer terminal". Run the consumer to read messages from TestTopic. Note the use of the --from-beginning flag, which allows the consumption of messages that were published before the consumer was started:

$ cd kafka_2.13-3.3.1
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestTopic --from-beginning
Hello, Kafka
  • Go back to the "publisher terminal" again, and publish a second message:

$ cd kafka_2.13-3.3.1
$ echo "Hello again, Kafka" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null
$
  • Go back to the "consumer terminal", and to see the new published message:

$ cd kafka_2.13-3.3.1
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestTopic --from-beginning
Hello, Kafka
Hello again, Kafka
  • If all tests are good, terminate (CTRL-C) the followings:

    • Go to the "consumer terminal", enter CTRL-C

    • Go to the "kafka terminal", enter CTRL-C

    • Last, go to the "zookeeper terminal", enter CTRL-C

  • This is the end of the testing.

Push log file to Kafka producer

  • Kafka broker should be running; otherwise see Setup section above.

  • Create the LogFileTopic topic

$ cd kafka_2.13-3.3.1
$ bin/kafka-topics.sh --create --topic LogFileTopic --bootstrap-server localhost:9092
Created topic LogFileTopic.
  • Start the producer with input source from log file

$ bin/kafka-console-producer.sh --broker-list localhost:9092 \
    --topic LogFileTopic < /var/logs

Snowflake-Kafka connector

Source: Snowflake - Build and Architect Data Pipelines Using AWS by Siddharth Raghunath; Published by Packt Publishing Github: https://github.com/PacktPublishing/Snowflake---Build-and-Architect-Data-Pipelines-using-AWS/tree/main/Section%2011%20-kafka-streaming-snowflake

connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
topics=sales-data
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
snowflake.topic2table.map=sales-data:sales_data
# from snowflake console
snowflake.url.name=az28777.us-west-2.snowflakecomputing.com
# production user for kafka
snowflake.user.name=siddharth
snowflake.private.key=<discuss below>
# make sure the database exists
snowflake.database.name=ecommerce_db
# create this schema in snowflake
snowflake.schema.name=kafka_live_schema
key.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
name=kafka_live_stream_data
  • Update the plugin.path in the config/connect-standalone.properties file:

plugin.path=/home/<username>/kafka_2.13-3.3.1/libs/
  • Create the private & public keys that will be used for snowflake authentication. The keys will be created using openssl (see Create encryption private/public keys for details).

    • The private key (rsa_key.pem) will be entered into the snowflake.private.key property in the properties file, SF_connect.properties, and in the kafka-folder/config folder. Ignore the first and last line. Copy the lines in between and add a continuation character (\ or backlash) at the end of each line except the last.

    • The public key (rsa_pub.key) will be entered in the snowflake user table, "RSA_PUBLIC_KEY" field from the snowflake web console. The command is "alter user <usename> set RSA_PUBLIC_KEY='<public_key>' ". Copy the lines except first and last.

How-to

Delete a topic; Note: not allowed with the default Kafka behavior

Kafka’s default behavior will not allow you to delete a topic. To modify the default behavior, edit the configuration file, config/server.properties:

[...]
delete.topic.enable = true

Last updated