<?xml version="1.0"?>
<feed xmlns="http://www.w3.org/2005/Atom" xml:lang="en-GB">
	<id>https://training-course-material.com/index.php?action=history&amp;feed=atom&amp;title=Apache_Kafka</id>
	<title>Apache Kafka - Revision history</title>
	<link rel="self" type="application/atom+xml" href="https://training-course-material.com/index.php?action=history&amp;feed=atom&amp;title=Apache_Kafka"/>
	<link rel="alternate" type="text/html" href="https://training-course-material.com/index.php?title=Apache_Kafka&amp;action=history"/>
	<updated>2026-05-13T09:38:56Z</updated>
	<subtitle>Revision history for this page on the wiki</subtitle>
	<generator>MediaWiki 1.45.1</generator>
	<entry>
		<id>https://training-course-material.com/index.php?title=Apache_Kafka&amp;diff=45688&amp;oldid=prev</id>
		<title>Pablo Recabal at 22:04, 28 October 2016</title>
		<link rel="alternate" type="text/html" href="https://training-course-material.com/index.php?title=Apache_Kafka&amp;diff=45688&amp;oldid=prev"/>
		<updated>2016-10-28T22:04:42Z</updated>

		<summary type="html">&lt;p&gt;&lt;/p&gt;
&lt;p&gt;&lt;b&gt;New page&lt;/b&gt;&lt;/p&gt;&lt;div&gt;{{Cat|Apache|02}}&lt;br /&gt;
&lt;br /&gt;
&amp;lt;slideshow style=&amp;quot;nobleprog&amp;quot; headingmark=&amp;quot;⌘&amp;quot; incmark=&amp;quot;…&amp;quot; scaled=&amp;quot;false&amp;quot; font=&amp;quot;Trebuchet MS&amp;quot; &amp;gt;&lt;br /&gt;
;title: Apache Kafka&lt;br /&gt;
;author: Pablo Recabal for NobleProg&lt;br /&gt;
&amp;lt;/slideshow&amp;gt;&lt;br /&gt;
&lt;br /&gt;
== Overview ⌘==&lt;br /&gt;
&lt;br /&gt;
What is Kafka?&lt;br /&gt;
&lt;br /&gt;
* Messaging system&lt;br /&gt;
* Implements publish/subscribe pattern&lt;br /&gt;
* Distributed&lt;br /&gt;
** Scalable&lt;br /&gt;
** Robust&lt;br /&gt;
* Efficient&lt;br /&gt;
&lt;br /&gt;
== Use cases ⌘==&lt;br /&gt;
* Stream processing&lt;br /&gt;
* Messaging and notifications&lt;br /&gt;
* Commit Log&lt;br /&gt;
* Log and metrics aggregation&lt;br /&gt;
&lt;br /&gt;
== Architecture ⌘==&lt;br /&gt;
[[File:Kafka system.png|640px]]&lt;br /&gt;
&lt;br /&gt;
== Key concepts ⌘==&lt;br /&gt;
* Producer: application that publishes messages&lt;br /&gt;
* Consumer: application that subscribes to messages&lt;br /&gt;
* Broker: mediator between producers and consumers&lt;br /&gt;
* Zookeeper: store broker and consumer metadata&lt;br /&gt;
* Topics: container for organizing messages&lt;br /&gt;
* Partitions: Unit of parallelism within topic&lt;br /&gt;
* Replication factor: Redundancy of a partition&lt;br /&gt;
* Offset: Single message position in the Kafka Log&lt;br /&gt;
&lt;br /&gt;
== Key concepts diagram ⌘==&lt;br /&gt;
[[File:key_concepts.png|640px]]&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
== Configuration ==&lt;br /&gt;
=== Zookeeper configuration ⌘===&lt;br /&gt;
&lt;br /&gt;
&amp;#039;&amp;#039;&amp;#039;minimum config:&amp;#039;&amp;#039;&amp;#039; &lt;br /&gt;
&lt;br /&gt;
* tickTime=2000 &lt;br /&gt;
:&amp;#039;&amp;#039;basic time unit used by ZooKeeper, measured in milliseconds&amp;#039;&amp;#039;&lt;br /&gt;
&lt;br /&gt;
* dataDir=/var/lib/zookeeper&lt;br /&gt;
:&amp;#039;&amp;#039;location to store snapshots and logs&amp;#039;&amp;#039;&lt;br /&gt;
* clientPort=2181&lt;br /&gt;
:&amp;#039;&amp;#039;the port to listen for client connections&amp;#039;&amp;#039;&lt;br /&gt;
&lt;br /&gt;
&amp;#039;&amp;#039;&amp;#039;additional configs:&amp;#039;&amp;#039;&amp;#039;&lt;br /&gt;
&lt;br /&gt;
* autopurge.snapRetainCount=3&lt;br /&gt;
:&amp;#039;&amp;#039;The number of snapshots to retain in dataDir&amp;#039;&amp;#039;&lt;br /&gt;
&lt;br /&gt;
* autopurge.purgeInterval=1&lt;br /&gt;
:&amp;#039;&amp;#039;Purge task interval in hours. Set to &amp;quot;0&amp;quot; to disable auto purge feature&amp;#039;&amp;#039;&lt;br /&gt;
&lt;br /&gt;
=== Kafka broker configuration ===&lt;br /&gt;
&lt;br /&gt;
==== Kafka Server settings ⌘====&lt;br /&gt;
* broker.id=0&lt;br /&gt;
:&amp;#039;&amp;#039;unique integer identifier within a cluster&amp;#039;&amp;#039;&lt;br /&gt;
* port=9092 &lt;br /&gt;
:&amp;#039;&amp;#039;the port to listen for client connections&amp;#039;&amp;#039;&lt;br /&gt;
* auto.create.topics.enable=false&lt;br /&gt;
:&amp;#039;&amp;#039;set by default to true, in some cases undesirable&amp;#039;&lt;br /&gt;
&lt;br /&gt;
==== Kafka Log configurations ⌘====&lt;br /&gt;
* log.dirs=/tmp/kafka-logs&lt;br /&gt;
:&amp;#039;&amp;#039;one or more directories where to store log segments&amp;#039;&amp;#039;&lt;br /&gt;
&lt;br /&gt;
* log.flush.interval.messages=10000&lt;br /&gt;
:&amp;#039;&amp;#039;the number of messages to accept before forcing a flush of data to disk&amp;#039;&amp;#039;&lt;br /&gt;
&lt;br /&gt;
* log.flush.interval.ms=1000&lt;br /&gt;
:&amp;#039;&amp;#039;the maximum amount of time a message can sit in a log before we force a flush&amp;#039;&amp;#039;&lt;br /&gt;
&lt;br /&gt;
* log.retention.hours=168&lt;br /&gt;
* log.retention.bytes=107374182&lt;br /&gt;
* log.retention.check.interval.ms=300000&lt;br /&gt;
:&amp;#039;&amp;#039;retention policies based on time and/or size&amp;#039;&amp;#039;&lt;br /&gt;
&lt;br /&gt;
* log.segment.bytes=1073741824&lt;br /&gt;
:&amp;#039;&amp;#039;maximum size of a log segment file&amp;#039;&amp;#039;&lt;br /&gt;
&lt;br /&gt;
==== Kafka -&amp;gt; Zookeeper settings ⌘====&lt;br /&gt;
* zookeeper.connect=localhost:2181&lt;br /&gt;
:&amp;#039;&amp;#039;location of zookeper used for storing broker metadata&amp;#039;&amp;#039;&lt;br /&gt;
&lt;br /&gt;
* zookeeper.connection.timeout.ms=6000&lt;br /&gt;
:&amp;#039;&amp;#039;timeout in ms for connecting to zookeeper&amp;#039;&amp;#039;&lt;br /&gt;
&lt;br /&gt;
== Producer API ==&lt;br /&gt;
=== Producer API. Configuration ⌘ ===&lt;br /&gt;
&lt;br /&gt;
&amp;#039;&amp;#039;java example from Kafka Java docs:&amp;#039;&amp;#039;&lt;br /&gt;
&amp;lt;small&amp;gt;&amp;lt;small&amp;gt;&lt;br /&gt;
&amp;lt;source lang=&amp;quot;java&amp;quot;&amp;gt;&lt;br /&gt;
Properties props = new Properties();&lt;br /&gt;
props.put(&amp;quot;bootstrap.servers&amp;quot;, &amp;quot;localhost:9092&amp;quot;);&lt;br /&gt;
&lt;br /&gt;
/*how many partition replicas must receive the record before &lt;br /&gt;
  *the producer can consider the write successful */&lt;br /&gt;
props.put(&amp;quot;acks&amp;quot;, &amp;quot;all&amp;quot;);&lt;br /&gt;
props.put(&amp;quot;retries&amp;quot;, 0);&lt;br /&gt;
props.put(&amp;quot;batch.size&amp;quot;, 16384);&lt;br /&gt;
/*wait in milliseconds for additional messages to get added to &lt;br /&gt;
   the batch before sending it to the brokers */&lt;br /&gt;
props.put(&amp;quot;linger.ms&amp;quot;, 1);&lt;br /&gt;
/*buffer size of messages waiting to be sent to brokers*/&lt;br /&gt;
props.put(&amp;quot;buffer.memory&amp;quot;, 33554432);&lt;br /&gt;
props.put(&amp;quot;key.serializer&amp;quot;, &amp;quot;org.apache.kafka.common.serialization.StringSerializer&amp;quot;);&lt;br /&gt;
props.put(&amp;quot;value.serializer&amp;quot;, &amp;quot;org.apache.kafka.common.serialization.StringSerializer&amp;quot;);&lt;br /&gt;
      &lt;br /&gt;
Producer&amp;lt;String, String&amp;gt; producer = new KafkaProducer&amp;lt;&amp;gt;(props);&lt;br /&gt;
for(int i = 0; i &amp;lt; 100; i++)&lt;br /&gt;
      producer.send(new ProducerRecord&amp;lt;String, String&amp;gt;(&amp;quot;my-topic&amp;quot;, Integer.toString(i), Integer.toString(i)));&lt;br /&gt;
&lt;br /&gt;
producer.close();&lt;br /&gt;
&amp;lt;/source&amp;gt;&lt;br /&gt;
&amp;lt;/small&amp;gt;&amp;lt;/small&amp;gt;&lt;br /&gt;
&lt;br /&gt;
=== Producer API. Sending messages ⌘ ===&lt;br /&gt;
&lt;br /&gt;
Delivery strategy: &lt;br /&gt;
:*fire &amp;amp; forget, &lt;br /&gt;
:*synchronously, &lt;br /&gt;
:*asynchronously&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&amp;#039;&amp;#039;asynchronous example using node.js (ES6):&amp;#039;&amp;#039;&lt;br /&gt;
&amp;lt;small&amp;gt;&amp;lt;small&amp;gt;&lt;br /&gt;
&amp;lt;source lang=&amp;quot;javascript&amp;quot;&amp;gt;&lt;br /&gt;
import kafka from &amp;#039;kafka-node&amp;#039;&lt;br /&gt;
let Producer = kafka.Producer&lt;br /&gt;
let client = new kafka.Client(&amp;#039;localhost:2181&amp;#039;)&lt;br /&gt;
let producer = new Producer(client)&lt;br /&gt;
&lt;br /&gt;
let messages = &amp;#039;test message&amp;#039;&lt;br /&gt;
let events = [{topic: &amp;#039;new_event&amp;#039;, messages: [&amp;#039;message body&amp;#039;], partition: 0}]&lt;br /&gt;
if (producer.ready) {&lt;br /&gt;
      producer.send(event, (err, data) =&amp;gt; {console.log(data)})&lt;br /&gt;
}&lt;br /&gt;
&amp;lt;/source&amp;gt;&lt;br /&gt;
&amp;lt;/small&amp;gt;&amp;lt;/small&amp;gt;&lt;br /&gt;
&lt;br /&gt;
=== Producer API. Keys and partitioning ⌘===&lt;br /&gt;
&lt;br /&gt;
* null key =&amp;gt; random partition&lt;br /&gt;
* &amp;lt;key,value&amp;gt; message with default partitioning =&amp;gt; Kafka hash based on key&lt;br /&gt;
* &amp;lt;key,value&amp;gt; message with custom partitioning =&amp;gt; Useful for data skew situations&lt;br /&gt;
&lt;br /&gt;
== Consumer API ==&lt;br /&gt;
&lt;br /&gt;
=== Consumer Properties ⌘===&lt;br /&gt;
* Mandatory properties&lt;br /&gt;
:* bootstrap.servers&lt;br /&gt;
:* key.deserializer&lt;br /&gt;
:* value.deserializer&lt;br /&gt;
&lt;br /&gt;
* (some) Optional properties:&lt;br /&gt;
:* group.id&lt;br /&gt;
:* enable.auto.commit&lt;br /&gt;
:* auto.commit.interval.ms&lt;br /&gt;
&lt;br /&gt;
=== Consumer API. Commits and offsets ⌘===&lt;br /&gt;
* Simple (low-level) vs High-Level API (Kafka version &amp;lt; 0.9.0.0):&lt;br /&gt;
::: - Synchronous or asynchronous committing and offset tracking, or&lt;br /&gt;
::: - Let Kafka manage commits and offsets&lt;br /&gt;
&lt;br /&gt;
* New consumer API (Kafka &amp;gt;= 0.9.0.0) integrates both approaches in a single API:&lt;br /&gt;
&lt;br /&gt;
&amp;lt;small&amp;gt;&amp;lt;small&amp;gt;&lt;br /&gt;
&amp;lt;source lang=&amp;quot;java&amp;quot;&amp;gt;&lt;br /&gt;
KafkaConsumer&amp;lt;Type, Type&amp;gt; consumer = new KafkaConsumer&amp;lt;Type, Type&amp;gt;(properties);&lt;br /&gt;
consumer.subscribe(&amp;lt;List of topics&amp;gt;);&lt;br /&gt;
while(true) {&lt;br /&gt;
      ConsumerRecords&amp;lt;Type, Type&amp;gt; records = consumer.poll(100);&lt;br /&gt;
}&lt;br /&gt;
&lt;br /&gt;
&amp;lt;/source&amp;gt;&lt;br /&gt;
&amp;lt;/small&amp;gt;&amp;lt;/small&amp;gt;&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
[Automatic committing] A call to poll() will always commit the last offset returned by the previous poll.&lt;br /&gt;
&lt;br /&gt;
=== Manual committing of offsets ⌘===&lt;br /&gt;
* consumer.commitSync()&lt;br /&gt;
* consumer.commitAsync( callback() )&lt;br /&gt;
* mixed approach&lt;br /&gt;
&lt;br /&gt;
=== Consumer groups ⌘===&lt;br /&gt;
* Consumer groups provide scalability at topic level&lt;br /&gt;
&lt;br /&gt;
:* consumers send &amp;#039;&amp;#039;&amp;#039;heartbeats&amp;#039;&amp;#039;&amp;#039; to a Kafka broker designated as the Group Coordinator &lt;br /&gt;
::=&amp;gt;  maintain membership in a consumer group and ownership on the partitions assigned to them&lt;br /&gt;
&lt;br /&gt;
:* &amp;#039;&amp;#039;&amp;#039;rebalance&amp;#039;&amp;#039;&amp;#039; is when partition ownership is moved from one consumer to another:&lt;br /&gt;
::* a new consumer enters a group&lt;br /&gt;
::* a consumer crashes or is shut-down&lt;br /&gt;
::=&amp;gt; undesirable process that makes the whole group unavailable&lt;br /&gt;
&lt;br /&gt;
=== Consumer rebalance ⌘===&lt;br /&gt;
[[File:rebalance.png|640px]]&lt;br /&gt;
&lt;br /&gt;
=== Operations ⌘===&lt;br /&gt;
* Adding topics&lt;br /&gt;
* Removing topics &lt;br /&gt;
::(delete.topic.enable=true)&lt;br /&gt;
* Modifying topics&lt;br /&gt;
&lt;br /&gt;
=== Kafka Design ⌘===&lt;br /&gt;
* Motivation&lt;br /&gt;
* Persistence&lt;br /&gt;
* Efficiency&lt;br /&gt;
* The Producer&lt;br /&gt;
* The Consumer&lt;br /&gt;
* Message Delivery Semantics&lt;br /&gt;
* Replication&lt;br /&gt;
* Log Compaction&lt;br /&gt;
* Quotas&lt;/div&gt;</summary>
		<author><name>Pablo Recabal</name></author>
	</entry>
</feed>