Better Threat Detection with CloudWatch Logs and Generative AI

In the previous parts of this series, I set up a base architecture for AWS Threat Detection and Management, deployed AWS WAF, AWS Shield Advanced, GuardDuty, Inspector, and AWS Config. I also briefly touched upon Amazon EventBridge. As we wrap up this series, I want to emphasize the importance of CloudWatch Logs and how Generative AI can be leveraged to enhance our threat detection and management capabilities.

CloudWatch

CloudWatch is a fully managed service that enables you to centralize, store, and analyze log data from various AWS resources, applications, and services. In our case, we are sending logs from AWS WAF, which allows us to monitor and analyze traffic patterns, detect potential threats, and take appropriate actions.

Generative AI for Threat Detection

While CloudWatch Logs provide invaluable data, manually analyzing and interpreting logs can be a daunting and time-consuming task, especially when dealing with large volumes of log data. This is where Generative AI comes into play. By leveraging the power of Large Language Models (LLMs), I can streamline the analysis process and gain valuable insights from our log data.

Here’s the updated section of the blog post with the latest code:

Sample Application with Streamlit and Python

To demonstrate the integration of CloudWatch Logs and Generative AI, I have developed a sample application using Streamlit and Python. Let’s break down the code and understand how it works:

import streamlit as st
import boto3
import json
from botocore.exceptions import ClientError

# Initialize Streamlit app
st.title("CloudWatch Log Analyzer")

# Create a session client for CloudWatch Logs
session = boto3.Session()

The application starts by importing the necessary libraries: streamlit for creating the user interface, boto3 for interacting with AWS services, json for handling JSON data, and botocore.exceptions for catching AWS-related exceptions. The title “CloudWatch Log Analyzer” is set for the Streamlit application, and a session client for CloudWatch Logs is created.

# Get the list of available regions with CloudWatch Logs access
available_regions = []
logs_client = session.client('logs')
try:
    response = logs_client.describe_log_groups()
    available_regions = [region for region in session.get_available_regions('logs')]
except ClientError as e:
    st.error(f"Error retrieving available regions: {e}")

# Set the default region to us-east-1 if available, otherwise use the first available region
default_region = 'us-east-1' if 'us-east-1' in available_regions else available_regions[0] if available_regions else None

# Create a dropdown menu for selecting the region
if available_regions:
    selected_region = st.selectbox("Select a region", available_regions, index=available_regions.index(default_region) if default_region else 0)

    # Create a client for CloudWatch Logs in the selected region
    logs_client = session.client('logs', region_name=selected_region)

The application retrieves the list of available regions where the current AWS credentials have access to CloudWatch Logs. If there’s an error retrieving the available regions, an error message is displayed. The default region is set to us-east-1 if available, or the first available region if us-east-1 is not accessible. A dropdown menu is created for selecting the region, with the default region preselected if available.

# Get a list of log groups in the selected region
try:
    log_groups = logs_client.describe_log_groups()['logGroups']
except ClientError as e:
    st.error(f"Error retrieving log groups in {selected_region}: {e}")
    log_groups = []

# Check if log groups are available
if not log_groups:
    st.warning(f"No logs available in {selected_region}, please select another region.")
else:
    # Create a dropdown menu for selecting the log group
    selected_log_group = st.selectbox("Select a log group", [log_group['logGroupName'] for log_group in log_groups])

    # Get a list of log streams in the selected log group
    try:
        log_streams = logs_client.describe_log_streams(logGroupName=selected_log_group)['logStreams']
    except ClientError as e:
        st.error(f"Error retrieving log streams in {selected_region}/{selected_log_group}: {e}")
        log_streams = []

    # Check if log streams are available
    if not log_streams:
        st.warning(f"No log streams available in {selected_region}/{selected_log_group}, please select another log group.")
    else:
        # Create a dropdown menu for selecting the log stream
        selected_log_stream = st.selectbox("Select a log stream", [log_stream['logStreamName'] for log_stream in log_streams])

The application retrieves the list of log groups in the selected region, handling any errors that may occur due to insufficient permissions. If log groups are available, a dropdown menu is created for selecting the log group. Similarly, the application retrieves the list of log streams in the selected log group, handling any errors that may occur. If log streams are available, a dropdown menu is created for selecting the log stream.

# Get the contents of the selected log stream
try:
    log_stream_data = logs_client.get_log_events(
        logGroupName=selected_log_group,
        logStreamName=selected_log_stream
    )
except ClientError as e:
    st.error(f"Error retrieving log events in {selected_region}/{selected_log_group}/{selected_log_stream}: {e}")
else:
    # Concatenate the log event messages into a single string
    prompt_data = ''.join([event['message'] + '\n' for event in log_stream_data['events']])

    # Create a Bedrock Runtime client
    bedrock = boto3.client(service_name="bedrock-runtime", region_name="us-west-2")

    def summarize_article(prompt_data):
        try:
            prompt_config = {
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 1000,
                "messages": [
                    {
                        "role": "user",
                        "content": [
                            {
                                "type": "text",
                                "text": f"You are a threat detection expert working with the AWS Cloud. Review the log data I am providing for AWS WAF and explain in detail what you see, and if you have any recommendations on rules that could be configured based on what you see in the log data, please tell me.\n\n {prompt_data}"
                            }
                        ]
                    }
                ]
            }
            body = json.dumps(prompt_config)
            modelId = "anthropic.claude-3-sonnet-20240229-v1:0"
            contentType = "application/json"
            accept = "application/json"
            response = bedrock.invoke_model(
                modelId=modelId,
                contentType=contentType,
                accept=accept,
                body=body
            )
            response_body = json.loads(response.get("body").read())
            summary = response_body.get("content")[0]["text"]
            return summary
        except Exception as e:
            st.error(f"An error occurred with the Bedrock Runtime API: {e}")
            return None

    # Display the log data and LLM response on the Streamlit page
    st.markdown("**Log Data:**")
    st.code(prompt_data, language="text")
    response = summarize_article(prompt_data)
    if response:
        st.markdown("**Recommendations:**")
        st.write(response)
else:
    st.warning("No regions available with CloudWatch Logs access.")

The application retrieves the log events from the selected log stream, handling any errors that may occur. If log events are retrieved successfully, the log event messages are concatenated into a single string called prompt_data. A client for the Bedrock Runtime service is created to interact with the LLM.

The summarize_article function takes the prompt_data as input and constructs a prompt configuration object with instructions for the LLM. The log data is included in the prompt, and the LLM is tasked with analyzing the data and providing recommendations for AWS WAF rules. The invoke_model method is used to send the prompt configuration to the Bedrock Runtime API. The LLM’s response is received and parsed to extract the summary.

Finally, the application displays the log data and the LLM’s recommendations on the Streamlit page. If there are no regions available with CloudWatch Logs access, a warning message is displayed.

With this application, you can select a region, log group, and log stream, and the application will retrieve the log data, send it to the LLM for analysis, and display the LLM’s recommendations on the Streamlit page.

Demonstration

To illustrate the application’s capabilities, let’s refer to the provided images:

Figure 1 shows the initial screen of the application, where the AWS region “us-east-1” has been selected.

Figure 1

In Figure 2 you can see the details of the log data being analyzed by the LLM.

Figure 2

Figure 3 is a continuation of Figure 2, that shows the LLM’s summary of what happened. In this case, it indicates that the request was blocked due to the URI /.env being identified as a potential exploitation attempt by the ExploitablePaths_URIPATH rule in the AWS-AWSManagedRulesKnownBadInputsRuleSet rule group.

Figure 3

The final figure, Figure 4, displays the LLM’s recommendations based on the log data analysis.

Figure 4

Improving Threat Detection

This application demonstrates how CloudWatch Logs can be effectively utilized in conjunction with Generative AI to enhance threat detection capabilities. By leveraging the LLM’s natural language understanding and analysis capabilities, we can quickly interpret and gain valuable insights from log data. The LLM can identify potential threats, provide explanations, and recommend appropriate actions or rule configurations based on the log data.

Instead of manually sifting through vast amounts of log data, security analysts can rely on the LLM to streamline the process and highlight potential issues or areas of concern. This approach not only saves time but also reduces the risk of overlooking critical information hidden within the logs.

Conclusion

By integrating CloudWatch Logs and Generative AI, I can significantly improve our threat detection and management capabilities. The sample application showcases how this integration can be achieved using Streamlit, Python, and the Bedrock Runtime LLM. The ability to quickly analyze log data and receive actionable recommendations from the LLM empoIrs security teams to respond promptly and effectively to potential threats.

I encourage you to explore this application and experiment with it using your own log data. Feel free to modify and enhance the code to suit your specific requirements. Happy Labbing!

Leave a Reply