Skip to content

Commit 9728c3f

Browse files
committed
add syntax highlight in /docs/streams (excluding developer-guide)
1 parent 2b20b9b commit 9728c3f

3 files changed

Lines changed: 562 additions & 515 deletions

File tree

docs/streams/quickstart.md

Lines changed: 169 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -33,32 +33,33 @@ This tutorial assumes you are starting fresh and have no existing Kafka data. Ho
3333
Kafka Streams is a client library for building mission-critical real-time applications and microservices, where the input and/or output data is stored in Kafka clusters. Kafka Streams combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology to make these applications highly scalable, elastic, fault-tolerant, distributed, and much more.
3434

3535
This quickstart example will demonstrate how to run a streaming application coded in this library. Here is the gist of the `[WordCountDemo](https://github.com/apache/kafka/blob/4.3/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java)` example code.
36-
37-
38-
// Serializers/deserializers (serde) for String and Long types
39-
final Serde<String> stringSerde = Serdes.String();
40-
final Serde<Long> longSerde = Serdes.Long();
41-
42-
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
43-
// represent lines of text (for the sake of this example, we ignore whatever may be stored
44-
// in the message keys).
45-
KStream<String, String> textLines = builder.stream(
46-
"streams-plaintext-input",
47-
Consumed.with(stringSerde, stringSerde)
48-
);
49-
50-
KTable<String, Long> wordCounts = textLines
51-
// Split each text line, by whitespace, into words.
52-
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
53-
54-
// Group the text words as message keys
55-
.groupBy((key, value) -> value)
56-
57-
// Count the occurrences of each word (message key).
58-
.count();
59-
60-
// Store the running counts as a changelog stream to the output topic.
61-
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
36+
37+
```java
38+
// Serializers/deserializers (serde) for String and Long types
39+
final Serde<String> stringSerde = Serdes.String();
40+
final Serde<Long> longSerde = Serdes.Long();
41+
42+
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
43+
// represent lines of text (for the sake of this example, we ignore whatever may be stored
44+
// in the message keys).
45+
KStream<String, String> textLines = builder.stream(
46+
"streams-plaintext-input",
47+
Consumed.with(stringSerde, stringSerde)
48+
);
49+
50+
KTable<String, Long> wordCounts = textLines
51+
// Split each text line, by whitespace, into words.
52+
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
53+
54+
// Group the text words as message keys
55+
.groupBy((key, value) -> value)
56+
57+
// Count the occurrences of each word (message key).
58+
.count();
59+
60+
// Store the running counts as a changelog stream to the output topic.
61+
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
62+
```
6263

6364
It implements the WordCount algorithm, which computes a word occurrence histogram from the input text. However, unlike other WordCount examples you might have seen before that operate on bounded data, the WordCount demo application behaves slightly differently because it is designed to operate on an **infinite, unbounded stream** of data. Similar to the bounded variant, it is a stateful algorithm that tracks and updates the counts of words. However, since it must assume potentially unbounded input data, it will periodically output its current state and results while continuing to process more data because it cannot know when it has processed "all" the input data.
6465

@@ -67,175 +68,192 @@ As the first step, we will start Kafka (unless you already have it started) and
6768
### Step 1: Download the code
6869

6970
[Download](https://www.apache.org/dyn/closer.cgi?path=/kafka/4.3.0/kafka_2.13-4.3.0.tgz "Kafka downloads") the 4.3.0 release and un-tar it. Note that there are multiple downloadable Scala versions and we choose to use the recommended version (2.13) here:
70-
71-
72-
$ tar -xzf kafka_2.13-4.3.0.tgz
73-
$ cd kafka_2.13-4.3.0
71+
72+
```bash
73+
$ tar -xzf kafka_2.13-4.3.0.tgz
74+
$ cd kafka_2.13-4.3.0
75+
```
7476

7577
### Step 2: Start the Kafka server
7678

7779
Generate a Cluster UUID
78-
79-
80-
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
80+
81+
```bash
82+
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
83+
```
8184

8285
Format Log Directories
83-
84-
85-
$ bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
86+
87+
```bash
88+
$ bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
89+
```
8690

8791
Start the Kafka Server
88-
89-
90-
$ bin/kafka-server-start.sh config/server.properties
92+
93+
```bash
94+
$ bin/kafka-server-start.sh config/server.properties
95+
```
9196

9297
### Step 3: Prepare input topic and start Kafka producer
9398

9499
Next, we create the input topic named **streams-plaintext-input** and the output topic named **streams-wordcount-output** :
95-
96-
97-
$ bin/kafka-topics.sh --create \
98-
--bootstrap-server localhost:9092 \
99-
--replication-factor 1 \
100-
--partitions 1 \
101-
--topic streams-plaintext-input
102-
Created topic "streams-plaintext-input".
100+
101+
```bash
102+
$ bin/kafka-topics.sh --create \
103+
--bootstrap-server localhost:9092 \
104+
--replication-factor 1 \
105+
--partitions 1 \
106+
--topic streams-plaintext-input
107+
Created topic "streams-plaintext-input".
108+
```
103109

104110
Note: we create the output topic with compaction enabled because the output stream is a changelog stream (cf. explanation of application output below).
105-
106-
107-
$ bin/kafka-topics.sh --create \
108-
--bootstrap-server localhost:9092 \
109-
--replication-factor 1 \
110-
--partitions 1 \
111-
--topic streams-wordcount-output \
112-
--config cleanup.policy=compact
113-
Created topic "streams-wordcount-output".
111+
112+
```bash
113+
$ bin/kafka-topics.sh --create \
114+
--bootstrap-server localhost:9092 \
115+
--replication-factor 1 \
116+
--partitions 1 \
117+
--topic streams-wordcount-output \
118+
--config cleanup.policy=compact
119+
Created topic "streams-wordcount-output".
120+
```
114121

115122
The created topic can be described with the same **kafka-topics** tool:
116-
117-
118-
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --exclude-internal
119-
Topic:streams-wordcount-output PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact,segment.bytes=1073741824
120-
Topic: streams-wordcount-output Partition: 0 Leader: 0 Replicas: 0 Isr: 0
121-
Topic:streams-plaintext-input PartitionCount:1 ReplicationFactor:1 Configs:segment.bytes=1073741824
122-
Topic: streams-plaintext-input Partition: 0 Leader: 0 Replicas: 0 Isr: 0
123+
124+
```bash
125+
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --exclude-internal
126+
Topic:streams-wordcount-output PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact,segment.bytes=1073741824
127+
Topic: streams-wordcount-output Partition: 0 Leader: 0 Replicas: 0 Isr: 0
128+
Topic:streams-plaintext-input PartitionCount:1 ReplicationFactor:1 Configs:segment.bytes=1073741824
129+
Topic: streams-plaintext-input Partition: 0 Leader: 0 Replicas: 0 Isr: 0
130+
```
123131

124132
### Step 4: Start the Wordcount Application
125133

126134
The following command starts the WordCount demo application:
127-
128-
129-
$ bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
135+
136+
```bash
137+
$ bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
138+
```
130139

131140
The demo application will read from the input topic **streams-plaintext-input** , perform the computations of the WordCount algorithm on each of the read messages, and continuously write its current results to the output topic **streams-wordcount-output**. Hence there won't be any STDOUT output except log entries as the results are written back into in Kafka.
132141

133142
Optionally, use `group.protocol=streams` to enable the new rebalancing protocol introduced in KIP-1071. This shifts task assignment logic from the client to the broker, reducing rebalance times and eliminating stop-the-world rebalances.
134-
135-
136-
$ echo "group.protocol=streams" > streams.properties
137-
$ bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo streams.properties
143+
144+
```bash
145+
$ echo "group.protocol=streams" > streams.properties
146+
$ bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo streams.properties
147+
```
138148

139149
Now we can start the console producer in a separate terminal to write some input data to this topic:
140-
141-
142-
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
150+
151+
```bash
152+
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
153+
```
143154

144155
and inspect the output of the WordCount demo application by reading from its output topic with the console consumer in a separate terminal:
145-
146-
147-
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
148-
--topic streams-wordcount-output \
149-
--from-beginning \
150-
--formatter-property print.key=true \
151-
--formatter-property print.value=true \
152-
--formatter-property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
153-
--formatter-property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
156+
157+
```bash
158+
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
159+
--topic streams-wordcount-output \
160+
--from-beginning \
161+
--formatter-property print.key=true \
162+
--formatter-property print.value=true \
163+
--formatter-property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
164+
--formatter-property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
165+
```
154166

155167
### Step 5: Process some data
156168

157169
Now let's write some message with the console producer into the input topic **streams-plaintext-input** by entering a single line of text and then hit <RETURN>. This will send a new message to the input topic, where the message key is null and the message value is the string encoded text line that you just entered (in practice, input data for applications will typically be streaming continuously into Kafka, rather than being manually entered as we do in this quickstart):
158-
159-
160-
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
161-
>all streams lead to kafka
170+
171+
```bash
172+
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
173+
>all streams lead to kafka
174+
```
162175

163176
This message will be processed by the Wordcount application and the following output data will be written to the **streams-wordcount-output** topic and printed by the console consumer:
164-
165-
166-
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
167-
--topic streams-wordcount-output \
168-
--from-beginning \
169-
--formatter-property print.key=true \
170-
--formatter-property print.value=true \
171-
--formatter-property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
172-
--formatter-property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
173-
174-
all 1
175-
streams 1
176-
lead 1
177-
to 1
178-
kafka 1
177+
178+
```bash
179+
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
180+
--topic streams-wordcount-output \
181+
--from-beginning \
182+
--formatter-property print.key=true \
183+
--formatter-property print.value=true \
184+
--formatter-property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
185+
--formatter-property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
186+
187+
all 1
188+
streams 1
189+
lead 1
190+
to 1
191+
kafka 1
192+
```
179193

180194
Here, the first column is the Kafka message key in `java.lang.String` format and represents a word that is being counted, and the second column is the message value in `java.lang.Long`format, representing the word's latest count.
181195

182196
Now let's continue writing one more message with the console producer into the input topic **streams-plaintext-input**. Enter the text line "hello kafka streams" and hit <RETURN>. Your terminal should look as follows:
183-
184-
185-
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
186-
>all streams lead to kafka
187-
>hello kafka streams
197+
198+
```bash
199+
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
200+
>all streams lead to kafka
201+
>hello kafka streams
202+
```
188203

189204
In your other terminal in which the console consumer is running, you will observe that the WordCount application wrote new output data:
190-
191-
192-
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
193-
--topic streams-wordcount-output \
194-
--from-beginning \
195-
--formatter-property print.key=true \
196-
--formatter-property print.value=true \
197-
--formatter-property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
198-
--formatter-property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
199-
200-
all 1
201-
streams 1
202-
lead 1
203-
to 1
204-
kafka 1
205-
hello 1
206-
kafka 2
207-
streams 2
205+
206+
```bash
207+
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
208+
--topic streams-wordcount-output \
209+
--from-beginning \
210+
--formatter-property print.key=true \
211+
--formatter-property print.value=true \
212+
--formatter-property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
213+
--formatter-property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
214+
215+
all 1
216+
streams 1
217+
lead 1
218+
to 1
219+
kafka 1
220+
hello 1
221+
kafka 2
222+
streams 2
223+
```
208224

209225
Here the last printed lines **kafka 2** and **streams 2** indicate updates to the keys **kafka** and **streams** whose counts have been incremented from **1** to **2**. Whenever you write further input messages to the input topic, you will observe new messages being added to the **streams-wordcount-output** topic, representing the most recent word counts as computed by the WordCount application. Let's enter one final input text line "join kafka summit" and hit <RETURN> in the console producer to the input topic **streams-plaintext-input** before we wrap up this quickstart:
210-
211-
212-
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
213-
>all streams lead to kafka
214-
>hello kafka streams
215-
>join kafka summit
226+
227+
```bash
228+
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
229+
>all streams lead to kafka
230+
>hello kafka streams
231+
>join kafka summit
232+
```
216233

217234
The **streams-wordcount-output** topic will subsequently show the corresponding updated word counts (see last three lines):
218-
219-
220-
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
221-
--topic streams-wordcount-output \
222-
--from-beginning \
223-
--formatter-property print.key=true \
224-
--formatter-property print.value=true \
225-
--formatter-property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
226-
--formatter-property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
227-
228-
all 1
229-
streams 1
230-
lead 1
231-
to 1
232-
kafka 1
233-
hello 1
234-
kafka 2
235-
streams 2
236-
join 1
237-
kafka 3
238-
summit 1
235+
236+
```bash
237+
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
238+
--topic streams-wordcount-output \
239+
--from-beginning \
240+
--formatter-property print.key=true \
241+
--formatter-property print.value=true \
242+
--formatter-property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
243+
--formatter-property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
244+
245+
all 1
246+
streams 1
247+
lead 1
248+
to 1
249+
kafka 1
250+
hello 1
251+
kafka 2
252+
streams 2
253+
join 1
254+
kafka 3
255+
summit 1
256+
```
239257

240258
As one can see, outputs of the Wordcount application is actually a continuous stream of updates, where each output record (i.e. each line in the original output above) is an updated count of a single word, aka record key such as "kafka". For multiple records with the same key, each later record is an update of the previous one.
241259

0 commit comments

Comments
 (0)