Create a Logstash config to feed data to ElasticSearch from Kafka

Nil Seri
2 min readMay 8, 2021

--

Photo by Danny de Groot on Unsplash

You can feed Kafka messages to ElasticSearch via Logstash. You may use these values to search for messages between given dates (like a diff service) and also for logging purposes (Kibana). Different requirements, such as masking or hiding a field for security reasons in logging, may appear.

Here, we will mask the first 15 characters of “subject” field and also remove “fragment” field. If we do this directly on the “zimbra-events” logstash config log type, the fields required for search responses will be modified.

If we search through the internet, we come accross the usage of “clone” as recommended here:

https://alexmarquardt.com/2018/08/31/using-logstash-to-drive-filtered-data-from-a-single-source-into-multiple-output-destinations/

https://stackoverflow.com/questions/60735821/how-to-remove-field-in-logstash-output

Kafka input:

{“inboxUnreadCount”:18,”currentEpochTime”:1597729872,”isDraft”:false,”folderName”:”Inbox”,”isFromMe”:false,”itemId”:1470,”accountMail”:”xxx@yyy.com”,”mailboxId”:33,”sender”:”Miro <notification@miro.com>”,”inSpam”:false,”@timestamp”:”2020–08–18T05:51:12.946Z”,”isLowPriority”:false,”fragment”:”Title Go to Miro AUGUST 09–15 Here is how your team did last week Retrospective TOP 5 editors and 5 comments last week! Here is how your team did …”,”@version”:”1",”type”:”zimbra-events”,”timestamp”:0,”isHighPriority”:false,”subject”:”Postac\u0131 team top board last week: Retrospective”,”conversationId”:-1470,”itemType”:”MESSAGE”,”eventDescription”:”MESSAGE-ADD”,”senderName”:”Miro”,”accountDisplayName”:”Nil Seri”,”eventType”:”ADD”,”folderId”:2}

File output:

{“inSpam”:false,”isFromMe”:false,”itemType”:”MESSAGE”,”inboxUnreadCount”:18,”type”:”logvision-zimbra-events”,”folderName”:”Inbox”,”subject”:”Postacı team to***”,”timestamp”:0,”itemId”:1470,”@timestamp”:”2020–08–18T05:51:12.946Z”,”isHighPriority”:false,”isDraft”:false,”folderId”:2,”sender”:”Miro <notification@miro.com>”,”accountMail”:”xxx@yyy.com”,”eventType”:”ADD”,”accountDisplayName”:”Nil Seri”,”@version”:”1",”senderName”:”Miro”,”eventDescription”:”MESSAGE-ADD”,”conversationId”:-1470,”currentEpochTime”:1597729872,”mailboxId”:33,”isLowPriority”:false}

Logstash “pipeline.conf” file:

Here, “itemType” is a field in the message and depending on its value, it will be directed to another index type (if its value is “CALENDAR” or not) in ElasticSearch.

“mutate” is used to clone and modify values. A different type value is assigned to the mutated filter. For the output, this type value is used to direct the output to a local file here.

Happy Coding!

--

--

Nil Seri
Nil Seri

Written by Nil Seri

I would love to change the world, but they won’t give me the source code | coding 👩🏻‍💻 | coffee ☕️ | jazz 🎷 | anime 🐲 | books 📚 | drawing 🎨

No responses yet