Apache Kafka uses a zookeeper to store information regarding the Kafka cluster and user info; in short, Zookeeper stores metadata about the Kafka cluster. Zookeeper is a centralized, open-source software that manages distributed applications. It provides a basic collection of primitives to implement higher-level synchronization, framework management, groups, and naming services.
Kafka can be setup in distributed or standalone mode.
Go to STEP 1 of the Quick start page and click Download. The download page shows the latest version available, and link to download. Alternatively, use curl as follow:
To test the Kafka setup, let's publish and consume a “Hello, kafka” message that is published to a "TestTopic" topic. A producer publishes a message to the topic, and a consumer reads the messages from the topic.
To begin, open a new terminal and let's call it "publisher terminal". Create a topic named TestTopic:
Next, from the "publisher terminal", publish the string "Hello, Kafka" to the TestTopic topic:
Open a new terminal and let's call it "consumer terminal". Run the consumer to read messages from TestTopic. Note the use of the --from-beginning flag, which allows the consumption of messages that were published before the consumer was started:
Go back to the "publisher terminal" again, and publish a second message:
Go back to the "consumer terminal", and to see the new published message:
If all tests are good, terminate (CTRL-C) the followings:
Go to the "consumer terminal", enter CTRL-C
Go to the "kafka terminal", enter CTRL-C
Last, go to the "zookeeper terminal", enter CTRL-C
This is the end of the testing.
Push log file to Kafka producer
Kafka broker should be running; otherwise see Setup section above.
Create the LogFileTopic topic
Start the producer with input source from log file
Copy the jar file (snowflake-kafka-connector-1.5.0.jar) to kafka-folder/libs
Download from Github the properties file, SF_connect.properties, and copy it to kafka-folder/config folder, and update the following properties:
Update the plugin.path in the config/connect-standalone.properties file:
Create the private & public keys that will be used for snowflake authentication. The keys will be created using openssl (see Create encryption private/public keys for details).
The private key (rsa_key.pem) will be entered into the snowflake.private.key property in the properties file, SF_connect.properties, and in the kafka-folder/config folder. Ignore the first and last line. Copy the lines in between and add a continuation character (\ or backlash) at the end of each line except the last.
The public key (rsa_pub.key) will be entered in the snowflake user table, "RSA_PUBLIC_KEY" field from the snowflake web console. The command is "alter user <usename> set RSA_PUBLIC_KEY='<public_key>' ". Copy the lines except first and last.
How-to
Delete a topic; Note: not allowed with the default Kafka behavior
Kafka’s default behavior will not allow you to delete a topic. To modify the default behavior, edit the configuration file, config/server.properties:
# Start the Kafka broker service; CTRL-C to end
$ cd kafka_2.13-3.3.1
$ bin/kafka-server-start.sh config/server.properties
[2022-10-16 20:40:26,554] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[...]
[2022-10-16 20:40:27,007] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2022-10-16 20:40:27,085] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[...]
[2022-10-16 20:40:29,166] INFO Kafka version: 3.3.1 (org.apache.kafka.common.utils.AppInfoParser)
[2022-10-16 20:40:29,167] INFO Kafka commitId: e23c59d00e687ff5 (org.apache.kafka.common.utils.AppInfoParser)
[2022-10-16 20:40:29,167] INFO Kafka startTimeMs: 1665978029155 (org.apache.kafka.common.utils.AppInfoParser)
[2022-10-16 20:40:29,186] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2022-10-16 20:40:29,410] INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller, from now on will use broker DESKTOP-U4G084O.localdomain:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2022-10-16 20:40:29,410] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use broker DESKTOP-U4G084O.localdomain:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
$ cd kafka_2.13-3.3.1
# Create a topic
$ bin/kafka-topics.sh --create --topic TestTopic --bootstrap-server localhost:9092
Created topic TestTopic.
$
# Show details of the topic
$ bin/kafka-topics.sh --describe --topic TestTopic --bootstrap-server localhost:9092
Topic: TestTopic TopicId: 8X9r29saQ2eo71UfaYahRQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: TestTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
$
# List all the topics
$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
TestTopic
$
# Delete a topic
$ bin/kafka-topics.sh --delete --topic TestTopic --bootstrap-server localhost:9092
$
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
topics=sales-data
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
snowflake.topic2table.map=sales-data:sales_data
# from snowflake console
snowflake.url.name=az28777.us-west-2.snowflakecomputing.com
# production user for kafka
snowflake.user.name=siddharth
snowflake.private.key=<discuss below>
# make sure the database exists
snowflake.database.name=ecommerce_db
# create this schema in snowflake
snowflake.schema.name=kafka_live_schema
key.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
name=kafka_live_stream_data