from typing import List, Optional
from mi.framework.http import HTTPSession
from mi.framework.router import Route
from mi.wrapper.models import RawFile
__all__ = ('MiFile', 'check_upload', 'get_file_ids')
[docs]class MiFile:
__slots__ = (
'path',
'file_id',
'name',
'folder_id',
'comment',
'is_sensitive',
'force'
)
def __init__(self, path: Optional[str] = None,
file_id: Optional[str] = None,
name: Optional[str] = None,
folder_id: Optional[str] = None,
comment: Optional[str] = None,
is_sensitive: bool = False,
force: bool = False
):
"""
Parameters
----------
path : Optional[str], default=None
path to a local file
file_id : Optional[str], default=None
ID of the file that exists on the drive
name Optional[str], default=None
file name
folder_id : Optional[str], default=None
Folder ID
comment : Optional[str], default=None
Comments on files
is_sensitive : Optional[str], default=None
Whether this item is sensitive
force : bool, default=False
Whether to force overwriting even if it already exists on the drive
"""
self.path = path
self.file_id = file_id
self.name = name
self.folder_id = folder_id
self.comment = comment
self.is_sensitive = is_sensitive
self.force = force
async def check_upload(files: List[MiFile]):
_files = []
for file in files:
if file.path:
endpoint = Route('POST', '/api/drive/files/create')
data = {'file': open(file.path, 'rb'),
'name': file.name,
'folderId': file.folder_id,
'isSensitive': file.is_sensitive,
'comment': file.comment,
'force': file.force}
_files.append(RawFile(await HTTPSession.request(endpoint, auth=True, data=data, lower=True)).id)
else:
_files.append(file.file_id)
return _files
async def get_file_ids(files: List[MiFile]):
return await check_upload(files)