o
    "j|>                     @   s   d Z ddlZddlmZ ddlmZ ddlmZmZ ddl	m
Z
mZ ddlmZmZ ddlmZ dd	lmZmZmZ d
dlmZmZmZ ej  ZZejjjZG dd deZ dS )a  
Steps to transpile trainer:
1. split variable to multiple blocks, aligned by product(dim[1:]) (width).
2. create delta variable in global scope which used to send
3. add send op to send sparse ids to communicator

Steps to transpile pserver:
1. create new program for parameter server.
2. create params variables that assigned to current server instance.
3. create a sub-block in the server side program
4. append sum ops that should run on current server instance.
5. add listen_and_serv op
    N)	framework)find_distributed_lookup_table)VarsDistributedwait_server_ready)Programcore)PSDispatcher
RoundRobin)DistributedMode)	Parameterdefault_main_programdefault_startup_program   )DistributeTranspilerDistributeTranspilerConfigslice_variablec                   @   sV   e Zd ZdddZ						dddZd	d
 ZdddZdd Zdd Zdd Z	dS )GeoSgdTranspilerNc                 C   s^   |d ur|| _ nt | _ |   | j jd u rt| j _| j jdks"J | j jjd tks-J d S )Ni    r   )configr   Z_set_server_configsplit_methodr	   min_block_size	__bases__r   )selfr    r   q/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/transpiler/geo_sgd_transpiler.py__init__@   s   zGeoSgdTranspiler.__init__127.0.0.1:6174r   Fc                 C   s  |d u rt  }|d u rt }|| _|| _| j | _|| _d| _|| _|	d}|| _
t | _|  \| _| _| j| j
}	i | _i | _| jD ]\}
}|j| j|
j< |
j| j|j< qGt| j| _| jd u| _| jrl| jnd | j_d| j_| j
| j_|| j_| jdk| j_t | _t | _ g | _!g | _"g | _#| $  g }|	%  t&| j'( }|D ]\}}t)|D ]	\}}|*| qq|}|	%  |	+|}t)|D ].\}}| j,| d *||  | j-|| j}||_.| j || j }| j| d *| q| j| j_/g | _0g | _1i }| j2 j3D ]R}d|4 v r_|j5dkr#|6d	d t7|8d
|8dD ]0\}}|| j"v r]||v rF|| |krFq.|2 9|}| j0*| | j1*| |||< q.q|2 j:t;< d}|2 j=dd| j0id|id| j1id | j>||d| _?| j!D ]}| j?2 j:|j|j@|jA|j5|jBd q| j?2 j:t;< d}| j?2 j:dd}| j?2 j=dd|gid|id|jgid d S )NF,Tr   paramsepmap	is_sparseZlookup_tableZremote_prefetchZIdsW)namesendXOutZsend_varnamestypeinputsoutputsattrs)	recv_varseplistr!   persistabledtyper&   shape
param_init)Cr   r   origin_programstartup_programcloneZorigin_startup_programtrainer_num	sync_mode
trainer_idsplitpserver_endpointsr   vars_overviewZ_get_optimize_passZoptimize_opsparams_gradsr   r   Zparam_name_to_grad_nameZgrad_name_to_param_namer!   r   Z
table_nameZhas_distributed_lookup_tableZ_distributed_lookup_tableZ_is_distributedZ
_endpointsZ_ps_endpointZ	_is_chiefcollectionsOrderedDict	vars_infosplit_to_origin_mappingdelta_vars_listsparse_var_listsparse_var_splited_list_init_splited_varsresetlistparam_var_mappingitems	enumerateappenddispatchparam_opt_ep_mappingZget_distributed_var_by_sliceendpointZ_parameters_on_pserversZ
sparse_varZsparse_tablesglobal_blockopsZ	all_attrsr&   Z	_set_attrzipinputvar
create_varr   Zgenerate_control_dev_var_name	append_opZ_get_trainer_startup_programZtrainer_startup_programr-   r.   r/   )r   r6   programZpserversZtrainersr5   r2   Zcurrent_endpointr8   Zps_dispatcherZ	param_varZgrad_varZ	send_varsZparam_var_mapping_items_splited_varsrP   r*   r+   iepZdistributed_varorigin_nameZunique_sparse_varopZinput_var_nameZsparse_var_nameZ	input_varZdummy_output	delta_varr0   r   r   r   	transpileM   s   















zGeoSgdTranspiler.transpilec                 C   s   | j S N)r=   r   r   r   r   _get_vars_info   s   zGeoSgdTranspiler._get_vars_infoTc                 C   s   |rt | j | jS r\   )r   r8   r1   )r   Z	wait_portr   r   r   get_trainer_program   s   
z$GeoSgdTranspiler.get_trainer_programc                 C   s(   |  |}| j| _| j||d}||fS )N)pserver_program)get_pserver_programrJ   Zparam_grad_ep_mappingZget_startup_program)r   rK   Zpserver_progZpserver_startupr   r   r   get_pserver_programs   s   
z%GeoSgdTranspiler.get_pserver_programsc              
   C   sz  t  }| jj|_|| j g }| j| d D ]
}| | | qg }g }g }|jd }| j| d D ]^}	||}
|	|
 |	j
}|
j }|j| }d|j
 }|	j
| jv ritjjj}|	d||j
g n|j}|j|d||j|jd}|
jdd||gid	|id
 |	|d t|
j  q4||| jtj||| jj| jj| jj d	}| jdd|ii |d |!  || _"|S )Nr   r   z%s.delta:F)r!   r-   r&   r.   r/   sumr#   r$   )r&   r'   r(   )	Zoptimize_blocksrK   ZFaninZdistributed_modeZgrad_to_block_idsparse_grad_to_paramZrpc_get_thread_numZrpc_send_thread_numZrpc_prefetch_thread_numZlisten_and_servr%   )#r   r1   Zrandom_seedZ_copy_dist_param_info_fromrJ   Z
_clone_varrL   Z
num_blocksZ_create_blockrH   r!   rS   varsrA   r   VarDescVarTypeSELECTED_ROWSjoinr&   rQ   r.   r/   rR   stridxr4   r
   ZGEOZserver_configZ_rpc_get_thread_numZ_rpc_send_thread_numZ_rpc_prefetch_thread_numZ_sync_with_cppr`   )r   rK   r`   Zrecv_inputsvZoptimize_blockZparam_to_block_idre   Zpre_block_idxrP   Zper_opt_blockvar_nameZpserver_blockparamZdelta_var_name
delta_typerZ   r)   r   r   r   ra      sr   








z$GeoSgdTranspiler.get_pserver_programc              
      sx  g }g }t  } jD ]?\}}t|tkr|jdu rq
|j|vr*|| ||j |j|vr:|| ||j |jtj	j
jkrI j|j q
t|t j jj}  j| _t  _ fdd jD   j D ]\}} j |}	t  j|< g  j| d<  |}
dd |
D  j| d< g  j| d< g  j| d< | jv rtj	j
j} j| d d	 n|	j} j| d d
  j jd|dgd|	j||	j d} j!| |D ]P} "|\}}} j#j$|	||||dd | j%|j< | jv r j&|j  j| d |j t|dkr8 j jd|jdgd|j||j d qqqd S )NFc                    s    g | ]} j |d g iiqS )r   )rJ   update).0rW   r]   r   r   
<listcomp>S  s    z7GeoSgdTranspiler._init_splited_vars.<locals>.<listcomp>Z	var_namesc                 S   s   g | ]}t |qS r   )rk   )rr   rV   r   r   r   rs   d  s    sectionsr   r   TrueFalse.deltar,   Param)
origin_varZ	slice_varblock_idoffsetis_sliceZvtyper   )'setr:   r&   r   Z	trainabler!   rH   addr   rg   rh   ri   r@   r   lenr8   r   r   Z_create_vars_from_blocklistr1   rE   r;   r<   rJ   rF   rL   rP   r=   Z_get_splited_var_sectionsrQ   rj   r.   r/   r?   Z_get_slice_var_infor9   Zadd_distributed_varr>   rA   )r   Z
param_listZ	grad_listZparam_grad_setpgZparam_blocksrX   rU   rz   Zvars_sectionrp   rZ   Zsplited_varr}   r{   r|   r   r]   r   rB   2  s   











z#GeoSgdTranspiler._init_splited_varsr\   )Nr   r   FNr   )T)
__name__
__module____qualname__r   r[   r^   r_   rb   ra   rB   r   r   r   r   r   ?   s    

 
Jr   )!__doc__r;   Zpaddler   Z*paddle.distributed.distribute_lookup_tabler   Z%paddle.distributed.transpiler.detailsr   r   Zpaddle.frameworkr   r   ZCpaddle.incubate.distributed.fleet.parameter_server.ir.ps_dispatcherr   r	   Z7paddle.incubate.distributed.fleet.parameter_server.moder
   Zpaddle.staticr   r   r   Zdistribute_transpilerr   r   r   Zop_proto_and_checker_makerZkOpRoleAttrNameZRPC_OP_ROLE_ATTR_NAMEZop_role_attr_nameZOpRoleZRPCZRPC_OP_ROLE_ATTR_VALUEr   r   r   r   r   <module>   s   
