https://kafka.apache.org/
A distributed streaming platform.
can be used as an enterprise messaging system
can be used it as a stream processing platform. Kafka gives a stream, and we can plug in a processing framework.
Also provides connectors to export and import bulk data from databases and other systems.
https://kafka.apache.org/images/kafka_diagram.png
Install/Unzip Apache Kafka
tar -zxvf kafka_2.11-0.10.1.0.tgz
Start Kafka Server
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
kafka-topics.sh is a tool to manage a Kafka
Create Kafka Topic (replication factor 3, and 2 partitions)
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic TopicName --partitions 2 --replication-factor 2
describe Kafka Topic
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic TopicName
ISR is a list of In Sync Replicas
Start Kafka Producer and Consumer
console producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TopicName
console consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TopicName
Broker Configurations
// 3 essential configurations are the following:
broker.id
port
log.dirs
zookeeper.connect
delete.topic.enable
auto.create.topics.enable
default.replication.factor
num.partitions
log.retention.ms
log.retention.bytes
complete list : https://kafka.apache.org/documentation/#brokerconfigs
Build.sbt
name := "KafkaTest"
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-clients" % "0.10.1.0"
exclude("javax.jms", "jms")
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri")
exclude("org.slf4j", "slf4j-simple")
)
Simple Kafka producer
import java.util.*;
import org.apache.kafka.clients.producer.*;
public class TestProducer {
public static void main(String[] args) throws Exception{
String key = "Key1";
String value = "Value1";
String topicName = "KafkaTestTopic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer<>(props);
ProducerRecord record = new ProducerRecord<>(topicName,key,value);
producer.send(record);
producer.close();
}
}