Accessibility
Tech Blog
17 MIN READ

Web messaging application with Apache Kafka and Elasticsearch

Filip Pandi

SOFTWARE DEVELOPER

Introduction

Concept

Neos

Configuring Apache Kafka

Configuring Elasticsearch

Frontend — Angular

 

Initialized setup —Angular side

Neos

Configuring proxy

Neos

For Angular — server communication proxy configuration needs to be configured just for the development stage of a project. To do that “proxy.config.json” file needs to be created inside the project directory.

{
“/kep/*”: {
“target”: “http://localhost:9090",
“secure”: false,
“logLevel”: “debug”,
“changeOrigin”: true
}
}

One last thing, “proxy.config.json” could be added to “angular.json” like this,

“serve”: {
“builder”: “@angular-devkit/build-angular:dev-server”,
“options”: {
“proxyConfig”: “proxy.conf.json”,
“browserTarget”: “KEPClient:build”
},

so when “ng serve” command is called it is using proxy configuration. Another way is just to call “ng serve — proxy-config proxy.config.json” to start the Angular server with proxy configuration.

Angular dependencies

“dependencies”: {
“@angular/animations”: “~9.0.1”,
“@angular/common”: “~9.0.1”,
“@angular/compiler”: “~9.0.1”,
“@angular/core”: “~9.0.1”,
“@angular/forms”: “~9.0.1”,
“@angular/localize”: “⁹.1.12”,
“@angular/platform-browser”: “~9.0.1”,
“@angular/platform-browser-dynamic”: “~9.0.1”,
“@angular/router”: “~9.0.1”,
“@auth0/angular-jwt”: “⁵.0.1”,
“@ng-select/ng-select”: “⁴.0.4”,
“bootstrap”: “⁴.4.1”,
“chart.js”: “².9.3”,
“chartjs-plugin-datalabels”: “⁰.7.0”,
“chartjs-plugin-zoom”: “⁰.7.7”,
“hammerjs”: “².0.8”,
“ng-multiselect-dropdown”: “⁰.2.10”,
“ng-select”: “1.0.0-rc.5”,
“ng2-charts”: “².3.0”,
“ngx-spinner”: “⁹.0.1”,
“rxjs”: “~6.5.4”,
“tslib”: “¹.10.0”,
“zone.js”: “~0.10.2”
}


Components and routing overview

const routes: Routes = [
{ path: ‘’, redirectTo: ‘/login’, pathMatch: ‘full’},
{ path: ‘˛˛login’, component: LoginComponent},
{ path: ‘register’, component: RegisterComponent},
{ path: ‘messenger’, component: MessengerComponent},
{ path: ‘monitoring’, component: MonitoringComponent}
];

 

The app component is going to be used as a navigation component. The landing page is going to be a login component. For login component authentication and interceptor service implementations are necessary.

Authentication (Angular side)

export class AuthInterceptor implements HttpInterceptor {private timeout: number;
tokenSubscription = new Subscription()
constructor(private http: HttpClient,
private router: Router,
private jwtHelper: JwtHelperService,
private authService: AuthService) {}intercept(req: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
const token = localStorage.getItem(‘token’);if (!token) {
 this.router.navigate([‘/’]);
 return next.handle(req);
 }const req1 = req.clone({
headers: req.headers.set(‘Authorization’, `Bearer ${token}`),
 });
this.timeout = this.jwtHelper.getTokenExpirationDate(token).valueOf() — new Date().valueOf();
this.expirationCounter(this.timeout);return next.handle(req1);
 }expirationCounter(timeout) {this.tokenSubscription.unsubscribe();
this.tokenSubscription =      of(null).pipe(delay(timeout)).subscribe((expired) => {
console.log(‘EXPIRED!!’);
this.authService.logout();
this.router.navigate([“/login”]);
 }); 
}
}


Messenger component

 

public loadMessage(senderId: number, receiverId: number): Observable<KafkaMessageModel[]> 
{ return this.http.get<KafkaMessageModel[]> ( this.baseUrl + ‘/messenger/receive/’ + senderId + ‘/’ + receiverId);
}
public sendMessage(kafkaMessage: KafkaMessageModel): Observable<KafkaMessageModel> { 
return this.http.post<KafkaMessageModel>(this.baseUrl + ‘/messenger/send’, kafkaMessage);
}

 

All that is left to be done is to implement a user interface, messenger component, for messenger functionality. Messenger interface allows logged-in users to choose recipient user which will receive messages from the logged-in user, a field for writing a message to a chosen recipient user, and an inbox area to display all the outgoing and incoming messages for conversation between the logged-in user and chosen recipient user. The next image shows a simple messenger interface revealing some conversation between two user’s messages from the perspective of user ‘test01’.

Neos

Kafka consumer lag monitor and component

Neos

Backend — SpringBoot/Apache Kafka/Elasticsearch

Initialized setup — Server-side

dependencies {
implementation ‘org.springframework.boot:spring-boot-starter-actuator’
implementation ‘org.springframework.boot:spring-boot-starter-data-elasticsearch:2.3.3.RELEASE’
implementation ‘org.springframework.boot:spring-boot-starter-data-jpa’
compile ‘org.springframework.boot:spring-boot-starter-jdbc’
compile ‘org.springframework.boot:spring-boot-starter-web:2.2.6.RELEASE’
compile ‘org.postgresql:postgresql:42.2.5’
compile ‘io.jsonwebtoken:jjwt:0.9.1’
runtime ‘org.postgresql:postgresql’
implementation ‘org.springframework.boot:spring-boot-starter-security’
implementation ‘org.apache.kafka:kafka-streams’
compile ‘org.apache.kafka:kafka-clients:2.6.0’
compile ‘com.google.code.gson:gson:2.8.6’
compileOnly ‘org.projectlombok:lombok’
compile ‘org.apache.logging.log4j:log4j-core:2.8.2’
annotationProcessor ‘org.projectlombok:lombok’
testImplementation(‘org.springframework.boot:spring-boot-starter-test’) {
exclude group: ‘org.junit.vintage’, module: ‘junit-vintage-engine’
}
testImplementation ‘org.springframework.kafka:spring-kafka-test’
compile ‘javax.servlet:javax.servlet-api:4.0.1’
testImplementation ‘org.springframework.security:spring-security-test’
compile ‘com.auth0:java-jwt:2.0.1’
compile ‘com.fasterxml.jackson.core:jackson-databind:2.11.1’
}

Authentication (Server-side)

@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException { final String tokenHeader = request.getHeader(AUTHORIZATION);
 String username = null;
 String jwtToken = null;if (tokenHeader != null && tokenHeader.startsWith(“Bearer “)) {
 jwtToken = tokenHeader.substring(7);try {
 username = jwtTokenUtil.getUsernameFromToken(jwtToken);
 } catch (IllegalArgumentException e) {
 logger.debug(“Error while getting username from token!”, e);
 } catch (ExpiredJwtException e) {
 logger.debug(“Token expired!”, e);
 }
 } else {
 logger.warn(“Jwt token doesn’t start with Bearer String…”);
}if (username != null &&
SecurityContextHolder.getContext().getAuthentication() == null) {
UserDetails user =   this.userManagerImpl.loadUserByUsername(username);if (jwtTokenUtil.validateToken(jwtToken, user)) {UsernamePasswordAuthenticationToken usernamePasswordAuthenticationToken =
new UsernamePasswordAuthenticationToken(user, null, user.getAuthorities());usernamePasswordAuthenticationToken.setDetails(new WebAuthenticationDetailsSource().buildDetails(request));SecurityContextHolder.getContext().setAuthentication(usernamePasswordAuthenticationToken);
 }
 }
 filterChain.doFilter(request, response);
 }

Also, JWT utility and general web security configuration classes are necessary. JWT utility class contains all the methods needed for generating and validating user tokens and the web security configuration class contains HTTP security URI matchers and user authority roles and permissions.

 

@Component
public class JwtTokenUtil implements Serializable { 
public static final long JWT_TOKEN_VALIDITY = 5 * 60 * 60;@Value(“${jwt.secret}”)
private String secret;public String getUsernameFromToken(String token) {
 return getClaimFromToken(token, Claims::getSubject);
 }public Date getExpirationDateFromToken(String token) {
 return getClaimFromToken(token, Claims::getExpiration);
 }public <T> T getClaimFromToken(String token, Function<Claims, T> claimsResolver) {
 final Claims claims = getAllClaimsFromToken(token);
 return claimsResolver.apply(claims);
 }private Claims getAllClaimsFromToken(String token) {
 return Jwts.parser().setSigningKey(secret).parseClaimsJws(token).getBody();
 }private Boolean isTokenExpired(String token) {
 final Date expiration = getExpirationDateFromToken(token);
 return expiration.before(new Date());
 }public String generateToken(UserDetails userDetails) {
 Map<String, Object> claims = new HashMap<>();
 return doGenerateToken(claims, userDetails.getUsername());
 }private String doGenerateToken(Map<String, Object> claims, String subject) {
return Jwts.builder().setClaims(claims).setSubject(subject).setIssuedAt(new Date(System.currentTimeMillis()))
 .setExpiration(new Date(System.currentTimeMillis() + JWT_TOKEN_VALIDITY * 1000))
 .signWith(SignatureAlgorithm.HS512, secret).compact();
 }
 public Boolean validateToken(String token, UserDetails userDetails) {
 final String username = getUsernameFromToken(token);
 return (username.equals(userDetails.getUsername()) && !isTokenExpired(token));
}
}

Apache Kafka and Elasticsearch

public KafkaProducer createKafkaProducer(String ack, 
Class<StringSerializer> keySerializer, 
Class<KafkaJsonSerializer> valueSerializer) {

Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.ACKS_CONFIG, ack);

producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
producerProperties.put(ProducerConfig.RETRIES_CONFIG, 10);
producerProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 50000);
producerProperties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 500);
producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, “true”);return new KafkaProducer<>(producerProperties);
 }



public class KafkaJsonSerializer implements Serializer {
 private Logger logger = LogManager.getLogger(this.getClass());@Override
 public void configure(Map configs, boolean isKey) {}@Override
 public byte[] serialize(String topic, Object data) {
 return new byte[0];
 } @Override
 public byte[] serialize(String topic, Headers headers, Object data) { byte[] retVal = null;
 ObjectMapper objectMapper = new ObjectMapper();
 try {
 retVal = objectMapper.writeValueAsBytes(data);
 } catch (Exception e) {
 logger.error(e.getMessage());
 }
 return retVal;
 }
 @Override
 public void close() {
 }
}


public Consumer createKafkaConsumer(String groupId, 
StringDeserializer keyDeserializer, JsonDeserializer 
valueDeserializer) {

Properties consumerProperties = new Properties();

consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “latest”);
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);

return new KafkaConsumer<>(consumerProperties, keyDeserializer, valueDeserializer);
 }

public void createTopicIfNotExist(String topicName, Long messageTopicStorageRetentionMS,String defaultReplicaitonFactor) throws InterruptedException, ExecutionException {synchronized (createTopicLock) {
if (existingTo˛˛pics.contains(topicName)) {
boolean topicExists = kafkaAdmin.listTopics().names().get().contains(topicName);if (!topicExists) {
Map<String, String> topicConfMap = new HashMap<>();
topicConfMap.put(TopicConfig.RETENTION_MS_CONFIG, 
messageTopicStorageRetentionMS.toString());topicConfMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);int messageTopicStorageNumPartitions = 3;NewTopic topic = new NewTopic(topicName, messageTopicStorageNumPartitions,
Short.parseShort(defaultReplicaitonFactor))
.configs(topicConfMap); List<NewTopic> resultTopicList = new ArrayList<>(); resultTopicList.add(topic);
 kafkaAdmin.createTopics(resultTopicList);
 }
 }
 }
}


@PostConstruct
private void createProducerOnStartUp() throws ExecutionException, InterruptedException {
 kafkaElasticUtils.createTopicIfNotExist(kafkaElasticUtils.messageTopicStorage,
kafkaElasticUtils.messageTopicStorageRetentionMS, kafkaElasticUtils.defaultReplicaitonFactor);try {
messageProducer = kafkaElasticUtils.createKafkaProducer(“1”, StringSerializer.class, KafkaJsonSerializer.class);
 logger.debug(“Successfully created kafka producer: {}”, messageProducer);
 } catch (Exception e) {
logger.error(“Error while creating Kafka producer: {}”, e.getMessage());
 }
}


If a producer is successfully created, the application can produce messages to Kafka.

public void startProducing(KafkaMessage kafkaMessage) {try {
messageProducer.send(new ProducerRecord<>(kafkaElasticUtils.messageTopicStorage,   kafkaMessage.receiverUserId.toString(), kafkaMessage));logger.debug(“Message successfully sent to topic: {} with receiver id: {}”, kafkaElasticUtils.messageTopicStorage, kafkaMessage.receiverUserId.toString());
 } catch (Exception e) {
 logger.error(“Error sending message with receiver id: {} — to topic: {} “, kafkaMessage.receiverUserId.toString(), kafkaElasticUtils.messageTopicStorage);
 }
}

The same thing is done for the consumer — it is created on startup after dependency injection is done.

@PostConstruct
private void createKafkaConsumerOnStartup() throws ExecutionException, InterruptedException {
 kafkaElasticUtils.createTopicIfNotExist(kafkaElasticUtils.messageTopicStorage,
kafkaElasticUtils.messageTopicStorageRetentionMS, kafkaElasticUtils.defaultReplicaitonFactor);consumer = kafkaElasticUtils.createKafkaConsumer(GROUP_ID, new StringDeserializer(), new JsonDeserializer<>(KafkaMessage.class));List<String> topics = new ArrayList<>();
 topics.add(kafkaElasticUtils.messageTopicStorage);
logger.debug(“Consumer {} successfully created!”, consumer);consumer.subscribe(topics);
 logger.debug(“Consumer {} successfully subscribed to topics: {}!”, consumer, topics);
}

After the successful creation of a consumer, the application can start consuming messages stored in a Kafka topic and store them in Elasticsearch. Also at this point, the consumer lag can be calculated, so it is processed as well.

 

private void saveMessageToElasticAndProcessTopicLag() {synchronized (consumer) {
ConsumerRecords<String, KafkaMessage> consumerRecords = consumer.poll(Duration.ofMillis(5));if (consumerRecords.count() > 0) {
consumerRecords.forEach(crv -> {
 Long topicLag = processTopicLag(crv.offset(), crv.topic());
kafkaLagProcessor.addKafkaTopicLag(new KafkaMonitorMetrics(DateTime.now().getMillis(), topicLag, crv.topic()));logger.info(“Topic {} lag: {} “, crv.topic(), topicLag); try {
 kafkaElasticsearchManager.saveKafkaMessageToElastic(crv.value());
 } catch (Exception e) {
 logger.error(“Error while saving to Elasticsearch: {}”, e.getMessage()); 
  }}); 
 }
 }
}

    

To process consumer lag simply subtract the end offset with the current offset like this.

 

public long processTopicLag(long offset, String topicName) { List<TopicPartition> partitions = consumer.partitionsFor(topicName)
 .stream().map(p -> new TopicPartition(p.topic(), p.partition()))
 .collect(Collectors.toList());Map<TopicPartition, Long> endOffsets =      consumer.endOffsets(partitions);long topicLag = 0;
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) { Long currentOffset = offset;
 long partitionLag = endOffset.getValue() — currentOffset;
 topicLag += partitionLag;
 }
 return topicLag;
}

To implement the Elasticsearch repository it is enough to create an Elasticsearch repository that extends “ElasticsearchRepository” which then allows the use of methods like a normal JPA repository for an entity that is customized for Elasticsearch use. Index name and schema for an entity is automatically created when the object is sent to Elasticsearch.

@Document(indexName = “kafka_message”)
public class KafkaMessage {@Id
@Field(type = FieldType.Keyword)
@JsonProperty(“id”)
 public String id;@JsonProperty(“message”)
 public String message;@JsonProperty(“senderUsername”)
 public String senderUsername;@JsonProperty(“senderUserId”)
 public Long senderUserId;@JsonProperty(“receiverUserId”)
 public Long receiverUserId;
}

To serve message data back to users, the application needs to load messages from Elasticsearch and return messages to the appropriate user’s conversation. This means that both users need to receive all the messages from each other and to each other.

 

public List<KafkaMessage> loadFromElasticsearch(Long senderId, Long receiverId) { List<KafkaMessage> conversationMessageList = new ArrayList<>(); List<KafkaMessage> receiverMessageList;
 List<KafkaMessage> senderMessageList;try {
receiverMessageList = kafkaElasticsearchManager.loadAllMessagesForUser(receiverId, senderId);senderMessageList = kafkaElasticsearchManager.loadAllMessagesForUser(senderId, receiverId); conversationMessageList.addAll(senderMessageList);
conversationMessageList.addAll(receiverMessageList);
 } catch (Exception e) {
 logger.error(“Error while loading messages out of ES: {}”, e.getMessage());
 }if (conversationMessageList.size() > 0) {
 logger.debug(“ Number of records: {} — pulled out of ES index: {}! For sender user with id: {} and receiver user with id: {}”,
 conversationMessageList.size(), kafkaElasticUtils.elasticIndex, senderId, receiverId); conversationMessageList.sort(Comparator.comparing(kafkaMessage -> kafkaMessage.id, Comparator.reverseOrder()));
 } return conversationMessageList;
}

The application is calling the method every 3 seconds and if there are some messages inside the topic, firstly those messages are written down, from Kafka, into Elasticsearch, and after that processed messages are pulled out of Elasticsearch back to the intended user.

Conclusion and future milestones

Skip to content