Source code for pykechain.models.service

import os
from datetime import datetime
from typing import Dict, Optional, Union

import requests

from pykechain.enums import (
    ServiceEnvironmentVersion,
    ServiceExecutionStatus,
    ServiceScriptUser,
    ServiceType,
)
from pykechain.exceptions import APIError
from pykechain.models.base import Base, BaseInScope
from pykechain.models.input_checks import check_enum, check_text, check_type
from pykechain.utils import Empty, clean_empty_values, empty, parse_datetime


[docs] class Service(BaseInScope): """ A virtual object representing a KE-chain Service. .. versionadded:: 1.13 :ivar id: id of the service :type id: uuid :ivar name: name of the service :type name: str :ivar description: description of the service :type description: str :ivar version: version number of the service, as provided by uploaded :type version: str :ivar type: type of the service. One of the :class:`ServiceType` :type type: str :ivar filename: filename of the service :type filename: str :ivar environment: environment in which the service will execute. One of :class:`ServiceEnvironmentVersion` :type environment: str :ivar updated_at: datetime in UTC timezone when the Service was last updated :type updated_at: datetime .. versionadded:: 3.0 :ivar trusted: Trusted flag. If the kecpkg is trusted. :ivar run_as: User to run the script as. One of :class:`ServiceScriptUser`. :ivar verified_on: Date when the kecpkg was verified by KE-chain (if verification pipeline is enabled) :ivar verification_results: Results of the verification (if verification pipeline is enabled) """ def __init__(self, json, **kwargs): """Construct a service from provided json data.""" super().__init__(json, **kwargs) del self.created_at self.description = json.get("description", "") self.version = json.get("script_version", "") self.filename = json.get("script_file_name") self.type = json.get("script_type") self.environment = json.get("env_version") # for SIM3 version self.trusted: bool = json.get("trusted") self.run_as: str = json.get("run_as") self.verified_on: Optional[datetime] = parse_datetime(json.get("verified_on")) self.verification_results: Dict = json.get("verification_results") def __repr__(self): # pragma: no cover return f"<pyke Service '{self.name}' id {self.id[-8:]}>"
[docs] def execute( self, interactive: Optional[bool] = False, **kwargs ) -> "ServiceExecution": """ Execute the service. For interactive (notebook) service execution, set interactive to True, defaults to False. .. versionadded:: 1.13 :param interactive: (optional) True if the notebook service should execute in interactive mode. :type interactive: bool or None :return: ServiceExecution when successful. :raises APIError: when unable to execute """ request_params = dict( interactive=check_type(interactive, bool, "interactive"), format="json", **kwargs, ) url = self._client._build_url("service_execute", service_id=self.id) response = self._client._request("GET", url, params=request_params) if response.status_code == requests.codes.conflict: # pragma: no cover raise APIError( f"Conflict: Could not execute Service {self} as it is already running.", response=response, ) elif response.status_code != requests.codes.accepted: # pragma: no cover raise APIError(f"Could not execute Service {self}", response=response) data = response.json() return ServiceExecution(json=data.get("results")[0], client=self._client)
[docs] def edit( self, name: Optional[Union[str, Empty]] = empty, description: Optional[Union[str, Empty]] = empty, version: Optional[Union[str, Empty]] = empty, type: Optional[Union[ServiceType, Empty]] = empty, environment_version: Optional[Union[ServiceEnvironmentVersion, Empty]] = empty, run_as: Optional[Union[ServiceScriptUser, Empty]] = empty, trusted: Optional[Union[bool, Empty]] = empty, **kwargs, ) -> None: """ Edit Service details. Setting an input to None will clear out the value (exception being name). .. versionadded:: 1.13 :param name: (optional) name of the service to change. Cannot be cleared. :type name: basestring or None or Empty :param description: (optional) description of the service. Can be cleared. :type description: basestring or None or Empty :param version: (optional) version number of the service. Can be cleared. :type version: basestring or None or Empty :param type: (optional) script type (Python or Notebook). Cannot be cleared. :type type: ServiceType or None or Empty :param environment_version: (optional) environment version of the service. Cannot be cleared. :type environment_version: ServiceEnvironmentVersion or None or Empty :param run_as: (optional) user to run the service as. Defaults to kenode user (bound to scope). Cannot be cleared. :type run_as: ServiceScriptUser or None or Empty :param trusted: (optional) flag whether the service is trusted, default if False. Cannot be cleared. :type trusted: bool or None or Empty :raises IllegalArgumentError: when you provide an illegal argument. :raises APIError: if the service could not be updated. Example ------- >>> service.edit(name='Car service',version='203') Not mentioning an input parameter in the function will leave it unchanged. Setting a parameter as None will clear its value (where that is possible). The example below will clear the description and edit the name. >>> service.edit(name="Plane service",description=None) """ update_dict = { "id": self.id, "name": check_text(name, "name") or self.name, "description": check_text(description, "description") or "", "trusted": check_type(trusted, bool, "trusted") or self.trusted, "script_type": check_enum(type, ServiceType, "type") or self.type, "env_version": check_enum( environment_version, ServiceEnvironmentVersion, "environment version" ) or self.environment, "run_as": check_enum(run_as, ServiceScriptUser, "run_as") or self.run_as, "script_version": check_text(version, "version") or "", } if kwargs: # pragma: no cover update_dict.update(**kwargs) update_dict = clean_empty_values(update_dict=update_dict) response = self._client._request( "PUT", self._client._build_url("service", service_id=self.id), json=update_dict, ) if response.status_code != requests.codes.ok: # pragma: no cover raise APIError(f"Could not update Service {self}", response=response) self.refresh(json=response.json()["results"][0])
[docs] def delete(self) -> None: """Delete this service. :raises APIError: if delete was not successful. """ response = self._client._request( "DELETE", self._client._build_url("service", service_id=self.id) ) if response.status_code != requests.codes.no_content: # pragma: no cover raise APIError(f"Could not delete Service {self}", response=response)
[docs] def upload(self, pkg_path): """ Upload a python script (or kecpkg) to the service. .. versionadded:: 1.13 :param pkg_path: path to the python script or kecpkg to upload. :type pkg_path: basestring :raises APIError: if the python package could not be uploaded. :raises OSError: if the python package could not be located on disk. """ if os.path.exists(pkg_path): self._upload(pkg_path=pkg_path) else: raise OSError(f"Could not locate python package to upload in '{pkg_path}'")
def _upload(self, pkg_path): url = self._client._build_url("service_upload", service_id=self.id) with open(pkg_path, "rb") as pkg: response = self._client._request( "POST", url, files={"attachment": (os.path.basename(pkg_path), pkg)} ) if response.status_code != requests.codes.accepted: # pragma: no cover raise APIError( f"Could not upload script file (or kecpkg) to Service {self}", response=response, ) self.refresh(json=response.json()["results"][0])
[docs] def save_as(self, target_dir=None): """ Save the kecpkg service script to an (optional) target dir. Retains the filename of the service as known in KE-chain. .. versionadded:: 1.13 :param target_dir: (optional) target dir. If not provided will save to current working directory. :type target_dir: basestring or None :raises APIError: if unable to download the service. :raises OSError: if unable to save the service kecpkg file to disk. """ full_path = os.path.join(target_dir or os.getcwd(), self.filename) url = self._client._build_url("service_download", service_id=self.id) response = self._client._request("GET", url) if response.status_code != requests.codes.ok: # pragma: no cover raise APIError( f"Could not download script file from Service {self}", response=response ) with open(full_path, "w+b") as f: for chunk in response: f.write(chunk)
[docs] def get_executions(self, **kwargs): """ Retrieve the executions related to the current service. .. versionadded:: 1.13 :param kwargs: (optional) additional search keyword arguments to limit the search even further. :type kwargs: dict :return: list of ServiceExecutions associated to the current service. """ return self._client.service_executions( service=self.id, scope=self.scope_id, **kwargs )
[docs] class ServiceExecution(Base): """ A virtual object representing a KE-chain Service Execution. .. versionadded:: 1.13 :ivar id: id of the service execution :type id: uuid :ivar name: name of the service to which the execution is associated :type name: str :ivar status: status of the service. One of :class:`ServiceExecutionStatus` :type status: str :ivar service: the :class:`Service` object associated to this service execution :type service: :class:`Service` :ivar service_id: the uuid of the associated Service object :type service_id: uuid :ivar user: (optional) username of the user that executed the service :type user: str or None :ivar activity_id: (optional) the uuid of the activity where the service was executed from :type activity_id: uuid or None """ def __init__(self, json, **kwargs): """Construct a scope from provided json data.""" super().__init__(json, **kwargs) del self.created_at del self.updated_at self.name = json.get("service_name") self.service_id = json.get("service") self.status: ServiceExecutionStatus = json.get("status", "") self.user = json.get("username") if json.get("activity") is not None: self.activity_id: Optional[str] = json["activity"].get("id") else: self.activity_id = None self.started_at: Optional[datetime] = parse_datetime(json.get("started_at")) self.finished_at: Optional[datetime] = parse_datetime(json.get("finished_at")) self._service: Optional[Service] = None def __repr__(self): # pragma: no cover return f"<pyke ServiceExecution '{self.name}' id {self.id[-8:]}>" @property def service(self) -> Service: """Retrieve the `Service` object to which this execution is associated.""" if not self._service: self._service = self._client.service(id=self.service_id) return self._service
[docs] def terminate(self): """ Terminate the Service execution. .. versionadded:: 1.13 :return: None if the termination request was successful :raises APIError: When the service execution could not be terminated. """ url = self._client._build_url( "service_execution_terminate", service_execution_id=self.id ) response = self._client._request("GET", url, params=dict(format="json")) if response.status_code != requests.codes.accepted: # pragma: no cover raise APIError(f"Could not terminate Service {self}", response=response)
[docs] def get_log(self, target_dir=None, log_filename="log.txt"): """ Retrieve the log of the service execution. .. versionadded:: 1.13 :param target_dir: (optional) directory path name where the store the log.txt to. :type target_dir: basestring or None :param log_filename: (optional) log filename to write the log to, defaults to `log.txt`. :type log_filename: basestring or None :raises APIError: if the logfile could not be found. :raises OSError: if the file could not be written. """ full_path = os.path.join(target_dir or os.getcwd(), log_filename) url = self._client._build_url( "service_execution_log", service_execution_id=self.id ) response = self._client._request("GET", url) if response.status_code != requests.codes.ok: # pragma: no cover raise APIError( f"Could not download execution log of Service {self}", response=response ) with open(full_path, "w+b") as f: for chunk in response: f.write(chunk)
[docs] def get_notebook_url(self): """ Get the url of the notebook, if the notebook is executed in interactive mode. .. versionadded:: 1.13 :return: full url to the interactive running notebook as `basestring` :raises APIError: when the url cannot be retrieved. """ url = self._client._build_url( "service_execution_notebook_url", service_execution_id=self.id ) response = self._client._request("GET", url, params=dict(format="json")) if response.status_code != requests.codes.ok: raise APIError( f"Could not retrieve notebook url of Service {self}", response=response ) data = response.json() url = data.get("results")[0].get("url") return url