Programming-[Backend]/Java

자바 기초 강의 정리 - 4. CompletableFuture, Thread-safe, Concurrent, Atomic

컴퓨터 탐험가 찰리 2024. 6. 16. 12:08
728x90
반응형

인프런 얄코의 제대로 파는 자바 강의를 듣고 정리한 내용이다.

 

중요하거나 실무를 하면서 놓치고 있었던 부분들 위주로만 요약 정리한다.

자세한 내용은 강의를 직접 수강하는 것이 좋다.

https://www.inflearn.com/course/%EC%A0%9C%EB%8C%80%EB%A1%9C-%ED%8C%8C%EB%8A%94-%EC%9E%90%EB%B0%94/dashboard

 

 


 

1. CompletableFuture

 

completableFuture도 Future와 마찬가지로 비동기처리의 결과를 담고 있는 클래스이다. Future와 동일하게 .get() 메서드를 호출하면 main 쓰레드의 흐름에 관여한다. CompletableFuture.supplyAsync() 메서드로 사용하여 인자없이 결과를 반환하는 Supplier와 같은 메서드를 사용할 수 있다.

 

이후 CompletableFuture로 받아온 값에 .thenAccept() 메서드를 체이닝으로 걸어서 Consumer처럼 사용할 수 있다. 인자를 통해 뭔가 실행하고, return하지는 않는 메서드를 적용하는 것이다.

 

.thenApply()로 Function을 적용하거나, .exceptionally()를 통해 에러를 처리할 수도 있다. .thenCompose() 를 사용하면 CompletableFuture의 반환 결과끼리 조합하여 처리할 수도 있다. 강의에서 나오는 코드의 일부를 참고하면 이해된다.

 

public class thenComposeEx () throws ExecutionException, InterruptedException{

    CompletableFuture<SwordMan> getBlueChamp = getChamp(Side.BLUE);
    CompletableFuture<SwordMan> getRedChamp = getChamp(Side.RED);

    System.out.println('=== 양 진영의 검사 훈련 중 ===');

    getBlueChamp.thenCompose(
            b -> b.thenApply(
                    r -> {
                        // Apply 처리 코드
                        return something;
                    }
            )
    )
            .thenApply()
            .exceptionally() //예외 발생 시 처리 코드
            .get()
}

 

 

allOf

allOf 메서드는 여러 CompletableFuture 작업들을 동기적으로 실행한다. 다시 말해 allOf() 내부에서 실행되는 모든 비동기적인CompletableFuture의 결과가 나올 때까지 기다린다. .thenRun()으로 실행하고 .join()으로 각 CompletableFuture의 결과값들을 받아올 수 있다. 아래 강의에서의 예제 코드를 참고하자

public static void allOfMethod() throws ExecutionException, InterruptedException {
    var roll1 = rollDiceFuture();
    var roll2 = rollDiceFuture();
    var roll3 = rollDiceFuture();

    CompletableFuture.allOf(
            roll1, roll2, roll3
    ).thenRun(
            () -> {
                System.out.println("결과 출력");


                var int1 = roll1.join();
                var int2 = roll2.join();
                var int3 = roll3.join();

                String result = IntStream.of(int1, int2, int3)
                        .boxed()
                        .map(i -> i == -1 ? "무효" : String.valueOf(i))
                        .collect(Collectors.joining(", "));
            }
    ).get();
}

 

 

2. 병렬 스트림

 

자바 스트림의 일부 메서드는 병렬로 처리 가능하다. filter, map, reduce 등은 여러 쓰레드에서 작업 후 결과를 합산하면 된다. 다만 데이터의 크기가 적으면 쓰레드 생성 시간이 더 많이 걸릴 수 있기 때문에 오히려 병렬로 처리하는게 느릴 수 있다. 또는 병렬과 직렬로 Stream을 변환해가면서 작업하는 혼합 방식이 더 빠를 수도 있다. 성능이 중요한 경우 테스트를 해보면서 최적 성능을 찾아가면 된다.

 

기본적으로 Stream.of()를 통해 생성된 Stream에 isParallel() 메서드를 실행해보면 false가 나온다. 여기에 parallel()을 통해 병렬 스트림으로 변경해줄 수 있고, sequential() 메서드를 통해서 다시 직렬로 바꿔줄 수 있다. 그리고 Stream을 만들 때부터 parallelStream()을 실행하면 바로 병렬 스트림으로 생성된다.

public class StreamTest {

    public static void main(String[] args) {
        Stream<Character> a = Stream.of('a', 'b', 'c');

        boolean parallel = a.isParallel();

        a.parallel();
        a.sequential();
        
        
        Stream<Integer> numStream = Arrays.asList(1,2,3).parallelStream();
    }
}

 

 

3. Thread-safe 클래스

 

3.1 Concurrent 컬렉션

아래처럼 HashMap에 10,000개의 숫자값을 넣는 Runnable을 만들고 쓰레드 3개로 run을 실행하면 동시에 실행은 되지만 hashMap이므로 키 값이 중복되지 않아서 최종 hashMap의 size는 10,000일 것이라고 예상할 수 있다. 그러나 실제 테스트해보면 size가 10,000개를 넘어가는 것을 확인할 수 있다.

public class Concurrent {
    public static void main(String[] args) {

        Map<String, Integer> hashMap = new HashMap<>();

        Runnable toHashMap = () -> {
            for (int i = 0; i < 10000; i++) {
                hashMap.put("key" + i, i);
            }
        };

        Thread t1 = new Thread(toHashMap);
        Thread t2 = new Thread(toHashMap);
        Thread t3 = new Thread(toHashMap);

        t1.start();
        t2.start();
        t3.start();


        try {
            t1.join();
            t2.join();
            t3.join();
        } catch (InterruptedException e) {
            
        }
    }
}

 

 

이것은 실제 요소는 10,000개가 존재하는데, size 값을 올리는 과정에서 문제가 생긴 것이다. hashMap도 instance인데 size를 올리는 과정 중 여러 쓰레드가 동시에 접근하면서 문제가 발생하는 것이다.

 

따라서 이를 방지할려면 멀티쓰레드 환경에서는 ConcurrentHashMap을 사용해야한다.

 

concurrentHashMap은 자료형을 여러 구간으로 나눠서 동기화를 적용하므로 속도도 빠르고 최종 값도 예상한대로 일치하게 나온다. 이외에도 ConcurrentLinkedQueue, CopyOnWriteArrayList, CopyOnWriteArraySet, ConcurrentSkipListSet, ConcurrentSkipListMap 등이 있다.

 

 

3.2 Atomic 클래스

 

어떤 값에 대한 접근 자체를 한 쓰레드씩만 허용하는 클래스이다. 위에서 살펴본 ConcurrentHashMap 예제와 비슷한 예제를 살펴본다.

 

 

import java.util.concurrent.atomic.AtomicInteger;

public class Atomic {

    static int count = 0;
    static AtomicInteger atomicInteger = new AtomicInteger(0);

    public static void main(String[] args) {

        Runnable intCount = () -> {
            for (int i = 0; i < 10000; i++) {
                count++;
                atomicInteger.getAndIncrement();
            }
        };

        Thread t1 = new Thread(intCount);
        Thread t2 = new Thread(intCount);
        Thread t3 = new Thread(intCount);

        t1.start();
        t2.start();
        t3.start();


        try {
            t1.join();
            t2.join();
            t3.join();
        } catch (InterruptedException e) {

        }

        int result = count;
        int atomicResult = atomicInteger.get();
    }
}

 

 

 

결과를 보면 int 타입은 3개의 쓰레드에 의해 동시적으로 접근하여 30,000보다 적은 수가 나왔지만 atomicInteger는 정확히 30,000이 나온것을 확인할 수 있다.

 

참고로 Atomic 클래스는 float, double 같은 부동소수점 자료형은 연산이 보다 복잡하여 지원되지 않는다.

AtomicReference

AtomicReference는 어떤 참조형 클래스를 감싸서 Atomic하게 만들어준다. 다만 그냥 감싼다고 끝이 아니고, Runnable을 구성하는 로직 자체에 Atomic한 로직을 적용해야한다는 점을 참고로 알고 있으면 될 것 같다.

728x90
반응형