Newer
Older

sparonuz
committed
SUBROUTINE lbc_lnk_neicoll_/**/PRECISION( cdname, ptab, cd_nat, psgn, kfld, kfillmode, pfillval, khls, lsend, lrecv, ld4only )
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
CHARACTER(len=*) , INTENT(in ) :: cdname ! name of the calling subroutine
TYPE(PTR_4d_/**/PRECISION), DIMENSION(:), INTENT(inout) :: ptab ! pointer of arrays on which apply the b.c.
CHARACTER(len=1), DIMENSION(:), INTENT(in ) :: cd_nat ! nature of array grid-points
REAL(PRECISION), DIMENSION(:), INTENT(in ) :: psgn ! sign used across the north fold boundary
INTEGER , INTENT(in ) :: kfld ! number of pt3d arrays
INTEGER , OPTIONAL, INTENT(in ) :: kfillmode ! filling method for halo over land (default = constant)
REAL(PRECISION), OPTIONAL, INTENT(in ) :: pfillval ! background value (used at closed boundaries)
INTEGER , OPTIONAL, INTENT(in ) :: khls ! halo size, default = nn_hls
LOGICAL, DIMENSION(8),OPTIONAL, INTENT(in ) :: lsend, lrecv ! communication with other 4 proc
LOGICAL, OPTIONAL, INTENT(in ) :: ld4only ! if .T., do only 4-neighbour comm (ignore corners)
!
INTEGER :: ji, jj, jk , jl, jf, jn ! dummy loop indices
INTEGER :: ipi, ipj, ipk, ipl, ipf ! dimension of the input array
INTEGER :: ip0i, ip1i, im0i, im1i
INTEGER :: ip0j, ip1j, im0j, im1j
INTEGER :: ishti, ishtj, ishti2, ishtj2
INTEGER :: iszS, iszR
INTEGER :: ierr
INTEGER :: ihls, idx
INTEGER :: impi_nc
INTEGER :: ifill_nfd
INTEGER, DIMENSION(4) :: iwewe, issnn
INTEGER, DIMENSION(8) :: isizei, ishtSi, ishtRi, ishtPi
INTEGER, DIMENSION(8) :: isizej, ishtSj, ishtRj, ishtPj
INTEGER, DIMENSION(8) :: ifill, iszall
INTEGER, DIMENSION(8) :: jnf
INTEGER, DIMENSION(:), ALLOCATABLE :: iScnt, iRcnt ! number of elements to be sent/received
INTEGER, DIMENSION(:), ALLOCATABLE :: iSdpl, iRdpl ! displacement in halos arrays
LOGICAL, DIMENSION(8) :: llsend, llrecv
REAL(PRECISION) :: zland
LOGICAL :: ll4only ! default: 8 neighbourgs
!!----------------------------------------------------------------------
!
! ----------------------------------------- !
! 1. local variables initialization !
! ----------------------------------------- !
!
ipi = SIZE(ptab(1)%pt4d,1)
ipj = SIZE(ptab(1)%pt4d,2)
ipk = SIZE(ptab(1)%pt4d,3)
ipl = SIZE(ptab(1)%pt4d,4)
ipf = kfld
!
IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ipk, ipl, ipf, ld_lbc = .TRUE. )
!
! take care of optional parameters
!
ihls = nn_hls ! default definition
IF( PRESENT( khls ) ) ihls = khls
IF( ihls > n_hlsmax ) THEN
WRITE(ctmp1,*) TRIM(cdname), ' is calling lbc_lnk with khls > n_hlsmax : ', khls, '>', n_hlsmax
CALL ctl_stop( 'STOP', ctmp1 )
ENDIF
IF( ipi /= Ni_0+2*ihls ) THEN
WRITE(ctmp1,*) TRIM(cdname), ' is calling lbc_lnk with an input array which does not match ihls along i: ', ipi, ihls, Ni_0
CALL ctl_stop( 'STOP', ctmp1 )
ENDIF
IF( ipj /= Nj_0+2*ihls ) THEN
WRITE(ctmp1,*) TRIM(cdname), ' is calling lbc_lnk with an input array which does not match ihls along j:', ipj, ihls , Nj_0
CALL ctl_stop( 'STOP', ctmp1 )
ENDIF
!
ll4only = .FALSE. ! default definition
IF( PRESENT(ld4only) ) ll4only = ld4only
!
impi_nc = mpi_nc_com8(ihls) ! default
IF( ll4only ) impi_nc = mpi_nc_com4(ihls)
!
zland = 0._wp ! land filling value: zero by default
IF( PRESENT( pfillval ) ) zland = pfillval ! set land value
!
! define llsend and llrecv: logicals which say if mpi-neibourgs for send or receive exist or not.
IF ( PRESENT(lsend) .AND. PRESENT(lrecv) ) THEN ! localy defined neighbourgs
CALL ctl_stop( 'STOP', 'mpp_nc_generic+lsend and lrecv not yet implemented')
ELSE IF( PRESENT(lsend) .OR. PRESENT(lrecv) ) THEN
WRITE(ctmp1,*) TRIM(cdname), ' is calling lbc_lnk with only one of the two arguments lsend or lrecv'
CALL ctl_stop( 'STOP', ctmp1 )
ELSE ! default neighbours
llsend(:) = mpiSnei(ihls,:) >= 0
IF( ll4only ) llsend(5:8) = .FALSE. ! exclude corners
llrecv(:) = mpiRnei(ihls,:) >= 0
IF( ll4only ) llrecv(5:8) = .FALSE. ! exclude corners
ENDIF
!
! define ifill: which method should be used to fill each parts (sides+corners) of the halos
! default definition
DO jn = 1, 8
IF( llrecv(jn) ) THEN ; ifill(jn) = jpfillmpi ! with an mpi communication
ELSEIF( l_SelfPerio(jn) ) THEN ; ifill(jn) = jpfillperio ! with self-periodicity
ELSEIF( PRESENT(kfillmode) ) THEN ; ifill(jn) = kfillmode ! localy defined
ELSE ; ifill(jn) = jpfillcst ! constant value (zland)
ENDIF
END DO
! take care of "indirect self-periodicity" for the corners
DO jn = 5, 8
IF(.NOT.l_SelfPerio(jn) .AND. l_SelfPerio(jpwe)) ifill(jn) = jpfillnothing ! no bi-perio but ew-perio: do corners later
IF(.NOT.l_SelfPerio(jn) .AND. l_SelfPerio(jpso)) ifill(jn) = jpfillnothing ! no bi-perio but ns-perio: do corners later
END DO
! north fold treatment
IF( l_IdoNFold ) THEN
ifill_nfd = ifill(jpno) ! if we are here, this means llrecv(jpno) = .false. and l_SelfPerio(jpno) = .false.
ifill( (/jpno/) ) = jpfillnothing ! we do north fold -> do nothing for northern halo
ENDIF
! We first define the localization and size of the parts of the array that will be sent (s), received (r)
! or used for periodocity (p). The localization is defined as "the bottom left corner - 1" in i and j directions.
! This is a shift that will be applied later in the do loops to pick-up the appropriate part of the array
!
! all definitions bellow do not refer to N[ij][se]0 so we can use it with any local value of ihls
! ! ________________________
ip0i = 0 ! im0j = inner |__|________________|__|
ip1i = ihls ! im1j = inner - halo | |__|__________|__| |
im1i = ipi-2*ihls ! | | | | | |
im0i = ipi - ihls ! | | | | | |
ip0j = 0 ! | | | | | |
ip1j = ihls ! | |__|__________|__| |
im1j = ipj-2*ihls ! ip1j = halo |__|__|__________|__|__|
im0j = ipj - ihls ! ip0j = 0 |__|________________|__|
! ! ip0i ip1i im1i im0i
!
iwewe(:) = (/ jpwe,jpea,jpwe,jpea /) ; issnn(:) = (/ jpso,jpso,jpno,jpno /)
! sides: west east south north ; corners: so-we, so-ea, no-we, no-ea
isizei(1:4) = (/ ihls, ihls, Ni_0, Ni_0 /) ; isizei(5:8) = ihls ! i- count
isizej(1:4) = (/ Nj_0, Nj_0, ihls, ihls /) ; isizej(5:8) = ihls ! j- count
ishtSi(1:4) = (/ ip1i, im1i, ip1i, ip1i /) ; ishtSi(5:8) = ishtSi( iwewe ) ! i- shift send data
ishtSj(1:4) = (/ ip1j, ip1j, ip1j, im1j /) ; ishtSj(5:8) = ishtSj( issnn ) ! j- shift send data
ishtRi(1:4) = (/ ip0i, im0i, ip1i, ip1i /) ; ishtRi(5:8) = ishtRi( iwewe ) ! i- shift received data location
ishtRj(1:4) = (/ ip1j, ip1j, ip0j, im0j /) ; ishtRj(5:8) = ishtRj( issnn ) ! j- shift received data location
ishtPi(1:4) = (/ im1i, ip1i, ip1i, ip1i /) ; ishtPi(5:8) = ishtPi( iwewe ) ! i- shift data used for periodicity
ishtPj(1:4) = (/ ip1j, ip1j, im1j, ip1j /) ; ishtPj(5:8) = ishtPj( issnn ) ! j- shift data used for periodicity
!
! -------------------------------- !
! 2. Prepare MPI exchanges !
! -------------------------------- !
!
! Allocate local temporary arrays to be sent/received.
iszS = COUNT( llsend )
iszR = COUNT( llrecv )
ALLOCATE( iScnt(iszS), iRcnt(iszR), iSdpl(iszS), iRdpl(iszR) ) ! ok if iszS = 0 or iszR = 0
iszall(:) = isizei(:) * isizej(:) * ipk * ipl * ipf
iScnt(:) = PACK( iszall, mask = llsend ) ! ok if mask = .false.
iRcnt(:) = PACK( iszall, mask = llrecv )
IF( iszS > 0 ) iSdpl(1) = 0
DO jn = 2,iszS
iSdpl(jn) = iSdpl(jn-1) + iScnt(jn-1) ! with _alltoallv: in units of sendtype
END DO
IF( iszR > 0 ) iRdpl(1) = 0
DO jn = 2,iszR
iRdpl(jn) = iRdpl(jn-1) + iRcnt(jn-1) ! with _alltoallv: in units of sendtype
END DO
! Allocate buffer arrays to be sent/received if needed
iszS = SUM(iszall, mask = llsend) ! send buffer size
IF( ALLOCATED(BUFFSND) ) THEN
IF( SIZE(BUFFSND) < iszS ) DEALLOCATE(BUFFSND) ! send buffer is too small
ENDIF
IF( .NOT. ALLOCATED(BUFFSND) ) ALLOCATE( BUFFSND(iszS) )
iszR = SUM(iszall, mask = llrecv) ! recv buffer size
IF( ALLOCATED(BUFFRCV) ) THEN
IF( SIZE(BUFFRCV) < iszR ) DEALLOCATE(BUFFRCV) ! recv buffer is too small
ENDIF
IF( .NOT. ALLOCATED(BUFFRCV) ) ALLOCATE( BUFFRCV(iszR) )
! fill sending buffer with ptab(jf)%pt4d
idx = 1
DO jn = 1, 8
IF( llsend(jn) ) THEN
ishti = ishtSi(jn)
ishtj = ishtSj(jn)
DO jf = 1, ipf ; DO jl = 1, ipl ; DO jk = 1, ipk ; DO jj = 1,isizej(jn) ; DO ji = 1,isizei(jn)
BUFFSND(idx) = ptab(jf)%pt4d(ishti+ji,ishtj+jj,jk,jl)
idx = idx + 1
END DO ; END DO ; END DO ; END DO ; END DO
ENDIF
END DO
!
! ------------------------------------------------ !
! 3. Do all MPI exchanges in 1 unique call !
! ------------------------------------------------ !
!
IF( ln_timing ) CALL tic_tac(.TRUE.)
CALL mpi_neighbor_alltoallv (BUFFSND, iScnt, iSdpl, MPI_TYPE, BUFFRCV, iRcnt, iRdpl, MPI_TYPE, impi_nc, ierr)
IF( ln_timing ) CALL tic_tac(.FALSE.)
!
! ------------------------- !
! 4. Fill all halos !
! ------------------------- !
!
idx = 1
! MPI3 bug fix when domain decomposition has 2 columns/rows
IF (jpni .eq. 2) THEN
IF (jpnj .eq. 2) THEN
jnf(1:8) = (/ 2, 1, 4, 3, 8, 7, 6, 5 /)
ELSE
jnf(1:8) = (/ 2, 1, 3, 4, 6, 5, 8, 7 /)
ENDIF
ELSE
IF (jpnj .eq. 2) THEN
jnf(1:8) = (/ 1, 2, 4, 3, 7, 8, 5, 6 /)
ELSE
jnf(1:8) = (/ 1, 2, 3, 4, 5, 6, 7, 8 /)
ENDIF
ENDIF
DO jn = 1, 8
ishti = ishtRi(jnf(jn))
ishtj = ishtRj(jnf(jn))
SELECT CASE ( ifill(jnf(jn)) )
CASE ( jpfillnothing ) ! no filling
CASE ( jpfillmpi ) ! fill with data received by MPI
DO jf = 1, ipf ; DO jl = 1, ipl ; DO jk = 1, ipk ; DO jj = 1,isizej(jnf(jn)) ; DO ji = 1,isizei(jnf(jn))
ptab(jf)%pt4d(ishti+ji,ishtj+jj,jk,jl) = BUFFRCV(idx)
idx = idx + 1
END DO ; END DO ; END DO ; END DO ; END DO
CASE ( jpfillperio ) ! use periodicity
ishti2 = ishtPi(jnf(jn))
ishtj2 = ishtPj(jnf(jn))
DO jf = 1, ipf ; DO jl = 1, ipl ; DO jk = 1, ipk ; DO jj = 1,isizej(jnf(jn)) ; DO ji = 1,isizei(jnf(jn))
ptab(jf)%pt4d(ishti+ji,ishtj+jj,jk,jl) = ptab(jf)%pt4d(ishti2+ji,ishtj2+jj,jk,jl)
END DO ; END DO ; END DO ; END DO ; END DO
CASE ( jpfillcopy ) ! filling with inner domain values
ishti2 = ishtSi(jnf(jn))
ishtj2 = ishtSj(jnf(jn))
DO jf = 1, ipf ; DO jl = 1, ipl ; DO jk = 1, ipk ; DO jj = 1,isizej(jnf(jn)) ; DO ji = 1,isizei(jnf(jn))
ptab(jf)%pt4d(ishti+ji,ishtj+jj,jk,jl) = ptab(jf)%pt4d(ishti2+ji,ishtj2+jj,jk,jl)
END DO ; END DO ; END DO ; END DO ; END DO
CASE ( jpfillcst ) ! filling with constant value
DO jf = 1, ipf ; DO jl = 1, ipl ; DO jk = 1, ipk ; DO jj = 1,isizej(jnf(jn)) ; DO ji = 1,isizei(jnf(jn))
ptab(jf)%pt4d(ishti+ji,ishtj+jj,jk,jl) = zland
END DO ; END DO ; END DO ; END DO ; END DO
END SELECT
END DO
DEALLOCATE( iScnt, iRcnt, iSdpl, iRdpl )
IF( iszS > jpi*jpj ) DEALLOCATE(BUFFSND) ! blocking Send -> can directly deallocate
IF( iszR > jpi*jpj ) DEALLOCATE(BUFFRCV) ! blocking Recv -> can directly deallocate
! potential "indirect self-periodicity" for the corners
DO jn = 5, 8
IF( .NOT. l_SelfPerio(jn) .AND. l_SelfPerio(jpwe) ) THEN ! no bi-perio but ew-perio: corners indirect definition
ishti = ishtRi(jn)
ishtj = ishtRj(jn)
ishti2 = ishtPi(jn) ! use i- shift periodicity
ishtj2 = ishtRj(jn) ! use j- shift recv location: use ew-perio -> ok as filling of the south and north halos now done
DO jf = 1, ipf ; DO jl = 1, ipl ; DO jk = 1, ipk ; DO jj = 1,isizej(jn) ; DO ji = 1,isizei(jn)
ptab(jf)%pt4d(ishti+ji,ishtj+jj,jk,jl) = ptab(jf)%pt4d(ishti2+ji,ishtj2+jj,jk,jl)
END DO ; END DO ; END DO ; END DO ; END DO
ENDIF
IF( .NOT. l_SelfPerio(jn) .AND. l_SelfPerio(jpso) ) THEN ! no bi-perio but ns-perio: corners indirect definition
ishti = ishtRi(jn)
ishtj = ishtRj(jn)
ishti2 = ishtRi(jn) ! use i- shift recv location: use ns-perio -> ok as filling of the west and east halos now done
ishtj2 = ishtPj(jn) ! use j- shift periodicity
DO jf = 1, ipf ; DO jl = 1, ipl ; DO jk = 1, ipk ; DO jj = 1,isizej(jn) ; DO ji = 1,isizei(jn)
ptab(jf)%pt4d(ishti+ji,ishtj+jj,jk,jl) = ptab(jf)%pt4d(ishti2+ji,ishtj2+jj,jk,jl)
END DO ; END DO ; END DO ; END DO ; END DO
ENDIF
END DO
!
! ------------------------------- !
! 5. north fold treatment !
! ------------------------------- !
!
IF( l_IdoNFold ) THEN
IF( jpni == 1 ) THEN ; CALL lbc_nfd( ptab, cd_nat, psgn , ihls, ipf ) ! self NFold
ELSE ; CALL mpp_nfd( ptab, cd_nat, psgn, ifill_nfd, zland, ihls, ipf ) ! mpi NFold
ENDIF
ENDIF
!
END SUBROUTINE lbc_lnk_neicoll_/**/PRECISION