Apache Kafka
Jump to navigation
Jump to search
Overview ⌘
What is Kafka?
- Messaging system
- Implements publish/subscribe pattern
- Distributed
- Scalable
- Robust
- Efficient
Use cases ⌘
- Stream processing
- Messaging and notifications
- Commit Log
- Log and metrics aggregation
Architecture ⌘
Key concepts ⌘
- Producer: application that publishes messages
- Consumer: application that subscribes to messages
- Broker: mediator between producers and consumers
- Zookeeper: store broker and consumer metadata
- Topics: container for organizing messages
- Partitions: Unit of parallelism within topic
- Replication factor: Redundancy of a partition
- Offset: Single message position in the Kafka Log
Key concepts diagram ⌘
Configuration
Zookeeper configuration ⌘
minimum config:
- tickTime=2000
- basic time unit used by ZooKeeper, measured in milliseconds
- dataDir=/var/lib/zookeeper
- location to store snapshots and logs
- clientPort=2181
- the port to listen for client connections
additional configs:
- autopurge.snapRetainCount=3
- The number of snapshots to retain in dataDir
- autopurge.purgeInterval=1
- Purge task interval in hours. Set to "0" to disable auto purge feature
Kafka broker configuration
Kafka Server settings ⌘
- broker.id=0
- unique integer identifier within a cluster
- port=9092
- the port to listen for client connections
- auto.create.topics.enable=false
- set by default to true, in some cases undesirable'
Kafka Log configurations ⌘
- log.dirs=/tmp/kafka-logs
- one or more directories where to store log segments
- log.flush.interval.messages=10000
- the number of messages to accept before forcing a flush of data to disk
- log.flush.interval.ms=1000
- the maximum amount of time a message can sit in a log before we force a flush
- log.retention.hours=168
- log.retention.bytes=107374182
- log.retention.check.interval.ms=300000
- retention policies based on time and/or size
- log.segment.bytes=1073741824
- maximum size of a log segment file
Kafka -> Zookeeper settings ⌘
- zookeeper.connect=localhost:2181
- location of zookeper used for storing broker metadata
- zookeeper.connection.timeout.ms=6000
- timeout in ms for connecting to zookeeper
Producer API
Producer API. Configuration ⌘
java example from Kafka Java docs:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
/*how many partition replicas must receive the record before
*the producer can consider the write successful */
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
/*wait in milliseconds for additional messages to get added to
the batch before sending it to the brokers */
props.put("linger.ms", 1);
/*buffer size of messages waiting to be sent to brokers*/
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
Producer API. Sending messages ⌘
Delivery strategy:
- fire & forget,
- synchronously,
- asynchronously
asynchronous example using node.js (ES6):
import kafka from 'kafka-node'
let Producer = kafka.Producer
let client = new kafka.Client('localhost:2181')
let producer = new Producer(client)
let messages = 'test message'
let events = [{topic: 'new_event', messages: ['message body'], partition: 0}]
if (producer.ready) {
producer.send(event, (err, data) => {console.log(data)})
}
Producer API. Keys and partitioning ⌘
- null key => random partition
- <key,value> message with default partitioning => Kafka hash based on key
- <key,value> message with custom partitioning => Useful for data skew situations
Consumer API
Consumer Properties ⌘
- Mandatory properties
- bootstrap.servers
- key.deserializer
- value.deserializer
- (some) Optional properties:
- group.id
- enable.auto.commit
- auto.commit.interval.ms
Consumer API. Commits and offsets ⌘
- Simple (low-level) vs High-Level API (Kafka version < 0.9.0.0):
- - Synchronous or asynchronous committing and offset tracking, or
- - Let Kafka manage commits and offsets
- New consumer API (Kafka >= 0.9.0.0) integrates both approaches in a single API:
KafkaConsumer<Type, Type> consumer = new KafkaConsumer<Type, Type>(properties);
consumer.subscribe(<List of topics>);
while(true) {
ConsumerRecords<Type, Type> records = consumer.poll(100);
}
[Automatic committing] A call to poll() will always commit the last offset returned by the previous poll.
Manual committing of offsets ⌘
- consumer.commitSync()
- consumer.commitAsync( callback() )
- mixed approach
Consumer groups ⌘
- Consumer groups provide scalability at topic level
- consumers send heartbeats to a Kafka broker designated as the Group Coordinator
- => maintain membership in a consumer group and ownership on the partitions assigned to them
- rebalance is when partition ownership is moved from one consumer to another:
- a new consumer enters a group
- a consumer crashes or is shut-down
- => undesirable process that makes the whole group unavailable
Consumer rebalance ⌘
Operations ⌘
- Adding topics
- Removing topics
- (delete.topic.enable=true)
- Modifying topics
Kafka Design ⌘
- Motivation
- Persistence
- Efficiency
- The Producer
- The Consumer
- Message Delivery Semantics
- Replication
- Log Compaction
- Quotas