Вперед: 5.2. Библиотека подпрограмм Aztec
Назад: 5.1.3. Использование библиотеки ScaLAPACK
К содержанию: Оглавление


5.1.4. Примеры использования пакета ScaLAPACK

В качестве первого примера использования пакета ScaLAPACK рассмотрим уже знакомую нам задачу перемножения матриц. Это позволит сопоставить получаемые результаты и производительность двух программ, созданных с использованием непосредственно среды параллельного программирования MPI (см. раздел 4.8.1) и с помощью пакета ScaLAPACK.

Пример 1. Перемножение матриц

Используется подпрограмма PDGEMM из PBLAS, которая выполняет матричную операцию C = αA*B + βC, где A, В и C - матрицы, α и β - константы. В нашем случае мы полагаем α = 1, β = 0.

Программа abcsl.f

program abcsl
   include 'mpif.h'
   integer N,NB,nxn
   REAL(KIND=8), DIMENSION(:), allocatable :: a,b,c,mem
   double precision time(6),ops,total,t1
   integer NIN
   PARAMETER ( NOUT = 6, NIN = 10 )
   DOUBLE PRECISION  ONE,ZERO
   PARAMETER  ( ONE = 1.0D+0, ZERO = 0.0D+0 )
   INTEGER  DESCA(9), DESCB(9), DESCC(9)

c Инициализация BLACS
   CALL BLACS_PINFO( IAM, NPROCS )
c Вычисление формата сетки процессоров,
c наиболее приближенного к квадратному
   NPROW = INT(SQRT(REAL(NPROCS)))
   NPCOL = NPROCS/NPROW
 2  IF (NPROW*NPCOL.NE. NPROCS) THEN
   NPROW = NPROW - 1
   NPCOL = NPROCS/NPROW
   GOTO 2
   END IF
c Считывание параметров решаемой задачи 
с ( N - размер матриц; NB - размер блоков ) 
   IF( IAM.EQ.0 ) THEN
     OPEN( NIN, FILE='abc.dat', STATUS='OLD' )
     READ( NIN, FMT = * ) N
     READ( NIN, FMT = * ) NB
     CLOSE( NIN )
     WRITE( NOUT, FMT = * )
     WRITE( NOUT, FMT = 9999 )
   $   'The following parameter values will be used:'
     WRITE( NOUT, FMT = 9998 ) 'N  ', N
     WRITE( NOUT, FMT = 9998 ) 'NB  ', NB
     WRITE( NOUT, FMT = 9998 ) 'P  ', NPROW
     WRITE( NOUT, FMT = 9998 ) 'Q  ', NPCOL
     WRITE( NOUT, FMT = * )
   END IF
c Рассылка считанной информации всем процессорам
   call MPI_BCAST(N,1,MPI_INTEGER,0,MPI_COMM_WORLD,ierr)
   call MPI_BCAST(NB,1,MPI_INTEGER,0,MPI_COMM_WORLD,ierr)   
   ops = (2.0d0*dfloat(n)-1)*dfloat(n)*dfloat(n)
c Инициализация сетки процессоров
   CALL BLACS_GET( -1, 0, ICTXT )
   CALL BLACS_GRIDINIT( ICTXT, 'Row-major', NPROW, NPCOL )
   CALL BLACS_GRIDINFO( ICTXT, NPROW, NPCOL, MYROW, MYCOL )

с Вычисление размеров локальных матриц на процессорах
   NP  = NUMROC( N, NB, MYROW, 0, NPROW )
   NQ  = NUMROC( N, NB, MYCOL, 0, NPCOL )
   NROW = N/NPROW + NB
   NCOL = N/NPCOL + NB
   nxn = NROW*NCOL
   NS = NROW
   
с  Аллокатирование массивов нужной размерности   
   allocate(A(nxn) )
   allocate(B(nxn) )
   allocate(C(nxn) )
   allocate(mem(NCOL) ) 

c Инициализация дескрипторов для 3-х матриц

   CALL DESCINIT( DESCA,N,N,NB,NB,0,0,ICTXT,MAX(1,NP ),INFO )
   CALL DESCINIT( DESCB,N,N,NB,NB,0,0,ICTXT,MAX(1,NP ),INFO )
   CALL DESCINIT( DESCC,N,N,NB,NB,0,0,ICTXT,MAX(1,NP ),INFO )

   lda = DESCA(9)
с Вызов процедуры генерации матриц A и B
   call pmatgen(a,DESCA,np,nq,b,DESCB,nprow,npcol,myrow,mycol)

   t1 = MPI_Wtime()

с Вызов процедуры перемножения матриц
    CALL PDGEMM('N','N',N,N,N,ONE,A,1,1,DESCA,B,1,1,DESCB,
   & ZERO,C,1,1,DESCC)
*
   time(2) = MPI_Wtime() - t1
с Печать контрольных элементов результирующей матрицы С
   if (IAM.EQ.0) write(*,*) 'Matrix C...'
   CALL PDLAPRNT( 1, 1, C, 1, 1, DESCC, 0, 0,'C', 6, MEM )
   CALL PDLAPRNT( 1, 1, C, 1, N, DESCC, 0, 0,'C', 6, MEM )
   CALL PDLAPRNT( 1, 1, C, N, 1, DESCC, 0, 0,'C', 6, MEM )
   CALL PDLAPRNT( 1, 1, C, N, N, DESCC, 0, 0,'C', 6, MEM ) 

     total = time(2)
     time(4) = ops/(1000000.0*total)
   if(IAM.EQ.0) then 
     write(6,80) lda
  80  format(' times for array with leading dimension of',i5)
     write(6,110) time(2), time(4)
 110  format(2x,'Time calculation: ',f12.4, ' sec.',
   & ' Mflops = ',f12.4)
     end if
   CALL BLACS_GRIDEXIT( ICTXT )
   CALL BLACS_EXIT(0)   

 9998 FORMAT( 2X, A5, '  :    ', I6 )
 9999 FORMAT(2X, 60A )
   stop
   end

subroutine pmatgen(a,DESCA,np,nq,b,DESCB,nprow,npcol,
     & myrow,mycol)
   
integer i,j,DESCA(*),DESCB(*),nprow,npcol,myrow,mycol
   double precision a(*),b(*)

   nb = DESCA(5)  
c Заполнение локальных частей матриц A и B,
с матрица A формируется по алгоритму A(I,J) = I, a
c матрица B(I,J) = 1./J

   k = 0
   do 250 j = 1,nq
    jc = (j-1)/nb
    jb = mod(j-1,nb)
     jg = mycol*nb + (jc)*npcol*nb + jb +1       
    do 240 i = 1,np
    ic = (i-1)/nb
    ib = mod(i-1,nb)
     ig = myrow*nb + (ic)*nprow*nb + ib +1 
      k = k + 1
     a(k) = dfloat(ig)
     b(k) = 1.D+0/dfloat(jg)
 240 continue
 250 continue   
   return
   end

Результаты работы.
Будут спользованы следующие значения параметров:
N: 36000
NB: 200
P: 3
Q: 3

Matrix C...
C(   1,   1)=   0.360000000000000000D+05
C(   1, 36000)=   0.100000000000000554D+01
C( 36000,   1)=   0.129600000000000000D+10
C( 36000, 36000)=   0.360000000000000000D+05
 times for array with leading dimension of 12000
 Time calculation:  1376.7066 sec. Mflops =  67778.2083 

Пример 2. Решение системы линейных уравнений с матрицей общего вида

В данном примере решается система линейных уравнений с матрицей общего вида, которая формируется с помощью генератора случайных чисел. Правая часть системы формируется таким образом, чтобы получить единичное решение. Для решения системы используются две вычислительные подпрограммы: PDGETRF (для факторизации матрицы) и PDGETRS (для нахождения решения). Общий шаблон вызовов функций мало отличается от предыдущего примера.

Программа pdlusl.f

program pdlusl
   include 'mpif.h'
   integer N,NB
   
   REAL(KIND=8), DIMENSION(:), allocatable :: a,b,x
   INTEGER, DIMENSION(:), allocatable :: ipvt
   double precision time(6),ops,total,norma,normx,t1
   double precision resid,residn,eps,tall,xsc
   INTEGER NPROW,NPCOL,NP,NQ,NROW,NCOL
   integer nxn
   integer NIN,init(4)
   PARAMETER ( NOUT = 6, NIN = 10 )
   DOUBLE PRECISION  ONE
   PARAMETER  ( ONE = 1.0D+0 )
   INTEGER   DESCA( 9 ), DESCB( 9 ), DESCX( 9 )

   NRHS = 1
c
   CALL BLACS_PINFO( IAM, NPROCS )
c  Выбор сетки процессоров
   NPROW = INT(SQRT(REAL(NPROCS)))
   NPCOL = NPROCS/NPROW
 1  IF (NPROW*NPCOL.NE. NPROCS) THEN
   NPROW = NPROW - 1
   NPCOL = NPROCS/NPROW
   GOTO 1
   END IF
 с  Считывание параметров задачи   
   IF( IAM.EQ.0 ) THEN
     OPEN( NIN, FILE='lusl.dat', STATUS='OLD' )
     READ( NIN, FMT = * ) N
     READ( NIN, FMT = * ) NB
     CLOSE( NIN )
     WRITE( NOUT, FMT = * )
     WRITE( NOUT, FMT = 9999 )
  &  'The following parameter values will be used:'
     WRITE( NOUT, FMT = 9998 ) 'N  ', N
     WRITE( NOUT, FMT = 9998 ) 'NB  ', NB
     WRITE( NOUT, FMT = 9998 ) 'P  ', NPROW
     WRITE( NOUT, FMT = 9998 ) 'Q  ', NPCOL
     WRITE( NOUT, FMT = * )
   END IF
   call MPI_BCAST(N,1,MPI_INTEGER,0,MPI_COMM_WORLD,ierr)
   call MPI_BCAST(NB,1,MPI_INTEGER,0,MPI_COMM_WORLD,ierr)
 
   ops = (2.0d0*dfloat(n)**3)/3.0d0 + 2.0d0*dfloat(n)**2
   c Формируем рабочую сетку процессоров
   CALL BLACS_GET( -1, 0, ICTXT )
   CALL BLACS_GRIDINIT( ICTXT, 'Row-major', NPROW, NPCOL )
   CALL BLACS_GRIDINFO( ICTXT, NPROW, NPCOL, MYROW, MYCOL )
c Определяем размерности локальных массивов  
   NP  = NUMROC( N, NB, MYROW, 0, NPROW )
   NQ  = NUMROC( N, NB, MYCOL, 0, NPCOL )
   NROW = N/NPROW + NB
   NCOL = N/NPCOL + NB
   nxn = NROW*NCOL
   NS = NROW
с Аллокатируем память под рабочие массивы   
   allocate(A(nxn) )
   allocate(b(NS+1) )
   allocate(x(NS+1) )
   allocate(ipvt(NS+1) )
с Формируем дескрипторы   
   CALL DESCINIT(DESCA,N,N,NB,NB,0,0,ICTXT,MAX(1,NP ),INFO )
   CALL DESCINIT(DESCB,N,NRHS,NB,NB,0,0,ICTXT,MAX(1,NP),INFO)
   CALL DESCINIT(DESCX,N,NRHS,NB,NB,0,0,ICTXT,MAX(1,NP),INFO)

   lda = DESCA(9)
   	 tall = 0.0
   с Заполняем матрицу A и вектор B
   call pmatgenl(a,DESCA,np,nq,b,DESCB,nprow,npcol,myrow,mycol,
	  & init)

    t1 = MPI_Wtime()
C  Выполняем факторизацию матрицы
    CALL PDGETRF(N,N,A,1,1,DESCA,ipvt, INFO )

    time(1) = MPI_Wtime() - t1
    	 t1 = MPI_Wtime()   
С  Решаем систему уравнений
   CALL PDGETRS('No',N,NRHS,A,1,1,DESCA,ipvt,B,1,1,DESCB,INFO)
с
    time(2) = MPI_Wtime() - t1
    total = time(1) + time(2)
c  Вычисление невязок решения и печать результатов
     k = 0
     norma = 0.0 
     do 10 i = 1,np
      x(i) = b(i)
     do 10 j = 1,nq
      k = k + 1
      norma = dmax1(dabs(a(k)), norma)
  10  continue

     do 20 i = 1,np
      b(i) = -b(i)
  20  continue
     resid = 0.0
     normx = 0.0
     do 30 i = 1,np
      resid = dmax1( resid, dabs(b(i)) )
      normx = dmax1( normx, dabs(x(i)) )
  30  continue
     eps = 2.22d-16
     residn = resid/( n*norma*normx*eps )
     if (iam.eq.0) then
     write(6,40)
  40  format(//' norm. resid  resid   machep',
   $     '     x(1)     x(n)')
     write(6,50) residn,resid,eps,x(1),x(np)
  50  format(1p5e16.8)
c
     write(6,60) n
  60 format(' times are reported for matrices of order ',i5)
     write(6,80) lda
  80 format(' times for array with leading dimension of ',i6)

     write(6,70)
  70  format(4x,'factor',5x,'solve',6x,'total',5x,'mflops',
   $     7x,'unit',6x,'ratio')
c
     time(3) = total
     time(4) = ops/(1000000.0*total)
     time(5) = 2.0d0/time(4)
     time(6) = total/cray
     tall = tall + total  
     write(6,110) (time(i),i=1,6)
 110  format(6(1pe11.3))
     write(6,*)' end of iteration: ', II
     end if
   CALL BLACS_GRIDEXIT( ICTXT )
   CALL BLACS_EXIT(0)
 9998 FORMAT( 2X, A5, '  :    ', I6 )
 9999 FORMAT(2X, 60A )
c
   stop
   end
   
   subroutine pmatgenl(a,DESCA,np,nq,b,DESCB,nprow,npcol,
   & myrow,mycol,init)
   integer init(4),i,j,DESCA(*),DESCB(*),
   & nprow,npcol,myrow,mycol
   double precision a(*),b(*),rand
c
   nb = DESCA(5)
   ICTXT = DESCA(2)
c
   init(1) = 1
   init(2) = myrow + 2
   init(3) = mycol + 3
   init(4) = 1325
c  Генерация матрицы A
   k = 0
   do 250 j = 1,nq
   do 240 i = 1,np
     k = k + 1
     a(k) = rand(init) - 0.5
 240 continue
 250 continue
    na = k
c  Вычисление вектора B
    do 350 i = 1,np
    k = i
     b(i) = 0
     do 351 j = 1,nq
     b(i) = b(i) + a(k)   
     k = k + np
 351 continue	
    CALL DGSUM2D( ICTXT, 'Row', ' ', 1, 1, b(i), 1, -1, 0)
 350 continue
   return
   end

Пример 3. Решение системы линейных алгебраических уравнений с ленточной матрицей

В данном примере решается система линейных алгебраических уравнений с симметричной положительно определенной ленточной матрицей. Используются подпрограммы PDPBTRF и PDPBTRS библиотеки ScaLAPACK. Матрица A формируется следующим образом:

10-41000 ...
-410-4100 ...
1-410-410 ...
01-410-41 ...
001-410-4 ...
0001-410 ...

с хранением верхнего треугольника (параметр UPLO ='U'), где звездочкой помечены неиспользуемые позиции

**a13a24a35a46 ... an-2,n
*a12a23a34a45a56 ... an-1,n
a11a22a33a44a55a66 ... an,n

Программа bandlu.f

program bandlu
   include 'mpif.h'

   REAL(KIND=8), DIMENSION(:), allocatable :: A, AF , b, x, bg
   INTEGER, DIMENSION(:), allocatable :: ipvt
   integer BW 
   INTEGER  DESCA(7), DESCB(7), DESCX(7)

   CALL BLACS_PINFO( IAM, NPROCS )
   NRHS = 1
   NPROW = 1
   NPCOL = NPROCS
С  Считывание пареметров задачи
   IF( IAM.EQ.0 ) THEN
     OPEN( NIN, FILE='band.dat', STATUS='OLD' )
     READ( NIN, FMT = * ) N
     READ( NIN, FMT = * ) BW
     CLOSE( NIN )
   END IF
   call MPI_BCAST(N,1,MPI_INTEGER,0,MPI_COMM_WORLD,ierr)
   call MPI_BCAST(BW,1,MPI_INTEGER,0,MPI_COMM_WORLD,ierr)
С  Определение размерностей массивов 
   NDD = mod(N, NPCOL)
   IF (NDD.EQ.0) THEN
   NB = N/NPCOL
   ELSE
   NB = N/NPCOL + 1
   END IF
   NB = MAX(NB,2*BW)
   NB = MIN(N,NB)
C  Резервирование памяти под массива   
   nxn = NB*3
   allocate(A(nxn) )
   allocate(AF(nxn) )
   allocate(b(N) )
   allocate(x(N) )
   allocate(bg(N) )
   allocate(ipvt(N) )
С  Печать параметров задачи
   IF (IAM.EQ.0) THEN
   WRITE( 6, * ) 'N  ', N
   WRITE( 6, * ) 'NRHS ', NRHS
   WRITE( 6, * ) 'BW  ', BW
   WRITE( 6, * ) 'P  ', NPROW
   WRITE( 6, * ) 'Q  ', NPCOL
   WRITE( 6, * ) 'NB  ', NB
   END IF
С Определение размеров рабочих массивов
   LWORK = 2*BW*BW
   LAF = (NB+2*BW)*BW
С Формирование сетки процессоров
   CALL BLACS_GET( -1, 0, ICTXT )
   CALL BLACS_GRIDINIT( ICTXT, 'Row-major', NPROW, NPCOL )
   CALL BLACS_GRIDINFO( ICTXT, NPROW, NPCOL,MYROW,MYCOL)

   NP  = NUMROC( (BW+1), (BW+1), MYROW, 0, NPROW )
   NQ  = NUMROC( N, NB, MYCOL, 0, NPCOL )
С Формирование дескрипторов массивов 
   DESCA(1) = 501
   DESCA(2) = ICTXT
   DESCA(3) = N
   DESCA(4) = NB
   DESCA(5) = 0
   DESCA(6) = BW+1
   DESCA(7) = 0
   DESCB(1) = 502
   DESCB(2) = ICTXT
   DESCB(3) = N
   DESCB(4) = NB
   DESCB(5) = 0
   DESCB(6) = NB
   DESCB(7) = 0
   lda = NB
C  Формирование левых и правых частей системы уравнений 
   call pmatgenb(a,DESCA,bw,b,DESCB,nprow,npcol,myrow,
   $   mycol,n,bg)
С  Вызов процедуры факторизации матрицы
   CALL PDPBTRF('U',N,BW,A,1,DESCA,AF,LAF,WORK,LWORK,INFO3)
С  Вызов процедуры решения системы уравнений
   CALL PDPBTRS('U',N,BW,NRHS,A,1,DESCA,B,1,DESCB,AF,LAF,
   $			   WORK, LWORK, INFO)
С  Печать контрольных элементов матрицы   
	 if (iam.eq.0) then
   write(6,40)
 40 format('x(1),...,x(4)')
   write(6,50)  (b(i),i=1,4)
 50 format(4F16.8)
   end if
   CALL BLACS_GRIDEXIT( ICTXT )
   CALL BLACS_EXIT (0)
 500 continue
   stop
   end
С Подпрограмма генерации (I,J) элемента матрицы
   double precision function matij(i,j)
   double precision rab
   rab = 0.0d0
   if (i.eq.j)			rab = 10.0d0
   if (i.eq.j+1.or.j.eq.i+1) rab = -4.0d0
   if (i.eq.j+2.or.j.eq.i+2) rab = 1.0d0
   matij = rab
   return
   end
С  Подпрограмма генерации матрицы А и вектора В
   subroutine pmatgenb(a,DESCA,bw, b,DESCB, nprow, npcol, 
   $				myrow, mycol,n,bg)
   integer i, j, DESCA(*), DESCB(*), nprow, npcol, 
   $ bw, bw1, myrow, mycol
   double precision a(bw+1,*), b(*), bg(*), matij
   nb = DESCA(4)
   ICTXT = DESCA(2)
   n = DESCA(3)
   BW1 = BW + 1
С Генерация вектора правой части таким образом, 
С чтобы решение равнялось X(I) = I 
   do 231 i = 1,n
   bg(i) = 0.0
   n1 = max(1,i-bw)
   n2 = min(n,i+bw)
   do 231 j = n1,n2
   bg(i) = bg(i) + matij(i,j)*j
 231 continue
С  Генерация локальных частей матрицы A
   jcs = MYCOL*NB
   NC = MIN(NB,N-jcs)
   do 250 j = 1,NC
   jc = jcs + j
   do 240 i = 1,BW1
   ic = jc - BW1 + i
   if (ic.ge.1) a(i,j) = matij(ic,jc)
 240 continue
 250 continue
   do 350 i = 1,NC
   b(i) = bg(jcs+i)
 351 continue
 350 continue
   return
   end


Вперед: 5.2. Библиотека подпрограмм Aztec
Назад: 5.1.3. Использование библиотеки ScaLAPACK
К содержанию: Оглавление