Apache Kafka

From Training Material
Jump to navigation Jump to search
title
Apache Kafka
author
Pablo Recabal for NobleProg

Overview ⌘

What is Kafka?

  • Messaging system
  • Implements publish/subscribe pattern
  • Distributed
    • Scalable
    • Robust
  • Efficient

Use cases ⌘

  • Stream processing
  • Messaging and notifications
  • Commit Log
  • Log and metrics aggregation

Architecture ⌘

Kafka system.png

Key concepts ⌘

  • Producer: application that publishes messages
  • Consumer: application that subscribes to messages
  • Broker: mediator between producers and consumers
  • Zookeeper: store broker and consumer metadata
  • Topics: container for organizing messages
  • Partitions: Unit of parallelism within topic
  • Replication factor: Redundancy of a partition
  • Offset: Single message position in the Kafka Log

Key concepts diagram ⌘

Key concepts.png


Configuration

Zookeeper configuration ⌘

minimum config:

  • tickTime=2000
basic time unit used by ZooKeeper, measured in milliseconds
  • dataDir=/var/lib/zookeeper
location to store snapshots and logs
  • clientPort=2181
the port to listen for client connections

additional configs:

  • autopurge.snapRetainCount=3
The number of snapshots to retain in dataDir
  • autopurge.purgeInterval=1
Purge task interval in hours. Set to "0" to disable auto purge feature

Kafka broker configuration

Kafka Server settings ⌘

  • broker.id=0
unique integer identifier within a cluster
  • port=9092
the port to listen for client connections
  • auto.create.topics.enable=false
set by default to true, in some cases undesirable'

Kafka Log configurations ⌘

  • log.dirs=/tmp/kafka-logs
one or more directories where to store log segments
  • log.flush.interval.messages=10000
the number of messages to accept before forcing a flush of data to disk
  • log.flush.interval.ms=1000
the maximum amount of time a message can sit in a log before we force a flush
  • log.retention.hours=168
  • log.retention.bytes=107374182
  • log.retention.check.interval.ms=300000
retention policies based on time and/or size
  • log.segment.bytes=1073741824
maximum size of a log segment file

Kafka -> Zookeeper settings ⌘

  • zookeeper.connect=localhost:2181
location of zookeper used for storing broker metadata
  • zookeeper.connection.timeout.ms=6000
timeout in ms for connecting to zookeeper

Producer API

Producer API. Configuration ⌘

java example from Kafka Java docs:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");

/*how many partition replicas must receive the record before 
  *the producer can consider the write successful */
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
/*wait in milliseconds for additional messages to get added to 
   the batch before sending it to the brokers */
props.put("linger.ms", 1);
/*buffer size of messages waiting to be sent to brokers*/
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
      producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

producer.close();

Producer API. Sending messages ⌘

Delivery strategy:

  • fire & forget,
  • synchronously,
  • asynchronously


asynchronous example using node.js (ES6):

import kafka from 'kafka-node'
let Producer = kafka.Producer
let client = new kafka.Client('localhost:2181')
let producer = new Producer(client)

let messages = 'test message'
let events = [{topic: 'new_event', messages: ['message body'], partition: 0}]
if (producer.ready) {
      producer.send(event, (err, data) => {console.log(data)})
}

Producer API. Keys and partitioning ⌘

  • null key => random partition
  • <key,value> message with default partitioning => Kafka hash based on key
  • <key,value> message with custom partitioning => Useful for data skew situations

Consumer API

Consumer Properties ⌘

  • Mandatory properties
  • bootstrap.servers
  • key.deserializer
  • value.deserializer
  • (some) Optional properties:
  • group.id
  • enable.auto.commit
  • auto.commit.interval.ms

Consumer API. Commits and offsets ⌘

  • Simple (low-level) vs High-Level API (Kafka version < 0.9.0.0):
- Synchronous or asynchronous committing and offset tracking, or
- Let Kafka manage commits and offsets
  • New consumer API (Kafka >= 0.9.0.0) integrates both approaches in a single API:

KafkaConsumer<Type, Type> consumer = new KafkaConsumer<Type, Type>(properties);
consumer.subscribe(<List of topics>);
while(true) {
      ConsumerRecords<Type, Type> records = consumer.poll(100);
}


[Automatic committing] A call to poll() will always commit the last offset returned by the previous poll.

Manual committing of offsets ⌘

  • consumer.commitSync()
  • consumer.commitAsync( callback() )
  • mixed approach

Consumer groups ⌘

  • Consumer groups provide scalability at topic level
  • consumers send heartbeats to a Kafka broker designated as the Group Coordinator
=> maintain membership in a consumer group and ownership on the partitions assigned to them
  • rebalance is when partition ownership is moved from one consumer to another:
  • a new consumer enters a group
  • a consumer crashes or is shut-down
=> undesirable process that makes the whole group unavailable

Consumer rebalance ⌘

Rebalance.png

Operations ⌘

  • Adding topics
  • Removing topics
(delete.topic.enable=true)
  • Modifying topics

Kafka Design ⌘

  • Motivation
  • Persistence
  • Efficiency
  • The Producer
  • The Consumer
  • Message Delivery Semantics
  • Replication
  • Log Compaction
  • Quotas