Skip to content

Commit d8c4de1

Browse files
committed
Revert "add syntax highlight in /docs/streams (excluding developer-guide)"
This reverts commit 9728c3f.
1 parent 9728c3f commit d8c4de1

3 files changed

Lines changed: 515 additions & 562 deletions

File tree

docs/streams/quickstart.md

Lines changed: 151 additions & 169 deletions
Original file line numberDiff line numberDiff line change
@@ -33,33 +33,32 @@ This tutorial assumes you are starting fresh and have no existing Kafka data. Ho
3333
Kafka Streams is a client library for building mission-critical real-time applications and microservices, where the input and/or output data is stored in Kafka clusters. Kafka Streams combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology to make these applications highly scalable, elastic, fault-tolerant, distributed, and much more.
3434

3535
This quickstart example will demonstrate how to run a streaming application coded in this library. Here is the gist of the `[WordCountDemo](https://github.com/apache/kafka/blob/4.3/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java)` example code.
36-
37-
```java
38-
// Serializers/deserializers (serde) for String and Long types
39-
final Serde<String> stringSerde = Serdes.String();
40-
final Serde<Long> longSerde = Serdes.Long();
41-
42-
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
43-
// represent lines of text (for the sake of this example, we ignore whatever may be stored
44-
// in the message keys).
45-
KStream<String, String> textLines = builder.stream(
46-
"streams-plaintext-input",
47-
Consumed.with(stringSerde, stringSerde)
48-
);
49-
50-
KTable<String, Long> wordCounts = textLines
51-
// Split each text line, by whitespace, into words.
52-
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
53-
54-
// Group the text words as message keys
55-
.groupBy((key, value) -> value)
56-
57-
// Count the occurrences of each word (message key).
58-
.count();
59-
60-
// Store the running counts as a changelog stream to the output topic.
61-
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
62-
```
36+
37+
38+
// Serializers/deserializers (serde) for String and Long types
39+
final Serde<String> stringSerde = Serdes.String();
40+
final Serde<Long> longSerde = Serdes.Long();
41+
42+
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
43+
// represent lines of text (for the sake of this example, we ignore whatever may be stored
44+
// in the message keys).
45+
KStream<String, String> textLines = builder.stream(
46+
"streams-plaintext-input",
47+
Consumed.with(stringSerde, stringSerde)
48+
);
49+
50+
KTable<String, Long> wordCounts = textLines
51+
// Split each text line, by whitespace, into words.
52+
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
53+
54+
// Group the text words as message keys
55+
.groupBy((key, value) -> value)
56+
57+
// Count the occurrences of each word (message key).
58+
.count();
59+
60+
// Store the running counts as a changelog stream to the output topic.
61+
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
6362

6463
It implements the WordCount algorithm, which computes a word occurrence histogram from the input text. However, unlike other WordCount examples you might have seen before that operate on bounded data, the WordCount demo application behaves slightly differently because it is designed to operate on an **infinite, unbounded stream** of data. Similar to the bounded variant, it is a stateful algorithm that tracks and updates the counts of words. However, since it must assume potentially unbounded input data, it will periodically output its current state and results while continuing to process more data because it cannot know when it has processed "all" the input data.
6564

@@ -68,192 +67,175 @@ As the first step, we will start Kafka (unless you already have it started) and
6867
### Step 1: Download the code
6968

7069
[Download](https://www.apache.org/dyn/closer.cgi?path=/kafka/4.3.0/kafka_2.13-4.3.0.tgz "Kafka downloads") the 4.3.0 release and un-tar it. Note that there are multiple downloadable Scala versions and we choose to use the recommended version (2.13) here:
71-
72-
```bash
73-
$ tar -xzf kafka_2.13-4.3.0.tgz
74-
$ cd kafka_2.13-4.3.0
75-
```
70+
71+
72+
$ tar -xzf kafka_2.13-4.3.0.tgz
73+
$ cd kafka_2.13-4.3.0
7674

7775
### Step 2: Start the Kafka server
7876

7977
Generate a Cluster UUID
80-
81-
```bash
82-
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
83-
```
78+
79+
80+
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
8481

8582
Format Log Directories
86-
87-
```bash
88-
$ bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
89-
```
83+
84+
85+
$ bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
9086

9187
Start the Kafka Server
92-
93-
```bash
94-
$ bin/kafka-server-start.sh config/server.properties
95-
```
88+
89+
90+
$ bin/kafka-server-start.sh config/server.properties
9691

9792
### Step 3: Prepare input topic and start Kafka producer
9893

9994
Next, we create the input topic named **streams-plaintext-input** and the output topic named **streams-wordcount-output** :
100-
101-
```bash
102-
$ bin/kafka-topics.sh --create \
103-
--bootstrap-server localhost:9092 \
104-
--replication-factor 1 \
105-
--partitions 1 \
106-
--topic streams-plaintext-input
107-
Created topic "streams-plaintext-input".
108-
```
95+
96+
97+
$ bin/kafka-topics.sh --create \
98+
--bootstrap-server localhost:9092 \
99+
--replication-factor 1 \
100+
--partitions 1 \
101+
--topic streams-plaintext-input
102+
Created topic "streams-plaintext-input".
109103

110104
Note: we create the output topic with compaction enabled because the output stream is a changelog stream (cf. explanation of application output below).
111-
112-
```bash
113-
$ bin/kafka-topics.sh --create \
114-
--bootstrap-server localhost:9092 \
115-
--replication-factor 1 \
116-
--partitions 1 \
117-
--topic streams-wordcount-output \
118-
--config cleanup.policy=compact
119-
Created topic "streams-wordcount-output".
120-
```
105+
106+
107+
$ bin/kafka-topics.sh --create \
108+
--bootstrap-server localhost:9092 \
109+
--replication-factor 1 \
110+
--partitions 1 \
111+
--topic streams-wordcount-output \
112+
--config cleanup.policy=compact
113+
Created topic "streams-wordcount-output".
121114

122115
The created topic can be described with the same **kafka-topics** tool:
123-
124-
```bash
125-
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --exclude-internal
126-
Topic:streams-wordcount-output PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact,segment.bytes=1073741824
127-
Topic: streams-wordcount-output Partition: 0 Leader: 0 Replicas: 0 Isr: 0
128-
Topic:streams-plaintext-input PartitionCount:1 ReplicationFactor:1 Configs:segment.bytes=1073741824
129-
Topic: streams-plaintext-input Partition: 0 Leader: 0 Replicas: 0 Isr: 0
130-
```
116+
117+
118+
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --exclude-internal
119+
Topic:streams-wordcount-output PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact,segment.bytes=1073741824
120+
Topic: streams-wordcount-output Partition: 0 Leader: 0 Replicas: 0 Isr: 0
121+
Topic:streams-plaintext-input PartitionCount:1 ReplicationFactor:1 Configs:segment.bytes=1073741824
122+
Topic: streams-plaintext-input Partition: 0 Leader: 0 Replicas: 0 Isr: 0
131123

132124
### Step 4: Start the Wordcount Application
133125

134126
The following command starts the WordCount demo application:
135-
136-
```bash
137-
$ bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
138-
```
127+
128+
129+
$ bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
139130

140131
The demo application will read from the input topic **streams-plaintext-input** , perform the computations of the WordCount algorithm on each of the read messages, and continuously write its current results to the output topic **streams-wordcount-output**. Hence there won't be any STDOUT output except log entries as the results are written back into in Kafka.
141132

142133
Optionally, use `group.protocol=streams` to enable the new rebalancing protocol introduced in KIP-1071. This shifts task assignment logic from the client to the broker, reducing rebalance times and eliminating stop-the-world rebalances.
143-
144-
```bash
145-
$ echo "group.protocol=streams" > streams.properties
146-
$ bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo streams.properties
147-
```
134+
135+
136+
$ echo "group.protocol=streams" > streams.properties
137+
$ bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo streams.properties
148138

149139
Now we can start the console producer in a separate terminal to write some input data to this topic:
150-
151-
```bash
152-
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
153-
```
140+
141+
142+
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
154143

155144
and inspect the output of the WordCount demo application by reading from its output topic with the console consumer in a separate terminal:
156-
157-
```bash
158-
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
159-
--topic streams-wordcount-output \
160-
--from-beginning \
161-
--formatter-property print.key=true \
162-
--formatter-property print.value=true \
163-
--formatter-property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
164-
--formatter-property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
165-
```
145+
146+
147+
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
148+
--topic streams-wordcount-output \
149+
--from-beginning \
150+
--formatter-property print.key=true \
151+
--formatter-property print.value=true \
152+
--formatter-property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
153+
--formatter-property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
166154

167155
### Step 5: Process some data
168156

169157
Now let's write some message with the console producer into the input topic **streams-plaintext-input** by entering a single line of text and then hit <RETURN>. This will send a new message to the input topic, where the message key is null and the message value is the string encoded text line that you just entered (in practice, input data for applications will typically be streaming continuously into Kafka, rather than being manually entered as we do in this quickstart):
170-
171-
```bash
172-
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
173-
>all streams lead to kafka
174-
```
158+
159+
160+
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
161+
>all streams lead to kafka
175162

176163
This message will be processed by the Wordcount application and the following output data will be written to the **streams-wordcount-output** topic and printed by the console consumer:
177-
178-
```bash
179-
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
180-
--topic streams-wordcount-output \
181-
--from-beginning \
182-
--formatter-property print.key=true \
183-
--formatter-property print.value=true \
184-
--formatter-property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
185-
--formatter-property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
186-
187-
all 1
188-
streams 1
189-
lead 1
190-
to 1
191-
kafka 1
192-
```
164+
165+
166+
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
167+
--topic streams-wordcount-output \
168+
--from-beginning \
169+
--formatter-property print.key=true \
170+
--formatter-property print.value=true \
171+
--formatter-property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
172+
--formatter-property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
173+
174+
all 1
175+
streams 1
176+
lead 1
177+
to 1
178+
kafka 1
193179

194180
Here, the first column is the Kafka message key in `java.lang.String` format and represents a word that is being counted, and the second column is the message value in `java.lang.Long`format, representing the word's latest count.
195181

196182
Now let's continue writing one more message with the console producer into the input topic **streams-plaintext-input**. Enter the text line "hello kafka streams" and hit <RETURN>. Your terminal should look as follows:
197-
198-
```bash
199-
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
200-
>all streams lead to kafka
201-
>hello kafka streams
202-
```
183+
184+
185+
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
186+
>all streams lead to kafka
187+
>hello kafka streams
203188

204189
In your other terminal in which the console consumer is running, you will observe that the WordCount application wrote new output data:
205-
206-
```bash
207-
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
208-
--topic streams-wordcount-output \
209-
--from-beginning \
210-
--formatter-property print.key=true \
211-
--formatter-property print.value=true \
212-
--formatter-property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
213-
--formatter-property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
214-
215-
all 1
216-
streams 1
217-
lead 1
218-
to 1
219-
kafka 1
220-
hello 1
221-
kafka 2
222-
streams 2
223-
```
190+
191+
192+
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
193+
--topic streams-wordcount-output \
194+
--from-beginning \
195+
--formatter-property print.key=true \
196+
--formatter-property print.value=true \
197+
--formatter-property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
198+
--formatter-property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
199+
200+
all 1
201+
streams 1
202+
lead 1
203+
to 1
204+
kafka 1
205+
hello 1
206+
kafka 2
207+
streams 2
224208

225209
Here the last printed lines **kafka 2** and **streams 2** indicate updates to the keys **kafka** and **streams** whose counts have been incremented from **1** to **2**. Whenever you write further input messages to the input topic, you will observe new messages being added to the **streams-wordcount-output** topic, representing the most recent word counts as computed by the WordCount application. Let's enter one final input text line "join kafka summit" and hit <RETURN> in the console producer to the input topic **streams-plaintext-input** before we wrap up this quickstart:
226-
227-
```bash
228-
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
229-
>all streams lead to kafka
230-
>hello kafka streams
231-
>join kafka summit
232-
```
210+
211+
212+
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
213+
>all streams lead to kafka
214+
>hello kafka streams
215+
>join kafka summit
233216

234217
The **streams-wordcount-output** topic will subsequently show the corresponding updated word counts (see last three lines):
235-
236-
```bash
237-
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
238-
--topic streams-wordcount-output \
239-
--from-beginning \
240-
--formatter-property print.key=true \
241-
--formatter-property print.value=true \
242-
--formatter-property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
243-
--formatter-property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
244-
245-
all 1
246-
streams 1
247-
lead 1
248-
to 1
249-
kafka 1
250-
hello 1
251-
kafka 2
252-
streams 2
253-
join 1
254-
kafka 3
255-
summit 1
256-
```
218+
219+
220+
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
221+
--topic streams-wordcount-output \
222+
--from-beginning \
223+
--formatter-property print.key=true \
224+
--formatter-property print.value=true \
225+
--formatter-property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
226+
--formatter-property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
227+
228+
all 1
229+
streams 1
230+
lead 1
231+
to 1
232+
kafka 1
233+
hello 1
234+
kafka 2
235+
streams 2
236+
join 1
237+
kafka 3
238+
summit 1
257239

258240
As one can see, outputs of the Wordcount application is actually a continuous stream of updates, where each output record (i.e. each line in the original output above) is an updated count of a single word, aka record key such as "kafka". For multiple records with the same key, each later record is an update of the previous one.
259241

0 commit comments

Comments
 (0)