# Kafka

<https://kafka.apache.org/>

A distributed streaming platform.

1. can be used as an enterprise messaging system
2. can be used it as a stream processing platform. Kafka gives a stream, and we can plug in a processing framework.
3. Also provides connectors to export and import bulk data from databases and other systems.

<https://kafka.apache.org/images/kafka_diagram.png>

Install/Unzip Apache Kafka

```
tar -zxvf kafka_2.11-0.10.1.0.tgz
```

Start Kafka Server

```
bin/zookeeper-server-start.sh config/zookeeper.properties
```

```
bin/kafka-server-start.sh config/server.properties
```

kafka-topics.sh is a tool to manage a Kafka

Create Kafka Topic (replication factor 3, and 2 partitions)

```
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic TopicName --partitions 2 --replication-factor 2
```

describe Kafka Topic

```
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic TopicName
```

* *ISR is a list of In Sync Replicas*

Start Kafka Producer and Consumer

console producer

```
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TopicName
```

console consumer

```
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TopicName
```

Broker Configurations

```
// 3 essential configurations are the following:

broker.id
port
log.dirs
```

```
zookeeper.connect
delete.topic.enable
auto.create.topics.enable
default.replication.factor
num.partitions
log.retention.ms
log.retention.bytes
```

complete list : <https://kafka.apache.org/documentation/#brokerconfigs>

Build.sbt

```
    name := "KafkaTest"

    libraryDependencies ++= Seq(
        "org.apache.kafka" % "kafka-clients" % "0.10.1.0"
        exclude("javax.jms", "jms")
        exclude("com.sun.jdmk", "jmxtools")
        exclude("com.sun.jmx", "jmxri")
        exclude("org.slf4j", "slf4j-simple")
    )
```

Simple Kafka producer

```
import java.util.*;
    import org.apache.kafka.clients.producer.*;                                    
    public class TestProducer {                                    
            public static void main(String[] args) throws Exception{

                String key = "Key1";                
                String value = "Value1";
                String topicName = "KafkaTestTopic";

                Properties props = new Properties();                
                props.put("bootstrap.servers", "localhost:9092,localhost:9093");
                props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");         
                props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

                Producer producer = new KafkaProducer<>(props);
                ProducerRecord record = new ProducerRecord<>(topicName,key,value);

                producer.send(record);                
                producer.close();
        }
    }
```
