Over a million developers have joined DZone.

Creating Meaningful Documents from Application Logs that Span Multiple Lines – Part 2

Learn about creating documents with application logs which span multiple lines in this handy tutorial. There's a neat scrips for ElasticSearch queries.

Discover 50 of the latest mobile performance statistics with the Ultimate Guide to Digital Experience Monitoring, brought to you in partnership with Catchpoint.

About Part 1

In part one of the article I described the script in portions. In this part, I will describe how documents can be queried in Elasticsearch and also present the complete script.

Retrieving data from Elasticsearch

As you might be aware, Logstash is popularly used along with two other tools, namely Elasticsearch and Kibana, with the three being commonly referred to as ‘ELK stack of tools’. In most cases, log information parsed by Logstash is stored as documents inside Elasticsearch, which is a high-performance search engine.

In our case also, we connect the ‘output’ section of the script to an Elasticsearch instance. On parsing the input, the created documents will be stored in the Elasticsearch instance. Once log information is stored in Elasticsearch, accessing information becomes easier as we can use the ‘search’ interface, instead of having to trawl through long text files. For example, we can easily generate a report of all the ‘ERROR’ messages in the last one month, by invoking a search query on the Elasticsearch index, using the ‘logLevel’ document attribute. A sample query is presented in Snippet 12.

{
  "query": {
    "filtered": {
      "filter": {
        "query": {
          "range": {
            "logTime": {
              "gte": "2015-04-09 00:00:00",
              "lte": "2015-04-09 23:59:00"
            }
          }
        }
      },
      "query": {
        "match": {
          "logLevel": "ERROR"
        }
      }
    }
  }
}

Snippet 12: Elasticsearch query

An important aspect of the cloning process is that because we are creating new records from the cloned record, the new records will have the same timestamp as the original record. Thus, we can not only search for the original record using its timestamp, we can also get all the records created from it, using the same timestamp. To help distinguish between the original record and the subsequent records, we have added a field named ‘recordType’. The value of ‘recordType’ is ‘mater’, for the original record and ‘child’ for the records created from the cloned record. The Elasticsearch query used to extract such records is presented in Snippet 13.

{
  "query": {
    "filtered": {
      "filter": {
        "query": {
          "range": {
            "logTime": {
              "gte": "2015-04-09 00:00:00",
              "lte": "2015-04-09 23:59:00"
            }
          }
        }
      },
      "query": {
        "bool": {
          "must": [
            { "match" : { "logLevel": "ERROR" } },
            { "match" : { "recordType": "child" } }
          ]
        }
      }
    }
  }
}

Snippet 13: Elasticsearch query matching multiple fields

The Full Script

The full script, presented in Snippet 14 that does the following:

  1. Creates a single record from multiple physical lines of the stack trace.
  2. Clones the record for further processing.
  3. The cloned record is split into multiple records, with each record containing granular information.
input {
  file {
    path => "/input/foolproof/input2.log"
    type => "foolproof_log"
    sincedb_path => "/input/foolproof/input2.sincedb"
    start_position => "beginning"
  }
  tcp {
    type => "foolproof_log"
    port => 9999
  }
}

filter {
  if [type] == "foolproof_log" {
    if "cloned" not in [tags] {
      if [message] == "" {
        drop { }
      }

      # read 'sync completed' info message
      grok {
        patterns_dir => "./patterns"
        match => { "message" => "%{CUSTOM_LEADER_TEXT_INFO:logLevel}[,]%{SPACE}%{CUSTOM_TIMESTAMP:logTimeString}[,]%{SPACE}%{CUSTOM_LABEL_SYNC_OPERATION_COMPLETED}%{SPACE}%{NUMBER:moduleID}%{GREEDYDATA:extraText}" }
        add_field => { "subType" => "infoSyncCompleted" }
        remove_tag => [ "_grokparsefailure" ]
      }

      if "_grokparsefailure" not in [tags] {
        mutate {
          gsub => [
            "logTimeString", ",", ";"
          ]
        }
        date {
          locale => "en"
          match => [ "logTimeString", "YYYY-MM-dd HH:mm:ss" ]
          #timezone => "Asia/Calcutta"
          target => "logTime"
        }
      }

      if "_grokparsefailure" in [tags] {
        multiline {
          patterns_dir => "./patterns"
          pattern => "(%{CUSTOM_ERROR_LABEL_1},%{SPACE}%{TIMESTAMP_ISO8601},%{SPACE}%{GREEDYDATA})|(%{CUSTOM_ERROR_LABEL_2}%{SPACE}%{TIMESTAMP_ISO8601}%{SPACE}%{GREEDYDATA})|(%{TIMESTAMP_ISO8601}%{SPACE}%{CUSTOM_MAIN_LABEL}%{SPACE}%{GREEDYDATA})"
          negate => true
          what => previous
        }

        grok {
          patterns_dir => "./patterns"
          match => [ "message", "%{CUSTOM_ERROR_LABEL_1:logLevel},%{SPACE}%{TIMESTAMP_ISO8601:logTimeString},%{SPACE}%{GREEDYDATA:mergedText}" ]
          remove_tag => [ "_grokparsefailure" ]
          add_field => { "recordType" => "master" }
          add_field => { "inputType" => "foolproof_log" }
        }

        # mergedText can be removed as we will be reading from the 'message' field
        mutate {
          remove_field => [ "mergedText" ]
        }

        mutate {
          gsub => [ "logTimeString", ",", ";" ]
        }
        date {
          locale => "en"
          match => [ "logTimeString", "YYYY-MM-dd HH:mm:ss" ]
          #timezone => "Asia/Calcutta"
          target => "logTime"
        }

        ## should we remove \n, \r and \t from 'message' field as well?
        ## removal can make it easy for search. These characters need
        ## to be replaced by some other character like # that does not occur in the message

        # clone the multiline, so that we can split in into constituent parts later
        clone {
          clones => [ "cloned" ]
        }
      }
    }
  }

  # split the 'cloned' multiline into separate parts for detailed processing
  if [type] == "cloned" {
    if [message] == "" {
      drop { }
    }

    if [logLevel] == "ERROR" {
      # this pattern patches the pattern of the multiline
      grok {
        patterns_dir => "./patterns"
        match => [ "message", "%{CUSTOM_ERROR_LABEL_1},%{SPACE}%{TIMESTAMP_ISO8601},%{SPACE}%{GREEDYDATA:mergedMessage}" ]
        remove_tag => [ "_grokparsefailure" ]
        remove_field => [ "message" ]
        remove_tag => [ "multiline" ]
        remove_field => [ "recordType" ]
      }

      mutate {
        add_field => { "recordType" => "child" }
      }

      ## we remove \n, \r and \t from 'mergedMessage' field.
      ## We remove these characters by replacing them by suitable alternative.
      ## we will then use the substituted chracter for splitting the multiline message into
      ## constituent messages
      mutate {
        gsub => [
          "mergedMessage", "\r\n", "#",
          "mergedMessage", "\n\n", "#",
          "mergedMessage", "\r\n\t", "#",
          "mergedMessage", "\n\t", "#",
          "mergedMessage", "\n    ", "#",
          "mergedMessage", "\n   ", "#",
          "mergedMessage", "\n", "#"
        ]
      }

      split {
        field => "mergedMessage"
        target => "splitText"
        terminator => "#"
      }

      # parse business unit id line
      grok {
        patterns_dir => "./patterns"
        match => { "splitText" => "%{CUSTOM_LEADER_TEXT_ERROR_OCCURRED_FOR}[:]%{SPACE}%{CUSTOM_TEXT_2:messge1}[.][.]%{SPACE}%{CUSTOM_LABEL_BUSINESS_UNIT_ID}%{SPACE}[=]%{SPACE}%{NUMBER:businessUnitID}%{SPACE}%{CUSTOM_LABEL_SDM_SOURCE}%{SPACE}[=]%{SPACE}%{IP:sourceIPAddress}%{GREEDYDATA:errorText}" }
        add_field => { "subType" => "errorOccurredFor" }
        remove_tag => [ "_grokparsefailure" ]
      }
      if "_grokparsefailure" not in [tags] {
        mutate {
          convert => [ "businessUnitID", "integer" ]
        }
      }

      # read first error line
      if "_grokparsefailure" in [tags] {
        grok {
          patterns_dir => "./patterns"
          match => { "splitText" => "%{SPACE}%{CUSTOM_ERROR}%{GREEDYDATA:messageText}" }
          add_field => { "subType" => "errorReason" }
          remove_tag => [ "_grokparsefailure" ]
        }
      }

      # read error stack details
      if "_grokparsefailure" in [tags] {
        grok {
          patterns_dir => "./patterns"
          match => { "splitText" => "%{CUSTOM_LEADER_TEXT_ERROR_DETAILS_STACK}[:]%{SPACE}%{GREEDYDATA:errorText}" }
          add_field => { "subType" => "stackTraceStart" }
          remove_tag => [ "_grokparsefailure" ]
        }
      }
      # read error source details
      if "_grokparsefailure" in [tags] {
        grok {
          patterns_dir => "./patterns"
          match => { "splitText" => "%{CUSTOM_LEADER_TEXT_ERROR_SOURCE}[:]%{SPACE}%{GREEDYDATA:errorSource}" }
          add_field => { "subType" => "errorSource" }
          remove_tag => [ "_grokparsefailure" ]
        }
      }
      # read application specific stack line
      if "_grokparsefailure" in [tags] {
        grok {
          patterns_dir => "./patterns"
          match => { "splitText" => "%{SPACE}%{CUSTOM_LEADER_TEXT_APPLICATION_PACKAGE_APP}%{GREEDYDATA:messageText}" }
          add_field => { "subType" => "stackTraceApp" }
          remove_tag => [ "_grokparsefailure" ]
        }
      }

      # read other stack lines
      if "_grokparsefailure" in [tags] {
        grok {
          patterns_dir => "./patterns"
          match => { "splitText" => "%{SPACE}%{CUSTOM_LEADER_TEXT_APPLICATION_PACKAGE_AT}%{SPACE}%{GREEDYDATA:messageText}" }
          add_field => { "subType" => "stackTraceOther" }
          remove_tag => [ "_grokparsefailure" ]
        }
      }
    }
  }
}

output {
  stdout {
    codec => "rubydebug"
  }

  file {
    codec => "json"
    flush_interval => 0 # 0 - every message; 2 - default
    path => "/output/out-foolproof-json.txt"
  }

  elasticsearch {
    host => "localhost"
    index => "foolproof"
  }
}


Snippet 14: Full script

Conclusion

If multiple lines of data from a log file are stored as a single block of text, as is done by the ‘multiline’ plugin, extracting information from the block of text becomes difficult. Any business logic, analytics algorithms or reporting that needs information, will have to implement text processing logic, which makes application maintenance fraught with problems.

To overcome this problem, I have presented a method using Logstash constructs that allows us to not only retain the original record created by ‘multiline’ as it is, but also to create more meaningful and granular records from it. Such granular records can be processed easily by analytics algorithms or by reporting tools.

About the Author

Bipin Patwardhan is a Sr. Technical Architect with IGATE Global Solutions. He is part of the Future and Emerging Technologies group. The group explores new and emerging technologies including 3D printing, wearable computing, analytics and augmented reality. The group creates service lines for enterprises using these technologies. He was earlier leading the High Performance Computing Centre of Excellence (CoE) and the Big Data CoE in the group.

Is your APM strategy broken? This ebook explores the latest in Gartner research to help you learn how to close the end-user experience gap in APM, brought to you in partnership with Catchpoint.

Topics:
performance ,parsing ,elasticsearch ,logstash ,kibana ,logging

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}