Accessibility
Tech Blog
8 MIN READ

Assembling multi-part messages using KafkaStreams API

Igor Buzatović

SOFTWARE DEVELOPER

What and why?

And this is how it works…

Assembling final messages

Neos
val config = Properties()
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "customer-incoming-message-parts-processor")
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.LongSerde::class.java)
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MessagePartSerde::class.java)
config.put(StreamsConfig.EXACTLY_ONCE, true)val streamConfig = StreamsConfig(config)val builder = StreamsBuilder()val inputStream = builder.stream<Long, MessagePart>("customer-incoming-message-parts")
inputStream
        .groupBy({ _, value ->
            "${value.senderId}-${value.senderMsgId}"
        }, Serialized.with(Serdes.String(), MessagePartSerde()))
        .aggregate(
                WholeMessageInitializer(),
                MessagePartAggregator(),
                Materialized.with(Serdes.StringSerde(), WholeMessageSerde())
        )
        .toStream()
        .filter { _, value -> value.parts.size == value.totalPartsCount }
        .map { _, value -> KeyValue(null as Long?, value) }
        .to(
                "customer-incoming-messages",
                Produced.with(Serdes.Long(), WholeMessageSerde())
        )streams = KafkaStreams(builder.build(), streamConfig)
streams.start()

Configuring streams application

Defining topology of streams application

inner class WholeMessageInitializer : Initializer<WholeMessage> {
override fun apply(): WholeMessage {
return WholeMessage(0, "", 0)
}
}

And here is our aggregator. Every time new part comes, the aggregator’s apply method will be called with that part as well as the current aggregation result passed as arguments. What the aggregator needs to do, is to return new aggregation result object based on those arguments. We just make another instance of “WholeMessage” add all parts present in the current aggregation and add a new part.

 

inner class MessagePartAggregator : Aggregator<String, MessagePart, WholeMessage> {
override fun apply(msgId: String, part: MessagePart, message: WholeMessage): WholeMessage? {
val updatedMessage = WholeMessage(part.senderId, part.senderMsgId, part.partsCount)
updatedMessage.parts.addAll(message.parts)
updatedMessage.parts.add(part)
return updatedMessage
}
}

This solves aggregation. The result of the aggregation operation is KTable with String as a key and WholeMessage as a value. Now we can convert it to stream, filter it in order to get only fully assembled WholeMessage records and output those instances to output topic.

 

.toStream()
.filter { _, value -> value.parts.size == value.totalPartsCount }
.map { _, value -> KeyValue(null as Long?, value)}
.to(
"customer-incoming-messages",
Produced.with(Serdes.Long(), WholeMessageSerde())
)
Skip to content