Tutorium 9

Gruppen 2 und 4
Samuel Teuber
propa@teuber.dev
https://teuber.dev/propa

Übungsblätter

  • Fragen?
  • Probleme?
  • Zu schwierig?

Übungsblatt 8

Aufgabe 1: Grundlagen der Parallelisierung

Es sollen Kanten in einem Bild (angegeben durch eine Matrix bestehend aus Pixeln) detektiertwerden. Hierfür können Faltungsmatrizen zur Kantendetektion angewandt werden, welche für jeden Pixel des Bildes einen Farbgradienten berechnen. Eine Faltungsmatrix beschreibt hierbeifür einen Pixel, welche der umliegenden Pixel für die Berechnung des Gradienten verwendet werden und den gewichteten Einfluss jedes umliegenden Pixels auf den Gradienten.

Parallelisierungsmethodologie?

Datenparallelismus!

Aufgabe 1: Grundlagen der Parallelisierung

Es soll ein Integral numerisch berechnet werden.

Parallelisierungsmethodologie?

Datenparallelismus!

Aufgabe 1: Grundlagen der Parallelisierung

Es soll einen Webserver programmiert werden, der mehrere Anfragen bearbeiten kann.

Parallelisierungsmethodologie?

Taskparallelismus!

Aufgabe 2: Zeiger-Arithmetik

#include<stdio.h>
int global[] = {1, 2, 3, 4, 5};
int *magic(int x[],int y) {
    printf("m");
    global[1] = *(global + y) + 3;
    return&x[y - 2];
}

int main() {
    printf("%i", *magic(&global[1], *(global + 1)));
    return 0;
}

Ausgabe?

m6

Aufgabe 3: MPI Send/Receive

int main(int argc, char*argv[]) {
    MPI_Init(&argc, &argv);
    int my_rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
    if (my_rank < 2) {
        int other_rank = 1 - my_rank;
        int tag        = 0;
        char message[14];
        sprintf(message, "Hello, I am %d", my_rank);
        MPI_Status status;

        MPI_Send(message, strlen(message) + 1, MPI_CHAR, other_rank, tag,MPI_COMM_WORLD);
        MPI_Recv(message, 100, MPI_CHAR, other_rank, tag, MPI_COMM_WORLD, &status);
        printf("%s\n", message);
    }
    MPI_Finalize();
    return EXIT_SUCCESS;
}
  • Funktionierte beim Test
  • Funktioniert nicht bei Kunden Problem??

Kommunikationsmodus (Buffered vs. Synchronized/Ready)

Lösungsstrategien?

  • Unterschiedliches Verhalten je nach Prozess
  • Erzwinge gepufferten Modus: MPI_Bsend
    Wichtig: Genügend Systempuffer vorhanden, Nutzung erzwingen mit MPI_Buffer_attach
  • Nicht blockierende Kommunikation nutzen (MPI_Isend)
    Wichtig: Weiterer Puffer für Receive notwendig
  • Kombinierte Sende- und Empfangsoperation: MPI_Sendrecv oder MPI_Sendrecv_replace

Aufgabe 4: Send/Receive zu kollektiven Operationen

grafik.png

Funktion: Erhöht alle Werte von elements um Differenz zwischen null und kleinstem Wert in Array, falls kleinster Wert kleiner null

grafik.png

MPI_Bcast(&offset, 1, MPI_INT, rootRank, MPI_COMM_WORLD);

grafik.png

int localOffet = 0;
for (int i = 0; i < elementsPerProcess; i++) {
    if (local[i] < localOffset) localOffset = local[i];
}
MPI_Reduce(&localOffset, &offset, 1, MPI_INT, MPI_MIN, rootRank, MPI_COMM_WORLD);

Aufgabe 5: MPI Reduce

|void my_int_sum_reduce(int *sendbuf, int *recvbuf, int count, int root, MPI_Comm comm) {
|    int size, rank;
|    MPI_Comm_size(comm,&size);
|    MPI_Comm_rank(comm,&rank);
|    if (rank == root) {
|        for (int i = 0; i<count; i++) {
|            recvbuf[i] = sendbuf[i]
|        }
|        for (int i = 0; i < size; i++) {
|            if (i == root) continue;
|            
|            int other[count];
|            MPI_Recv(other, count, MPI_INT, i, 0, comm, MPI_STATUS_IGNORE);
|            for (int j = 0; j < count; j++)
|                recvbuf[j] += other[j];
|        }
|    } else {
|        MPI_Send(sendbuf, count, MPI_INT, root, 0, comm);
|    }
|}

Aufgabe 6: MPI Matrix Multiplikation

grafik.png

Java und Multithreading

Java: Kennen wir schon

class Main {
    public static void main(String[] args) {
        System.out.println("Hello World!");
    }
}

Lambdas in Java

Neues Feature seit Java 8

Statt...

public void add(int i, int j) {
    return i + j;
}

schreiben wir...

(int i, j) -> i + j

Grundlage: Functional Interfaces und Method References

Functional Interfaces: Interface mit einzelner Funktion

@FunctionalInterface
interface Predicate {
    boolean check(Integer value);
}
//...
public int sum(List<Integer> values, Predicate predicate) {
    //...
}

Method References: Methodenreferenz als Argument für Funktionsinterface

public class SimpleCheckers {
    public static boolean checkSomething(Integer value) { /*...*/ }
}
sum(values, SimpleCheckers::checkSomething)
sum(values, (Integer v) -> v>0);

Threads in Java

  • Innerhalb von einem Prozess
  • Geteilter Heap
  • Jeder Thread hat eigenen Stack
  • Code-Ausführung zunächst unabhängig

Beispiel: Java1

Thread Scheduling in Java

  • Scheduling kennen wir schon aus anderen Vorlesungen (z.B. OS)
  • Wird auch hier verwendet: Verteilung von Threads auf verfügbare Kerne
  • Priorität kann mit (new Thread())->setPriority(int priority) beeinflusst werden

Race Conditions: Counters

  • Wenn die Reihenfolge, in welcher Threads ihre Instruktionen ausführen, das Ergebnis beeinflussen
  • Notwendige Bedingung bei geteiltem Speicher: Lese- und Schreibzugriff

Beispiel: Java2

Atomic Statements in Java

Executed complete or not at all

  • Lesen/Schreiben von Variablen
  • Lesen/Schreiben primitiver Variablen
    • Immer für 32 Bit Werte
    • Je nach System evtl. nicht für 64 Bit Werte (z.B. long oder double)
  • Lesen/Schreiben auf allen volatile Variablen

NICHT: a++

Möglicher Lösungsansatz: Mutual Exclusion

Critical Section darf nur von einzigem Thread betreten werden

Performance vs Korrektheit: Amdahl's Law

Java: Monitore

public void doSomething() {
    synchronized(someObject) {
        // critical section for someObject
    }
}
public synchronized void doSomething() {
    // critical section for doSomething
}

Übungsaufgabe

  • Wie beheben wir die Race Condition im Counter Beispiel?

Java2

  • Wie fügen wir eine decrement Methode (ohne Race Condition) hinzu?

Deadlocks: Coffman Bedingungen

  • Mutual Exclusion
  • Hold and Wait
  • No preemption
  • Circular wait

Philosophenproblem:

  • 5 Philosophen sitzen an Tisch, 5 Teller Spaghetti, 5 Gabeln
  • Jeder Philosoph braucht 2 Gabeln um zu essen (jeweils links und rechts von ihm)
  • Wenn verfügbar: Philosoph nimmt sich Gabel rechts
  • Wenn verfügbar: Philosoph nimmt sich Gabel links
  • Wenn nicht verfügbar: Wartet (und behält Besteck in der Hand)

Java4

Livelock

  • Kein Deadlock
  • Threads arbeiten, aber...
  • ...kommen nicht voran

Guarded Block

while(!condition) {}

busy waiting!

Besser:

  • while(!condition) {wait();}
  • notify()
  • notifyAll()

Warum trotzdem while?

Übungsaufgabe: Consumer/Producer

Java3

class Producer extends Thread {
    private Queue queue;
    public Producer(Queue queueParam) {
        this.queue = queueParam;
    }
    public void run() {
        while(true) {
            queue.produce(new Task(ThreadLocalRandom.current().nextInt(0, 100), ThreadLocalRandom.current().nextInt(0, 100)));
        }
    }
}
class Consumer extends Thread {
    private Queue queue;
    public Consumer(Queue queueParam) {
        this.queue = queueParam;
    }
    public void run() {
        while (true) {
            Task t = this.queue.consume();
            System.out.println(t.add());
        }
    }
}

Lösung:

class Task{
    int a;
    int b;

    public Task(int aParam, int bParam) {
        this.a = aParam;
        this.b = bParam;
    }

    public int add() {
        return this.a + this.b;
    }
}

class Queue{
    public Task[] tasks = new Task[3];
    public volatile int nextPut = 0;
    public volatile int nextTake = 0;
    public volatile int taskNum = 0;
    public synchronized void produce(Task newTask) {
        while (taskNum == tasks.length) {
            try {
                wait();
            } catch (InterruptedException e) {
                //..
            }
        }
        assert nextTake <= nextPut && Math.abs(nextPut-nextTake)==taskNum;
        tasks[nextPut] = newTask;
        nextPut=(nextPut+1)%tasks.length;
        taskNum++;
        notifyAll();
        assert nextTake <= nextPut && Math.abs(nextPut-nextTake)==taskNum;
    }

    public synchronized Task consume(){
        while (taskNum == 0) {
            try {
                wait();
            } catch (InterruptedException e) {
                //..
            }
        }
        assert nextTake <= nextPut && Math.abs(nextPut-nextTake)==taskNum;
        Task t = tasks[nextTake];
        tasks[nextTake] = null;
        nextTake = (nextTake+1) % tasks.length;
        taskNum--;
        notifyAll();
        assert nextTake <= nextPut && Math.abs(nextPut-nextTake)==taskNum;
        return t;
    }
}

Weitere Konzepte

  • Immutable Objects (private und final)
  • Defensive Copies (bei In- und Output)

Zu Hause anschauen...

Happens-Before: Memory Consistency

public class Main {
    static int flag = 0;
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> flag = 1);
        Thread t2 = new Thread(() -> {
            System.out.print(flag);
            System.out.print(flag);
        });
        t1.start();
        t2.start();
    }
}

Ausgabe 00 - Warum?

  • t2 wird vor t1 ausgeführt
  • t1 wird vor zweitem sysout in t2 ausgeführt werden, aber flag in Cache von t2

Lösung: volatile flag

Happens-Before: Reordering

public class Main {
    public static void main(String[] args) {
        State state = new State();
        (new Thread( () -> {
            state.a = 1;
            state.b = 1;
            state.c = state.a + 1;
        })).start();
        (new Thread( () -> {
            if(state.c == 2 && state.b == 0) {
                System.out.println("Wrong");
            }
        })).start();
    }
}

public class State {
    int a = 0;
    int b = 0;
    int c = 0;
}

Ausgabe: Wrong

Instruction reordering - tatsächlich ausgeführt in Thread 1:

state.a = 1;
state.c = state.a + 1;
state.b = 1;

Lösung: volatile int c = 0;

Java Advanced

Atomic Types

java.util.concurrent.atomic

Bsp: AtomicInteger

  • int get()
  • int incrementAndGet()
  • int decrementAndGet()
  • bool compareAndSet(int oldValue, int newValue)

Zurück zum Counter Beispiel

Locks und Semaphoren

  • ReentrantLock
    Lock lock= new ReentrantLock(false);
    // ...
    lock.lock();
    c++;
    lock.unlock();
    
  • Semaphore
    Semaphore sem = Semaphore(int capacity, boolean fair);
    // ...
    sem.acquire();
    c++
    sem.release();
    
  • Barrieren
    • CyclicBarrier(int n) (reentrant)
    • CountDownLatch(int n) (nicht reentrant)

Executor & ExecutorService

ExecutorService executor = Executors.newSingleThreadExecutor();

executor.execute(() -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
});
try{
    executor.shutdown();
    executor.awaitTermination(5, TimeUnit.SECONDS);
} catch(InterruptedExceptionex) {
} finally { if(!executor.isTerminated()) { executor.shutdownNow(); }}

Auch: newFixedThreadPool(int c)

Futures

ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<Integer>> list;
// ....
for (int i= 0; i< 10; i++) {
    final int currentValue= i;
    Callable<Integer> myCallable= () -> { return fib(currentValue); };
    list.add(executorService.submit(myCallable));
    // Abrufbar mit
    Integer result = future.get();
}
// ...
for(Future<Integer> curFuture : list) {
    Integer result = curFuture.get();
    // ...
}
executorService.shutdownNow();

Zusätzlich

  • CompletableFutures (Callback)
  • Fork-Join
  • Streams