Source code for pykechain.models.expiring_download
import datetime
import os
from typing import Dict, Optional
import requests
from pykechain.exceptions import APIError
from pykechain.models import Base
from pykechain.models.input_checks import check_type
from pykechain.utils import clean_empty_values, empty
[docs]
class ExpiringDownload(Base):
"""Expiring Download class."""
def __init__(self, json: Dict, **kwargs) -> None:
"""
Init function.
:param json: the json response to construct the :class:`ExpiringDownload` from
:type json: dict
"""
super().__init__(json, **kwargs)
self.filename = json.get("content_file_name")
self.expires_in = json.get("expires_in")
self.expires_at = json.get("expires_at")
self.token_hint = json.get("token_hint")
def __repr__(self): # pragma: no cover
return f"<pyke ExpiringDownload id {self.id[-8:]}>"
[docs]
def save_as(self, target_dir: Optional[str] = None) -> None:
"""
Save the Expiring Download content.
:param target_dir: the target directory where the file will be stored
:type target_dir: str
"""
full_path = os.path.join(target_dir or os.getcwd(), self.filename)
url = self._client._build_url("expiring_download_download", download_id=self.id)
response = self._client._request("GET", url)
if response.status_code != requests.codes.ok: # pragma: no cover
raise APIError(
f"Could not download file from Expiring download {self}",
response=response,
)
with open(full_path, "w+b") as f:
for chunk in response:
f.write(chunk)
[docs]
def delete(self) -> None:
"""Delete this expiring download.
:raises APIError: if delete was not successful.
"""
response = self._client._request(
"DELETE", self._client._build_url("expiring_download", download_id=self.id)
)
if response.status_code != requests.codes.no_content: # pragma: no cover
raise APIError(
f"Could not delete Expiring Download {self}", response=response
)
[docs]
def edit(
self,
expires_at: Optional[datetime.datetime] = empty,
expires_in: Optional[int] = empty,
**kwargs,
) -> None:
"""
Edit Expiring Download details.
:param expires_at: The moment at which the ExpiringDownload will expire
:type expires_at: datetime.datetime
:param expires_in: The amount of time (in seconds) in which the ExpiringDownload will expire
:type expires_in: int
"""
update_dict = {
"id": self.id,
"expires_at": check_type(expires_at, datetime.datetime, "expires_at")
or self.expires_at,
"expires_in": check_type(expires_in, int, "expires_in") or self.expires_in,
}
if kwargs: # pragma: no cover
update_dict.update(**kwargs)
update_dict = clean_empty_values(update_dict=update_dict)
response = self._client._request(
"PUT",
self._client._build_url("expiring_download", download_id=self.id),
json=update_dict,
)
if response.status_code != requests.codes.ok: # pragma: no cover
raise APIError(
f"Could not update Expiring Download {self}", response=response
)
self.refresh(json=response.json()["results"][0])
[docs]
def upload(self, content_path):
"""
Upload a file to the Expiring Download.
.. versionadded:: 3.10.0
:param content_path: path to the file to upload.
:type content_path: basestring
:raises APIError: if the file could not be uploaded.
:raises OSError: if the file could not be located on disk.
"""
if os.path.exists(content_path):
self._upload(content_path=content_path)
else:
raise OSError(f"Could not locate file to upload in '{content_path}'")
def _upload(self, content_path):
url = self._client._build_url("expiring_download_upload", download_id=self.id)
with open(content_path, "rb") as file:
response = self._client._request(
"POST",
url,
files={"attachment": (os.path.basename(content_path), file)},
)
if response.status_code not in (
requests.codes.accepted,
requests.codes.ok,
): # pragma: no cover
raise APIError(
f"Could not upload file to Expiring Download {self}", response=response
)
self.refresh(json=response.json()["results"][0])