DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Actuator Enhancements: Spring Framework 6.2 and Spring Boot 3.4
  • How Spring Boot Starters Integrate With Your Project
  • A Practical Guide to Creating a Spring Modulith Project
  • Structured Logging in Spring Boot 3.4 for Improved Logs

Trending

  • Your AI Agent Tests Are Passing, But Your Agent Is Still Broken
  • Architecting Zero-Trust AI Agents: How to Handle Data Safely
  • 5 AI Security Incidents That Broke Things in Production (and What They Have in Common)
  • Feature Flag Debt: Performance Impact in Enterprise Applications
  1. DZone
  2. Coding
  3. Frameworks
  4. Log Analysis 101 with Apache Flink, Spring Boot and ActiveMQ

Log Analysis 101 with Apache Flink, Spring Boot and ActiveMQ

By 
Preetdeep Kumar user avatar
Preetdeep Kumar
·
Jan. 20, 20 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
11.8K Views

Join the DZone community and get the full member experience.

Join For Free

Log analysis is very useful and allows organizations to get insights from log files.

In this post (which is a beginner's guide), I will share how we to configure a "message queue" in Spring Boot and then use it as a stream source using Flink. This pattern allows for highly decoupled systems, where one component delegates the responsibility for further processing to another component in an asynchronous way.

All the code in this example is available in GitHub.

The data pipeline used in this example is:

1. Configuring Apache Log File Simulator 

For log files, I used a very handy fake apache log generator utility. After installing it, execute the following command to start the file generator in infinite mode 

Shell
 




x


 
1
$ python apache-fake-log-gen.py -n 0 -o LOG



2. Logstash Setup

Now that we have the files being populated, we want an agent to read these log texts and send them to our Spring Boot application. Logstash is a purpose-made utility for such use cases. Just download and unzip it; use the following config text and save it as $LOGSTASH/config/apache-log.yml.

*<app-hostname> is the IP or hostname where you are running the spring boot application.

YAML
 




xxxxxxxxxx
1
18


 
1
input {
2
  file {
3
    path => ["/root/Fake-Apache-Log-Generator/*.log"]
4
  }
5
}
6

          
7
filter {
8
  grok {
9
    match => { "message" => "%{COMBINEDAPACHELOG}" }
10
  }
11
}
12

          
13
output {
14
  http {
15
    url => "http://<app-hostname>:8080/logs"
16
    http_method => "post"
17
  }
18
}



Then, execute the following command to start Logstash. The version I tested is 7.3.2.

Shell
 




x


 
1
$LOGSTASH/bin/logstash -f config/apache.yml


3. Spring Boot Application to Receive the Log Data From Logstash

As with other features, setting up JMS is very easy in Spring Boot. I followed this dzone article and spring boot documentation. I am using a default in-memory ActiveMQ message queue, as shown below, and exposing it as a bean.

Java
 




xxxxxxxxxx
1
22


 
1
@SpringBootApplication
2
@EnableJms
3
@EnableScheduling
4
public class Application 
5
{
6
    private static final String LOCAL_ACTIVE_MQ_URL = "vm://localhost?broker.persistent=false";
7

          
8
    public static void main(String[] args) 
9
    {
10
        SpringApplication.run(Application.class, args);
11
    }
12
    
13
    @Bean
14
    public Session mySession() throws JMSException
15
    {
16
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(LOCAL_ACTIVE_MQ_URL);
17
        factory.setTrustAllPackages( true );
18
        Connection connection = factory.createConnection();
19
        connection.start();
20
        return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
21
    }
22
}



Next, I expose a REST endpoint "http://hostname:8080/logs" to receive log messages from Logstash and send them to the queue. Note that the JSON payload from Logstash is converted to a POJO (ApacheLogMessage.java) before being sent to the queue.

Java
 




xxxxxxxxxx
1
17


 
1
@RestController
2
@RequestMapping("/logs")
3
public class Controller
4
{
5
    @Autowired 
6
    private JmsTemplate jmsTemplate;
7
    
8
    public static final String QUEUE_NAME = "webserverlog";
9
    
10
    @PostMapping
11
    public ResponseEntity<?> sendToQueue(@RequestBody ApacheLogMessage message) 
12
    {
13
        jmsTemplate.convertAndSend(QUEUE_NAME, message );
14
        
15
        return new ResponseEntity<>(HttpStatus.ACCEPTED);            
16
    }
17
}



4. Configure Flink to Use the Queue as a Source

AMQSource.java, under the org.pd.streaming.application.queue package, extends Flink's RichSourceFunction and acts as a source. It takes a consumer object of type MessageConsumer  and listens for messages. Before sending it to Flink, I convert it back to my POJO for the Apache log message.

Java
 




xxxxxxxxxx
1
12


 
1
@Override
2
    public void run( SourceContext<ApacheLogMessage> ctx ) throws Exception
3
    {
4
        while( running )
5
        {
6
            Message m = consumer.receive();
7
            
8
            ApacheLogMessage logMessage = (ApacheLogMessage)((ObjectMessage)m).getObject();
9
            
10
            ctx.collect( logMessage );
11
        }
12
    }



Now that we have the source done, let us put them together and start a Flink execution environment. In the StreamProcess.java class, I first create a Flink DataStream from my source.

Java
 




xxxxxxxxxx
1


 
1
Destination destination = mySession.createQueue(QUEUE_NAME);          
2
MessageConsumer consumer = mySession.createConsumer( destination );
3
source = new AMQSource(consumer);
4
DataStream<ApacheLogMessage> dataStream = env.addSource( source );


 

Then, I build a Flink data pipeline, as shown below:

Java
 




xxxxxxxxxx
1
28


 
1
dataStream
2
.keyBy((KeySelector<ApacheLogMessage, String>) ApacheLogMessage::getClientip)
3
.timeWindow( Time.seconds( 10 ) )
4
.apply( new WindowFunction<ApacheLogMessage,Tuple2<String, Long>, String,TimeWindow>()
5
{
6
  @Override
7
  public void apply( String key, TimeWindow window,Iterable<ApacheLogMessage> input, Collector<Tuple2<String,Long>> out ) throws Exception
8
  {
9
    long count = 0;
10
    for( ApacheLogMessage msg : input)
11
    {
12
      if ( HttpStatus.valueOf( msg.getResponse() ).is4xxClientError() )
13
      {
14
        count++;
15
      }
16
    }
17
    out.collect( new Tuple2<>(key, count) );
18
  }
19
})
20
.filter( new FilterFunction<Tuple2<String,Long>>()
21
{
22
  @Override
23
  public boolean filter( Tuple2<String,Long> value ) throws Exception
24
  {
25
    return value.f1 > 0;
26
  }
27
})
28
.print();



Let me explain in detail how the above code works.

  • Line 2: Partition the log stream using client IP Address as a key.
  • Line 3: Create a tumbling time window of 10 seconds.
  • Line 4 -19: Check for messages where the server reported 4xx response code (client-side errors). Return a Flink Tuple containing the client-ip and the count.
  • Line 20 - 27: Another Flink API to filter out those tuples that didn't have any 4xx errors.

Finally, print these tuples (sample from eclipse console).


I hope this article helps you in getting started with log analysis using Flink. In upcoming posts, I will be adding a database as a sink and sharing some Grafana charts to better visualize our data.


Further Reading

  • Apache Flink Basic Transformation Example.
Spring Framework Spring Boot Log analysis Apache Flink

Opinions expressed by DZone contributors are their own.

Related

  • Actuator Enhancements: Spring Framework 6.2 and Spring Boot 3.4
  • How Spring Boot Starters Integrate With Your Project
  • A Practical Guide to Creating a Spring Modulith Project
  • Structured Logging in Spring Boot 3.4 for Improved Logs

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook