Logging (Sentry & Logstash) in Server-Sent Events (Spring Webflux and Kafka)
Log information at every phase in your text/event-stream API
You can see my post to get more information about my implementation of Server-Sent Events with Spring Webflux and Kafka.
Here I will be describing how I implemented logging in my API.
I used MDC (Mapped Diagnostic Context) (which provides a simple key-value map to keep your data) but you should be careful while using it (since the values are used per thread, do not forget to clean up each time before setting -> MDC.clear()).
A Mapped Diagnostic Context, or MDC in short, is an instrument for distinguishing interleaved log output from different sources. Log output is typically interleaved when a server handles multiple clients near-simultaneously. The MDC is managed on a per thread basis.
This is the getter/setter utility file for my MDC properties:
This is my logback.xml file:
This is my sentry.properties file content (for more information, you can visit here):
dsn=http://sentry_token_value@sentry_url:port/no
stacktrace.app.packages=
Please add these dependencies below:
<dependency>
<groupId>io.sentry</groupId>
<artifactId>sentry-logback</artifactId>
<version>1.7.29</version>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>6.3</version>
</dependency>
You can create a config for ELK Stack (because you will be sending your logs to Logstash which can send them to ElasticSearch and you can visualize them after you connect it with Kibana). You can read to get more details here:
I created an interceptor to work on each request:
These are IP and Port header key values in my application.properties:
ip.port.config=remote
header.ip.key=Remote_Addr
header.port.key=Remote_Port
Finally, this is the part where API events are logged:
return kafkaService.getEventPublisher()
.map(stringServerSentEvent -> stringServerSentEvent.data())
.filter(clientEvent -> eventValidationService.isValid(clientEvent, accountMail))
.map(this::clientEventToServerSentEvent)
.mergeWith(heartbeatStream)
.doOnSubscribe(subscription -> log.info("[ON_SUBSCRIBE]"))
.doOnCancel(() -> log.info("[EMAIL-OPERATIONS] [ON_CANCEL]"))
.doOnError(e -> log.error("[ON_ERROR= {}]", CommonUtils.buildStackTraceString(e)))
.doFinally(signalType -> log.info("[FINALLY] [SIGNAL_TYPE= {}]", signalType.name()));
You can get a full list of all possible SignalType values here.
Happy Coding!