classLexyClient:""" LexyClient class Attributes: base_url (str): Base URL for the Lexy API. aclient (httpx.AsyncClient): Asynchronous API client. client (httpx.Client): Synchronous API client. binding (BindingClient): Client for interacting with the Bindings API. collection (CollectionClient): Client for interacting with the Collections API. document (DocumentClient): Client for interacting with the Documents API. index (IndexClient): Client for interacting with the Indexes API. transformer (TransformerClient): Client for interacting with the Transformers API. """base_url:straclient:httpx.AsyncClientclient:httpx.Clientbinding:BindingClientcollection:CollectionClientdocument:DocumentClientindex:IndexClienttransformer:TransformerClientdef__init__(self,base_url:str=DEFAULT_BASE_URL,api_timeout:int=API_TIMEOUT,client_kwargs:dict=None,aclient_kwargs:dict=None,)->None:"""Initialize a LexyClient instance. Args: base_url (str, optional): Base URL for the Lexy API. Defaults to DEFAULT_BASE_URL. api_timeout (int, optional): Timeout in seconds for API requests. Defaults to API_TIMEOUT. client_kwargs (dict, optional): Keyword args for the synchronous API client. aclient_kwargs (dict, optional): Keyword args for the asynchronous API client. """self.base_url=base_urlself.api_timeout=api_timeoutclient_kwargs=client_kwargsor{}self._client=httpx.Client(base_url=self.base_url,timeout=self.api_timeout,**client_kwargs)aclient_kwargs=aclient_kwargsor{}self._aclient=httpx.AsyncClient(base_url=self.base_url,timeout=self.api_timeout,**aclient_kwargs)# bindingself.binding=BindingClient(self)self.create_binding=self.binding.create_bindingself.delete_binding=self.binding.delete_bindingself.get_binding=self.binding.get_bindingself.list_bindings=self.binding.list_bindingsself.update_binding=self.binding.update_binding# collectionself.collection=CollectionClient(self)self.create_collection=self.collection.create_collectionself.delete_collection=self.collection.delete_collectionself.get_collection=self.collection.get_collectionself.list_collections=self.collection.list_collectionsself.update_collection=self.collection.update_collection# documentself.document=DocumentClient(self)self.add_documents=self.document.add_documentsself.bulk_delete_documents=self.document.bulk_delete_documentsself.delete_document=self.document.delete_documentself.get_document=self.document.get_documentself.list_documents=self.document.list_documentsself.update_document=self.document.update_documentself.upload_documents=self.document.upload_documents# indexself.index=IndexClient(self)self.create_index=self.index.create_indexself.delete_index=self.index.delete_indexself.get_index=self.index.get_indexself.list_indexes=self.index.list_indexesself.query_index=self.index.query_indexself.update_index=self.index.update_index# transformerself.transformer=TransformerClient(self)self.create_transformer=self.transformer.create_transformerself.delete_transformer=self.transformer.delete_transformerself.get_transformer=self.transformer.get_transformerself.list_transformers=self.transformer.list_transformersself.update_transformer=self.transformer.update_transformerself.transform_document=self.transformer.transform_document@propertydefclient(self)->httpx.Client:ifself._clientisNone:raiseLexyClientError("Client is not set.")returnself._client@client.setterdefclient(self,value)->None:self._client=value@propertydefaclient(self)->httpx.AsyncClient:ifself._aclientisNone:raiseLexyClientError("AsyncClient is not set.")returnself._aclient@aclient.setterdefaclient(self,value)->None:self._aclient=valueasyncdef__aenter__(self)->"LexyClient":"""Async context manager entry point."""returnselfasyncdef__aexit__(self,exc_type,exc,tb)->None:"""Async context manager exit point."""awaitself.aclient.aclose()def__enter__(self)->"LexyClient":"""Synchronous context manager entry point."""returnselfdef__exit__(self,exc_type,exc,tb)->None:"""Synchronous context manager exit point."""self.client.close()defget(self,url:str,**kwargs)->httpx.Response:"""Synchronous GET request."""returnself.client.get(url,**kwargs)asyncdefaget(self,url:str,**kwargs)->httpx.Response:"""Async GET request."""returnawaitself.aclient.get(url,**kwargs)defpost(self,url:str,**kwargs)->httpx.Response:"""Synchronous POST request."""returnself.client.post(url,**kwargs)asyncdefapost(self,url:str,**kwargs)->httpx.Response:"""Async POST request."""returnawaitself.aclient.post(url,**kwargs)defpatch(self,url:str,**kwargs)->httpx.Response:"""Synchronous PATCH request."""returnself.client.patch(url,**kwargs)asyncdefapatch(self,url:str,**kwargs)->httpx.Response:"""Async PATCH request."""returnawaitself.aclient.patch(url,**kwargs)defdelete(self,url:str,**kwargs)->httpx.Response:"""Synchronous DELETE request."""returnself.client.delete(url,**kwargs)asyncdefadelete(self,url:str,**kwargs)->httpx.Response:"""Async DELETE request."""returnawaitself.aclient.delete(url,**kwargs)@propertydefbindings(self):"""Get a list of all bindings."""returnself.binding.list_bindings()@propertydefcollections(self):"""Get a list of all collections."""returnself.collection.list_collections()@propertydefindexes(self):"""Get a list of all indexes."""returnself.index.list_indexes()@propertydeftransformers(self):"""Get a list of all transformers."""returnself.transformer.list_transformers()definfo(self):"""Print info about the Lexy server."""collections_str="\n".join([f"\t- {c.__repr__()}"forcinself.collections])indexes_str="\n".join([f"\t- {i.__repr__()}"foriinself.indexes])transformers_str="\n".join([f"\t- {t.__repr__()}"fortinself.transformers])bindings_str="\n".join([f"\t- {b.__repr__()}"forbinself.bindings])info_str=(f"Lexy server <{self.base_url}>\n\n"f"{len(self.collections)} Collections\n{collections_str}\n"f"{len(self.indexes)} Indexes\n{indexes_str}\n"f"{len(self.transformers)} Transformers\n{transformers_str}\n"f"{len(self.bindings)} Bindings\n{bindings_str}\n")print(info_str)
def__init__(self,base_url:str=DEFAULT_BASE_URL,api_timeout:int=API_TIMEOUT,client_kwargs:dict=None,aclient_kwargs:dict=None,)->None:"""Initialize a LexyClient instance. Args: base_url (str, optional): Base URL for the Lexy API. Defaults to DEFAULT_BASE_URL. api_timeout (int, optional): Timeout in seconds for API requests. Defaults to API_TIMEOUT. client_kwargs (dict, optional): Keyword args for the synchronous API client. aclient_kwargs (dict, optional): Keyword args for the asynchronous API client. """self.base_url=base_urlself.api_timeout=api_timeoutclient_kwargs=client_kwargsor{}self._client=httpx.Client(base_url=self.base_url,timeout=self.api_timeout,**client_kwargs)aclient_kwargs=aclient_kwargsor{}self._aclient=httpx.AsyncClient(base_url=self.base_url,timeout=self.api_timeout,**aclient_kwargs)# bindingself.binding=BindingClient(self)self.create_binding=self.binding.create_bindingself.delete_binding=self.binding.delete_bindingself.get_binding=self.binding.get_bindingself.list_bindings=self.binding.list_bindingsself.update_binding=self.binding.update_binding# collectionself.collection=CollectionClient(self)self.create_collection=self.collection.create_collectionself.delete_collection=self.collection.delete_collectionself.get_collection=self.collection.get_collectionself.list_collections=self.collection.list_collectionsself.update_collection=self.collection.update_collection# documentself.document=DocumentClient(self)self.add_documents=self.document.add_documentsself.bulk_delete_documents=self.document.bulk_delete_documentsself.delete_document=self.document.delete_documentself.get_document=self.document.get_documentself.list_documents=self.document.list_documentsself.update_document=self.document.update_documentself.upload_documents=self.document.upload_documents# indexself.index=IndexClient(self)self.create_index=self.index.create_indexself.delete_index=self.index.delete_indexself.get_index=self.index.get_indexself.list_indexes=self.index.list_indexesself.query_index=self.index.query_indexself.update_index=self.index.update_index# transformerself.transformer=TransformerClient(self)self.create_transformer=self.transformer.create_transformerself.delete_transformer=self.transformer.delete_transformerself.get_transformer=self.transformer.get_transformerself.list_transformers=self.transformer.list_transformersself.update_transformer=self.transformer.update_transformerself.transform_document=self.transformer.transform_document
definfo(self):"""Print info about the Lexy server."""collections_str="\n".join([f"\t- {c.__repr__()}"forcinself.collections])indexes_str="\n".join([f"\t- {i.__repr__()}"foriinself.indexes])transformers_str="\n".join([f"\t- {t.__repr__()}"fortinself.transformers])bindings_str="\n".join([f"\t- {b.__repr__()}"forbinself.bindings])info_str=(f"Lexy server <{self.base_url}>\n\n"f"{len(self.collections)} Collections\n{collections_str}\n"f"{len(self.indexes)} Indexes\n{indexes_str}\n"f"{len(self.transformers)} Transformers\n{transformers_str}\n"f"{len(self.bindings)} Bindings\n{bindings_str}\n")print(info_str)