o
    "j^                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	 ddl
mZ g ZG dd deZG dd deZG d	d
 d
eZG dd deZG dd deZG dd dZG dd deZdddZG dd deZG dd deZdS )    N)core   )loggerc                   @      e Zd ZdS )ExecuteErrorN__name__
__module____qualname__ r   r   b/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/fleet/utils/fs.pyr           r   c                   @   r   )FSFileExistsErrorNr   r   r   r   r   r   $   r   r   c                   @   r   )FSFileNotExistsErrorNr   r   r   r   r   r   (   r   r   c                   @   r   )	FSTimeOutNr   r   r   r   r   r   ,   r   r   c                   @   r   )FSShellCmdAbortedNr   r   r   r   r   r   0   r   r   c                   @   s   e Zd Zejdd Zejdd Zejdd Zejdd Zejd	d
 Z	ejdd Z
ejdd Zejdd Zejdd Zejdd Zejd"ddZejdd Zejdd Zejd#ddZejd$d d!ZdS )%FSc                 C      t NNotImplementedErrorselffs_pathr   r   r   ls_dir5      z	FS.ls_dirc                 C   r   r   r   r   r   r   r   is_file9   r   z
FS.is_filec                 C   r   r   r   r   r   r   r   is_dir=   r   z	FS.is_dirc                 C   r   r   r   r   r   r   r   is_existA   r   zFS.is_existc                 C   r   r   r   )r   
local_pathr   r   r   r   uploadE   r   z	FS.uploadc                 C   r   r   r   )r   r   r   r   r   r   downloadI   r   zFS.downloadc                 C   r   r   r   r   r   r   r   mkdirsM   r   z	FS.mkdirsc                 C   r   r   r   r   r   r   r   deleteQ   r   z	FS.deletec                 C   r   r   r   r   r   r   r   need_upload_downloadU   r   zFS.need_upload_downloadc                 C   r   r   r   r   fs_src_pathfs_dst_pathr   r   r   renameY   r   z	FS.renameFc                 C   r   r   r   r   r'   r(   	overwritetest_existsr   r   r   mv]   r   zFS.mvc                 C   r   r   r   )r   	local_dirdest_dirr   r   r   
upload_dira   r   zFS.upload_dirc                 C   r   r   r   r   r   r   r   	list_dirse   r   zFS.list_dirsTc                 C   r   r   r   r   r   exist_okr   r   r   touchi   r   zFS.touchNc                 C   r   r   r   r   r   r   r   catm   r   zFS.catFFTr   )r   r	   r
   abcabstractmethodr   r   r   r   r    r!   r"   r#   r%   r)   r-   r0   r1   r4   r5   r   r   r   r   r   4   s>    











r   c                   @   s|   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dd Zdd Zdd ZdddZd ddZdd ZdS )!LocalFSa(  
    A tool of local file system.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> from paddle.distributed.fleet.utils import LocalFS

            >>> client = LocalFS()
            >>> subdirs, files = client.ls_dir("./")

    c                 C   s\   |  |s	g g fS g }g }t|D ]}tj|d | r$|| q|| q||fS )az  
        List directorys and files under `fs_path` .

        Args:
            fs_path(str): The local file path.

        Returns:
            Tuple: Return a 2-tuple, the first is a list of all its subdirectories,
            and the second is a list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]).

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> subdirs, files = client.ls_dir("./")

        /)r   oslistdirpathisdirappend)r   r   dirsfilesfr   r   r   r      s   
zLocalFS.ls_dirc                 C   s,   t j|rJ | dt j|dd dS )a  
        Create a local directory.

        Args:
            fs_path(str): The local directory path.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.mkdirs("test_mkdirs")
                >>> client.delete("test_mkdirs")

        z is already a fileT)r3   N)r<   r>   isfilemakedirsr   r   r   r   r"      s   zLocalFS.mkdirsc                 C   s   t || dS )aI  
        Rename the file.

        Args:
            fs_src_path(str): The actual name of the file or directory
            fs_dst_path(str): The new name of the file or directory.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.touch("test_rename_src")
                >>> print(client.is_exist("test_rename_src"))
                True
                >>> client.rename("test_rename_src", "test_rename_dst")
                >>> print(client.is_exist("test_rename_src"))
                False
                >>> print(client.is_exist("test_rename_dst"))
                True
                >>> client.delete("test_rename_dst")

        N)r<   r)   r&   r   r   r   r)      s   zLocalFS.renamec                 C      t | d S r   )shutilrmtreer   r   r   r   _rmr      zLocalFS._rmrc                 C   rF   r   )r<   remover   r   r   r   _rm   rJ   zLocalFS._rmc                 C   s.   |  |sdS tj|r| |S | |S )a  
        Delete the local file path, whether it's a file or directory.

        Args:
            fs_path(str): The local file path.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.mkdirs("test_localFS_mkdirs")
                >>> client.delete("test_localFS_mkdirs")

        N)r   r<   r>   rD   rL   rI   r   r   r   r   r#      s
   


zLocalFS.deletec                 C      dS )NFr   r$   r   r   r   r%         zLocalFS.need_upload_downloadc                 C      t j|S )au  
        Whether the local file path is a file.

        Args:
            fs_path(str): The local file path.

        Returns:
            Bool: Return true if the path exists and it's a file, otherwise return false.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.touch("test_is_file")
                >>> print(client.is_file("test_is_file"))
                True
                >>> client.delete("test_is_file")

        )r<   r>   rD   r   r   r   r   r         zLocalFS.is_filec                 C   rO   )a|  
        Whether the local file path is a directory.

        Args:
            fs_path(str): The local file path.

        Returns:
            Bool: Return true if the path exists and it's a directory, otherwise return false.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.mkdirs("test_is_dir")
                >>> print(client.is_dir("test_is_dir"))
                True
                >>> client.delete("test_is_dir")

        r<   r>   r?   r   r   r   r   r     rP   zLocalFS.is_dirc                 C   rO   )a  
        Whether the local file path exists.

        Args:
            fs_path(str): The local file path.

        Returns:
            Bool: Wheter it's a file or directory, return true if the path exists,
            otherwise return false.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> local_fs = LocalFS()
                >>> ret = local_fs.is_exist("test_is_exist")

        )r<   r>   existsr   r   r   r   r   )  s   zLocalFS.is_existTc                 C   sF   |  |r|r	dS tt|d	 W d   dS 1 sw   Y  dS )a1  
        Create a local file.

        Args:
            fs_path(str): The local file path.
            exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false,
            program will throw an Exception. Default is true.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.touch("test_touch")
                >>> client.delete("test_touch")

        Na)r   r   openr2   r   r   r   r4   @  s   
"zLocalFS.touchFc                 C   s@   |  |st|r|  |r| | |  |rt| ||S )a  
        Move a local file or directory from `src_path` to `dst_path` .

        Args:
            src_path(str):  Name of the file or directory, that's needed to be moved.
            dst_path(str):  Name of the file or directory to which to move to.
            overwrite(bool): Whether to re-write `dst_path` if that exists. Default is False.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.touch("test_mv_src")
                >>> client.mv("test_mv_src", "test_mv_dst")
                >>> client.delete("test_mv_dst")

        )r   r   r#   r   r)   )r   Zsrc_pathZdst_pathr+   r,   r   r   r   r-   [  s   


z
LocalFS.mvc                    s*   |   sg S  fddt D }|S )a  
        Only list directorys under `fs_path` .

        Args:
            fs_path(str): The local file path.

        Returns:
            List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...].

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> subdirs = client.list_dirs("./")

        c                    s$   g | ]}t j d  | r|qS r;   rQ   ).0rC   r   r   r   
<listcomp>  s
    z%LocalFS.list_dirs.<locals>.<listcomp>)r   r<   r=   )r   r   rA   r   rW   r   r1   {  s   

zLocalFS.list_dirsNr7   r6   )r   r	   r
   __doc__r   r"   r)   rI   rL   r#   r%   r   r   r   r4   r-   r1   r   r   r   r   r:   r   s    "

 r:   c                    s    fdd}|S )Nc                    s   t   fdd}|S )Nc               
      s   | d }}|d u rt |jd }n|d }t |jd }t }|}	 z | i |W S  tyY } z!t | |krJtd|  dt |  t| W Y d }~nd }~ww t | dkrrtd| t |  t }q$)Nr   g     @@Tzargs:z	 timeout:   z*hadoop operator timeout:args:{} timeout:{})	float	_time_out_sleep_intertimer   r   sleepprintformat)argskwargsotime_outinterstartZlast_print_timee)rC   max_time_outr   r   handler  s6   z2_handle_errors.<locals>.decorator.<locals>.handler)	functoolswraps)rC   rj   ri   )rC   r   	decorator  s   z!_handle_errors.<locals>.decoratorr   )ri   rn   r   rm   r   _handle_errors  s   #ro   c                   @   sP  e Zd ZdZ		dBddZdCdd	ZdCd
dZe dd Ze dd Z	dd Z
dd Ze dd Zdd Zdd Ze dd ZdDddZdEddZe d d! ZdEd"d#Ze d$d% Ze d&d' ZdFd)d*Ze d+d, Zd-d. Zd/d0 Ze d1d2 ZdGd3d4Ze d5d6 Zd7d8 ZdHd:d;Ze d<d= Zd>d? Z d@dA Z!d9S )I
HDFSClienta  
    A tool of HDFS.

    Args:
        hadoop_home(str): Hadoop home.
        configs(dict): Hadoop config. It is a dictionary and needs to contain the
            keys: "fs.default.name" and "hadoop.job.ugi".

    Examples:

        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> from paddle.distributed.fleet.utils import HDFSClient
            >>> hadoop_home = "/home/client/hadoop-client/hadoop/"

            >>> configs = {
            ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
            ...     "hadoop.job.ugi": "hello,hello123"
            ... }

            >>> client = HDFSClient(hadoop_home, configs)
            >>> client.ls_dir("hdfs:/test_hdfs_client")
            ([], [])

       c           
      C   s   g | _ d| }| j | d}| j | |r.| D ]\}}d| d| }	| j |	 q|| _|| _d| j | _td| _	d S )Nz%s/bin/hadoopfsz-D= z8\s?responseErrorMsg\s?\:.*, errorCode\:\s?[0-9]+, path\:)
Zpre_commandsr@   itemsr\   r]   join	_base_cmdrecompile
_bd_err_re)
r   Zhadoop_homeZconfigsre   sleep_interZ
hadoop_bindfskvZconfig_commandr   r   r   __init__  s   
zHDFSClient.__init__F   c           	      C   s|   | j  d| }d}d }d}t|d D ]}t|dd|\}}t|}|dkr* nt| q|dkr8t||| fS )Nz -r      r      )	rx   ranger   Zshell_execute_cmdintr^   r_   r   
splitlines)	r   cmdredirect_stderrretry_timesexe_cmdretoutputretry_sleep_secondxr   r   r   _run_cmd  s   zHDFSClient._run_cmdc                 C   s   | j g|  }d}d}d}t|d D ]I}ztj|dtj|r"tjntjdd}	|	j}W  n0 tjyK }
 z|
j	}|
j
}t| W Y d }
~
qd }
~
w ty] }
 zW Y d }
~
 nd }
~
ww |dkrft|d S )Nr    r   r   T)checkstdoutstderrtextr   )rx   splitr   
subprocessrunPIPESTDOUTr   CalledProcessError
returncoder   r^   r_   	Exceptionr   )r   r   r   r   r   r   r   r   r   processrh   r   r   r   _run_safe_cmd  s:   	zHDFSClient._run_safe_cmdc                 C       |  |sg S | |\}}|S )a)  
        Only list directorys under `fs_path` .

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...].

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> subdirs = client.list_dirs("hdfs:/test_hdfs_client")

        r   _ls_dirr   r   rA   rB   r   r   r   r1   !  s   
zHDFSClient.list_dirsc                 C      |  |s	g g fS | |S )a  
        List directorys and files under `fs_path` .

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Tuple: Return a 2-tuple, the first element is the list of all its subdirectories,
            and the second one is the list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]).

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> subdirs, files = client.ls_dir("hdfs:/test_hdfs_client")

        r   r   r   r   r   r   C  s   

zHDFSClient.ls_dirc           
      C   s   d|g}|  |\}}|dkrt|g }g }|D ](}| }t|dkr&qtj|d }	|d d dkr<||	 q||	 q||fS )Nz-lsr         d)r   r   r   lenr<   r>   basenamer@   )
r   r   r   r   linesrA   rB   linearrpr   r   r   r   e  s   zHDFSClient._ls_dirc                 C   s*   |D ]}| j |}|d ur|  S qd S r   )r{   match)r   r   lmr   r   r   _test_match{  s   zHDFSClient._test_matchc                 C      |  |sdS | |S )a.  
        Whether the remote HDFS path is a directory.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Bool: Return true if the path exists and it's a directory, otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> ret = client.is_file("hdfs:/test_hdfs_client")

        Fr   _is_dirr   r   r   r   r     s   

zHDFSClient.is_dirc                 C   sR   d| }| j |ddd\}}|r'| |r%td td| t|dS dS )Nztest -d Tr   r   r   zraise exception: 
F)r   r   r`   rw   r   )r   r   r   r   r   r   r   r   r     s   

zHDFSClient._is_dirc                 C      |  |sdS | | S )a$  
        Whether the remote HDFS path is a file.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Bool: Return true if the path exists and it's a file, otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> ret = client.is_file("hdfs:/test_hdfs_client")

        Fr   r   r   r   r   r     s   
zHDFSClient.is_filec                 C   s0   d| d}| j |ddd\}}|dkrdS dS )aB  
        Whether the remote HDFS path exists.

        Args:
            fs_path(str): The hdfs file path.

        Returns:
            Bool: Whether it's is file or directory, return true if the path exists,
            otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DITSTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> ret = client.is_exist("hdfs:/test_hdfs_client")

        ztest -e ru   Tr   r   r   F)r   )r   r   r   r   outr   r   r   r     s
   zHDFSClient.is_existc                 C   sl   | d}| d}tj|}| |d | r$|r$| |d |  | |s.| | | || dS z
        upload dir to hdfs
        Args:
            local_dir(str): local dir
            dest_dir(str): hdfs dest dir
            overwrite(bool): is overwrite
        Returns:
            return code
        r;   N)rstripr<   r>   r   r   r#   r"   _try_uploadr   r.   r/   r+   Zlocal_basenamer   r   r   r0     s   




zHDFSClient.upload_dirc                    s    fdd}dd }t  }||st| d||}|s%td dS  |r6|r6 |  | g }	t|D ]}
 ||
|}tj	|||fd}|	
| |  q<|	D ]}|  qZdS )	a  
        Upload the local path to remote HDFS.

        Args:
            local_path(str): The local path.
            fs_path(str): The HDFS path.
            multi_processes(int|1): the upload data process at the same time, default=5
            overwrite(bool|False): will overwrite file on HDFS or not

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.upload("test_hdfs_client", "hdfs:/test_hdfs_client")

        c                    s   |D ]}  ||  qd S r   )r   )Zhdfs_path_singledatasdatar$   r   r   Z__subprocess_upload&  s   z.HDFSClient.upload.<locals>.__subprocess_uploadc                 S   sZ   g }t j| s
|S t j| r&t | D ]}t j| |}|| q|S ||  |S )z
            get local files
            Args:
                path(str): local path
            Returns:
                list of local files
            )r<   r>   rR   r?   r=   rw   r@   )r>   Zrlistfiletr   r   r   get_local_files*  s   
z*HDFSClient.upload.<locals>.get_local_files not existsz/there are nothing need to upload, function exitNtargetrb   )r:   r   r   r`   r#   r"   r   _split_filesmultiprocessingProcessr@   rg   rw   )r   r   r   multi_processesr+   Z_HDFSClient__subprocess_uploadr   local	all_filesprocsiprocess_datasr   procr   r$   r   r    
  s.   





zHDFSClient.uploadc              
   C   s`   d| d| }d}z|  |\}}|dkrt|W d S  ty/ } z| | |d }~ww )Nzput ru   r   )r   r   r   r#   )r   r   r   r   r   _rh   r   r   r   r   Y  s   
zHDFSClient._try_uploadc                    s   fdd}  st  d r |S  \}} fdd|D }| fdd|D  g }	t|D ]}
||
|}tj	|||fd}|	
| |  q?|	D ]}|  q]dS )	al  
        Download remote HDFS path to the local.

        Args:
            fs_path(str):  The HDFS path.
            local_path(str): The local path.
            multi_processes(int|1): the download data process at the same time, default=1
            overwrite(bool): is overwrite

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.download("hdfs:/test_hdfs_client", "./")

        c                    s   |D ]}  ||  qdS z
            download file from HDFS
            Args:
                local_path(str): the local file path
                datas(str): the hdfs file path list
            N)_try_downloadr   r   r   r$   r   r   __subprocess_download  s   z2HDFSClient.download.<locals>.__subprocess_download
 not exitsc                       g | ]} d  | qS rU   r   rV   r   rW   r   r   rX         z'HDFSClient.download.<locals>.<listcomp>c                    r   rU   r   r   rW   r   r   rX     r   r   N)r   r   r   r   r   extendr   r   r   r   r@   rg   rw   )r   r   r   r   r+   Z _HDFSClient__subprocess_downloadrA   all_filenamesr   r   r   r   r   r   r   r   r   r   r!   f  s&   





zHDFSClient.downloadc              
   C   sf   d| d| }d}z|  |\}}|dkrt|W d S  ty2 } z
t }|| |d }~ww )Nzget ru   r   )r   r   r   r:   r#   )r   r   r   r   r   r   rh   Zlocal_fsr   r   r   r     s   
zHDFSClient._try_downloadc                 C   s   |  |rdS d}d| d}| j|dd\}}|dkr/|D ]
}d|v r(d} nq|s/t||rJ|  |sLd	| }| |\}}|dkrNt|dS dS dS )
a  
        Create a remote HDFS directory.

        Args:
            fs_path(str): The HDFS directory path.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.mkdirs("hdfs:/test_hdfs_client")

        NFzmkdir ru   T)r   r   zNo such file or directoryz	mkdir -p )r   r   r   )r   r   Zout_hdfsr   r   r   r   r   r   r   r   r"     s(   

zHDFSClient.mkdirsTc                 C   sX   |r|  |r| | |r&|  |st| d|  |r&t| d| ||S )a  
        Move a remote HDFS file or directory from `fs_src_path` to `fs_dst_path` .

        Args:
            fs_src_path(str):  Name of the file or directory, that's needed to be moved.
            fs_dst_path(str):  Name of the file or directory to which to move to.
            overwrite(bool): Whether to re-write `fs_dst_path` if that exists. Default is False.
            test_exists(bool): Check the existence of `fs_src_path` and `fs_dst_path` . When `test_exists` is set true, if `fs_src_path` doesn't exist or `fs_dst_path` exists, program will throw an Excetption.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.mv("hdfs:/test_hdfs_client", "hdfs:/test_hdfs_client2")

         is not exists exists already)r   r#   r   r   _try_mvr*   r   r   r   r-     s   


zHDFSClient.mvc              
   C   s|   d| d| }d}z| j |dd\}}|dkrt|W d S  ty= } z| |s7| |r7W Y d }~d S |d }~ww )Nzmv ru   r   r   r   )r   r   r   r   )r   r'   r(   r   r   r   rh   r   r   r   r     s   zHDFSClient._try_mvc                 C   ,   d| }|  |\}}|dkrt|d S )Nzrmr r   r   r   r   r   r   r   r   r   r   r   rI     
   
zHDFSClient._rmrc                 C   r   )Nzrm r   r   r   r   r   r   rL     r   zHDFSClient._rmc                 C   s0   |  |sdS | |}|r| |S | |S )a  
        Delete a remote HDFS path, whether it's a file or directory.

        Args:
            fs_path(str): The HDFS file path.

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.delete("hdfs:/test_hdfs_client")

        N)r   r   rI   rL   )r   r   r   r   r   r   r#     s   



zHDFSClient.deletec                 C   s    |  |r|r	dS t| |S )a6  
        Create a remote HDFS file.

        Args:
            fs_path(str): The HDFS file path.
            exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false,
            program will throw an Exception. Default is true.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.touch("hdfs:/test_hdfs_client")

        N)r   r   _touchzr2   r   r   r   r4   ?  s
   

zHDFSClient.touchc                 C   r   )Nztouchz r   r   r   r   r   r   r   `  s
   
zHDFSClient._touchzc                 C   rM   NTr   r$   r   r   r   r%   g  rN   zHDFSClient.need_upload_downloadNc                 C   s"   |  |r| |}d|S dS )a  
        Cat a remote HDFS file.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            file content

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.cat("hdfs:/test_hdfs_client")
                ''

        r   r   )r   _try_catrw   )r   r   r   r   r   r   r5   j  s   


zHDFSClient.catc                 C   s0   d| }| j |dd\}}|dkrt||S )Nzcat r   r   r   r   )r   r   r   r   r   r   r   r   r     s
   
zHDFSClient._try_catc           
      C      t || }t || }|g| }t|D ]
}||  d7  < qg g| }d}	t|D ]}||	|	||   ||< |	|| 7 }	q+|| S z
        split file list
        Args:
            files(list): file list
            trainer_id(int): trainer mpi rank id
            trainers(int): all trainers num
        Returns:
            fileist(list): file list of current trainer
        r   r   r   r   
r   rB   Z
trainer_idZtrainers	remainder	blocksizeblocksr   Ztrainer_filesbeginr   r   r   r        


zHDFSClient._split_filesc                 C   s   t |dkrg S g }d}|D ]}||d 7 }qd| d }| |\}}t |dkr3td|  g S |D ] }|d}	t |	dk rCq5|	d }
t|	d }||
|d	 q5|S )
z
        list_files return file path and size
        Args:
            path_list(list): file list
        Returns:
            fileist(list): file list with file path and size
        r   r   ru   zls z) | awk '{if ($8 != "") {print $5" "$8 }}'zlist_files empty, path[%s]   r   )r>   size)r   r   r   warningr   r   r@   )r   	path_list	file_listZ
str_concatr>   r   r   r   r   r   	file_path	file_sizer   r   r   list_files_info  s(   

zHDFSClient.list_files_inforq   rr   )Fr   F)r   FFTr7   r   )"r   r	   r
   rY   r   r   r   ro   r1   r   r   r   r   r   r   r   r0   r    r   r!   r   r"   r-   r   rI   rL   r#   r4   r   r%   r5   r   r   r   r   r   r   r   rp     sX    



!
!
  

#
O

<


-'


!!

"
rp   c                   @   s   e Zd ZdZd.ddZdd Zdd	 Zd
d Zdd Zdd Z	dd Z
dd Zdd Zd/ddZd0ddZd0ddZdd Zd1d!d"Zd#d$ Zd2d%d&Zd'd( Zd3d*d+Zd,d- Zd)S )4	AFSClienta  
    A tool of AFS. Use AfsWrapper.

    Examples:

        .. code-block:: python

            >>> # doctest: +SKIP('depend on WITH_PSLIB')
            >>> from paddle.distributed.fleet.utils.fs import AFSClient

            >>> client = AFSClient()
            >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
            >>> client.ls_dir("hdfs:/test_hdfs_client")

    rq   rr   c                 C   s   t  | _|| _d S r   )r   Z
AfsWrapper_fsr\   )r   re   r|   r   r   r   r     s   

zAFSClient.__init__c                 C   s   | j |||| d S r   )r   init)r   Zfs_nameZfs_userZ	fs_passwdZfs_confr   r   r   r     s   zAFSClient.initc                 C   r   )aw  
        Only list directorys under `fs_path` .

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...].

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on WITH_PSLIB')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> subdirs = client.list_dirs("hdfs:/test_hdfs_client")

        r   r   r   r   r   r1     s   
zAFSClient.list_dirsc                 C   r   )a	  
        List directorys and files under `fs_path` .

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Tuple: Return a 2-tuple, the first element is the list of all its subdirectories,
            and the second one is the list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]).

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on WITH_PSLIB')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> subdirs, files = client.ls_dir("hdfs:/test_hdfs_client")

        r   r   r   r   r   r     s   

zAFSClient.ls_dirc                 C   s   | j |}|g}||fS r   )r   list)r   r   rB   rA   r   r   r   r      s   zAFSClient._ls_dirc                 C   r   )a{  
        Whether the remote HDFS path is a directory.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Bool: Return true if the path exists and it's a directory, otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on WITH_PSLIB')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> ret = client.is_dir("hdfs:/test_hdfs_client")

        Fr   r   r   r   r   r   %  s   

zAFSClient.is_dirc                 C   s    | j |}t|dkrdS dS )Nr   TF)r   r   r   )r   r   	list_pathr   r   r   r   @  s   zAFSClient._is_dirc                 C   r   )ar  
        Whether the remote HDFS path is a file.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Bool: Return true if the path exists and it's a file, otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on WITH_PSLIB')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> ret = client.is_file("hdfs:/test_hdfs_client")

        Fr   r   r   r   r   r   G  s   
zAFSClient.is_filec                 C   s   | j |S )a  
        Whether the remote HDFS path exists.

        Args:
            fs_path(str): The hdfs file path.

        Returns:
            Bool: Whether it's is file or directory, return true if the path exists,
            otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on WITH_PSLIB')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> ret = client.is_exist("hdfs:/test_hdfs_client")

        )r   existr   r   r   r   r   b  rP   zAFSClient.is_existFc                 C   sn   | d}| d}tj|}| |d | r$|r$| |d |  | |s.| | | j|| dS r   )	r   r<   r>   r   r   r#   r"   r   r    r   r   r   r   r0   {  s   




zAFSClient.upload_dirr   c                 C   s0   t  }||st| d| j|| dS )a  
        Upload the local path to remote HDFS.

        Args:
            local_path(str): The local path.
            fs_path(str): The HDFS path.
            multi_processes(int|1): the upload data process at the same time, default=5
            overwrite(bool|False): will overwrite file on HDFS or not

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on WITH_PSLIB')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.upload("test_hdfs_client", "hdfs:/test_hdfs_client")

        r   N)r:   r   r   r   r    )r   r   r   r   r+   r   r   r   r   r      s   
zAFSClient.uploadc                    s   fdd}  st  d rj| S  \}} fdd|D }g }	t|D ]}
||
|}tj	|||fd}|	
| |  q4|	D ]}|  qRdS )a  
        Download remote HDFS path to the local.

        Args:
            fs_path(str):  The HDFS path.
            local_path(str): The local path.
            multi_processes(int|1): the download data process at the same time, default=1
            overwrite(bool): is overwrite

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on WITH_PSLIB')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.download("hdfs:/test_hdfs_client", "./")

        c                    s   |D ]	} j | | qdS r   )r   r!   r   r$   r   r   r     s   z1AFSClient.download.<locals>.__subprocess_downloadr   c                    s   g | ]} | qS r   r   r   rW   r   r   rX     s    z&AFSClient.download.<locals>.<listcomp>r   N)r   r   r   r   r!   r   r   r   r   r   r@   rg   rw   )r   r   r   r   r+   Z_AFSClient__subprocess_downloadr   r   r   r   r   r   r   r   r   r   r   r!     s$   





zAFSClient.downloadc                 C   s   |  |rdS | j| dS )a  
        Create a remote HDFS directory.

        Args:
            fs_path(str): The HDFS directory path.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on WITH_PSLIB')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.mkdirs("hdfs:/test_hdfs_client")

        N)r   r   mkdirr   r   r   r   r"     s   
zAFSClient.mkdirsTc                 C   s^   |r|  |r| | |r&|  |st| d|  |r&t| d| j|| dS )a  
        Move a remote HDFS file or directory from `fs_src_path` to `fs_dst_path` .

        Args:
            fs_src_path(str):  Name of the file or directory, that's needed to be moved.
            fs_dst_path(str):  Name of the file or directory to which to move to.
            overwrite(bool): Whether to re-write `fs_dst_path` if that exists. Default is False.
            test_exists(bool): Check the existence of `fs_src_path` and `fs_dst_path` . When `test_exists` is set true, if `fs_src_path` doesn't exist or `fs_dst_path` exists, program will throw an Excetption.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on WITH_PSLIB')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.mv("hdfs:/test_hdfs_client", "hdfs:/test_hdfs_client2")

        r   r   N)r   r#   r   r   r   r-   r*   r   r   r   r-     s   


zAFSClient.mvc                 C   s   |  |sdS | j| dS )a  
        Delete a remote HDFS path, whether it's a file or directory.

        Args:
            fs_path(str): The HDFS file path.

        Examples:

            .. code-block:: python


                >>> # doctest: +SKIP('depend on WITH_PSLIB')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.delete("hdfs:/test_hdfs_client")

        N)r   r   rK   r   r   r   r   r#     s   
zAFSClient.deletec                 C   s"   |  |r|r	dS t| j|S )a~  
        Create a remote HDFS file.

        Args:
            fs_path(str): The HDFS file path.
            exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false,
            program will throw an Exception. Default is true.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on WITH_PSLIB')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.touch("hdfs:/test_hdfs_client")

        N)r   r   r   Ztouchzr2   r   r   r   r4   3  s
   
zAFSClient.touchc                 C   rM   r   r   r$   r   r   r   r%   O  rN   zAFSClient.need_upload_downloadNc                 C   s   |  |r| j|S dS )a  
        Cat a remote HDFS file.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            file content

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on WITH_PSLIB')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.cat("hdfs:/test_hdfs_client")

        r   )r   r   r5   r   r   r   r   r5   R  s   
zAFSClient.catc           
      C   r   r   r   r   r   r   r   r   m  r   zAFSClient._split_filesr   r   )r   Fr   r7   r   )r   r	   r
   rY   r   r   r1   r   r   r   r   r   r   r0   r    r!   r"   r-   r#   r4   r%   r5   r   r   r   r   r   r     s*    



6
"

r   r   )r8   rk   r   r<   ry   rG   r   r^   Zpaddle.baser   Zlog_utilr   __all__r   r   r   r   r   r   r   r:   ro   rp   r   r   r   r   r   <module>   s:   >  
)'      