KafkaAssignment
Directory actions
More options
Directory actions
More options
KafkaAssignment
Folders and files
| Name | Name | Last commit date | ||
|---|---|---|---|---|
parent directory.. | ||||
This project implements an end-to-end Kafka-based data pipeline using Spring Boot. It produces, consumes, transforms, and validates customer account messages, stores them in PostgreSQL, archives raw messages, and exposes a metrics API.
Set up Infrastructure
cd infra
docker-compose up -d
Build and Run Spring Boot App
cd app
mvn clean install
mvn spring-boot:run
Produce Messages
The producer sends 5 throttled customer account messages to Kafka topic:
curl -X POST http://localhost:8080/produce
Consume and Transform
The consumer reads messages, transforms them, and inserts into customer_account_profile table.
Metrics API
Endpoint
GET /metrics/messages?from=<ISO8601>&to=<ISO8601>
curl "http://localhost:8080/metrics/messages?from=2025-10-15T13:00:00Z&to=2025-10-15T14:00:00Z"
sample response
{
"topic": "customer.account.events.v1",
"from": "2025-10-15T13:00:00Z",
"to": "2025-10-15T14:00:00Z",
"producedCount": 5,
"firstMessageTimestamp": "2025-10-15T13:00:05Z",
"lastMessageTimestamp": "2025-10-15T13:09:55Z"
}
Run tests
{
"topic": "customer.account.events.v1",
"from": "2025-10-15T13:00:00Z",
"to": "2025-10-15T14:00:00Z",
"producedCount": 5,
"firstMessageTimestamp": "2025-10-15T13:00:05Z",
"lastMessageTimestamp": "2025-10-15T13:09:55Z"
}
Output
- Extent HTML report: tests/reports/customer-account-validation-report.html
- Validates:
- JSON schema compliance
- DB inserts
- Minor & employee flag
Postman Collection
Create a Postman collection with:
- GET /metrics/messages with sample ISO8601 timestamps
- POST /produce to trigger message production