Developing an extension with Extender API: automatic session management part 1
Plugin registration
In this module, we will solve a real-world problem by automatically handling situations where the application being tested uses very short login times. This typically becomes a problem when trying to use any kind of automation, perform scans, or use tools that take longer to execute than the time the user stays logged in to the application.
from burp import IBurpExtender
from burp import IHttpListener
class BurpExtender(IBurpExtender, IHttpListener):
def registerExtenderCallbacks(self, callbacks):
self.callbacks = callbacks
self.helpers = callbacks.getHelpers()
callbacks.setExtensionName("Handle Authentication Plugin")
callbacks.registerHttpListener(self)
self.session_token = None
Let's start with the familiar plugin registration. Here we're just adding a new object called self.session_token. This will be used later, but in practice is just a storage location for the current session cookie.
Login programming
Next, we need a function that can programmatically perform the login present in the application and update the new session cookie obtained from it to the previously defined session_token variable.
```python
def makeAuth(self):
try:
headers = [
"POST /login HTTP/1.1",
"Host: 127.0.0.1:5000",
"Content-Length: 17",
"Content-Type: application/x-www-form-urlencoded"
]
body = "username=username"
auth_message = self.helpers.buildHttpMessage(headers, body)
host = "127.0.0.1"
port = 5000
use_https = False
resp = self.callbacks.makeHttpRequest(host, port, use_https, auth_message)
resp_info = self.helpers.analyzeResponse(resp)
cookie = resp_info.getHeaders()[7]
self.session_token = cookie.split("session=")[-1].split(";")[0]
except Exception as e:
print(e)
```
In the makeAuth function above, we practically construct an HTTP request to log in to the service, then send this HTTP request using the makeHttpRequest function. We read the incoming response and extract a new login cookie that the service created for us as a result of the login.
The makeHttpRequest function takes the following parameters:
- host - the domain name to which the request is sent
- port - the port to which the request is sent
- use_https - using an HTTPS connection
- auth_message - the HTTP request that was just constructed
Updating cookies in headers
Next, we create another function that updates the current login cookie in the given header list.
```python
def updateCookies(self, headers):
new_headers = []
cookie_seen = False
for header in headers:
if "Cookie:" in header:
new_headers.append("Cookie: session={}".format(self.session_token))
cookie_seen = True
else:
new_headers.append(header)
if not cookie_seen:
new_headers.append("Cookie: session={}".format(self.session_token))
return new_headers
```
This can, of course, be done within the processHttpMessage function, but in this example, we have separated it into its own function. The function returns an updated list of headers with the current cookie.
Building the processing of HTTP request and response
Finally, we write the necessary logic for processing HTTP messages in the processHttpMessage function:
```python
def processHttpMessage(self, tool_flag, is_request, message_info):
if is_request:
request = message_info.getRequest()
request_info = self.helpers.analyzeRequest(request)
# If the target is correct
if "127.0.0.1:5000" in request_info.getHeaders()[1]:
# if the session cookie is saved
if self.session_token:
# update the current session cookie in the request
new_headers = self.updateCookies(request_info.getHeaders())
body = request[request_info.getBodyOffset():]
new_message = self.helpers.buildHttpMessage(new_headers, body)
message_info.setRequest(new_message)
```
for HTTP responses:
```python
else:
response = message_info.getResponse()
response_info = self.helpers.analyzeResponse(response)
# get the status code and if it is 302, and
# the redirect is happening to path /, we know
# that the session has expired because the response takes us
# to the login page
if int(response_info.getStatusCode()) == 302 and "Location: /" in response_info.getHeaders():
# Update the login cookie by performing
# the login again
self.makeAuth()
# Repeat the previously failed HTTP request, where
# the server requested a login again
req = message_info.getRequest()
req_i = self.helpers.analyzeRequest(req)
new_headers = self.updateCookies(req_i.getHeaders())
body = req[req_i.getBodyOffset():]
new_message = self.helpers.buildHttpMessage(new_headers, body)
host = "127.0.0.1"
port = 5000
use_https = False
success_resp = self.callbacks.makeHttpRequest(host, port, use_https, new_message)
# Set the response of the repeated HTTP request
# in place of the response requiring re-login
message_info.setResponse(success_resp)
return
```
So in practice, our code reacts to situations where the server responds in a certain way that indicates the login has expired. We then programmatically perform the login ourselves, save the new cookie obtained from it, and use this in future calls. The only complicating factor is that when we notice that the login has expired, we have to repeat the original HTTP request to which the server initially responded so that the session is no longer valid. Once we have completed this with the new cookie, we then set this response in place of the response requiring re-login. This way, no tool is aware that a re-login was performed on the fly during execution, and thus nothing breaks.
Final code
The final code will look something like this:
```python
from burp import IBurpExtender
from burp import IHttpListener
class BurpExtender(IBurpExtender, IHttpListener):
def registerExtenderCallbacks(self, callbacks):
self.callbacks = callbacks
self.helpers = callbacks.getHelpers()
callbacks.setExtensionName("Handle Authentication Plugin")
callbacks.registerHttpListener(self)
self.session_token = None
def makeAuth(self):
try:
headers = [
"POST /login HTTP/1.1",
"Host: 127.0.0.1:5000",
"Content-Length: 17",
"Content-Type: application/x-www-form-urlencoded"
]
body = "username=username"
auth_message = self.helpers.buildHttpMessage(headers, body)
host = "127.0.0.1"
port = 5000
use_https = False
resp = self.callbacks.makeHttpRequest(host, port, use_https, auth_message)
resp_info = self.helpers.analyzeResponse(resp)
cookie = resp_info.getHeaders()[7]
self.session_token = cookie.split("session=")[-1].split(";")[0]
except Exception as e:
print(e)
def updateCookies(self, headers):
new_headers = []
cookie_seen = False
for header in headers:
if "Cookie:" in header:
new_headers.append("Cookie: session={}".format(self.session_token))
cookie_seen = True
else:
new_headers.append(header)
if not cookie_seen:
new_headers.append("Cookie: session={}".format(self.session_token))
return new_headers
def processHttpMessage(self, tool_flag, is_request, message_info):
if is_request:
request = message_info.getRequest()
request_info = self.helpers.analyzeRequest(request)
if "127.0.0.1:5000" in request_info.getHeaders()[1]:
if self.session_token:
new_headers = self.updateCookies(request_info.getHeaders())
body = request[request_info.getBodyOffset():]
new_message = self.helpers.buildHttpMessage(new_headers, body)
message_info.setRequest(new_message)
else:
response = message_info.getResponse()
response_info = self.helpers.analyzeResponse(response)
if int(response_info.getStatusCode()) == 302 and "Location: /" in response_info.getHeaders():
# Session has expired
self.makeAuth() # Update session token
# Replay the failed request with the new session token
req = message_info.getRequest()
req_i = self.helpers.analyzeRequest(req)
new_headers = self.updateCookies(req_i.getHeaders())
body = req[req_i.getBodyOffset():]
new_message = self.helpers.buildHttpMessage(new_headers, body)
host = "127.0.0.1"
port = 5000
use_https = False
success_resp = self.callbacks.makeHttpRequest(host, port, use_https, new_message)
message_info.setResponse(success_resp)
return
```
Ready to become an ethical hacker?
Start today.
As a member of Hakatemia you get unlimited access to Hakatemia modules, exercises and tools, and you get access to the Hakatemia Discord channel where you can ask for help from both instructors and other Hakatemia members.