{{announcement.body}}
{{announcement.title}}

Log Analysis 101 with Apache Flink, Spring Boot and ActiveMQ

DZone 's Guide to

Log Analysis 101 with Apache Flink, Spring Boot and ActiveMQ

In this article, we show you how to get started with log analysis with Apache Flink, Spring Boot, and ActiveMQ.

· Big Data Zone ·
Free Resource

Log analysis is very useful and allows organizations to get insights from log files.

In this post (which is a beginner's guide), I will share how we to configure a "message queue" in Spring Boot and then use it as a stream source using Flink. This pattern allows for highly decoupled systems, where one component delegates the responsibility for further processing to another component in an asynchronous way.

All the code in this example is available in GitHub.

The data pipeline used in this example is:

1. Configuring Apache Log File Simulator 

For log files, I used a very handy fake apache log generator utility. After installing it, execute the following command to start the file generator in infinite mode 

Shell




x


1
$ python apache-fake-log-gen.py -n 0 -o LOG



2. Logstash Setup

Now that we have the files being populated, we want an agent to read these log texts and send them to our Spring Boot application. Logstash is a purpose-made utility for such use cases. Just download and unzip it; use the following config text and save it as $LOGSTASH/config/apache-log.yml.

*<app-hostname> is the IP or hostname where you are running the spring boot application.

YAML
 




xxxxxxxxxx
1
18


1
input {
2
  file {
3
    path => ["/root/Fake-Apache-Log-Generator/*.log"]
4
  }
5
}
6
 
          
7
filter {
8
  grok {
9
    match => { "message" => "%{COMBINEDAPACHELOG}" }
10
  }
11
}
12
 
          
13
output {
14
  http {
15
    url => "http://<app-hostname>:8080/logs"
16
    http_method => "post"
17
  }
18
}



Then, execute the following command to start Logstash. The version I tested is 7.3.2.

Shell
 




x


1
$LOGSTASH/bin/logstash -f config/apache.yml



You may also like: Application Log Analysis and Data Visualization.

3. Spring Boot Application to Receive the Log Data From Logstash

As with other features, setting up JMS is very easy in Spring Boot. I followed this dzone article and spring boot documentation. I am using a default in-memory ActiveMQ message queue, as shown below, and exposing it as a bean.

Java
 




xxxxxxxxxx
1
22


 
1
@SpringBootApplication
2
@EnableJms
3
@EnableScheduling
4
public class Application 
5
{
6
    private static final String LOCAL_ACTIVE_MQ_URL = "vm://localhost?broker.persistent=false";
7
 
          
8
    public static void main(String[] args) 
9
    {
10
        SpringApplication.run(Application.class, args);
11
    }
12
    
13
    @Bean
14
    public Session mySession() throws JMSException
15
    {
16
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(LOCAL_ACTIVE_MQ_URL);
17
        factory.setTrustAllPackages( true );
18
        Connection connection = factory.createConnection();
19
        connection.start();
20
        return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
21
    }
22
}



Next, I expose a REST endpoint "http://hostname:8080/logs" to receive log messages from Logstash and send them to the queue. Note that the JSON payload from Logstash is converted to a POJO (ApacheLogMessage.java) before being sent to the queue.

Java
 




xxxxxxxxxx
1
17


 
1
@RestController
2
@RequestMapping("/logs")
3
public class Controller
4
{
5
    @Autowired 
6
    private JmsTemplate jmsTemplate;
7
    
8
    public static final String QUEUE_NAME = "webserverlog";
9
    
10
    @PostMapping
11
    public ResponseEntity<?> sendToQueue(@RequestBody ApacheLogMessage message) 
12
    {
13
        jmsTemplate.convertAndSend(QUEUE_NAME, message );
14
        
15
        return new ResponseEntity<>(HttpStatus.ACCEPTED);            
16
    }
17
}



4. Configure Flink to Use the Queue as a Source

AMQSource.java, under the org.pd.streaming.application.queue package, extends Flink's RichSourceFunction and acts as a source. It takes a consumer object of type MessageConsumer  and listens for messages. Before sending it to Flink, I convert it back to my POJO for the Apache log message.

Java
 




xxxxxxxxxx
1
12


 
1
@Override
2
    public void run( SourceContext<ApacheLogMessage> ctx ) throws Exception
3
    {
4
        while( running )
5
        {
6
            Message m = consumer.receive();
7
            
8
            ApacheLogMessage logMessage = (ApacheLogMessage)((ObjectMessage)m).getObject();
9
            
10
            ctx.collect( logMessage );
11
        }
12
    }



Now that we have the source done, let us put them together and start a Flink execution environment. In the StreamProcess.java class, I first create a Flink DataStream from my source.

Java
 




xxxxxxxxxx
1


 
1
Destination destination = mySession.createQueue(QUEUE_NAME);          
2
MessageConsumer consumer = mySession.createConsumer( destination );
3
source = new AMQSource(consumer);
4
DataStream<ApacheLogMessage> dataStream = env.addSource( source );


 

Then, I build a Flink data pipeline, as shown below:

Java
 




xxxxxxxxxx
1
28


 
1
dataStream
2
.keyBy((KeySelector<ApacheLogMessage, String>) ApacheLogMessage::getClientip)
3
.timeWindow( Time.seconds( 10 ) )
4
.apply( new WindowFunction<ApacheLogMessage,Tuple2<String, Long>, String,TimeWindow>()
5
{
6
  @Override
7
  public void apply( String key, TimeWindow window,Iterable<ApacheLogMessage> input, Collector<Tuple2<String,Long>> out ) throws Exception
8
  {
9
    long count = 0;
10
    for( ApacheLogMessage msg : input)
11
    {
12
      if ( HttpStatus.valueOf( msg.getResponse() ).is4xxClientError() )
13
      {
14
        count++;
15
      }
16
    }
17
    out.collect( new Tuple2<>(key, count) );
18
  }
19
})
20
.filter( new FilterFunction<Tuple2<String,Long>>()
21
{
22
  @Override
23
  public boolean filter( Tuple2<String,Long> value ) throws Exception
24
  {
25
    return value.f1 > 0;
26
  }
27
})
28
.print();



Let me explain in detail how the above code works.

  • Line 2: Partition the log stream using client IP Address as a key.
  • Line 3: Create a tumbling time window of 10 seconds.
  • Line 4 -19: Check for messages where the server reported 4xx response code (client-side errors). Return a Flink Tuple containing the client-ip and the count.
  • Line 20 - 27: Another Flink API to filter out those tuples that didn't have any 4xx errors.

Finally, print these tuples (sample from eclipse console).


I hope this article helps you in getting started with log analysis using Flink. In upcoming posts, I will be adding a database as a sink and sharing some Grafana charts to better visualize our data.


Further Reading

Topics:
flink ,log analysis ,big data ,spring boot ,flink api

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}