Building BurpSuite Extensions

Developing an extension with Extender API: automatic session management part 1

Medium
30 min

Plugin registration

In this module, we will solve a real-world problem by automatically handling situations where the application being tested uses very short login times. This typically becomes a problem when trying to use any kind of automation, perform scans, or use tools that take longer to execute than the time the user stays logged in to the application.

from burp import IBurpExtender
from burp import IHttpListener

class BurpExtender(IBurpExtender, IHttpListener):
  def registerExtenderCallbacks(self, callbacks):
    self.callbacks = callbacks
    self.helpers = callbacks.getHelpers()
    callbacks.setExtensionName("Handle Authentication Plugin")
    callbacks.registerHttpListener(self)
    self.session_token = None

Let's start with the familiar plugin registration. Here we're just adding a new object called self.session_token. This will be used later, but in practice is just a storage location for the current session cookie.

Login programming

Next, we need a function that can programmatically perform the login present in the application and update the new session cookie obtained from it to the previously defined session_token variable.

```python
def makeAuth(self):
    try:
        headers = [
            "POST /login HTTP/1.1",
            "Host: 127.0.0.1:5000",
            "Content-Length: 17",
            "Content-Type: application/x-www-form-urlencoded"
        ]
        body = "username=username"
        
        auth_message = self.helpers.buildHttpMessage(headers, body)
        host = "127.0.0.1"
        port = 5000
        use_https = False
        
        resp = self.callbacks.makeHttpRequest(host, port, use_https, auth_message)
        resp_info = self.helpers.analyzeResponse(resp)
        cookie = resp_info.getHeaders()[7]
        self.session_token = cookie.split("session=")[-1].split(";")[0]
    except Exception as e:
        print(e)
```

In the makeAuth function above, we practically construct an HTTP request to log in to the service, then send this HTTP request using the makeHttpRequest function. We read the incoming response and extract a new login cookie that the service created for us as a result of the login.

The makeHttpRequest function takes the following parameters:

  • host - the domain name to which the request is sent
  • port - the port to which the request is sent
  • use_https - using an HTTPS connection
  • auth_message - the HTTP request that was just constructed

Updating cookies in headers

Next, we create another function that updates the current login cookie in the given header list.

```python
def updateCookies(self, headers):
    new_headers = []
    cookie_seen = False
    for header in headers:
        if "Cookie:" in header:
            new_headers.append("Cookie: session={}".format(self.session_token))
            cookie_seen = True
        else:
            new_headers.append(header)
    if not cookie_seen:
        new_headers.append("Cookie: session={}".format(self.session_token))
    return new_headers
```

This can, of course, be done within the processHttpMessage function, but in this example, we have separated it into its own function. The function returns an updated list of headers with the current cookie.

Building the processing of HTTP request and response

Finally, we write the necessary logic for processing HTTP messages in the processHttpMessage function:

```python
def processHttpMessage(self, tool_flag, is_request, message_info):
    if is_request:
        request = message_info.getRequest()
        request_info = self.helpers.analyzeRequest(request)
        # If the target is correct
        if "127.0.0.1:5000" in request_info.getHeaders()[1]:
            # if the session cookie is saved
            if self.session_token:
                # update the current session cookie in the request
                new_headers = self.updateCookies(request_info.getHeaders())
                body = request[request_info.getBodyOffset():]
                new_message = self.helpers.buildHttpMessage(new_headers, body)
                message_info.setRequest(new_message)
```

for HTTP responses:

```python
else:
    response = message_info.getResponse()
    response_info = self.helpers.analyzeResponse(response)
    # get the status code and if it is 302, and
    # the redirect is happening to path /, we know
    # that the session has expired because the response takes us
    # to the login page
    
    if int(response_info.getStatusCode()) == 302 and "Location: /" in response_info.getHeaders():
        # Update the login cookie by performing
        # the login again
        self.makeAuth()
        
        # Repeat the previously failed HTTP request, where
        # the server requested a login again
        req = message_info.getRequest()
        req_i = self.helpers.analyzeRequest(req)
        new_headers = self.updateCookies(req_i.getHeaders())
        body = req[req_i.getBodyOffset():]
        new_message = self.helpers.buildHttpMessage(new_headers, body)
        host = "127.0.0.1"
        port = 5000
        use_https = False
        success_resp = self.callbacks.makeHttpRequest(host, port, use_https, new_message)
        # Set the response of the repeated HTTP request
        # in place of the response requiring re-login
        message_info.setResponse(success_resp)
    return
```

So in practice, our code reacts to situations where the server responds in a certain way that indicates the login has expired. We then programmatically perform the login ourselves, save the new cookie obtained from it, and use this in future calls. The only complicating factor is that when we notice that the login has expired, we have to repeat the original HTTP request to which the server initially responded so that the session is no longer valid. Once we have completed this with the new cookie, we then set this response in place of the response requiring re-login. This way, no tool is aware that a re-login was performed on the fly during execution, and thus nothing breaks.

Final code

The final code will look something like this:

```python
from burp import IBurpExtender
from burp import IHttpListener

class BurpExtender(IBurpExtender, IHttpListener):
    def registerExtenderCallbacks(self, callbacks):
        self.callbacks = callbacks
        self.helpers = callbacks.getHelpers()
        callbacks.setExtensionName("Handle Authentication Plugin")
        callbacks.registerHttpListener(self)
        self.session_token = None

    def makeAuth(self):
        try:
            headers = [
                "POST /login HTTP/1.1",
                "Host: 127.0.0.1:5000",
                "Content-Length: 17",
                "Content-Type: application/x-www-form-urlencoded"
            ]
            body = "username=username"

            auth_message = self.helpers.buildHttpMessage(headers, body)
            host = "127.0.0.1"
            port = 5000
            use_https = False

            resp = self.callbacks.makeHttpRequest(host, port, use_https, auth_message)
            resp_info = self.helpers.analyzeResponse(resp)
            cookie = resp_info.getHeaders()[7]
            self.session_token = cookie.split("session=")[-1].split(";")[0]
        except Exception as e:
            print(e)

    def updateCookies(self, headers):
        new_headers = []
        cookie_seen = False
        for header in headers:
            if "Cookie:" in header:
                new_headers.append("Cookie: session={}".format(self.session_token))
                cookie_seen = True
            else:
                new_headers.append(header)
        if not cookie_seen:
            new_headers.append("Cookie: session={}".format(self.session_token))
        return new_headers

    def processHttpMessage(self, tool_flag, is_request, message_info):
        if is_request:
            request = message_info.getRequest()
            request_info = self.helpers.analyzeRequest(request)

            if "127.0.0.1:5000" in request_info.getHeaders()[1]:
                if self.session_token:
                    new_headers = self.updateCookies(request_info.getHeaders())
                    body = request[request_info.getBodyOffset():]
                    new_message = self.helpers.buildHttpMessage(new_headers, body)
                    message_info.setRequest(new_message)

        else:
            response = message_info.getResponse()
            response_info = self.helpers.analyzeResponse(response)
            if int(response_info.getStatusCode()) == 302 and "Location: /" in response_info.getHeaders():
                # Session has expired
                self.makeAuth()  # Update session token

                # Replay the failed request with the new session token
                req = message_info.getRequest()
                req_i = self.helpers.analyzeRequest(req)
                new_headers = self.updateCookies(req_i.getHeaders())
                body = req[req_i.getBodyOffset():]
                new_message = self.helpers.buildHttpMessage(new_headers, body)
                host = "127.0.0.1"
                port = 5000
                use_https = False
                success_resp = self.callbacks.makeHttpRequest(host, port, use_https, new_message)
                message_info.setResponse(success_resp)
        return
```

Exercises

Flag

Find the flag from the lab environment and enter it below.

hakatemia pro

Ready to become an ethical hacker?
Start today.

As a member of Hakatemia you get unlimited access to Hakatemia modules, exercises and tools, and you get access to the Hakatemia Discord channel where you can ask for help from both instructors and other Hakatemia members.