▸JAVA/기본 문법

Thread (스레드)_스레드풀 [3/3]

코데방 2020. 2. 1.
728x90

[ 스레드 풀 (Thread Pool) ]

  • 미리 스레드를 생성해두고 재활용해가며 작업을 할당
  • 미리 생성해둔 스레드를 모아둔 묶음을 스레드 풀이라고 함

이전 글들에서 다른 스레드는 각 스레드 당 하나의 run() 메소드를 가지고 있었습니다. 즉, 스레드를 하나 만들어서 실행하면 하나의 스레드는 한개의 로직(run)을 실행하게 됩니다. 그리고 이 로직이 끝나면 스레드는 종료되고 사라집니다.

 

스레드를 만든다는 것은 JVM이 운영체제(OS)에게 스레드 실행에 필요한 연산 자원(메모리 등)을 할당 받는다는 의미입니다. NIO의 바이트버퍼에서 일반 allocate()에 비해 allocateDirect()가 메모리 버퍼 생성과 해제에 더 시간이 많이 들었던 것처럼, 대부분 JVM 내에서 실행되는 것이 아니라 OS에 요청해서 자원을 할당받고 해제하는 것은 상대적으로 더 많은 시간과 자원을 소모하게 됩니다. 오버헤드가 발생한다고도 표현합니다.

 

따라서 잦은 스레드의 생성과 해제는 시스템 성능을 저하시킬 수 있기 때문에 미리 스레드를 여러 개 만들어 두고 작업이 들어오는대로 할당해서 사용하게 하고, 다 쓰고난 뒤에도 그대로 뒀다가 나중에 필요할 때 재사용하기 위한 장치가 스레드 풀(Thread Pool)입니다. 스레드 묶음이라고 생각하면 됩니다. 물론 스레드를 거의 쓸 일이 없는데 미리 생성해두는 것은 비효율적이기 때문에 용도에 맞게 잘 사용해야 합니다. 

 


[ java.util.concurrent 패키지 ]

  • 스레드 풀을 지원하는 패키지
  • ExecutorService : 스레드풀 객체 (인터페이스)
  • Executors : 스레드풀 객체 생성을 지원하는 클래스

[ 스레드풀 생성하기 (Executors Class) ]

  • newFixedThreadPool(int n) : n개의 스레드를 가지는 스레드풀 생성
  • newCachedThreadPool() : 스레드 풀에 재사용할 수 있는 스레드가 있으면 사용하고 없으면 생성, 60초동안 사용하지 않는 스레드는 삭제

두 메소드 모두 static 메소드로 고정 갯수를 생성해두거나 유동적으로 늘어나고 삭제되는 스레드 풀 객체를 생성해줍니다. 상황에 맞게 사용하면 되는데 예를 들어 필요한 스레드 갯수가 어느 정도 정해져있는 경우에는 고정적으로 사용하면 되고, 예측할 수 없으면서 스레드를 사용하는 작업이 빠르게 교체되는 경우 유동적으로 사용하면 됩니다.

package hs;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

	public static void main(String[] args) {

		// 스레드풀 생성
		ExecutorService e1 = Executors.newFixedThreadPool(10);
		ExecutorService e2 = Executors.newCachedThreadPool();
	}
}

 

 


 

[ 스레드풀의 스레드 동작시키기 ]

  • void execute(Runnable command) : Runnable 객체로 스레드 동작
  • Future<T> submit(Callable<T> task) : Runnable 또는 Callable 객체로 스레드 동작

execute() 메소드의 경우 이전까지 다뤄왔던 Runnable을 구현한 객체의 run() 메소드를 실행시켜주는 메소드입니다. Thread 클래스의 start() 메소드와 유사합니다. 다만 스레드풀에 이미 생성돼 있는 스레드 객체에 Runnable 객체를 넣어 run() 메소드를 실행시켜줍니다.

 

execute() 메소드는 작업 처리 결과를 반환하지 않고, 만약 작업이 정상적으로 종료되지 않고 예외가 발생해 종료될 경우 스레드를 제거한 뒤 다른 작업을 처리하기 위한 새로운 스레드를 생성합니다. 

 

package hs;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

	public static void main(String[] args) {

		// 스레드풀 생성
		ExecutorService e1 = Executors.newFixedThreadPool(10);
		ExecutorService e2 = Executors.newCachedThreadPool();
		
		e1.execute(new R1());
		
	}
}

class R1 implements Runnable {
	
	@Override
	public void run() {
		System.out.println("Runable Thread");
	}
}

 

 

submit() 메소드의 경우 새로운 방식입니다. execute() 메소드와 다르게 스레드 동작 결과에 대한 반환값이 있습니다. 또한 스레드 동작 중 예외가 발생하더라도 스레드를 삭제하지 않고 다시 재사용해 스레드 삭제/생성에 따른 오버헤드를 줄일 수 있습니다. Runnable 객체를 사용하는 execute() 메소드는 예외 처리가 없기 때문에 예외 발생 시 스레드 자체가 종료되는 반면 Collable 객체를 사용하는 submit() 메소드는 예외처리 로직이 되어 있어 예외 발생 시에도 스레드를 종료시키지 않고 처리할 수 있기 때문입니다.

 

물론 Runnable 객체를 사용해도 try-catch 구문을 통해 예외 발생 시 정상적으로 메소드를 종료시켜 스레드가 삭제되지 않도록 할 수도 있고 리턴값은 없지만 참조변수(공유객체)를 통해 결과값을 얻을 수도 있습니다. 다만, 이미 기능이 포함된 Callable 객체를 사용하는 것이 여러모로 편리합니다. 또한 스레드 블록킹 기능 등이 추가적으로 있는데 이부분은 아랫쪽에서 조금 더 자세히 설명하겠습니다. 해당 기능들이 필요없다면 간결한 Runnable 사용이 더 좋을 수 있습니다.

 

submit()은 Runnable을 매개변수로 받을 수도 있지만 일반적으로 Callable 객체를 매개변수로 사용하고, Callable 인터페이스는 run() 메소드 대신 call() 메소드를 가진 함수형 인터페이스입니다. 구현 방식은 Runnable 인터페이스와 비슷한데 제네릭 지정이 되어 있습니다. 제네릭으로 지정한 타입의 객체를 리턴해주는 메소드입니다. 기존 스레드 객체의 확장판 개념이라고 보면 됩니다.

 

 

코드는 아래와 같이 사용할 수 있습니다. Callable 객체의 제네릭 타입은 리턴시키고 싶은 객체 타입으로 지정해주면 되고 call() 메소드를 오버라이딩 해주면 됩니다. call() 메소드의 리턴값은 Callable에서 지정한 제네릭 타입이지만, subit() 메소들의 리턴값은 Future<V> 타입의 객체입니다. 즉, Future 객체에 실제 call()메소드의 리턴값이 별도로 저장돼 있는 것입니다. 해당 값을 추출하는 메소드는 Future 객체의 get() 메소드입니다.  

 

package hs;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main {

	public static void main(String[] args) {

		// 스레드풀 생성
		ExecutorService e1 = Executors.newFixedThreadPool(10);

		// Callable 객체 생성
		C1 callable = new C1(new StringBuffer("Hello"));
		
		// 스레드풀을 통한 Callable 객체 실행 및 리턴값 저장
		Future<StringBuffer> f1 = e1.submit(callable);
		
		// 리턴된 Future 객체에서 값 추출하기
		try {
		
			System.out.println(f1.get());

		} catch(Exception e) {
			
			System.out.println("Error");
		}
	}
}

class C1 implements Callable<StringBuffer> {

	StringBuffer a;

	C1(StringBuffer a) {

		this.a = a;
	}

	@Override
	public StringBuffer call() throws Exception {

		a.append(" World~!");
		return a;
	}
}

 

 


 

[ Future 객체를 통한 스레드 블록킹 (작업 결과 통보받기) ]

  • 해당 스레드에서 실제 리턴값(결과)이 올 때까지 결과를 호출한 스레드를 블록킹해서 동작을 정지함
  • 지속적으로 해당 스레드가 끝났는지 확인해야하는 방식에 비해 효율적임
  • Thread 클래스의 join() 메소드와 유사한 기능

만약 main 스레드에서 두 개의 스레드를 호출해서 로직을 돌린 후, 두 스레드에서 작업한 결과값을 가지고 최종 결과물을 산출한다고 가정해 봅니다. 그렇다면 main 스레드는 두 스레드가 모두 종료된 이후에 최종 작업을 진행해야 합니다. 가장 처음 시도해볼 수 있는 것은 아래와 같이 스레드를 2개 만들어서 이전 글에서 다룬 join() 메소드를 통해 구현하는 것입니다. join() 메소드는 해당 스레드가 종료될 때까지 호출한 쪽의 스레드를 블록킹(Blocking) 시켜 동작하지 않고 기다리도록 만들어줍니다. 

 

package hs;

public class Main {

	public static void main(String[] args) {

		int[] arr = new int[2];
		Thread t1 = new Thread(new T1(arr));
		Thread t2 = new Thread(new T2(arr));

		t1.start();
		t2.start();

		try {
			t1.join();
			t2.join();
			System.out.println("T1결과 + T2결과 = " + (arr[0] + arr[1]));

		// 중간에 어느 스레드가 interrupt 돼서 적절한 결과가 없을 경우를 처리
		} catch (InterruptedException e) {
			System.out.println("작업 실패");
		}
	}
}

class T1 implements Runnable {

	int[] arr;

	T1(int[] arr) {
		this.arr = arr;
	}

	@Override
	public void run() {

		int n = 0;
		for (int i = 0; i < 3; i++) {
			n += i * 10;
			System.out.println("나는 T1 : " + n);
		}
		arr[0] = n;
	}
}

class T2 implements Runnable {

	int[] arr;

	T2(int[] arr) {
		this.arr = arr;
	}

	@Override
	public void run() {

		int n = 0;
		for (int i = 0; i < 3; i++) {
			n += i + 10;
			System.out.println("나는 T2 : " + n);
		}
		arr[1] = n;
	}
}

 


 

스레드풀을 사용하면 직접 Thread 객체를 만들지 않기 때문에 join() 메소드를 사용할 수 없습니다. 스레드풀은 배열 형태가 아니고 스레드풀 안의 어느 스레드를 실제로 점유할지 미리 알 수도 없기 때문에 직접 종속관계를 만들어주기도 어렵습니다. 이 때 사용하는 방식이 Future 클래스의 get() 메소드입니다.

 

위에서 설명한 것처럼 submit() 메소드는 execute() 메소드와 달리 Future<V> 타입의 객체를 결과값으로 리턴합니다. 따라서 위에서처럼 공유객체인 int[] arr을 생성해서 결과값을 따로 저장할 필요가 없이 리턴값을 받아서 처리할 수 있습니다.

 

스레드 실행의 결과값인 Future 객체는 스레드가 끝나서 리턴값이 나와야 실제로 값을 담을 수 있기 때문에 "미래의 결과"라는 점에서 이름이 Future(미래)입니다. 그리고 Future가 가지고 있는 <V> 타입의 값을 얻는 메소드가 get() 메소드입니다. 아직 끝나지 않은 스레드의 값을 가져오는 메소드이므로, 스레드가 종료되기 전까지 호출한 스레드를 블록킹 시켜서 중지해줍니다. join() 메소드와 동일한 방식입니다. 

 

package hs;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main {

	public static void main(String[] args) {

		// 스레드풀 생성
		ExecutorService threadpool = Executors.newFixedThreadPool(4);

		// 스레드 가동
		Future<Integer> f1 = threadpool.submit(new E1());
		Future<Integer> f2 = threadpool.submit(new E2());

		try {
			int a = f1.get();
			int b = f2.get();
			System.out.println("E1 + E2 = " + (a + b));

		// 스레드가 interrupt 될 경우
		} catch (InterruptedException i) {
			System.out.println("스레드가 중간에 interrupt됨");

		// 스레드가 예외를 뱉어낼 경우
		} catch (ExecutionException e) {
			System.out.println("예외가 발생함");
		}
	}
}

class E1 implements Callable<Integer> {

	@Override
	public Integer call() throws Exception {

		int n = 0;
		for (int i = 0; i < 3; i++) {
			n += i * 10;
			System.out.println("나는 E1 : " + n);
		}

		return n; // 오토래핑으로 자동 객체화
	}
}

class E2 implements Callable<Integer> {

	@Override
	public Integer call() throws Exception {

		int n = 0;
		for (int i = 0; i < 3; i++) {
			n += i + 10;
			System.out.println("나는 E2 : " + n);
		}
		return n;
	}
}

 

 


 

 

만약 리턴시킬 결과값이 없고 그냥 스레드가 끝나는 순서만 중요하다 하면 Runnable 객체를 사용해주면 됩니다. 이 때는 제네릭이 없는 Future 객체를 이용해 submit()의 리턴값을 받아줘도 되고 그냥 바로 get()메소드를 호출해도 됩니다.

 

package hs;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

	public static void main(String[] args) {

		// 스레드풀 생성
		ExecutorService threadpool = Executors.newFixedThreadPool(4);

		// 스레드 가동

		try {
			threadpool.submit(new E1()).get();
			threadpool.submit(new E2()).get();
			System.out.println("나는 MAIN 스레드다!");

		// 스레드가 interrupt 될 경우
		} catch (InterruptedException i) {
			System.out.println("스레드가 중간에 interrupt됨");
		
		// 스레드가 예외를 뱉어낼 경우
		} catch (ExecutionException e) {
			System.out.println("예외가 발생함");
		}
	}
}

class E1 implements Runnable {
	
	@Override
	public void run() {
		
		for(int i = 0; i < 3; i++) {
			System.out.println("나는 E1이다.");
		}
	}
}

class E2 implements Runnable {
	
	@Override
	public void run() {
		
		for(int i = 0; i < 3; i++) {
			System.out.println("나는 E2다.");
		}
	}
}

 

 


 

[ ExecutorCompletionService Class - 먼저 종료되는 스레드 순으로 작업 처리하기 ]

 

위에서 사용한 get() 메소드는 순서대로 작동합니다. 스레드1을 get() 하고 스레드2를 get() 했다면 스레드2가 먼저 종료되더라도 스레드1의 결과가 나올 때까지 기다리게 됩니다. 스레드1이 끝나야 스레드2의 결과값을 받고 마지막 연산을 수행할 수 있습니다.

 

하지만 만약 각 스레드의 로직이 상이해서 종료되는 시간차이가 많이 나고, 각 스레드의 결과값을 각자 처리해야한다면 이 방식은 병렬처리의 이점이 없습니다. 따라서 먼저 종료되는 스레드의 결과값을 먼저 받아서 처리하기 위한 클래스가 따로 존재합니다.

 

ExecutorCompletionService 클래스는 CompletionService 인터페이스를 상속받은 객체로 이러한 역할을 수행해줍니다. 이름은 길지만 사용법은 아래와 같이 간단합니다.

 

  1. 스레드풀을 생성한다.
  2. ExecutorCompletionService 클래스의 인스턴스를 생성하면서 스레드풀을 매개변수로 넣어준다.
  3. ExecutorCompletionService 클래스의 submit() 메소드로 스레드를 실행한다.
  4. ExecutorCompletionService 클래스의 take() 메소드로 먼저오는 결과값을 받아서 처리한다.
  5. 반복문 등으로 4번 과정을 반복한다.

반복문으로 4번 과정을 처리하면 무한 루프가 돕니다. 예외 발생은 루프의 정상적인 탈출 과정이 아닙니다. 따라서 필요에 따라 break를 걸어주거나 또는 프로그램 내내 실행되도록 별도의 스레드를 써서 실행해주면 됩니다. 예시 코드는 아래와 같습니다. 반복문 횟수를 적게 돌린 2번 스레드의 결과를 먼저 얻은 것을 알 수 있습니다. 

 

package hs;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main {

	public static void main(String[] args) {

		// 스레드풀 생성
		ExecutorService threadpool = Executors.newFixedThreadPool(4);

		// 스레드가 끝나는 순서로 처리할 수 있는 객체 생성
		ExecutorCompletionService<T1> c1 = 
				new ExecutorCompletionService<>(threadpool);
		
		// ExecutorCompletionService가 상속받은
		// CompletionService 인터페이스의 submit() 메소드로 스레드 실행 
		c1.submit(new E1(new T1()));
		c1.submit(new E2(new T1()));
		
		int i = 0;
		while(true) {
			
			try {
				// 먼저 끝나는 스레드의 결과를 담아줌
				Future<T1> f = c1.take();
				// 해당 결과의 값을 넣어줌
				
				T1 result = f.get();
				System.out.println(result.treadName + " : " + result.n);
				
				// 결과가 두 개 이상 쌓이면 종료
				if(i > 1)
					break;
				
			// c1.take()에 대한 예외처리
			} catch(InterruptedException interrupt) {
				System.out.println("스레드가 중간에 interrupt 되었습니다.(take 실패)");
				break;
				
			// c1.get()에 대한 예외처리
			} catch(ExecutionException e) {
				System.out.println("값을 가져올 수 없습니다.(get 실패)");
				break;
			}
		}
	}
}

// 계산결과값과 담당 스레드 이름을 가진 객체
class T1 {
	
	int n;
	String treadName;
}

class E1 implements Callable<T1> {

	T1 t1;
	
	E1(T1 t1) {
		this.t1 = t1;
	}
	
	@Override
	public T1 call() throws Exception {

		// 값
		t1.n = 0;
		for (int i = 0; i < 100000; i++) {
			t1.n += i + 5;
		}

		// 현재 스레드 이름
		t1.treadName = Thread.currentThread().getName();
		return t1; // 오토래핑으로 자동 객체화
	}
}

class E2 implements Callable<T1> {
	
	T1 t1;
	
	E2(T1 t1) {
		this.t1 = t1;
	}
	
	@Override
	public T1 call() throws Exception {

		// 값
		t1.n = 0;
		for (int i = 0; i < 1000; i++) {
			t1.n += i + 10;
		}
		
		// 현재 스레드 이름
		t1.treadName = Thread.currentThread().getName();
		return t1;
	}
}

 

 

만약 위에서 take() 메소드 대신 poll() 메소드를 사용하면 실행 시점에 완료된 스레드가 없다면 null값을 리턴합니다. 매개변수로 시간을 제공하면 해당 시간동안 블록킹돼서 기다리다가 그래도 없으면 null값을 리턴합니다. 일정 시점에 완료되지 않은 결과값은 사용하지 않겠다고 하면 poll() 메소드를 사용해서 처리하면 됩니다.

 


 

[ 스레드 종료 ]

  • ExecutorService 클래스의 shutdown() : 현재 실행 중인 작업이 끝나고 스레드풀 종료
  • ExecutorService 클래스의 shutdownNow() : 즉시 스레드풀 종료

스레드풀은 데몬 스레드가 아닙니다. 데몬 스레드란 주 스레드의 작업을 돕는 보조 스레드로 주 스레드가 종료되면 같이 종료되는 스레드입니다. 하지만 스레드풀은 메인 스레드가 종료되더라도 계속 남아있어 프로그램이 종료되지 않기 때문에 프로그램 종료 시 또는 필요할 때 종료시켜줘야 합니다. 위의 두 메소드가 종료 메소드입니다. 그냥 쓰면 되니까 예시 코드는 쓰지 않겠습니다. 

728x90

댓글

💲 추천 글