[Java] CompletableFuture 사용하기

0. 시작하며

https://nooroongzi.tistory.com/12

 

[SpringBoot] SSE알림과 비동기

0. 시작하며 프로젝트에서 SSE(Server Sent Event) 방식의 알림을 구현을 담당했습니다. 이때발생한 에러와 트러블 슈팅을 기록하려합니다. 1. 알림 구현 방식 후보군 1-1 Short Polling 클라이언트가 서버

nooroongzi.tistory.com

위 글에서 이어지는 글 입니다. SpringBoot에서의 비동기처리를할때 많이 고려되는  CompletableFuture에대하여 알아보겠습니다.

 

1.CompletableFuture란?

자바에서 비동기프로그래밍을 가능하게 하는 인터페이스 입니다.

이름에서도 알 수 있듯 Future에서 발전된형태이며 단점을 보완했습니다. 이를 사용하기 전의 단점을 먼저 알아보겠습니다.

2. Thread를 상속받기

자바에서 멀티쓰레드 환경을 만드는 방법은 여러가지가 있지만 Thread를 상속받아 만들어보면 다음과 같습니다.

public class test {
	public static void main(String[] args) {
        HelloThread helloThread = new HelloThread();
        helloThread.start();
        System.out.println("hello : " + Thread.currentThread().getName());
    }
    static class HelloThread extends Thread {
        @Override
        public void run() {
            System.out.println("world : " + Thread.currentThread().getName());
        }
    }
}
//출력
//hello: main           world: Thread-0
//world: Thread-0 혹은  hello: main

Thread를 상속받은 Class를 사용하여 멀티쓰레드 환경을 만들 수 있습니다. 하지만 작업의 순서를 보장할 수 없어 사용에 주의가 필요합니다.

 

3. Runnable()

위를 함수형 인터페이스로서 발전시킨것이 Runnable입니다. 사용방법은 다음과 같습니다.

public class test {
    public static void main(String[] args) {

        Thread t = new Thread(new MyRunnable());
        t.start();
    }
    public static class MyRunnable implements Runnable {
        public void run() {
            System.out.println("my : " + Thread.currentThread().getName());
        }
    }
}

Runnable은 함수형 인터페이스이므로 위 코드를 다음과 같이 줄일 수 있습니다.

 

public class test {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> System.out.println("my : " + Thread.currentThread().getName()));
        thread.start();
    }
}

 

이렇게 만들어진 쓰레드를 sleep, interrupt, join등의 명령을통해 제어할 수 있습니다. Thread를 직접 상속받지 않기때문에 좀 더 유연한 멀티쓰레드 구현 방법이라고 할 수 있습니다. 

 

하지만 코드레벨에서 하나하나 모든 쓰레드에 작업을 지정해주는건 무리가 있는 작업입니다. 그래서 고급언어는 쓰레드를 만들과 관리하는 작업을 분리했고 자바의 경우 보통 Executors에 위임하여 처리합니다.

 

4. Executors

Executors의 주요 인터페이스

1. Executor

  • 가장 기본적인 인터페이스이고 execute(Runable)을 제공합니다.

2. ExecutorServie

  • Executor 상속 받은 인터페이스로, Callable도 실행할 수 있으며, Executor를 종료 시키거나, 여러 Callable을 동시에 실행하는 등의 기능을 제공합니다. (행동대장)
  • ExecutorService는 명시적으로 작업을 멈춰주지 않으면 대기 상태를 유지하여 지속적으로 리소스를 필요로 합니다.

3. ScheduledExecutorService

  • ExecutorService를 상속 받은 인터페이스로 특정 시간 이후에 또는 주기적으로 작업을 실행할 수 있습니다.

 

Executors의 주요 작업

 

1. 쓰레드 만들기

Excutors는 애플레케이션이 사용할 Thread Pool을 다음과 같이 만들고 관리할 수 있습니다.

ExecutorService executorService = Executors.newFixedThreadPool(2); // 사이즈 지정 
ExecutorService executorService = Executors.newCachedThreadPool(); // 캐시 기능
ExecutorService executorService = Executors.newSingleThreadExecutor(); // 하나의 쓰레드

//각 Thread Pool의 기능과 특징은 추후 정리

이때 Thread 갯수를 초과하는 작업이 들어와도 무리없이 실행되게 됩니다. 이는 Excutors 내부적으로 BlockingQueue에 작업을 대기시키고 순차적으로 처리하기 때문입니다.

 

2. 쓰레드 관리

쓰레드의 생명주기를 자동으로 관리합니다. 특히 사용되지 않는 쓰레드는 자동으로 종료될 수 있습니다. 또한 명시적으로도 다음과 같이 종료할 수 있습니다.

ExecutorService executorService = Executors.newSingleThreadExecutor();

executorService.shutdown(); // 처리중인 작업 기다렸다가 종료
executorService.shutdownNow(); // 당장 종료

 

3. 작업 실행하기

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
	System.out.println("Hello :" + Thread.currentThread().getName());
});

 

4. schedule 작업 

//3초 후 getRunnable실행하고 shutdown
public static void main(String[] args) {
    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
    executorService.schedule(getRunnable("Hello"), 3, TimeUnit.SECONDS);

	executorService.shutdown(); 
}
private static Runnable getRunnable(String msg){
    return () -> System.out.println(msg + Thread.currentThread().getName());
}

//1초 후 getRunnable 실행, 2초마다 반복 실행
public static void main(String[] args) {
    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
    executorService.scheduleAtFixedRate(getRunnable("Hi"),1,2,TimeUnit.SECONDS);
}
private static Runnable getRunnable(String msg){
	return () -> System.out.println(msg + Thread.currentThread().getName());
}

 

위와 같은 작업은 Fork/Join 프레임워크에 의해 동작됩니다. 이는 ExecutorService의 구현체로 손쉽게 멀티프로세서를 활용할 수 있도록 도와주는 역할을 합니다.

 

5. Future

Callable이 return하는 타입에 지정할 수 있는 객체입니다. 비동기적인 작업의 현재 상태를 조회(get)하거나 완료된 결과를 가져올 수 있습니다. ( Callable은 Runnable과 유사하지만 return을 명시할 수 있다는 차이점이 있습니다.) Future의 몇가지 메서드를 알아보겠습니다.

 

1. get()

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> helloFuture = executorService.submit(() -> {
	Thread.sleep(2000L);
	return "NooRoongZi";
});
System.out.println("Hello");
String result = helloFuture.get();
System.out.println(result);
executorService.shutdown();

 

위 코드를 실행한다면 "Hello" 를 출력하고 약 2초뒤 "NooRoongZi"가 출력됩니다.

반대로 Future의 get함수가 Hello보다 먼저 호출된다면 2초를 기다린 후 "NooRoongZi"가 출력되고 "Hello"가 출력됩니다.

 

즉, Future 의 get()은 해당 스레드의 작업을 기다리는 Blocking Call 입니다. ( 타임아웃 기능이 있습니다. )

 

2. isDone()

작업이 완료되었다면 true, 아니면 false를 반환합니다.

 

3. cancle()

잘 취소 되었다면 true, 아니면 false를 반환합니다. 인자로 true값을 전달하면 진행중인 작업을 즉시 interrupt하고 false를 전달하면 끝날때 까지 기다립니다.

 

cancle() 후 isDone() => true  : 작업이 취소되어 true가 반환됩니다.

cancle() 후 get() → CancellationException발생

 

4. invokeAll()

여러 작업을 동시에 실행하여 모든 작업의 결과를 가져올 수 있습니다. 하지만 가장 오래 걸리는 작업만큼 시간이 소요됩니다.

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(4);

        Callable<String> Noo = () ->{
            Thread.sleep(3000L);
            return "Noo";
        };
        Callable<String> Roong = () ->{
            Thread.sleep(2000L);
            return "Roong";
        };
        Callable<String> Zi = () ->{
            Thread.sleep(1000L);
            return "Zi";
        };

        List<Future<String>> list = executorService.invokeAll(Arrays.asList(Noo, Roong, Zi));
        
        for(Future<String> str: list){
            System.out.println(str.get());
        }
        executorService.shutdown();
    }

 

5. invokeAny()

마찬가지로 동시에 모든 작업을 실행하지만 가장 빨리 끝나는 작업의 결과물만 조회됩니다.

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(4);

        Callable<String> Noo = () ->{
            Thread.sleep(3000L);
            return "Noo";
        };
        Callable<String> Roong = () ->{
            Thread.sleep(2000L);
            return "Roong";
        };
        Callable<String> Zi = () ->{
            Thread.sleep(1000L);
            return "Zi";
        };

        String string = executorService.invokeAll(Arrays.asList(Noo, Roong, Zi));
        System.out.println(string);
        executorService.shutdown();
  }

6. CompletableFuture

드디어 CompletableFuture까지 왔습니다. 이제 Future로는 어렵던 작업들을 보완하여 사용할 수 있습니다. 리턴값이 없는 경우 runAsync()를, 리턴값이 있는 경우 supplyAsync()를 사용하여 작업을 실행할 수 있고 기본적으로 Fork/Join Pool을 이용하여 작동합니다.

Future로 어려웠던 작업들

  1. Future는 취소하거나, get()에 타임아웃을 설정할 수는 있지만 외부에서 완료 시킬 수 없습니다.
  2. 블로킹 코드(get())을 사용하지 않고서는 작업이 끝났을 때 콜백함수를 실행할 수 없습니다.
  3. 여러 Future를 조합하기 어렵습니다. (ex. Event 정보 가져온 다음 Event에 참석하는 회원 목록가져오기)
  4. 예외 처리용 API를 제공하지 않습니다.

콜백 적용하기

  • get 함수 전에 적용할 수 있습니다.
  • thenApply(Function): 리턴값을 받아서 다른 값으로 바꾸는 콜백입니다.
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
}).thenApply((s) -> {
    System.out.println(Thread.currentThread().getName());
    return s.toUpperCase();
});

System.out.println(future.get());
  • thenAccept(Consumer): 리턴값을 또 다른 작업을 처리하는 콜백입니다. (리턴없이)
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
}).thenAccept((s) -> {
    System.out.println(Thread.currentThread().getName());
    System.out.println(s.toUpperCase());
});

    voidCompletableFuture.get();
  • thenRun(Runnable): 리턴값 받지 않고 다른 작업을 처리하는 콜백입니다.
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
}).thenRun(() -> {
    System.out.println(Thread.currentThread().getName());
});

voidCompletableFuture.get();
  • 추가적으로 ThreadPool을 지정하여 다른 쓰레드에서 처리할 수 있습니다. 
public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(4);
    CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello " + Thread.currentThread().getName());
        return "Hello";
    }, executorService).thenRunAsync(() -> {
        System.out.println(Thread.currentThread().getName());
    }, executorService);

    voidCompletableFuture.get();
}

// CompletableFuture, 콜백함수 모두
// 2번째 인자로 ThreadPool을 넣어주면 해당 TP로 처리함
//콜백함수는 메서드명뒤에 Async가 붙음

 

CompletableFuture 조합하기

  • thenCompose(): 두 작업이 서로 이어서 실행하도록 조합할 수 있습니다
public class test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> future = hello.thenCompose(test::getWorld);
        System.out.println(future.get()) ;
    }

    private static CompletableFuture<String> getWorld(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return message+"World";
        });
    }
}

//출력
//Hello ForkJoinPool.commonPool-worker-19
//World ForkJoinPool.commonPool-worker-5
//HelloWorld
  • thenCombine(): 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백 실행
public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello " + Thread.currentThread().getName());
        return "Hello";
    });

    CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
        System.out.println("World " + Thread.currentThread().getName());
        return "World";
    });

    CompletableFuture<String> future = hello.thenCombine(world, (h, w) -> h + "@@@" + w);
    System.out.println(future.get());
}
  • allOf(): 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello " + Thread.currentThread().getName());
        return "Hello";
    });

    CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
        System.out.println("World " + Thread.currentThread().getName());
        return "World";
    });

    List<CompletableFuture<String>> futures = Arrays.asList(hello, world);
    CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]);
    CompletableFuture<List<String>> res = CompletableFuture.allOf(futuresArray)
        .thenApply(v -> futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList()));

    res.get().forEach(System.out::println);
}
//어떤 작업도 blocking이 되지 않음
  • anyOf(): 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백 실행
public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello " + Thread.currentThread().getName());
        return "Hello";
    });

    CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
        System.out.println("World " + Thread.currentThread().getName());
        return "World";
    });

    CompletableFuture<Void> future = CompletableFuture.anyOf(hello, world).thenAccept(
        System.out::println);
    future.get();
}

 

예외 처리

  • exeptionally(Function)
public static void main(String[] args) throws ExecutionException, InterruptedException {
    boolean throwError = false;

    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
        if(throwError){
            throw new IllegalArgumentException();
        }

        System.out.println("Hello " + Thread.currentThread().getName());
        return "Hello";
    }).exceptionally(ex ->{
        System.out.println(ex);
        return "ERROR";
    });
    System.out.println(hello.get());
}
  • handle(BiFunction)
public static void main(String[] args) throws ExecutionException, InterruptedException {
    boolean throwError = true;

    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
        if(throwError){
            throw new IllegalArgumentException();
        }

        System.out.println("Hello " + Thread.currentThread().getName());
        return "Hello";
    }).handle((res, ex) ->{
        if(ex != null){
            System.out.println(ex);
            return "ERROR";
        }
        return res;
    });
    System.out.println(hello.get());
}

 

참고

https://www.inflearn.com/course/the-java-java8

 

더 자바, Java 8 강의 - 인프런

자바 8에 추가된 기능들은 자바가 제공하는 API는 물론이고 스프링 같은 제 3의 라이브러리 및 프레임워크에서도 널리 사용되고 있습니다. 이 시대의 자바 개발자라면 반드시 알아야 합니다. 이

www.inflearn.com

 

'Java' 카테고리의 다른 글

[JAVA] Builder 패턴을 알아보자  (0) 2024.03.24
[JAVA] 팩토리 메서드 패턴  (0) 2024.03.21
[Java] ConcurrentHashMap사용하기  (0) 2023.12.24
[Java] 디자인 패턴과 싱글톤  (0) 2023.09.06
객체 지향 프로그래밍  (0) 2023.08.30