You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
63 lines
1.8 KiB
Python
63 lines
1.8 KiB
Python
|
|
import requests
|
|
|
|
|
|
def _query_params_to_string(params):
|
|
s = "?"
|
|
for k, v in params.items():
|
|
s += f"&{k}={v}"
|
|
return s
|
|
|
|
|
|
class PortainerClient:
|
|
def __init__(self, base_url, endpoint):
|
|
self.endpoint = endpoint
|
|
self.base_url = base_url
|
|
self.token = ""
|
|
self.headers = {}
|
|
|
|
def login(self, username, password):
|
|
payload = {
|
|
"Username": username,
|
|
"Password": password,
|
|
}
|
|
auth_url = f"{self.base_url}/api/auth"
|
|
resp = requests.post(auth_url, json=payload)
|
|
resp.raise_for_status()
|
|
self.token = resp.json()["jwt"]
|
|
self.headers = {
|
|
"Authorization": f"Bearer {self.token}"
|
|
}
|
|
|
|
def get(self, get_endpoint, query_params=None):
|
|
url = f"{self.base_url}/api/{get_endpoint}"
|
|
if query_params:
|
|
url = url + _query_params_to_string(query_params)
|
|
|
|
res = requests.get(url, headers=self.headers)
|
|
res.raise_for_status()
|
|
return res.json()
|
|
|
|
def delete(self, endpoint):
|
|
url = f"{self.base_url}/api/{endpoint}"
|
|
try:
|
|
# TODO: deletion works, but the request fails?
|
|
res = requests.delete(url, headers=self.headers)
|
|
res.raise_for_status()
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
def put(self, endpoint, body):
|
|
url = f"{self.base_url}/api/{endpoint}"
|
|
res = requests.put(url, json=body, headers=self.headers)
|
|
res.raise_for_status()
|
|
return res.json()
|
|
|
|
def post(self, endpoint, body, query_params=None):
|
|
url = f"{self.base_url}/api/{endpoint}" + _query_params_to_string(query_params)
|
|
|
|
res = requests.post(url, json=body, headers=self.headers)
|
|
res.raise_for_status()
|
|
return res.json()
|