Create input and output topics
Kafka Streams applications consume data from one or more input topics, perform transformations, and write the results to one or more output topics.
In this step, you will create two topics: streams-input-topic and streams-output-topic.
Step 1: Create the input topic
Execute the following command in the terminal to create the input topic streams-input-topic on the broker running locally at localhost:9092:
# Create the input topic with 3 partitions and replication factor 1
kafka-topics --create --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 --topic streams-input-topic
Step 2: Create the output topic
Next, create the output topic streams-output-topic where your transformed results will be published:
# Create the output topic with 3 partitions and replication factor 1
kafka-topics --create --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 --topic streams-output-topic
Step 3: Verify topics are active
Confirm both topics exist on the broker:
# List all active topics
kafka-topics --list --bootstrap-server localhost:9092
Once both topics are active and verified, click Verify step below to move on to implementing your Kafka Streams processor!
Hint
Create both `streams-input-topic` and `streams-output-topic` using the `kafka-topics` utility with 3 partitions each.
Implement the Streams Processor
Now, you will write a Kafka Streams application in Java. This application will process a stream of key-value records in real-time.
Your application must perform two stateless transformations:
- Filter: Drop any record whose key does not start with the character
"a".
- Map: Uppercase the value of all remaining records.
The result must be written to streams-output-topic.
Step 1: Open StreamsMain in the Editor
Locate and open /home/sandbox/kafka-streams/src/main/java/com/linuxacademy/ccdak/streams/StreamsMain.java in the Editor tab on the right side of the screen.
Step 2: Implement the Streams Topology
Replace the placeholder code in StreamsMain.java with the following complete implementation:
package com.linuxacademy.ccdak.streams;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
public class StreamsMain {
public static void main(String[] args) {
// 1. Set up configuration properties
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-filter-upper");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
final StreamsBuilder builder = new StreamsBuilder();
// 2. Consume from streams-input-topic
final KStream<String, String> source = builder.stream("streams-input-topic");
// 3. Chain transformations: Filter (keys starting with 'a') and map values to uppercase
final KStream<String, String> transformed = source
.filter((key, value) -> key != null && key.startsWith("a"))
.mapValues(value -> value.toUpperCase());
// 4. Output results to streams-output-topic
transformed.to("streams-output-topic");
// 5. Build and start the streams topology
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
System.out.println(topology.describe());
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (final Throwable e) {
System.out.println("Error running Streams app: " + e.getMessage());
System.exit(1);
}
System.exit(0);
}
}
Make sure to Save the file inside your editor after editing.
Click Verify step below to check if your Java source code compiles and maps the required streams properties.
Hint
Open `/home/sandbox/kafka-streams/src/main/java/com/linuxacademy/ccdak/streams/StreamsMain.java` and implement the stateless filter and map operations.
Verify Stream Processing in Real-Time
Now you will run your Kafka Streams application. Since a streams processor is a continuous background loop, it will run in the foreground of your first terminal tab. You will then open a new terminal tab to publish input messages and verify the output.
Step 1: Run the Kafka Streams Application
In your current terminal tab, navigate to the streams project directory and start the application:
# Navigate to the project folder
cd /home/sandbox/kafka-streams
# Compile and run the Streams application
gradle run
This will compile the project and start the processor. The terminal will remain active, describing the streams topology.
Step 2: Open a New Terminal Tab
Click the + (plus) button in the terminal tab strip on the right side of the screen to open a second shell tab (Terminal 2).
Step 3: Produce Test Messages (Terminal 2)
In the new terminal tab, run kafka-console-producer to publish messages with keys to the input topic. Note that we specify --property parse.key=true and --property key.separator=: so Kafka knows how to extract the key from our console inputs.
# Start a console producer expecting Key:Value format
kafka-console-producer --bootstrap-server localhost:9092 --topic streams-input-topic --property parse.key=true --property key.separator=:
Paste or type the following lines into the terminal:
a:apple
b:banana
a:apricot
Once typed, press Ctrl+C to exit the producer.
Step 4: Verify Processed Output (Terminal 2)
Verify that the Streams application filtered out keys not starting with "a", and uppercased the values of the keys that passed:
# Consume from streams-output-topic to view the results
kafka-console-consumer --bootstrap-server localhost:9092 --topic streams-output-topic --from-beginning --property print.key=true --max-messages 2
You should see:
a:APPLE
a:APRICOT
Notice that b:banana is completely missing because it was successfully filtered out by your topology!
Click Verify step below to complete the Kafka Streams scenario!
Hint
Run `gradle run` in `/home/sandbox/kafka-streams` to start your streams application, produce test keys/values, and check that only keys starting with 'a' are uppercased and written to `streams-output-topic`.