from __future__ import annotations
from json import JSONDecodeError
import requests
import os
import getpass
import logging
from requests.auth import HTTPBasicAuth
from bailo.core.exceptions import BailoException, ResponseException
logger = logging.getLogger(__name__)
[docs]
class Agent:
"""Base API Agent for talking with Bailo.
Wraps each request in an exception handler that maps API errors to Python Bailo errors, among status codes less than 400.
"""
[docs]
def __init__(
self,
verify: str | bool = True,
):
"""Initiate a standard agent.
:param verify: Path to certificate authority file, or bool for SSL verification.
"""
self.verify = verify
def __request(self, method, *args, **kwargs):
kwargs["verify"] = self.verify
res = requests.request(method, *args, **kwargs)
# Check response for a valid range
if res.status_code < 400:
return res
try:
# Give the error message issued by bailo
raise BailoException(res.json()["error"]["message"])
except JSONDecodeError:
# No response given
raise ResponseException(f"{res.status_code} Cannot {method} to {res.request.url}")
[docs]
def get(self, *args, **kwargs):
return self.__request("GET", *args, **kwargs)
[docs]
def post(self, *args, **kwargs):
return self.__request("POST", *args, **kwargs)
[docs]
def patch(self, *args, **kwargs):
return self.__request("PATCH", *args, **kwargs)
[docs]
def push(self, *args, **kwargs):
return self.__request("PUSH", *args, **kwargs)
[docs]
def delete(self, *args, **kwargs):
return self.__request("DELETE", *args, **kwargs)
[docs]
def put(self, *args, **kwargs):
return self.__request("PUT", *args, **kwargs)
[docs]
class PkiAgent(Agent):
[docs]
def __init__(
self,
cert: str,
key: str,
auth: str,
):
"""Initiate an agent for PKI authentication.
:param cert: Path to cert file
:param key: Path to key file
:param auth: Path to certificate authority file
"""
super().__init__(verify=auth)
self.cert = cert
self.key = key
[docs]
def get(self, *args, **kwargs):
return super().get(*args, cert=(self.cert, self.key), **kwargs)
[docs]
def post(self, *args, **kwargs):
return super().post(*args, cert=(self.cert, self.key), **kwargs)
[docs]
def put(self, *args, **kwargs):
return super().put(*args, cert=(self.cert, self.key), **kwargs)
[docs]
def patch(self, *args, **kwargs):
return super().patch(*args, cert=(self.cert, self.key), **kwargs)
[docs]
def delete(self, *args, **kwargs):
return super().delete(*args, cert=(self.cert, self.key), **kwargs)
[docs]
class TokenAgent(Agent):
[docs]
def __init__(
self,
access_key: str | None = None,
secret_key: str | None = None,
):
"""Initiate an agent for API token authentication.
:param access_key: Access key
:param secret_key: Secret key
"""
super().__init__()
if access_key is None:
logger.info("Access key not provided. Trying other sources...")
try:
access_key = os.environ["BAILO_ACCESS_KEY"]
logger.info("Access key acquired from BAILO_ACCESS_KEY environment variable.")
except KeyError:
logger.info("Access key not found in BAILO_ACCESS_KEY environment variable. Requires user input.")
access_key = getpass.getpass("BAILO ACCESS KEY:")
logger.info("Access key acquired from user input.")
if secret_key is None:
logger.info("Secret key not provided. Trying other sources...")
try:
secret_key = os.environ["BAILO_SECRET_KEY"]
logger.info("Secret key acquired from BAILO_SECRET_KEY environment variable.")
except KeyError:
logger.info("Secret key not found in BAILO_SECRET_KEY environment variable. Requires user input.")
secret_key = getpass.getpass("BAILO SECRET KEY:")
logger.info("Secret key acquired from user input.")
self.access_key = access_key
self.secret_key = secret_key
self.basic = HTTPBasicAuth(access_key, secret_key)
[docs]
def get(self, *args, **kwargs):
return super().get(*args, auth=self.basic, **kwargs)
[docs]
def post(self, *args, **kwargs):
return super().post(*args, auth=self.basic, **kwargs)
[docs]
def put(self, *args, **kwargs):
return super().put(*args, auth=self.basic, **kwargs)
[docs]
def patch(self, *args, **kwargs):
return super().patch(*args, auth=self.basic, **kwargs)
[docs]
def delete(self, *args, **kwargs):
return super().delete(*args, auth=self.basic, **kwargs)