o
    "ÕjlO ã                   @   sÂ   d Z ddlZddlZddlZddlZddlZddlZddlZddlZddl	Z
ddlZddlmZ ddlmZ ddlmZ ddlmZ dd	gZeeejd
dZdaG dd„ dƒZG dd	„ d	eƒZdS )zFleet Utils.é    N)Úbase)Ú
get_logger)Ú
HDFSClienté   )ÚutilsÚ	FleetUtilÚ	GPUPSUtilz&%(asctime)s %(levelname)s: %(message)s)Úfmtc                
   @   sÐ  e Zd ZdZd_dd„Zdd„ Zdd„ Zd	d
„ Ze 	¡ e 
¡ dfdd„Ze 	¡ dddfdd„Ze 	¡ ddfdd„Zdd„ Zd`dd„Zd`dd„Zi dfdd„Z		 dad!d"„Zi dd#fd$d%„Z		&dbd'd(„Zd)d*„ Zd+d,„ Zd-d.„ Zd/d0„ Zd1d2„ Zdcd4d5„Zd6d7„ Zd8d9„ Z		:ddd;d<„Z		#	:ded=d>„Z	dfd?d@„Z	dfdAdB„Z 	dfdCdD„Z!dEdF„ Z"e 	¡ dddGdHdIdJdKdLf	dMdN„Z#e 	¡ dddGdHdIdJdKdLdf
dOdP„Z$dQdR„ Z%dSdT„ Z&dUdV„ Z'dWdX„ Z(dYdZ„ Z)d[d\„ Z*d]d^„ Z+d#S )gr   a]  
    FleetUtil provides some common functions for users' convenience.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
            >>> fleet_util = FleetUtil()
            >>> fleet_util.rank0_print("my log")

    Úpslibc                 C   s@   |dkrddl m} |ad S |dkrddlm} |ad S tdƒ‚)Nr
   r   )ÚfleetZ
transpilerz3Please choose one mode from ["pslib", "transpiler"])Z8paddle.incubate.distributed.fleet.parameter_server.pslibr   ZHpaddle.incubate.distributed.fleet.parameter_server.distribute_transpilerÚ
ValueError)ÚselfÚmodeZfleet_pslibZfleet_transpiler© r   úm/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/incubate/distributed/fleet/fleet_util.pyÚ__init__9   s   ÿzFleetUtil.__init__c                 C   s&   t  ¡ dkrdS t|ƒ tj ¡  dS )a  
        Worker of rank 0 print some log.

        Args:
            s(str): string to print

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.rank0_print("my log")

        r   N)r   Úworker_indexÚprintÚsysÚstdoutÚflush©r   Úsr   r   r   Úrank0_printL   s   zFleetUtil.rank0_printc                 C   ó   t  ¡ dkrdS t |¡ dS )a—  
        Worker of rank 0 print some log info.

        Args:
            s(str): string to log

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.rank0_info("my log info")

        r   N)r   r   Ú_loggerÚinfor   r   r   r   Ú
rank0_infoa   ó   zFleetUtil.rank0_infoc                 C   r   )aš  
        Worker of rank 0 print some log error.

        Args:
            s(str): string to log

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.rank0_error("my log error")

        r   N)r   r   r   Úerrorr   r   r   r   Úrank0_erroru   r   zFleetUtil.rank0_errorZint64c                 C   s2   |  |¡ ¡ }t | ¡ ¡ |¡}| ||¡ dS )a±  
        Set tensor of a Variable to zero.

        Args:
            var_name(str): name of Variable
            scope(Scope): Scope object, default is base.global_scope()
            place(Place): Place object, default is base.CPUPlace()
            param_type(str): param data type, default is int64

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> # doctest: +SKIP('dependency on custom variables')
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.set_zero(myvar.name, myscope)

        N)ÚvarÚ
get_tensorÚnpZzerosZ	_get_dimsZastypeÚset)r   Úvar_nameÚscopeZplaceZ
param_typeÚparamZparam_arrayr   r   r   Úset_zero‰   s   zFleetUtil.set_zeroZ_generated_var_2Z_generated_var_3Ú c                 C   s$   |   |||¡}|  |d|  ¡ dS )ay  
        Print global auc of all distributed workers.

        Args:
            scope(Scope): Scope object, default is base.global_scope()
            stat_pos(str): name of auc pos bucket Variable
            stat_neg(str): name of auc neg bucket Variable
            print_prefix(str): prefix of print auc

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> # doctest: +SKIP('dependency on custom variables')
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.print_global_auc(myscope, stat_pos=stat_pos.name,
                ...                           stat_neg=stat_neg.name)

                >>> # below is part of model
                >>> emb = my_slot_net(slots, label) # emb can be fc layer of size 1
                >>> similarity_norm = paddle.nn.functional.sigmoid(paddle.clip(
                ...     emb, min=-15.0, max=15.0), name="similarity_norm")
                >>> binary_predict = paddle.concat(input=[
                ...     paddle.subtract(
                ...         paddle.ceil(similarity_norm),
                ...         similarity_norm),
                ...     similarity_norm],
                ...     axis=1)
                >>> auc, batch_auc, [batch_stat_pos, batch_stat_neg, stat_pos,
                ...     stat_neg] = paddle.static.auc(input=binary_predict,
                ...                                   label=label,curve='ROC',
                ...                                   num_thresholds=4096)

        z global auc = %sN)Úget_global_aucr   )r   r&   Ústat_posÚstat_negÚprint_prefixÚ	auc_valuer   r   r   Úprint_global_auc§   s   *zFleetUtil.print_global_aucc                 C   sª  |  |¡du s|  |¡du r|  d¡ dS tj ¡  t |  |¡ ¡ ¡}t |j¡}| 	d¡}t 
|¡d }tj ||¡ | 	|¡}t |  |¡ ¡ ¡}t |j¡}| 	d¡}t 
|¡d }	tj ||	¡ |	 	|¡}	t|d ƒ}
d}d}d}d}d}d}t|
ƒD ]8}|
d | }||d |  }||d | 7 }||	d |  }||	d | 7 }||| ||  d 7 }|}|}q€d}|| dksÅ|dkrÈd}n|||  }tj ¡  |S )	an  
        Get global auc of all distributed workers.

        Args:
            scope(Scope): Scope object, default is base.global_scope()
            stat_pos(str): name of auc pos bucket Variable
            stat_neg(str): name of auc neg bucket Variable

        Returns:
            auc_value(float), total_ins_num(int)

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> # doctest: +SKIP('dependency on custom variables')
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> auc_value, _ = fleet_util.get_global_auc(myscope,
                ...                                          stat_pos=stat_pos,
                ...                                          stat_neg=stat_neg)

        Núnot found auc bucketéÿÿÿÿr   ç        r   é   g      à?)Úfind_varr   r   Ú_role_makerÚ_barrier_workerr#   Úarrayr"   ÚshapeÚreshapeÚcopyÚ_all_reduceÚlenÚrange)r   r&   r+   r,   ÚposÚold_pos_shapeÚ
global_posÚnegÚold_neg_shapeÚ
global_negÚ
num_bucketZareaÚnew_posZnew_negÚtotal_ins_numÚiÚindexr.   r   r   r   r*   Ô   sL   






zFleetUtil.get_global_aucc                 C   s   t  ||¡ dS )a)  
        load pslib model to one table

        Args:
            table_id(int): load model to one table, default is None, which mean
                           load all table.
            path(str): model path

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.load_fleet_model_one_table(1, path="hdfs:/my/model/path")
        N)r   Zload_one_table)r   Útable_idÚpathr   r   r   Úload_fleet_model_one_table$  s   z$FleetUtil.load_fleet_model_one_tabler   c                 C   s   t j||d dS )aP  
        load pslib model

        Args:
            path(str): model path
            mode(str): 0 or 1, which means load checkpoint or delta model,
                       default is 0

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()

                >>> fleet_util.load_fleet_model("hdfs:/my/model/path")

                >>> fleet_util.load_fleet_model("hdfs:/my/model/path", mode=0)

        ©r   N)r   Zinit_server©r   rJ   r   r   r   r   Úload_fleet_model7  s   zFleetUtil.load_fleet_modelc                 C   s   t jd||d dS )aÿ  
        save pslib model

        Args:
            path(str): model path
            mode(str): 0 or 1, which means save checkpoint or delta model,
                       default is 0

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.save_fleet_model("hdfs:/my/model/path")

        NrL   )r   Úsave_persistablesrM   r   r   r   Úsave_fleet_modelN  s   zFleetUtil.save_fleet_modelÚpatchc	                 C   sl  t  ¡ }	|dkrt|ƒ|	d< n!|dkrttt ¡ ƒƒ|	d< ntd| ƒ d}ttt ¡ ƒƒ|	d< t|ƒ|	d< | d¡s@| d¡rK|| d¡d	 d … }|| d
¡ d |	d< d|	d< d|	d< d|	d< d|	d< ||	d< t	 
d¡ ¡  ¡ }
t	 
d¡ ¡  ¡ }| |
¡}| d¡}|dkr”|dkr”|||… }
|
|	d< d|	d< | d
¡d | d |	d< tt ¡ ƒ|	d < t |	¡S )!Nr   ÚidrQ   ú)warning: unknown mode %s, set it to patchÚkeyúhdfs:úafs:ú:r   ú/ú/000ÚinputÚ111111Úrecord_countÚ2Úpartition_typeÚdefault_job_nameÚjob_nameÚfeasignÚins_tagÚins_pathzecho -n ${JOB_ID}zecho -n ${INSTANCE_ID}z--r1   Újob_idr)   Úmonitor_dataú	/monitor/ú.txtÚmonitor_pathÚmpi_size)ÚcollectionsÚOrderedDictÚstrÚintÚtimer   Ú
startswithÚfindÚrstripÚosÚpopenÚreadÚstripr   Ú
worker_numÚjsonÚdumps)r   Úoutput_pathÚdayÚ
model_pathÚxbox_base_keyÚ	data_pathÚhadoop_fs_namere   r   Ú	xbox_dictZjob_id_with_hostZinstance_idÚ	start_posÚend_posr   r   r   Ú_get_xbox_strb  s:   

ÿ
zFleetUtil._get_xbox_strú$HADOOP_HOMEúdonefile.txtc	              	   C   s(  t |ƒ}t |ƒ}t|ƒ}|dkr!d|› d|› d}	| d¡|	 }
nd| }	| d¡|	 }
t ¡ dkr|d | }d|||
|df }||dœ}t||ƒ}| |¡rÜ| |¡}| d¡}dd	„ |D ƒ}d
d	„ |D ƒ}d}t	t
|ƒƒD ]}t|ƒt|| ƒkr‹t|ƒt|| ƒkr‹d} nqq|sÌt|dƒ}| |d ¡ | |d ¡ W d  ƒ n1 s¬w   Y  | |¡ | ||¡ |  d|› d|› d|› d¡ nA|  d|› d|› d|› d¡ n1t|dƒ}| |d ¡ W d  ƒ n1 sów   Y  | ||¡ |  d|› d|› d|› d¡ tj ¡  dS )a¢  
        write donefile when save model

        Args:
            output_path(str): output path
            day(str|int): training day
            pass_id(str|int): training pass id
            xbox_base_key(str|int): xbox base key
            hadoop_fs_name(str): hdfs/afs fs name
            hadoop_fs_ugi(str): hdfs/afs fs ugi
            hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
            donefile_name(str): donefile name, default is "donefile.txt"

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.write_model_donefile(output_path="hdfs:/my/output",
                ...                                 day=20190723,
                ...                                 pass_id=66,
                ...                                 xbox_base_key=int(time.time()),
                ...                                 hadoop_fs_name="hdfs://xxx",
                ...                                 hadoop_fs_ugi="user,passwd")

        ú-1rX   ú/%s/0/r   ú%s	%lu	%s	%s	%d©zfs.default.namezhadoop.job.ugiÚ
c                 S   ó   g | ]	}|  d ¡d ‘qS ©ú	r   ©Úsplit©Ú.0rG   r   r   r   Ú
<listcomp>Ð  ó    z2FleetUtil.write_model_donefile.<locals>.<listcomp>c                 S   rŠ   ©rŒ   é   r   r   r   r   r   r‘   Ñ  r’   FTÚwNúwrite ú ú succeedú
not write ú	 because ú already exists)rl   rm   rq   r   r   r   Úis_fileÚcatrŽ   r=   r<   ÚopenÚwriteÚdeleteÚuploadr    r5   r6   )r   ry   rz   Úpass_idr|   r~   Úhadoop_fs_ugiÚhadoop_homeÚdonefile_nameÚsuffix_namer{   Údonefile_pathÚcontentÚconfigsÚclientÚpre_contentÚpre_content_listÚday_listÚ	pass_listÚexistrG   Úfr   r   r   Úwrite_model_donefileŽ  sn   &ûþ



ÿ€þ
ÿÿÿÿzFleetUtil.write_model_donefileNc              
   C   sz  t |ƒ}t |ƒ}t|ƒ}d}|dkr+d}d|› d|› d}| d¡| }|
du r*d}
nd}d| }| d¡| }|
du r>d	}
t|tƒrHd
 |¡}t ¡ dkr6|d |
 }| j||||||i |d}||dœ}t	|	|ƒ}| 
|¡r| |¡}t | d¡d ¡}|d  d¡d }|d  d¡d  d¡d }d}t|ƒt|ƒk s²t|ƒt|ƒkr´t|ƒt|ƒkr´d}|sôt|
dƒ}| |d ¡ | |d ¡ W d  ƒ n1 sÔw   Y  | |¡ | |
|¡ |  d|› d|› d|
› d¡ nB|  d|
› d|› d|› d¡ n2t|
dƒ}| |d ¡ W d  ƒ n	1 sw   Y  | |
|¡ |  d|› d|› d|
› d¡ tj ¡  dS )aü  
        write delta donefile or xbox base donefile

        Args:
            output_path(str): output path
            day(str|int): training day of model
            pass_id(str|int): training pass id of model
            xbox_base_key(str|int): xbox base key
            data_path(str|list): training data path
            hadoop_fs_name(str): hdfs/afs fs name
            hadoop_fs_ugi(str): hdfs/afs fs ugi
            monitor_data(dict): metrics
            hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
            donefile_name(str): donefile name, default is None"

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.write_xbox_donefile(
                ...     output_path="hdfs:/my/output/",
                ...     day=20190722,
                ...     pass_id=1,
                ...     xbox_base_key=int(time.time()),
                ...     data_path="hdfs:/my/data/",
                ...     hadoop_fs_name="hdfs://xxx",
                ...     hadoop_fs_ugi="user,passwd",
                ...     monitor_data={})

        Nr…   rQ   rX   ú/delta-úxbox_patch_done.txtr   ú	/%s/base/úxbox_base_done.txtú,r   ©re   r   rˆ   r‰   r1   rZ   éýÿÿÿéþÿÿÿú-FTr•   r–   r—   r˜   r™   rš   r›   )rl   rm   rq   Ú
isinstanceÚlistÚjoinr   r   r‚   r   rœ   r   rw   ÚloadsrŽ   rž   rŸ   r    r¡   r    r5   r6   )r   ry   rz   r¢   r|   r}   r~   r£   re   r¤   r¥   r   r¦   r{   r§   Úxbox_strr©   rª   r«   Ú	last_dictÚlast_dayÚ	last_passr¯   r°   r   r   r   Úwrite_xbox_donefileð  s‚   -€

øþ

þ
ÿÿÿÿzFleetUtil.write_xbox_donefileúsparse_cache.metac	                 K   s  t |ƒ}t |ƒ}t|ƒ}|	 dd¡}
|	 dd¡}|dkr+d|||f }| d¡| }nd||f }| d¡| }t ¡ dkrˆ|d | }||d	œ}t||ƒ}| |¡r[|  d
| ¡ n-d|
|f }t	|dƒ}| 
|¡ W d  ƒ n1 svw   Y  | ||¡ |  d| ¡ tj ¡  dS )aÒ  
        write cache donefile

        Args:
            output_path(str): output path
            day(str|int): training day of model
            pass_id(str|int): training pass id of model
            key_num(str|int): save cache return value
            hadoop_fs_name(str): hdfs/afs fs name
            hadoop_fs_ugi(str): hdfs/afs fs ugi
            hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
            donefile_name(str): donefile name, default is "sparse_cache.meta"
            kwargs(dict): user defined properties
                          file_num(int): cache file num
                          table_id(int): cache table id

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.write_cache_donefile(
                ...     output_path="hdfs:/my/output/",
                ...     day=20190722,
                ...     pass_id=1,
                ...     key_num=123456,
                ...     hadoop_fs_name="hdfs://xxx",
                ...     hadoop_fs_ugi="user,passwd")

        Úfile_numé   rI   r   r…   ú/%s/delta-%s/%03d_cacherX   ú/%s/base/%03d_cacherˆ   ú#not write because %s already existsú(file_prefix:part
part_num:%s
key_num:%d
r•   Núwrite %s succeed)rl   rm   Úgetrq   r   r   r   rœ   r    rž   rŸ   r¡   r5   r6   )r   ry   rz   r¢   Úkey_numr~   r£   r¤   r¥   ÚkwargsrÅ   rI   r¦   r{   r§   r©   rª   Úmeta_strr°   r   r   r   Úwrite_cache_donefilef  s<   +þ

ÿþÿzFleetUtil.write_cache_donefilec                 C   óP   t |ƒ}t |ƒ}d|› d|› d}|| }|  d| ¡ |  |¡ |  d¡ dS )að  
        load pslib model

        Args:
            output_path(str): output path
            day(str|int): training day
            pass_id(str|int): training pass id

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.load_model("hdfs:/my/path", 20190722, 88)

        rX   zgoing to load_model %szload_model doneN)rl   r    rN   )r   ry   rz   r¢   r¦   Z	load_pathr   r   r   Ú
load_model´  ó   
zFleetUtil.load_modelc                 C   rÑ   )að  
        save pslib model

        Args:
            output_path(str): output path
            day(str|int): training day
            pass_id(str|int): training pass id

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.save_model("hdfs:/my/path", 20190722, 88)

        rX   úgoing to save_model %szsave_model doneN)rl   r   rP   ©r   ry   rz   r¢   r¦   r{   r   r   r   Ú
save_modelÎ  rÓ   zFleetUtil.save_modelc                 C   sD   t |ƒ}d| }|| }|  d| ¡ tjd|dd |  d¡ dS )aÃ  
        save batch model

        Args:
            output_path(str): output path
            day(str|int): training day

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.save_batch_model("hdfs:/my/path", 20190722)

        r†   rÔ   Nr”   rL   zsave_batch_model done©rl   r   r   rO   ©r   ry   rz   r¦   r{   r   r   r   Úsave_batch_modelè  ó   zFleetUtil.save_batch_modelc                 C   sV   t |ƒ}t |ƒ}d|› d|› d}|| }|  d| ¡ tjd|dd |  d¡ dS )aö  
        save delta model

        Args:
            output_path(str): output path
            day(str|int): training day
            pass_id(str|int): training pass id

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.save_delta_model("hdfs:/my/path", 20190722, 88)

        rX   r²   zgoing to save_delta_model %sNr   rL   zsave_delta_model doner×   rÕ   r   r   r   Úsave_delta_model   s   zFleetUtil.save_delta_modelc                 C   sD   t |ƒ}d| }|| }|  d| ¡ tjd|dd |  d¡ dS )aË  
        save xbox base model

        Args:
            output_path(str): output path
            day(str|int): training day

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.save_xbox_base_model("hdfs:/my/path", 20190722)

        r´   zgoing to save_xbox_base_model Nr3   rL   zsave_xbox_base_model doner×   rØ   r   r   r   Úsave_xbox_base_model  rÚ   zFleetUtil.save_xbox_base_modelr   c           
      K   sp   t |ƒ}t |ƒ}t|ƒ}| dd¡}d|› d|› }| d¡| }|  d| ¡ tjd|||d}	|  d¡ |	S )	aÇ  
        save cache model

        Args:
            output_path(str): output path
            day(str|int): training day
            pass_id(str|int): training pass id
            mode(str|int): save mode
            kwargs(dict): user defined properties
                          table_id(int): table id to save cache

        Returns:
            key_num(int): cache key num

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.save_cache_model("hdfs:/my/path", 20190722, 88)

        rI   r   rX   r²   zgoing to save_cache_model %sN©r   rI   zsave_cache_model done)rl   rm   rÌ   rq   r   r   Úsave_cache_model)
r   ry   rz   r¢   r   rÎ   rI   r¦   r{   rÍ   r   r   r   rÞ   2  s   ÿ
zFleetUtil.save_cache_modelc                 K   sX   t |ƒ}| dd¡}d| }| d¡| }|  d| ¡ tjd|d|d}|  d	¡ |S )
a£  
        save cache model

        Args:
            output_path(str): output path
            day(str|int): training day
            pass_id(str|int): training pass id
            kwargs(dict): user defined properties
                          table_id(int): table id to save cache

        Returns:
            key_num(int): cache key num

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.save_cache_base_model("hdfs:/my/path", 20190722)

        rI   r   z/%s/baserX   z!going to save_cache_base_model %sNr3   rÝ   zsave_cache_base_model done)rl   rÌ   rq   r   r   rÞ   )r   ry   rz   rÎ   rI   r¦   r{   rÍ   r   r   r   Úsave_cache_base_modelW  s   ÿ
zFleetUtil.save_cache_base_modelc                 C   s  t j ¡  t j ¡ rztt|ƒƒ}t jd |  ¡ j}t jd | }i }|D ]}d|vr,q%|| D ]}d|t	|ƒ< q0q%|D ]=}	t	|	j
ƒ|vrFq<g }
tdt|	jƒƒD ]}|	j| }| |¡du rhtd| d d ƒ‚|
 |¡ qPt j |t	|	j
ƒ|
¡ q<t j ¡  dS )	a#  
        pull all dense params in trainer of rank 0

        Args:
            scope(Scope): base Scope
            program(Program): base Program

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> # doctest: +SKIP('dependency on custom variables')
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.pull_all_dense_params(my_scope, my_program)

        Zprogram_id_to_workerZprogram_configsZdenser   Nzvar z not found in scope zwhen pull dense)r   r5   r6   Zis_first_workerrl   rR   Z	_opt_infoZget_descZdense_tablerm   rI   r=   r<   Zdense_variable_namer4   r   ÚappendZ
_fleet_ptrZ
pull_dense)r   r&   ÚprogramZprog_idZtablesZ	prog_confZprog_tablesrT   rI   ÚtableZvar_name_listrG   r%   r   r   r   Úpull_all_dense_paramsy  sH   

ýÿ
ÿþýÿÿzFleetUtil.pull_all_dense_paramsTc                 C   s  t |ƒ}t |ƒ}d}|  ||¡ t ¡ dkrt |¡' |r-tjjj	||||| 
¡ d ntjjj	||||| 
¡ d W d  ƒ n1 sEw   Y  |	|
dœ}t||ƒ}|dkra|› d|› d}n|› d|› d	|› d
}| |¡sv| |¡ |j||ddd tj ¡  dS )a  
        save paddle inference model, and upload to hdfs dnn_plugin path

        Args:
            executor(Executor): base Executor
            scope(Scope): base Scope
            program(Program): base Program
            feeded_vars(list[Variable]): feed vars
            target_vars(list[variable]): fetch vars
            output_path(str): hdfs/afs output path
            day(str|int): training day
            pass_id(str|int): training pass
            hadoop_fs_name(str): hadoop fs name
            hadoop_fs_ugi(str): hadoop fs ugi
            hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
            save_combine(bool): whether to save in a file or separate files,
                                default is True

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> # doctest: +SKIP('dependency on custom variables')
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.save_paddle_inference_model(exe,
                ...                                        join_scope,
                ...                                        join_program,
                ...                                        feeded_vars,
                ...                                        target_vars,
                ...                                        "hdfs:/my/output/path/",
                ...                                        day=20190727,
                ...                                        pass_id=6,
                ...                                        hadoop_fs_name="xxx",
                ...                                        hadoop_fs_ugi="xxx,xxx")
        Zinference_modelr   ©rá   Nrˆ   r…   rX   ú/base/dnn_plugin/r²   ú/dnn_plugin/é   T©Zmulti_processesÚ	overwrite)rl   rã   r   r   r   Úscope_guardÚpaddleÚstaticÚioZsave_inference_modelÚcloner   Úis_existÚmakedirsr¡   r5   r6   )r   Úexecutorr&   rá   Zfeeded_varsZtarget_varsry   rz   r¢   r~   r£   r¤   Úsave_combineÚ
model_namer©   rª   Údestr   r   r   Úsave_paddle_inference_model­  sD   3ûû€öþ


z%FleetUtil.save_paddle_inference_modelc                    s  t |ƒ}t |ƒ}|  |ˆ ¡ t ¡ dkr‡ fdd„|D ƒ}t |¡" |r2tjjj	|dˆ ||d ntjjj	||ˆ |d W d  ƒ n1 sGw   Y  ||	dœ}t
|
|ƒ}|d	krc|› d
|› d}n|› d
|› d|› d}| |¡sx| |¡ |j||ddd tj ¡  dS )aH  
        save paddle model, and upload to hdfs dnn_plugin path

        Args:
            executor(Executor): base Executor
            scope(Scope): base Scope
            program(Program): base Program
            model_name(str): save model local dir or filename
            output_path(str): hdfs/afs output path
            day(str|int): training day
            pass_id(str|int): training pass
            hadoop_fs_name(str): hadoop fs name
            hadoop_fs_ugi(str): hadoop fs ugi
            hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
            var_names(list): save persistable var names, default is None
            save_combine(bool): whether to save in a file or separate files,
                                default is True

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> # doctest: +SKIP('dependency on custom variables')
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.save_paddle_params(exe,
                ...                               join_scope,
                ...                               join_program,
                ...                               "paddle_dense.model.0",
                ...                               "hdfs:/my/output/path/",
                ...                               day=20190727,
                ...                               pass_id=6,
                ...                               hadoop_fs_name="xxx",
                ...                               hadoop_fs_ugi="xxx,xxx",
                ...                               var_names=join_all_var_names)
                >>> fleet_util.save_paddle_params(exe,
                ...                               join_scope,
                ...                               join_program,
                ...                               "paddle_dense.model.usr.0",
                ...                               "hdfs:/my/output/path/",
                ...                               day=20190727,
                ...                               pass_id=6,
                ...                               hadoop_fs_name="xxx",
                ...                               hadoop_fs_ugi="xxx,xxx",
                ...                               var_names=join_user_var_names)
                >>> fleet_util.save_paddle_params(exe,
                ...                               join_scope,
                ...                               join_program,
                ...                               "paddle_dense.model.item.0",
                ...                               "hdfs:/my/output/path/",
                ...                               day=20190727,
                ...                               pass_id=6,
                ...                               hadoop_fs_name="xxx",
                ...                               hadoop_fs_ugi="xxx,xxx",
                ...                               var_names=join_user_item_names)

        r   c                    s   g | ]	}ˆ   ¡  |¡‘qS r   )Úglobal_blockr!   r   rä   r   r   r‘   V  r’   z0FleetUtil.save_paddle_params.<locals>.<listcomp>z./)ÚvarsÚfilename)r÷   Nrˆ   r…   rX   rå   r²   ræ   rç   Trè   )rl   rã   r   r   r   rê   rë   rì   rí   Z	save_varsr   rï   Zmkdirsr¡   r5   r6   )r   rñ   r&   rá   ró   ry   rz   r¢   r~   r£   r¤   Z	var_namesrò   r÷   r©   rª   rô   r   rä   r   Úsave_paddle_params	  s4   H
ÿÿ€úþ


zFleetUtil.save_paddle_paramsc                 C   sž   |d }||dœ}t ||ƒ}| |¡sddtt ¡ ƒgS | |¡}t | d¡d ¡}	t|	d  d¡d ƒ}
d |	d  d¡dd… ¡}t|	d	 ƒ}|
||gS )
a$  
        get last saved base xbox info from xbox_base_done.txt

        Args:
            output_path(str): output path
            hadoop_fs_name(str): hdfs/afs fs_name
            hadoop_fs_ugi(str): hdfs/afs fs_ugi
            hadoop_home(str): hadoop home, default is "$HADOOP_HOME"

        Returns:
            [last_save_day, last_path, xbox_base_key]
            last_save_day(int): day of saved model
            last_path(str): model path
            xbox_base_key(int): xbox key

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> last_save_day, last_path, xbox_base_key = \
                ...     fleet_util.get_last_save_xbox_base("hdfs:/my/path",
                ...                                        hadoop_fs_name="hdfs://xxx",
                ...                                        hadoop_fs_ugi="user,passwd")

        ú/xbox_base_done.txtrˆ   r1   r‰   rZ   rX   r¸   NrT   ©	r   rœ   rm   rn   r   rw   r¾   rŽ   r½   )r   ry   r~   r£   r¤   r§   r©   rª   r«   rÀ   rÁ   Ú	last_pathr|   r   r   r   Úget_last_save_xbox_baseq  s   "þ



z!FleetUtil.get_last_save_xbox_basec                 C   sÂ   |d }||dœ}t ||ƒ}| |¡sdddtt ¡ ƒgS | |¡}t | d¡d ¡}	t|	d  d¡d ƒ}
t|	d  d¡d	  d
¡d ƒ}d |	d  d¡dd… ¡}t|	d ƒ}|
|||gS )ac  
        get last saved xbox info from xbox_patch_done.txt

        Args:
            output_path(str): output path
            hadoop_fs_name(str): hdfs/afs fs_name
            hadoop_fs_ugi(str): hdfs/afs fs_ugi
            hadoop_home(str): hadoop home, default is "$HADOOP_HOME"

        Returns:
            [last_save_day, last_save_pass, last_path, xbox_base_key]
            last_save_day(int): day of saved model
            last_save_pass(int): pass id of saved
            last_path(str): model path
            xbox_base_key(int): xbox key

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> last_save_day, last_save_pass, last_path, xbox_base_key = \
                ...     fleet_util.get_last_save_xbox("hdfs:/my/path",
                ...                                   hadoop_fs_name="hdfs://xxx",
                ...                                   hadoop_fs_ugi="user,passwd")

        ú/xbox_patch_done.txtrˆ   r1   r)   r‰   rZ   rX   r¸   r¹   rº   NrT   rû   )r   ry   r~   r£   r¤   r§   r©   rª   r«   rÀ   rÁ   rÂ   rü   r|   r   r   r   Úget_last_save_xbox¢  s   #þ


 zFleetUtil.get_last_save_xboxc                 C   sœ   d}d}d}|d }||dœ}	t ||	ƒ}
|
 |¡s#dddtt ¡ ƒgS |
 |¡}| d¡d  d¡}t|d ƒ}t|d ƒ}|d	 }t|d
 ƒ}||||gS )a`  
        get last saved model info from donefile.txt

        Args:
            output_path(str): output path
            hadoop_fs_name(str): hdfs/afs fs_name
            hadoop_fs_ugi(str): hdfs/afs fs_ugi
            hadoop_home(str): hadoop home, default is "$HADOOP_HOME"

        Returns:
            [last_save_day, last_save_pass, last_path, xbox_base_key]
            last_save_day(int): day of saved model
            last_save_pass(int): pass id of saved
            last_path(str): model path
            xbox_base_key(int): xbox key

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> last_save_day, last_save_pass, last_path, xbox_base_key = \
                ...     fleet_util.get_last_save_model("hdfs:/my/path",
                ...                                    hadoop_fs_name="hdfs://xxx",
                ...                                    hadoop_fs_ugi="user,passwd")

        r1   r)   ú/donefile.txtrˆ   r‰   rŒ   r   r”   r3   r   )r   rœ   rm   rn   r   rŽ   )r   ry   r~   r£   r¤   Úlast_save_dayÚlast_save_passrü   r§   r©   rª   r¨   r|   r   r   r   Úget_last_save_modelÕ  s"   #þ


zFleetUtil.get_last_save_modelc                 C   sx  d}t  |t|ƒ¡stdƒ‚t d| ¡ ¡  d¡}t  |t|ƒ¡s&tdƒ‚t d| ¡ ¡  d¡}t|ƒ}t|ƒ}t|d ƒt|d ƒ d d	 | }|| }t|d ƒ}	t|d ƒ}
d}g }t	|ƒD ].}|d	 }|d	 }||	k sv||
kr{||7 }qd|r…| 
d
| ¡ n	| 
d||f ¡ ||7 }qdd}g }t	|ƒD ]}| 
g ¡ t	||| ƒD ]}||  
|| ¡ q©||7 }q›|S )a’  
        get online pass interval

        Args:
            days(str): days to train
            hours(str): hours to train
            split_interval(int|str): split interval
            split_per_pass(int}str): split per pass
            is_data_hourly_placed(bool): is data hourly placed

        Returns:
            online_pass_interval(list)

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> online_pass_interval = fleet_util.get_online_pass_interval(
                ...     days="{20190720..20190729}",
                ...     hours="{0..23}",
                ...     split_interval=5,
                ...     split_per_pass=2,
                ...     is_data_hourly_placed=False)

        z!^\d+|{[0-9]+}|{[0-9]+\.\.[0-9]+}$zdays format is not rightzecho -n r—   zhours format is not rightr1   r   r   é<   z%02dz%02d%02d)ÚreÚ	fullmatchrl   Ú	Exceptionrr   rs   rt   rŽ   rm   r=   rà   )r   ÚdaysÚhoursZsplit_intervalZsplit_per_passZis_data_hourly_placedÚpatternZsplits_per_dayZpass_per_dayZleft_train_hourZright_train_hourÚstartZ
split_pathrG   ÚhÚmZonline_pass_intervalÚjr   r   r   Úget_online_pass_interval  sD   "ÿ


z"FleetUtil.get_online_pass_intervalZsqrerrZabserrZprobÚqr>   Útotalc
           1   
      sÂ  ˆ   |¡du sˆ   |¡du r|  d¡ dgd S ˆ   |¡du r+|  d| ¡ dgd S ˆ   |¡du r>|  d| ¡ dgd S ˆ   |¡du rQ|  d| ¡ dgd S ˆ   |¡du rd|  d| ¡ dgd S ˆ   |¡du rw|  d| ¡ dgd S ˆ   |	¡du rŠ|  d	|	 ¡ dgd S tj ¡  |  ˆ ||¡}
t ˆ   |¡ ¡ ¡}t |j	¡}| 
d
¡}t |¡d }tj ||¡ | 
|¡}t ˆ   |¡ ¡ ¡}t |j	¡}| 
d
¡}t |¡d }tj ||¡ | 
|¡}t|d ƒ}‡ fdd„}||ƒ}||ƒ}||ƒ}||ƒ}||ƒ}||	ƒ}|| }|| }t || ¡}|| }|| }|| }d}t|dkƒr2|| }d} d}!d}"d}#d}$d}%d}&d}'d}(d})d}*d}+d},d}-d}.t|ƒD ]t}/|d |/ }&|d |/ |d |/  }'t|/ƒ| }(t|(|  ƒ|-kr|(} d}!d}"d}#|!|'7 }!|"|(|' 7 }"|#|&7 }#|!dkr”qT|"|! })|)dkrŸqTt d|) |)|!  ¡}*|*|.k rÇ|#|! }+t|+|) d ƒ},|$|,|! 7 }$|%|!7 }%d
} qT|%dkrÒ|$|% nd}0|
|0||||||t|ƒg	S )aú  
        get global metrics, including auc, bucket_error, mae, rmse,
        actual_ctr, predicted_ctr, copc, mean_predict_qvalue, total_ins_num.

        Args:
            scope(Scope): Scope object, default is base.global_scope()
            stat_pos_name(str): name of auc pos bucket Variable
            stat_neg_name(str): name of auc neg bucket Variable
            sqrerr_name(str): name of sqrerr Variable
            abserr_name(str): name of abserr Variable
            prob_name(str): name of prob Variable
            q_name(str): name of q Variable
            pos_ins_num_name(str): name of pos ins num Variable
            total_ins_num_name(str): name of total ins num Variable

        Returns:
            [auc, bucket_error, mae, rmse, actual_ctr, predicted_ctr, copc,
             mean_predict_qvalue, total_ins_num]

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> # doctest: +SKIP('dependency on custom variables')
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> metric_list = fleet_util.get_global_metrics(myscope,
                ...                                             stat_pos.name,
                ...                                             stat_neg.name,
                ...                                             local_sqrerr.name,
                ...                                             local_abserr.name,
                ...                                             local_prob.name,
                ...                                             local_q.name,
                ...                                             local_pos_ins.name,
                ...                                             local_total_ins.name)

                >>> # below is part of example model
                >>> label = paddle.static.data(name="click", shape=[-1, 1],\
                ...     dtype="int64", lod_level=0)
                >>> emb = my_slot_net(slots, label) # emb can be fc layer of size 1
                >>> similarity_norm = paddle.nn.functional.sigmoid(paddle.clip(\
                ...     emb, min=-15.0, max=15.0), name="similarity_norm")\
                >>> binary_predict = paddle.concat(input=[\
                ...     paddle.subtract(\
                ...         paddle.ceil(similarity_norm), similarity_norm),\
                ...     similarity_norm], axis=1)
                >>> auc, batch_auc, [batch_stat_pos, batch_stat_neg, stat_pos, \
                ...     stat_neg] = paddle.static.auc(input=binary_predict,\
                ...                                  label=label, curve='ROC',\
                ...                                  num_thresholds=4096)
                >>> local_sqrerr, local_abserr, local_prob, local_q, local_pos_ins,\
                ...     local_total_ins = paddle.static.ctr_metric_bundle(\
                ...         similarity_norm, label)

        Nr0   é	   únot found sqrerr_name=%súnot found abserr_name=%súnot found prob_name=%súnot found q_name=%súnot found pos_ins_num_name=%súnot found total_ins_num_name=%sr1   r   c                    sX   t  ˆ  | ¡ ¡ ¡}t  |j¡}| d¡}t  |¡d }tj 	||¡ | |¡}|d S )Nr1   r   )
r#   r7   r4   r"   r8   r9   r:   r   r5   r;   )ÚnameZmetricZold_metric_shapeZglobal_metric©r&   r   r   Ú
get_metricÈ  s   

z0FleetUtil.get_global_metrics.<locals>.get_metricr2   gíµ ÷Æ°>g      ð¿g{®Gáz„?gš™™™™™©?r   )r4   r   r   r5   r6   r*   r#   r7   r"   r8   r9   r:   r;   r<   ÚmathÚsqrtÚabsr=   Úfloatrm   )1r   r&   Ústat_pos_nameÚstat_neg_nameÚsqrerr_nameÚabserr_nameÚ	prob_nameÚq_nameÚpos_ins_num_nameÚtotal_ins_num_nameÚaucr>   r?   r@   rA   rB   rC   rD   r  Zglobal_sqrerrZglobal_abserrZglobal_probZglobal_q_valueZpos_ins_numrF   Zneg_ins_numÚmaeÚrmseZreturn_actual_ctrÚpredicted_ctrÚmean_predict_qvalueÚcopcZlast_ctrZimpression_sumZctr_sumZ	click_sumZ	error_sumZerror_countZclickÚshowZctrZ
adjust_ctrZrelative_errorÚ
actual_ctrZrelative_ctr_errorZ
k_max_spanZk_relative_error_boundrG   Úbucket_errorr   r  r   Úget_global_metricsQ  sØ   D






ÿ





	

ÿ
€÷zFleetUtil.get_global_metricsc                 C   s<  |  |¡du s|  |¡du r|  d¡ dS |  |¡du r%|  d| ¡ dS |  |¡du r5|  d| ¡ dS |  |¡du rE|  d| ¡ dS |  |¡du rU|  d| ¡ dS |  |¡du re|  d| ¡ dS |  |	¡du ru|  d|	 ¡ dS |  |||||||||	¡	\	}}}}}}}}}|  d	 |
|||||||||¡
¡ dS )
a&  
        print global metrics, including auc, bucket_error, mae, rmse,
        actual_ctr, predicted_ctr, copc, mean_predict_qvalue, total_ins_num.

        Args:
            scope(Scope): Scope object, default is base.global_scope()
            stat_pos_name(str): name of auc pos bucket Variable
            stat_neg_name(str): name of auc neg bucket Variable
            sqrerr_name(str): name of sqrerr Variable
            abserr_name(str): name of abserr Variable
            prob_name(str): name of prob Variable
            q_name(str): name of q Variable
            pos_ins_num_name(str): name of pos ins num Variable
            total_ins_num_name(str): name of total ins num Variable
            print_prefix(str): print prefix

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> # doctest: +SKIP('dependency on custom variables')
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.print_global_metrics(myscope,
                ...                                 stat_pos.name,
                ...                                 stat_neg.name,
                ...                                 local_sqrerr.name,
                ...                                 local_abserr.name,
                ...                                 local_prob.name,
                ...                                 local_q.name,
                ...                                 local_pos_ins.name,
                ...                                 local_total_ins.name)

                >>> # below is part of model
                >>> label = paddle.static.data(name="click", shape=[-1, 1],\
                ...     dtype="int64", lod_level=0)
                >>> emb = my_slot_net(slots, label) # emb can be fc layer of size 1
                >>> similarity_norm = paddle.nn.functional.sigmoid(paddle.clip(\
                ...     emb, min=-15.0, max=15.0), name="similarity_norm")\
                >>> binary_predict = paddle.concat(input=[\
                ...     paddle.subtract(\
                ...         paddle.ceil(similarity_norm), similarity_norm),\
                ...     similarity_norm], axis=1)
                >>> auc, batch_auc, [batch_stat_pos, batch_stat_neg, stat_pos, \
                ...     stat_neg] = paddle.static.auc(input=binary_predict,\
                ...                                  label=label, curve='ROC',\
                ...                                  num_thresholds=4096)
                >>> local_sqrerr, local_abserr, local_prob, local_q, local_pos_ins, \
                ...     local_total_ins = paddle.static.ctr_metric_bundle(\
                ...         similarity_norm, label)

        Nr0   r  r  r  r  r  r  z•{} global AUC={:.6f} BUCKET_ERROR={:.6f} MAE={:.6f} RMSE={:.6f} Actural_CTR={:.6f} Predicted_CTR={:.6f} COPC={:.6f} MEAN Q_VALUE={:.6f} Ins number={})r4   r   r1  Úformat)r   r&   r   r!  r"  r#  r$  r%  r&  r'  r-   r(  r0  r)  r*  r/  r+  r-  r,  rF   r   r   r   Úprint_global_metrics  sx   B
ÿ÷ööýzFleetUtil.print_global_metricsc                 C   s   t  |||¡S ©N)r   Úprogram_type_trans)r   Zprog_dirZprog_fnÚis_textr   r   r   r5  Ÿ  s   zFleetUtil.program_type_transc                 C   s   t  ||¡S r4  )r   Úload_program)r   Úmodel_filenamer6  r   r   r   r7  ¢  s   zFleetUtil.load_programc                 C   s"   |   ||¡}t | ¡ ||¡ dS )zdraw program from fileN)r7  r   Úgraphvizrö   )r   r8  r6  Ú
output_dirZoutput_filenamerá   r   r   r   Údraw_from_program_file¥  s   z FleetUtil.draw_from_program_filec                 C   s   t  | ¡ ||¡ dS )zdraw ProgramN)r   r9  rö   )r   rá   r:  Zoutput_namer   r   r   Údraw_from_program¬  s   zFleetUtil.draw_from_programc                 C   sp   |   |j|j¡}|   |j|j¡}|jr"tj |j¡}|  	|||j
¡ t ||¡}|r1t d¡ |S t d¡ |S )Nzcheck_programs succeed.zBcheck_programs failed. pruned program and train program not match!)r7  Ztrain_prog_pathZis_text_train_programZpruned_prog_pathZis_text_pruned_programZdrawrr   rJ   Údirnamer<  Zdraw_out_namer   Zcheck_pruned_program_varsr   r   )r   ÚconfigZ
train_progZpruned_progZ
pruned_dirÚresr   r   r   Úcheck_two_programs°  s&   ÿÿÿ
ýÿzFleetUtil.check_two_programsc              	   C   s<   t  d¡ t |j|j|j|j|j|j	|j
¡}t  d¡ |S )Nzstart check_vars_and_dump.zcheck_vars_and_dump succeed.)r   r   r   Zcheck_saved_vars_try_dumpZdump_model_dirZdump_program_filenameZis_text_dump_programZfeed_configZfetch_configZ
batch_sizeZsave_params_filename)r   r>  Úresultsr   r   r   Úcheck_vars_and_dumpÅ  s   
ù
	zFleetUtil.check_vars_and_dumpc                 C   s   |   ||¡}t ||¡ dS )ae  
        Parse program.proto into a more readable format.
        This function will generate three files:
        output_dir/vars_all.log,
        output_dir/vars_persistable.log,
        output_dir/ops.log.

        Args:
            prog_path(str): proto file path to be parsed.
            is_text(bool): proto file is human-readale format or not(binary).
            output_dir(str): output dir.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> program_path = "./program.pbtxt"
                >>> is_text = True
                >>> output_dir = "/tmp/"
                >>> fleet_util.parse_program_proto(program_path, is_text, output_dir)
        N)r7  r   Zparse_program)r   Z	prog_pathr6  r:  rá   r   r   r   Úparse_program_protoÓ  s   zFleetUtil.parse_program_proto)r
   )r   )rƒ   r„   )rƒ   rÄ   )r   )rƒ   T)rƒ   NT)rƒ   ),Ú__name__Ú
__module__Ú__qualname__Ú__doc__r   r   r   r    r   Zglobal_scopeZCPUPlacer(   r/   r*   rK   rN   rP   r‚   r±   rÃ   rÐ   rÒ   rÖ   rÙ   rÛ   rÜ   rÞ   rß   rã   rõ   rù   rý   rÿ   r  r  r1  r3  r5  r7  r;  r<  r@  rB  rC  r   r   r   r   r   +   s®    

û 
û/
üP


÷4
÷k
õ~
÷N
%"@
óg
óm
û6
û8
û6H
ö O
õ c                       s€   e Zd ZdZd‡ fdd„	Zdd„ Zdd„ Zd	d
„ Zdd„ Zdd„ Z		ddd„Z
i ddfdd„Z	ddd„Zi dfdd„Z‡  ZS )r   a\  
    GPUPSUtil provides some common functions for users' convenience.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> from paddle.incubate.distributed.fleet.fleet_util import GPUPSUtil
            >>> fleet_util = GPUPSUtil()
            >>> fleet_util.rank0_print("my log")
    Nc                    s   t ƒ  d¡ || _d S )Nr
   )Úsuperr   Ú_afs©r   Z	fs_client©Ú	__class__r   r   r   ü  s   
zGPUPSUtil.__init__c                 C   s   | j  ||||¡ dS )a*  
        init for fs config

        Args:
            fs_name(str): fs name
            fs_user(str): fs user
            fs_passwd(str): fs password
            fs_conf(str): fs and afs conf path

        Returns:
            None

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import GPUPSUtil
                >>> fleet_util = GPUPSUtil()
                >>> fleet_util.init(20190722, 88, 88, "./afs.conf")
        N)rI  Úinit)r   Zfs_nameZfs_userZ	fs_passwdZfs_confr   r   r   rM    s   zGPUPSUtil.initc                 C   s
   || _ dS )a:  
        set fs_client for fs config

        Args:
            fs_client(AFSClient): fs_client object

        Returns:
            None

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import GPUPSUtil
                >>> from paddle.distributed.fleet.utils.fs import AFSClient
                >>> hdfs_client = AFSClient()
                >>> fleet_util = GPUPSUtil()
                >>> fleet_util.set_fsclient(hdfs_client)
        N)rI  rJ  r   r   r   Úset_fsclient  s   
zGPUPSUtil.set_fsclientc           	      C   sÎ   |d }| j  |¡sddtt ¡ ƒgS | j  |d¡ d}tddƒ}| ¡ }W d  ƒ n1 s0w   Y  | ¡ }t 	| 
d¡d ¡}t|d	  
d
¡d ƒ}d
 |d	  
d
¡dd… ¡}t|d ƒ}|||gS )a€  
        get last saved base xbox info from xbox_base_done.txt

        Args:
            output_path(str): output path

        Returns:
            [last_save_day, last_path, xbox_base_key]
            last_save_day(int): day of saved model
            last_path(str): model path
            xbox_base_key(int): xbox key

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import GPUPSUtil
                >>> from paddle.distributed.fleet.utils.fs import AFSClient
                >>> hdfs_client = AFSClient()
                >>> fleet_util = GPUPSUtil()
                >>> fleet_util.set_fsclient(hdfs_client)
                >>> last_save_day, last_path, xbox_base_key = \
                ...     fleet_util.get_last_save_xbox_base("hdfs:/my/path")

        rú   r1   z./xbox_base_done.txtr)   rµ   ÚrNr‰   rZ   rX   r¸   rT   )rI  rœ   rm   rn   Údownloadrž   rt   ru   rw   r¾   rŽ   r½   )	r   ry   r§   r«   r°   rÀ   rÁ   rü   r|   r   r   r   rý   .  s   
ÿ
z!GPUPSUtil.get_last_save_xbox_basec           
      C   sü   |d }| j  |¡sdddtt ¡ ƒgS | j  |d¡ d}tddƒ}| ¡ }W d  ƒ n1 s1w   Y  | ¡ }t 	| 
d¡d ¡}t|d  
d	¡d
 ƒ}t|d  
d	¡d  
d¡d ƒ}d	 |d  
d	¡dd… ¡}t|d ƒ}	t d¡ ||||	gS )aÉ  
        get last saved xbox info from xbox_patch_done.txt

        Args:
            output_path(str): output path

        Returns:
            [last_save_day, last_save_pass, last_path, xbox_base_key]
            last_save_day(int): day of saved model
            last_save_pass(int): pass id of saved
            last_path(str): model path
            xbox_base_key(int): xbox key

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import GPUPSUtil
                >>> from paddle.distributed.fleet.utils.fs import AFSClient
                >>> hdfs_client = AFSClient()
                >>> fleet_util = GPUPSUtil()
                >>> fleet_util.set_fsclient(hdfs_client)
                >>> last_save_day, last_save_pass, last_path, xbox_base_key = \
                ...     fleet_util.get_last_save_xbox("hdfs:/my/path")

        rþ   r1   r)   r³   rO  Nr‰   rZ   rX   r¸   r¹   rº   rT   )rI  rœ   rm   rn   rP  rž   rt   ru   rw   r¾   rŽ   r½   rr   Úremove)
r   ry   r§   r«   r°   rÀ   rÁ   rÂ   rü   r|   r   r   r   rÿ   X  s    
ÿ 
zGPUPSUtil.get_last_save_xboxc           	      C   sÒ   d}d}d}|d }| j  |¡sdddtt ¡ ƒgS | j  |d¡ d}tddƒ}| ¡ }W d  ƒ n1 s7w   Y  | ¡  d¡d  d	¡}t|d
 ƒ}t|d ƒ}|d }t|d ƒ}t	 
d¡ ||||gS )aÄ  
        get last saved model info from donefile.txt

        Args:
            output_path(str): output path

        Returns:
            [last_save_day, last_save_pass, last_path, xbox_base_key]
            last_save_day(int): day of saved model
            last_save_pass(int): pass id of saved
            last_path(str): model path
            xbox_base_key(int): xbox key

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import GPUPSUtil
                >>> from paddle.distributed.fleet.utils.fs import AFSClient
                >>> hdfs_client = AFSClient()
                >>> fleet_util = GPUPSUtil()
                >>> fleet_util.set_fsclient(hdfs_client)
                >>> last_save_day, last_save_pass, last_path, xbox_base_key = \
                ...     fleet_util.get_last_save_model("hdfs:/my/path")

        r1   r)   r   z./donefile.txtr„   rO  Nr‰   rŒ   r   r”   r3   r   )rI  rœ   rm   rn   rP  rž   rt   ru   rŽ   rr   rQ  )	r   ry   r  r  rü   r§   r¨   r°   r|   r   r   r   r  „  s$   
ÿ
zGPUPSUtil.get_last_save_modelr„   c              	   C   sh  t |ƒ}t |ƒ}t|ƒ}|dkr!d|› d|› d}| d¡| }nd| }| d¡| }t ¡ dkr2|d | }d||||df }	| j |¡rý| j ||¡ d}
t|dƒ}| 	¡ }
W d  ƒ n1 sew   Y  |
 
¡  d	¡}d
d„ |D ƒ}dd„ |D ƒ}t |¡ d}tt|ƒƒD ]}t|ƒt|| ƒkr¦t|ƒt|| ƒkr¦d} nqŒ|sìt|dƒ}| |
 
¡ d	 ¡ | |	d	 ¡ W d  ƒ n1 sÉw   Y  | j |¡ | j ||¡ |  d|› d|› d|› d¡ dS |  d|› d|› d|› d¡ dS t|dƒ}| |	d	 ¡ W d  ƒ n	1 sw   Y  | j ||¡ |  d|› d|› d|› d¡ dS dS )a  
        write donefile when save model

        Args:
            output_path(str): output path
            day(str|int): training day
            pass_id(str|int): training pass id
            xbox_base_key(str|int): xbox base key
            donefile_name(str): donefile name, default is "donefile.txt"

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import GPUPSUtil
                >>> from paddle.distributed.fleet.utils.fs import AFSClient
                >>> hdfs_client = AFSClient()
                >>> fleet_util = GPUPSUtil()
                >>> fleet_util.set_fsclient(hdfs_client)
                >>> fleet_util.write_model_donefile(output_path="hdfs:/my/output",
                ...                                 day=20190723,
                ...                                 pass_id=66,
                ...                                 xbox_base_key=int(time.time()))

        r…   rX   r†   r   r‡   r)   rO  Nr‰   c                 S   rŠ   r‹   r   r   r   r   r   r‘   ì  r’   z2GPUPSUtil.write_model_donefile.<locals>.<listcomp>c                 S   rŠ   r“   r   r   r   r   r   r‘   í  r’   FTr•   r–   r—   r˜   r™   rš   r›   )rl   rm   rq   r   r   rI  rœ   rP  rž   rt   ru   rŽ   rr   rQ  r=   r<   rŸ   r    r¡   r    )r   ry   rz   r¢   r|   r¥   r¦   r{   r§   r¨   r«   r°   r¬   r­   r®   r¯   rG   r   r   r   r±   ±  sp   !û
ÿ
ÿ€þÿÿÿÿÕzGPUPSUtil.write_model_donefilerƒ   c              
   C   sØ  t |ƒ}t |ƒ}t|ƒ}d}|dkr+d}d|› d|› d}| d¡| }|
du r*d}
nd}d| }| d¡| }|
du r>d	}
t|tƒrHd
 |¡}t ¡ dkrj|d |
 }| j||||||i |d}| j	 
|¡r5|  d| ¡ | j	 ||
¡ d}t|
dƒ}| ¡ }W d  ƒ n1 sw   Y  t | ¡  d¡d ¡}|d  d¡d }|d  d¡d  d¡d }t |
¡ |  d|
 ¡ d}t|ƒt|ƒk sÛt|ƒt|ƒkrÝt|ƒt|ƒkrÝd}|s$t|
dƒ}| | ¡ d ¡ | |d ¡ W d  ƒ n	1 sw   Y  | j	 |¡ | j	 |
|¡ |  d|› d|› d|
› d¡ dS |  d|
› d|› d|› d¡ dS t|
dƒ}| |d ¡ W d  ƒ n	1 sMw   Y  | j	 |
|¡ |  d|› d|› d|
› d¡ dS dS ) aã  
        write delta donefile or xbox base donefile

        Args:
            output_path(str): output path
            day(str|int): training day of model
            pass_id(str|int): training pass id of model
            xbox_base_key(str|int): xbox base key
            data_path(str|list): training data path
            monitor_data(dict): metrics
            hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
            donefile_name(str): donefile name, default is None"

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import GPUPSUtil
                >>> from paddle.distributed.fleet.utils.fs import AFSClient
                >>> hdfs_client = AFSClient()
                >>> fleet_util = GPUPSUtil()
                >>> fleet_util.set_fsclient(hdfs_client)
                >>> fleet_util.write_xbox_donefile(
                ...     output_path="hdfs:/my/output/",
                ...     day=20190722,
                ...     pass_id=1,
                ...     xbox_base_key=int(time.time()),
                ...     data_path="hdfs:/my/data/",
                ...     monitor_data={})

        Nr…   rQ   rX   r²   r³   r   r´   rµ   r¶   r   r·   zexist %s succeedr)   rO  r‰   r1   rZ   r¸   r¹   rº   zremove %s succeedFTr•   r–   r—   r˜   r™   rš   r›   )rl   rm   rq   r»   r¼   r½   r   r   r‚   rI  rï   r   rP  rž   rt   rw   r¾   ru   rŽ   rr   rQ  rŸ   r    r¡   r    )r   ry   rz   r¢   r|   r}   r~   r£   re   r¤   r¥   r   r¦   r{   r§   r¿   r«   r°   rÀ   rÁ   rÂ   r¯   r   r   r   rÃ     sˆ   ,€

ø
ÿ
þÿÿÿÿÎzGPUPSUtil.write_xbox_donefilerÄ   c                 K   s
  t |ƒ}t |ƒ}t|ƒ}| dd¡}| dd¡}|dkr+d|||f }	| d¡|	 }
nd||f }	| d¡|	 }
t ¡ dkrƒ|
d | }| j |¡rS|  d	| ¡ dS d
||f }t	|dƒ}| 
|¡ W d  ƒ n1 snw   Y  | j ||¡ |  d| ¡ dS dS )at  
        write cache donefile

        Args:
            output_path(str): output path
            day(str|int): training day of model
            pass_id(str|int): training pass id of model
            key_num(str|int): save cache return value
            donefile_name(str): donefile name, default is "sparse_cache.meta"
            kwargs(dict): user defined properties
                          file_num(int): cache file num
                          table_id(int): cache table id

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import GPUPSUtil
                >>> from paddle.distributed.fleet.utils.fs import AFSClient
                >>> hdfs_client = AFSClient()
                >>> fleet_util = GPUPSUtil()
                >>> fleet_util.set_fsclient(hdfs_client)
                >>> fleet_util.write_cache_donefile(
                ...     output_path="hdfs:/my/output/",
                ...     day=20190722,
                ...     pass_id=1,
                ...     key_num=123456)

        rÅ   rÆ   rI   r   r…   rÇ   rX   rÈ   rÉ   rÊ   r•   NrË   )rl   rm   rÌ   rq   r   r   rI  rœ   r    rž   rŸ   r¡   )r   ry   rz   r¢   rÍ   r¥   rÎ   rÅ   rI   r¦   r{   r§   rÏ   r°   r   r   r   rÐ     s4   &ÿþÿñzGPUPSUtil.write_cache_donefilerQ   c	           
      C   s"  t  ¡ }	|dkrt|ƒ|	d< n!|dkrttt ¡ ƒƒ|	d< ntd| ƒ d}ttt ¡ ƒƒ|	d< t|ƒ|	d< | d¡s@| d¡rK|| d¡d	 d … }|| d
¡ d |	d< d|	d< d|	d< d|	d< d|	d< ||	d< t	j
 dd¡|	d< d|	d< | d
¡d | d |	d< tt ¡ ƒ|	d< t |	¡S )Nr   rR   rQ   rS   rT   rU   rV   rW   r   rX   rY   rZ   r[   r\   r]   r^   r_   r`   ra   rb   rc   ZPADDLE_JOB_IDr)   rd   re   rf   rg   rh   ri   )rj   rk   rl   rm   rn   r   ro   rp   rq   rr   ÚenvironrÌ   r   rv   rw   rx   )
r   ry   rz   r{   r|   r}   r~   re   r   r   r   r   r   r‚   Å  s.   ÿ
zGPUPSUtil._get_xbox_strr4  )r„   )rÄ   )rD  rE  rF  rG  r   rM  rN  rý   rÿ   r  r±   rÃ   rÐ   r‚   Ú__classcell__r   r   rK  r   r   ï  s&    *,3
úd
õ{
úL÷)rG  rj   rw   Úloggingr  rr   r  r   rn   Únumpyr#   rë   r   Zpaddle.base.log_helperr   Z!paddle.distributed.fleet.utils.fsr   r)   r   Ú__all__rD  ÚINFOr   r   r   r   r   r   r   r   Ú<module>   sF   ÿ             Q