Skip to content

Commit

Permalink
Refactor Kafka consumer code and add actuator endpoint
Browse files Browse the repository at this point in the history
This commit refactors the code in the Kafka consumer module and adds an actuator endpoint. The changes include:
- Importing the required dependencies for the actuator and express
- Subscribing to the Kafka topic "delete-child" from the beginning
- Parsing and processing the received Kafka messages based on the resource type
- Adding a new actuator endpoint at the root ("/") to welcome users
  • Loading branch information
vignesh-gupta committed Sep 13, 2024
1 parent 3f986eb commit 6b26ecd
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions apps/kafka-consumer/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
const { Kafka, logLevel } = require("kafkajs");
const { postDeleteProject } = require("./service/delete-child");
const actuator = require("express-actuator");
const express = require("express");

import { KafkaMessage } from "@repo/backend/lib/types";
import { Response } from "express";
const actuator = require('express-actuator');
const express = require("express");

const kafka = new Kafka({
brokers: [process.env.KAFKA_BROKER!],
Expand All @@ -23,7 +24,10 @@ const run = async () => {
await consumer.connect().then(() => console.log("Connected"));

await consumer
.subscribe({ topic: process.env.KAFKA_TOPIC || "delete-child", fromBeginning: true })
.subscribe({
topic: process.env.KAFKA_TOPIC || "delete-child",
fromBeginning: true,
})
.then(() => console.log("Subscribed to topic"));

await consumer.run({
Expand All @@ -38,12 +42,13 @@ const run = async () => {
if (!message.value?.keys) return;

const messageValue: KafkaMessage = JSON.parse(message.value.toString());
const { id, resource } = messageValue;

console.log(messageValue.id);
if (!id || !resource) return;

switch (messageValue.resource) {
switch (resource) {
case "project":
postDeleteProject(messageValue.id);
postDeleteProject(id);
break;

default:
Expand All @@ -60,7 +65,7 @@ const port = process.env.PORT || 8080;

app.use(actuator());

app.get("/", (_, res:Response) => {
app.get("/", (_: any, res: Response) => {
res.send("Welcome to the Projectify Kaka consumer!");
});

Expand Down

0 comments on commit 6b26ecd

Please sign in to comment.