Source code for bailo.core.client

from __future__ import annotations

from io import BytesIO
from typing import Any

from bailo.core.agent import Agent, TokenAgent
from bailo.core.enums import CollaboratorEntry, EntryKind, ModelVisibility, SchemaKind
from bailo.core.utils import filter_none


[docs] class Client: """Create a Client object that can be used to talk to the website. :param url: Url of bailo website :param agent: An agent object to handle requests """
[docs] def __init__(self, url: str, agent: Agent = Agent()): """Initialise a Client. :param url: URL of the Bailo instance website. :param agent: An agent object to handle requests, defaults to Agent(). """ self.url = url.rstrip("/") + "/api" self.agent = agent
[docs] def post_model( self, name: str, kind: EntryKind, description: str, sourceModelId: str | None = None, visibility: ModelVisibility | None = None, organisation: str | None = None, state: str | None = None, tags: list[str] | None = None, collaborators: list[CollaboratorEntry] | None = None, ): """Create a model. :param name: Name of the model :param kind: Either a Model, Mirrored Model or a Datacard :param description: Description of the model :param sourceModelId: Used for syncing a mirrored model to its source model :param visibility: Enum to define model visibility (e.g public or private), defaults to None :param organisation: Organisation responsible for the model, defaults to None :param state: Development readiness of the model, defaults to None :param tags: Tags to assign to the model, defaults to None :param collaborators: List of CollaboratorEntry to define who the model's collaborators (a.k.a. model access) are, defaults to None :return: JSON response object """ _visibility: str = "public" if visibility is not None: _visibility = str(visibility) if sourceModelId is not None and kind != EntryKind.MIRRORED_MODEL: raise ValueError("Only Mirrored Models may pass a `sourceModelId` argument.") if sourceModelId is None and kind == EntryKind.MIRRORED_MODEL: raise ValueError("Mirrored Models must specify a `sourceModelId` argument.") filtered_json = filter_none( { "name": name, "kind": kind, "description": description, "settings": { "mirror": { "sourceModelId": sourceModelId, }, }, "visibility": _visibility, "organisation": organisation, "state": state, "tags": tags, "collaborators": collaborators, } ) return self.agent.post( f"{self.url}/v2/models", json=filtered_json, ).json()
[docs] def get_models( self, task: str | None = None, libraries: list[str] | None = None, filters: list[str] | None = None, search: str = "", kind: EntryKind | None = None, organisations: list[str] | None = None, states: list[str] | None = None, allow_templating: bool | None = None, schema_id: str | None = None, admin_access: bool | None = None, peers: list[str] | None = None, title_only: bool | None = None, ): """Search for models using a combination of structured filters and free-text search. Calls `/api/v2/models/search` and returns a list of entry summaries visible to the current user. Results may include both local models and, if requested, models returned from configured peers. Any peer or local search errors are included alongside results. :param task: Entry task (e.g. image classification), defaults to None :param libraries: Entry library (e.g. TensorFlow), defaults to None :param filters: List of collaborator role filters. Special value `"mine"` restricts results to models where the current user is a collaborator. Otherwise, values are treated as collaborator roles, defaults to None :param search: Free-text search string. Always performs a partial, case-insensitive match against the entry name. If `title_only` is False, a full-text search across entry content is also performed, defaults to "" :param kind: Entry kind to filter by (e.g. `EntryKind.MODEL`), defaults to None :param organisations: List of organisation identifiers to restrict results, defaults to None :param states: List of entry lifecycle states to restrict results, defaults to None :param allow_templating: If True, restricts results to models with templating enabled, defaults to None :param schema_id: Schema ID to restrict results to models using that schema, defaults to None :param admin_access: If True, returns models requiring admin access. The caller must have the Admin role or the request will be rejected by the backend, defaults to None :param peers: List of peer identifiers to include remote search results from, defaults to None :param title_only: If True, limits searching to entry titles only and disables full-text search, defaults to None :return: JSON response object """ filtered_params = filter_none( { "kind": kind, "task": task, "libraries": libraries, "organisations": organisations, "states": states, "filters": filters, "search": search, "allowTemplating": allow_templating, "schemaId": schema_id, "adminAccess": admin_access, "peers": peers, "titleOnly": title_only, } ) return self.agent.get( f"{self.url}/v2/models/search", params=filtered_params, ).json()
[docs] def get_model( self, model_id: str, ): """Retrieve a specific model using its unique ID. :param model_id: Unique model ID :return: JSON response object """ return self.agent.get( f"{self.url}/v2/model/{model_id}", ).json()
[docs] def patch_model( self, model_id: str, name: str | None = None, kind: str | None = None, description: str | None = None, visibility: str | None = None, organisation: str | None = None, state: str | None = None, tags: list[str] | None = None, collaborators: list[CollaboratorEntry] | None = None, ): """Update a specific model using its unique ID. :param model_id: Unique model ID :param name: Name of the model, defaults to None :param kind: Either a Model, Mirrored Model or a Datacard, defaults to None :param description: Description of the model, defaults to None :param visibility: Enum to define model visibility (e.g public or private), defaults to None :param organisation: Organisation responsible for the model, defaults to None :param state: Development readiness of the model, defaults to None :param tags: Tags to assign to the model, defaults to None :param collaborators: List of CollaboratorEntry to define who the model's collaborators (a.k.a. model access) are, defaults to None :return: JSON response object """ filtered_json = filter_none( { "name": name, "organisation": organisation, "state": state, "kind": kind, "description": description, "visibility": visibility, "collaborators": collaborators, "tags": tags, } ) return self.agent.patch(f"{self.url}/v2/model/{model_id}", json=filtered_json).json()
[docs] def delete_model( self, model_id: str, ): """ Delete a specific model and all associated artefacts. :param model_id: Unique model ID :return: JSON response object """ return self.agent.delete( f"{self.url}/v2/model/{model_id}", ).json()
[docs] def get_model_card( self, model_id: str, version: str, mirrored: bool = False, ): """Retrieve a specific model card, using the unique model ID and version. :param model_id: Unique model ID :param version: Model card version :param mirrored: Whether to get the read only model card :return: JSON response object """ return self.agent.get( f"{self.url}/v2/model/{model_id}/model-card/{version}", params={mirrored: mirrored} ).json()
[docs] def put_model_card( self, model_id: str, metadata: Any, ): """Update the latest model card, using the unique model ID. :param model_id: Unique model ID :param metadata: Metadata object, defined by model card schema :return: JSON response object """ return self.agent.put( f"{self.url}/v2/model/{model_id}/model-cards", json={ "metadata": metadata, }, ).json()
[docs] def model_card_from_schema( self, model_id: str, schema_id: str, ): """ Create a model card using a given schema ID. :param model_id: Unique model ID :param schema_id: Unique model card schema ID :return: JSON response object """ return self.agent.post( f"{self.url}/v2/model/{model_id}/setup/from-schema", json={ "schemaId": schema_id, }, ).json()
[docs] def model_card_from_template(self, model_id: str, template_id: str | None): """Create a model card using a given template ID (previously created models, model ID) :param model_id: Unique model ID :param template_id Previous model's unique ID to be used as template for new model card :return: JSON response object """ return self.agent.post( f"{self.url}/v2/model/{model_id}/setup/from-template", json={"templateId": template_id}, ).json()
[docs] def post_release( self, model_id: str, release_version: str, notes: str, file_ids: list[str], images: list[str], model_card_version: int | None = None, minor: bool | None = False, draft: bool | None = False, ): """ Create a new model release. :param model_id: Unique model ID :param model_card_version: Model card version :param release_version: Release version :param notes: Notes on release :param file_ids: Files for release :param images: Images for release :param minor: Signifies a minor release, defaults to False :param draft: Signifies a draft release, defaults to False :return: JSON response object """ filtered_json = filter_none( { "modelCardVersion": model_card_version, "semver": release_version, "notes": notes, "minor": minor, "draft": draft, "fileIds": file_ids, "images": images, } ) return self.agent.post(f"{self.url}/v2/model/{model_id}/releases", json=filtered_json).json()
[docs] def put_release( self, model_id: str, model_card_version: int, release_version: str, notes: str, draft: bool, file_ids: list[str], images: list[str], ): """ Create a new model release. :param model_id: Unique model ID :param model_card_version: Model card version :param release_version: Release version :param notes: Notes on release :param file_ids: Files for release :param images: Images for release :param minor: Signifies a minor release, defaults to False :param draft: Signifies a draft release, defaults to False :return: JSON response object """ return self.agent.put( f"{self.url}/v2/model/{model_id}/release/{release_version}", json={ "notes": notes, "draft": draft, "fileIds": file_ids, "images": images, "modelCardVersion": model_card_version, }, ).json()
[docs] def get_all_releases( self, model_id: str, ): """ Get all releases for a model. :param model_id: Unique model ID :return: JSON response object """ return self.agent.get( f"{self.url}/v2/model/{model_id}/releases", ).json()
[docs] def get_release(self, model_id: str, release_version: str): """ Get a specific model release. :param model_id: Unique model ID :param release_version: Release version :return: JSON response object """ return self.agent.get( f"{self.url}/v2/model/{model_id}/release/{release_version}", ).json()
[docs] def delete_release( self, model_id: str, release_version: str, ): """ Delete a specific model release. :param model_id: Unique model ID :param release_version: Release version :return: JSON response object """ return self.agent.delete( f"{self.url}/v2/model/{model_id}/release/{release_version}", ).json()
[docs] def get_files( self, model_id: str, ): """ Get files for a model. :param model_id: Unique model ID :return: JSON response object """ return self.agent.get( f"{self.url}/v2/model/{model_id}/files", ).json()
[docs] def get_download_file( self, model_id: str, file_id: str, ): """Download a specific file by its id. :param model_id: Unique model ID :param file_id: Unique file ID :return: The unique file ID """ if isinstance(self.agent, TokenAgent): return self.agent.get( f"{self.url}/v2/token/model/{model_id}/file/{file_id}/download", stream=True, timeout=10_000, ) else: return self.agent.get( f"{self.url}/v2/model/{model_id}/file/{file_id}/download", stream=True, timeout=10_000, )
[docs] def get_download_by_filename( self, model_id: str, semver: str, filename: str, ): """Download a specific file. :param model_id: Unique model ID :param semver: Semver of the release :param filename: The filename trying to download from :return: The filename """ if isinstance(self.agent, TokenAgent): return self.agent.get( f"{self.url}/v2/token/model/{model_id}/release/{semver}/file/{filename}/download", stream=True, timeout=10_000, ) else: return self.agent.get( f"{self.url}/v2/model/{model_id}/release/{semver}/file/{filename}/download", stream=True, timeout=10_000, )
[docs] def simple_upload(self, model_id: str, name: str, buffer: BytesIO): """Upload a file associated with a model. :param model_id: Unique model ID. :param name: Name of the file to upload. :param buffer: File-like BytesIO object containing the data to upload. :return: Response object from the upload endpoint. """ return self.agent.post( f"{self.url}/v2/model/{model_id}/files/upload/simple", params={"name": name}, data=buffer, stream=True, timeout=10_000, )
# def start_multi_upload(): TBC # def finish_multi_upload(): TBC
[docs] def delete_file( self, model_id: str, file_id: str, ): """Delete a specific file associated with a model. :param model_id: Unique model ID :param file_id: Unique file ID :return: JSON response object """ return self.agent.delete( f"{self.url}/v2/model/{model_id}/file/{file_id}", ).json()
[docs] def get_all_images( self, model_id: str, ): """Get all images. :param model_id: A unique model ID :return: JSON response object """ return self.agent.get(f"{self.url}/v2/model/{model_id}/images").json()
[docs] def get_all_schemas( self, kind: SchemaKind | None = None, ): """Get all schemas. :param kind: Enum to define schema kind (e.g. Model or AccessRequest), defaults to None :return: JSON response object """ return self.agent.get( f"{self.url}/v2/schemas", params={"kind": kind}, ).json()
[docs] def get_schema( self, schema_id: str, ): """Retrieve a specific schema using its unique ID. :param schema_id: Unique schema ID :return: JSON response object. """ return self.agent.get( f"{self.url}/v2/schema/{schema_id}", ).json()
[docs] def post_schema( self, schema_id: str, name: str, description: str, kind: SchemaKind, json_schema: dict[str, Any], review_roles: list[str], ): """Create a schema. :param schema_id: Unique schema ID :param name: Name of the schema :param description: Description for the schema :param kind: Enum to define schema kind (e.g. Model or AccessRequest) :param json_schema: JSON schema :param review_roles: List made up of the "shortName" property from a Review Role object :return: JSON response object """ return self.agent.post( f"{self.url}/v2/schemas", json={ "id": schema_id, "name": name, "description": description, "kind": str(kind), "jsonSchema": json_schema, "reviewRoles": review_roles, }, ).json()
[docs] def get_reviews( self, active: bool, model_id: str | None = None, version: str | None = None, ): """Get all reviews within given parameters. :param active: Boolean representing status of review :param model_id: Unique model ID, defaults to None :param version: Model version, defaults to None :return: JSON response object. """ _active = str(active).lower() return self.agent.get( f"{self.url}/v2/reviews", params={ "active": _active, "modelId": model_id, "semver": version, }, ).json()
[docs] def post_release_review( self, model_id: str, version: str, role: str, decision: str, comment: str | None = None, ): """Create a review for a release. :param model_id: A unique model ID :param version: Semver of the release :param role: The role of the user making the review :param decision: Either approve or request changes :param comment: A comment to go with the review """ filtered_json = filter_none({"role": role, "decision": decision, "comment": comment}) return self.agent.post( f"{self.url}/v2/model/{model_id}/release/{version}/review", json=filtered_json, ).json()
[docs] def get_model_roles( self, model_id: str, ): """ Get roles for a model. :param model_id: Unique model ID :return: JSON response object """ return self.agent.get( f"{self.url}/v2/model/{model_id}/roles", ).json()
[docs] def get_access_request(self, model_id: str, access_request_id: str): """Retrieve a specific access request given its unique ID. :param model_id: Unique model ID :param access_request_id: Unique access request ID :return: JSON response object """ return self.agent.get( f"{self.url}/v2/model/{model_id}/access-request/{access_request_id}", ).json()
[docs] def get_access_requests( self, model_id: str, ): """Retrieve all access requests given a specific model. :param model_id: Unique model ID :param access_request_id: Unique access request ID :return: JSON response object """ return self.agent.get( f"{self.url}/v2/model/{model_id}/access-requests", ).json()
[docs] def post_access_request(self, model_id: str, metadata: Any, schema_id: str): """Create an access request given a model ID. :param model_id: Unique model ID :param metadata: Metadata object, defined by access request schema :param schema_id: Unique schema ID :return: JSON response object """ return self.agent.post( f"{self.url}/v2/model/{model_id}/access-requests", json={"schemaId": schema_id, "metadata": metadata}, ).json()
[docs] def delete_access_request(self, model_id: str, access_request_id: str): """Delete a specific access request associated with a model. :param model_id: Unique model ID :param access_request_id: Unique access request ID :return: JSON response object """ return self.agent.delete( f"{self.url}/v2/model/{model_id}/access-request/{access_request_id}", ).json()
[docs] def patch_access_request( self, model_id: str, access_request_id: str, metadata: Any, schema_id: str | None = None, ): """Update an access request given its unique ID. :param model_id: Unique model ID :param access_request_id: Unique access request ID :metadata: Metadata object, defined by access request schemas :return: JSON response object """ filtered_json = filter_none({"schemaId": schema_id, "metadata": metadata}) return self.agent.patch( f"{self.url}/v2/model/{model_id}/access-request/{access_request_id}", json=filtered_json, ).json()
[docs] def put_file_scan( self, model_id: str, file_id: str, ): """ Manually re-request a new antivirus scan for a file. :param model_id: Unique model ID :param file_id: Unique file ID :return: JSON response object """ return self.agent.put(f"{self.url}/v2/filescanning/model/{model_id}/file/{file_id}/scan", json={}).json()
[docs] def post_access_request_review( self, model_id: str, access_request_id: str, role: str, decision: str, comment: str | None = None, ): """Create a review for a release. :param model_id: A unique model ID :param access_request_id: Unique access request ID :param role: The role of the user making the review :param decision: Either approve or request changes :param comment: A comment to go with the review """ filtered_json = filter_none({"role": role, "decision": decision, "comment": comment}) return self.agent.post( f"{self.url}/v2/model/{model_id}/access-request/{access_request_id}/review", json=filtered_json, ).json()