Apache Kafka: Difference between revisions
												
				Jump to navigation
				Jump to search
				
No edit summary  | 
			
(No difference) 
 | 
Latest revision as of 22:04, 28 October 2016
<slideshow style="nobleprog" headingmark="⌘" incmark="…" scaled="false" font="Trebuchet MS" >
- title
 - Apache Kafka
 - author
 - Pablo Recabal for NobleProg
 
</slideshow>
Overview ⌘
What is Kafka?
- Messaging system
 - Implements publish/subscribe pattern
 - Distributed
- Scalable
 - Robust
 
 - Efficient
 
Use cases ⌘
- Stream processing
 - Messaging and notifications
 - Commit Log
 - Log and metrics aggregation
 
Architecture ⌘
Key concepts ⌘
- Producer: application that publishes messages
 - Consumer: application that subscribes to messages
 - Broker: mediator between producers and consumers
 - Zookeeper: store broker and consumer metadata
 - Topics: container for organizing messages
 - Partitions: Unit of parallelism within topic
 - Replication factor: Redundancy of a partition
 - Offset: Single message position in the Kafka Log
 
Key concepts diagram ⌘
Configuration
Zookeeper configuration ⌘
minimum config:
- tickTime=2000
 
- basic time unit used by ZooKeeper, measured in milliseconds
 
- dataDir=/var/lib/zookeeper
 
- location to store snapshots and logs
 
- clientPort=2181
 
- the port to listen for client connections
 
additional configs:
- autopurge.snapRetainCount=3
 
- The number of snapshots to retain in dataDir
 
- autopurge.purgeInterval=1
 
- Purge task interval in hours. Set to "0" to disable auto purge feature
 
Kafka broker configuration
Kafka Server settings ⌘
- broker.id=0
 
- unique integer identifier within a cluster
 
- port=9092
 
- the port to listen for client connections
 
- auto.create.topics.enable=false
 
- set by default to true, in some cases undesirable'
 
Kafka Log configurations ⌘
- log.dirs=/tmp/kafka-logs
 
- one or more directories where to store log segments
 
- log.flush.interval.messages=10000
 
- the number of messages to accept before forcing a flush of data to disk
 
- log.flush.interval.ms=1000
 
- the maximum amount of time a message can sit in a log before we force a flush
 
- log.retention.hours=168
 - log.retention.bytes=107374182
 - log.retention.check.interval.ms=300000
 
- retention policies based on time and/or size
 
- log.segment.bytes=1073741824
 
- maximum size of a log segment file
 
Kafka -> Zookeeper settings ⌘
- zookeeper.connect=localhost:2181
 
- location of zookeper used for storing broker metadata
 
- zookeeper.connection.timeout.ms=6000
 
- timeout in ms for connecting to zookeeper
 
Producer API
Producer API. Configuration ⌘
java example from Kafka Java docs:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
/*how many partition replicas must receive the record before 
  *the producer can consider the write successful */
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
/*wait in milliseconds for additional messages to get added to 
   the batch before sending it to the brokers */
props.put("linger.ms", 1);
/*buffer size of messages waiting to be sent to brokers*/
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
      producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
Producer API. Sending messages ⌘
Delivery strategy:
- fire & forget,
 - synchronously,
 - asynchronously
 
asynchronous example using node.js (ES6):
import kafka from 'kafka-node'
let Producer = kafka.Producer
let client = new kafka.Client('localhost:2181')
let producer = new Producer(client)
let messages = 'test message'
let events = [{topic: 'new_event', messages: ['message body'], partition: 0}]
if (producer.ready) {
      producer.send(event, (err, data) => {console.log(data)})
}
Producer API. Keys and partitioning ⌘
- null key => random partition
 - <key,value> message with default partitioning => Kafka hash based on key
 - <key,value> message with custom partitioning => Useful for data skew situations
 
Consumer API
Consumer Properties ⌘
- Mandatory properties
 
- bootstrap.servers
 - key.deserializer
 - value.deserializer
 
- (some) Optional properties:
 
- group.id
 - enable.auto.commit
 - auto.commit.interval.ms
 
Consumer API. Commits and offsets ⌘
- Simple (low-level) vs High-Level API (Kafka version < 0.9.0.0):
 
- - Synchronous or asynchronous committing and offset tracking, or
 - - Let Kafka manage commits and offsets
 
- New consumer API (Kafka >= 0.9.0.0) integrates both approaches in a single API:
 
KafkaConsumer<Type, Type> consumer = new KafkaConsumer<Type, Type>(properties);
consumer.subscribe(<List of topics>);
while(true) {
      ConsumerRecords<Type, Type> records = consumer.poll(100);
}
[Automatic committing] A call to poll() will always commit the last offset returned by the previous poll.
Manual committing of offsets ⌘
- consumer.commitSync()
 - consumer.commitAsync( callback() )
 - mixed approach
 
Consumer groups ⌘
- Consumer groups provide scalability at topic level
 
- consumers send heartbeats to a Kafka broker designated as the Group Coordinator
 
- => maintain membership in a consumer group and ownership on the partitions assigned to them
 
- rebalance is when partition ownership is moved from one consumer to another:
 
- a new consumer enters a group
 - a consumer crashes or is shut-down
 
- => undesirable process that makes the whole group unavailable
 
Consumer rebalance ⌘
Operations ⌘
- Adding topics
 - Removing topics
 
- (delete.topic.enable=true)
 
- Modifying topics
 
Kafka Design ⌘
- Motivation
 - Persistence
 - Efficiency
 - The Producer
 - The Consumer
 - Message Delivery Semantics
 - Replication
 - Log Compaction
 - Quotas