▸JAVA/기본 문법

네트워크_소켓(Socket) 통신_NIO 입출력(논블로킹) [2/3]

코데방 2020. 2. 4.
728x90

[ NIO를 이용한 네트워크 입출력 - 논블로킹(non-blocking) 방식 ]

 

파일 입출력편에서 다룬 것 처럼 네트워크도 NIO를 통해 입출력을 수행할 수 있습니다. 스트림 계열의 경우 인풋과 아웃풋을 따로 만들어 관리해줘야 하지만 NIO의 채널을 이용하면 양방향 입출력 통로를 생성해서 사용할 수 있습니다.

 

또한 스트림 계열이 블로킹(blocking) 방식만 지원하는데 반해 NIO는 블로킹 방식에 더해 논블로킹(non-blocking) 방식을 추가로 지원하기 때문에 데이터 송수신양이 많을 경우 기존 스트림 계열보다 병렬처리를 더욱 효과적으로 할 수 있습니다.

 

블로킹(blocking) 방식이란 데이터 송수신을 기다리는 동안 해당 스레드가 대기 상태로 멈추는 것을 말합니다. 스레드편에서 다룬 블로킹과 동일한 의미입니다. 블로킹 방식으로 채널 또는 스트림을 구성하면, 데이터를 송수신 받기 위해 계속 스레드를 중지 상태로 둬야 합니다. 주고받는 데이터가 없는 경우 스레드가 멈춰있기 때문에 여러 사용자가 동시에 사용할 경우 그 수만큼 스레드를 늘려줘야 합니다. 스레드풀을 이용해 어느정도 해결할 수 있지만 사용자가 많은 경우 스레드풀의 스레드를 그만큼 늘려줘야 하기 때문에 시스템에서 오버헤드가 발생하게 됩니다. 스레드를 생성하면서 드는 최소 자원이 있기 때문에 간단한 작업에 스레드를 만들면 작업보다 스레드를 생성하고 없애는데 더 큰 비용이 들어갑니다. 배보다 배꼽이 커지는 것이죠. 

 

논블로킹(non-blocking) 방식이란 데이터 송수신이 없을 때도 스레드가 블로킹 되지 않도록 하므로써 하나의 스레드에서 여러 입출력을 처리할 수 있도록 하는 방식입니다. NIO의 모든 입출력 클래스가 논블로킹 방식을 지원하는 것은 아니지만 네트워크 소켓 채널은 논블로킹 방식이 지원됩니다. 하지만 논블로킹 방식이라도 스레드가 하나인 이상 병렬 처리를 해주지는 못합니다. 그저 빠르게 여러 채널에서 발생하는 유형별 이벤트를 돌아가면서 처리해주는 방식입니다.  따라서 시간이 오래 걸리는 작업은 논블로킹 채널에서도 별도 스레드를 생성해줘야 합니다.

 

논블로킹 채널의 기본적인 동작 방식은 아래와 같습니다.

 

 

 

먼저 외부 객체와 TCP/IP 프로토콜 네트워크를 사용해 채널로 연결해줍니다. IO에서는 소켓 통신이 연결되면 스트림을 별도로 생성해줬지만 채널의 경우 SocketChannel을 생성해주면 두 과정이 동시에 완료됩니다. 그리고 나서 연결된 소켓채널을 채널 관리자 역할의 Selector에 등록해줍니다. 등록해줄 때 채널의 어떤 이벤트를 감지할 것인지 유형도 같이 등록해줍니다. 그리고 해당 채널이 가지고 있는 별도의 객체(Attachment)를 등록해줄 수도 있습니다.

 

Selector에 등록된 채널의 여러 정보를 가지고 있는 객체가 SelectionKey입니다. Selector는 이를 Set에 담아 보관하고 select() 메소드를 실행하면 등록된 채널에 이벤트가 발생하는지 보고 있습니다. 아무 이벤트가 발생하지 않는다면 이 스레드는 블로킹되어 있다가 이벤트가 발생하면 다시 처리를 재개합니다.

 

Selector의 selectedKey() 메소드를 실행하면 발생한 이벤트를 가진 채널의 SelectionKey만 모아둔 Set을 다시 만들어서 리턴해줍니다. 리턴된 Set에 포함된 채널은 어떠한 통신 이벤트가 발생한 것이므로 해당 이벤트 유형에 맞는 로직에 의해 처리됩니다. 발생한 순서대로 꺼내서 처리하기 때문에 이 Set은 큐(Queue)의 자료 구조 형태로 이용됩니다. 

 

이러한 구조로 여러 채널에서 발생한 이벤트를 계속 돌아가면서 처리하기 때문에 발생한 이벤트가 1개라도 있는 이상 스레드가 블로킹 되지 않고 계속 작동하며 여러 채널의 이벤트를 순차적으로 처리해줍니다. 그래서 논블로킹 방식이라고 합니다. 하지만 보시다시피 특정 유형의 이벤트를 처리하는데 아주 오래걸린다고 하면 그 시간만큼은 다른 채널의 이벤트를 처리하지 못합니다. 따라서 빠르게 처리할 수 있는 로직은 하나의 스레드에서 처리하고 시간이 오래 걸리는 로직은 따로 빼서 별도의 스레드로 동작시키는 것이 좋습니다. 

 

만약 채팅 서버를 구현한다고 했을 때 블로킹 방식으로는 클라이언트 1명당 하나의 스레드를 생성해줘야 합니다. 하지만 채팅의 특성 상 오가는 텍스트의 처리가 아주 간단하기 때문에 하나의 스레드에 순차적으로 처리해도 거의 동시에 처리되는 것처럼 구동될 수 있으며, 클라이언트 수만큼 스레드를 늘리는 것은 배보다 배꼽이 더 커지는 형국이 될 수 있습니다. 이 경우 채팅은 논블로킹 채널을 가진 스레드 하나로 해결하고 대용량 파일 전송 등 시간이 걸리는 작업만 별도 스레드로 수행하면 서버를 효율적으로 구성할 수 있습니다. 

 

 

※ 동기(Synchronous)와 비동기(Asynchronous)

 

동기와 비동기라는 용어는 블로킹과 논블로킹과는 다른 것이고, 스레드 간의 관계를 의미합니다. 스레드1과 스레드2가 있는데 1이 2에게 일을 시킵니다. 블로킹 방식에서는 2가 일을 끝내고 리턴해줄 때까지 1은 쉬고 있을 것이고 논블로킹 방식에서는 2에게 일을 시켜두고 1은 계속 할일을 하게 될 것입니다. 이 때 1은 2가 일을 잘 끝냈는지 결과를 알아와야할 수 있습니다. 

 

1이 2의 작업 결과를 알아내는 방식 2가지가 바로 동기와 비동기 방식입니다. 동기(Synchronous) 방식은 1이 2에게 계속 작업이 끝났는지 결과를 물어보는 방식입니다. 비동기(Asynchronous) 방식은 2가 작업이 끝나면 1에게 알려주는 방식입니다. 동기와 비동기 방식을 이용해서 채팅 내 대용량 파일 전송을 처리하는 법에 대해서는 아래 링크글을 참조 부탁드립니다.

 

2020/02/07 - [JAVA/기본 문법] - 콜백(Callback) 패턴을 사용한 비동기 방식의 원리와 사용법

 


 

간단히 채팅 서버 구현을 통해 논블로킹 채널의 사용법을 알아보겠습니다. NIO의 로직만 보기 위해 최대한 간단하게 짜고 Server와 Client 클래스 2개만 써서 객체화는 지양했습니다. 실제 코딩할 때는 유지보수를 위해 기능별로 객체화를 시키는 것이 좋습니다.

 

 


 

 

먼저 서버 쪽 로직입니다. 사실 클라이언트는 어차피 스레드를 2개로 나눠야하기 때문에 논블로킹의 의미가 별로 없습니다. 

 

 

[ 논블로킹(non-blocking) 채팅 서버 구성 ]

 

NIO의 ServerSocketChannel 클래스를 이용해 서버 소켓 채널을 연 뒤 InetSocketAddress 객체를 통해 서비스 포트를 지정해줍니다. 기본적으로 NIO도 블로킹 방식으로 동작하기 때문에 논블로킹을 바꿔주기 위해서는 메소드를 사용해야 합니다. configureBlocking() 메소드의 매개변수를 false로 주면 논블로킹 방식의 채널이 됩니다.

 

HashSet은 모든 클라이언트의 소켓채널을 모아둡니다. 사용자가 들어오고 나가면 다른 사용자들에게 알려줘야하기 때문에 현재 접속돼 있는 모든 사용자의 채널 정보를 가지고 있다가 이벤트가 발생하면 메세지를 모두에게 뿌려주는 용도입니다.

package hs;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

public class Server {

	public static void main(String[] args) {

		// 연결된 클라이언트를 관리할 컬렉션
		Set<SocketChannel> allClient = new HashSet<>();

		try (ServerSocketChannel serverSocket = ServerSocketChannel.open()) {

			// 서비스 포트 설정 및 논블로킹 모드로 설정
			serverSocket.bind(new InetSocketAddress(15000));
			serverSocket.configureBlocking(false);

 


 

이제 채널을 관리할 Selector를 생성하고 서버 소켓 채널을 등록해줍니다. 서버 소켓 채널은 서비스 포트에 접속하려는 클라이언트의 접속 요청을 받아주는 역할이므로 OP_ACCEPT 모드로 등록해줍니다. 접속을 요청하는 이벤트를 셀렉터가 감지하도록 한다는 의미입니다. 유형에 대한 상수는 SelectionKey 클래스가 가지고 있습니다.

 

그리고 클라이언트가 보내주는 데이터를 저장할 버퍼와 클라이언트에게 보내줄 데이터를 저장할 버퍼를 생성해줍니다. 하나만 써도 되고 두 개로 나눠써도 됩니다. 만약 이 통신이 끊기지 않고 상시로 연결되어 데이터를 주고 받아야 한다면 ByteBuffer의 allocateDirect() 메소드로 버퍼를 생성해주는 것이 좋습니다. 차이점은 아래 글을 참조하시면 됩니다.

 

2019/12/16 - [JAVA/기본 문법] - 외부 데이터 입출력_java.nio [3/3]

 

상수 유형
OP_ACCEPT  접속 요청에 대한 이벤트를 감지
OP_CONNECT  소켓 통신이 허용된 이벤트를 감지
OP_READ  읽기 가능한 상태인지 감지 (해당 채널의 소켓에 데이터가 수신된 이벤트 발생 감지)
OP_WRITE  쓰기 가능한 상태인지 감지 (연결이 되어 있으면 항상 쓰기가 가능한 상태로 인식)
package hs;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

public class Server {

	public static void main(String[] args) {

		// 연결된 클라이언트를 관리할 컬렉션
		Set<SocketChannel> allClient = new HashSet<>();

		try (ServerSocketChannel serverSocket = ServerSocketChannel.open()) {

			// 서비스 포트 설정 및 논블로킹 모드로 설정
			serverSocket.bind(new InetSocketAddress(15000));
			serverSocket.configureBlocking(false);

			// 채널 관리자(Selector) 생성 및 채널 등록
			Selector selector = Selector.open();
			serverSocket.register(selector, SelectionKey.OP_ACCEPT);

			System.out.println("----------서버 접속 준비 완료----------");
			// 버퍼의 모니터 출력을 위한 출력 채널 생성

			// 입출력 시 사용할 바이트버퍼 생성
			ByteBuffer inputBuf = ByteBuffer.allocate(1024);
			ByteBuffer outputBuf = ByteBuffer.allocate(1024);

 


 

여기까지 됐으면 클라이언트가 접속할 수 있는 서버의 준비는 끝난 상태입니다. 이제 셀렉터가 서버 소켓 채널의 이벤트를 감지하도록 select() 메소드를 실행시킵니다. 클라이언트 요청이 없는 상태라면 발생하는 접속 요청 이벤트가 없으므로 스레드가 블로킹되어 이벤트 발생을 기다립니다.

 

이벤트가 하나라도 생기면 Selector가 가지고 있는 SelectionKey들 중, 이벤트가 발생한 채널의 객체만 다시 모아둔 Set을 받습니다. 이 Set안의 SelectionKey들은 모두 이벤트가 발생한 채널의 객체이기 때문에 큐 구조로 모두 순회하면서 처리해줘야 합니다. 여러 방법의 반복문으로 처리해줘도 되지만 순회를 편하게 만들어주는 Iterator 객체로 받아서 처리하도록 했습니다.

 

package hs;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

public class Server {

	public static void main(String[] args) {

		// 연결된 클라이언트를 관리할 컬렉션
		Set<SocketChannel> allClient = new HashSet<>();

		try (ServerSocketChannel serverSocket = ServerSocketChannel.open()) {

			// 서비스 포트 설정 및 논블로킹 모드로 설정
			serverSocket.bind(new InetSocketAddress(15000));
			serverSocket.configureBlocking(false);

			// 채널 관리자(Selector) 생성 및 채널 등록
			Selector selector = Selector.open();
			serverSocket.register(selector, SelectionKey.OP_ACCEPT);

			System.out.println("----------서버 접속 준비 완료----------");
			// 버퍼의 모니터 출력을 위한 출력 채널 생성

			// 입출력 시 사용할 바이트버퍼 생성
			ByteBuffer inputBuf = ByteBuffer.allocate(1024);
			ByteBuffer outputBuf = ByteBuffer.allocate(1024);

			// 클라이언트 접속 시작
			while (true) {

				selector.select(); // 이벤트 발생할 때까지 스레드 블로킹

				// 발생한 이벤트를 모두 Iterator에 담아줌
				Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

 


 

이제 발생한 이벤트를 가진 채널의 정보(SelectionKey)가 Iterator에 순서대로 담겨있습니다. 하나씩 꺼내서 처리해주면 됩니다. 가장 처음 발생한 이벤트는 당연히 클라이언트의 접속 요청일 것입니다.

 

등록된 이벤트가 감지된 SelectionKey는 상태값을 가지고 있습니다. 현재 발생한 이벤트의 종류를 알 수 있게 해주는 메소드를 실행해 유형별로 로직을 짜면 됩니다. 

 

SelectionKey의 상태 확인 메소드 설명
boolean isAcceptable()  이벤트가 발생한 채널의 감지 유형이 OP_ACCEPT인 경우
boolean isReadable()  이벤트가 발생한 채널의 감지 유형이 OP_READ인 경우
boolean isWritable()  이벤트가 발생한 채널의 감지 유형이 OP_WRITE인 경우
boolean isConnectable()  이벤트가 발생한 채널의 감지 유형이 OP_CONNECT인 경우
boolean isValid()  현재 채널이 유효한 경우

 

 

서버에서는 서버 연결 요청을 수락하는 이벤트(OP_ACCEPT)와 클라이언트가 보내준 데이터를 받는 이벤트(OP_READ)만 발생합니다. 서버에서 능동적으로 클라이언트에게 메세지를 만들어 보내는 일은 없습니다. 헷갈리지 말아야할 것은 위의 메소드와 작업 유형은 채널에서 발생하는 이벤트를 감지하기 위한 것이지 READ 모드라고 채널에서 읽기만 가능한 것이 아닙니다. 채널에서 READ 이벤트가 발생했을 때 읽은 데이터를 다시 채널로 보내줄 수도 있습니다.

 

먼저 연결 요청이 들어온 클라이언트를 처리하는 로직을 추가합니다. 응답 요청에 대한 채널은 서버 소켓 채널이므로 SelectionKey가 가지고 있는 채널 객체를 ServerSocketChannel 객체 타입으로 저장해줍니다. 그리고 서버 소켓 채널의 accept() 메소드를 통해 해당 통신을 수락한 후, 연결된 채널인 SocketChannel을 생성합니다. 

 

이제 이 SocketChannel은 '클라이언트-서버' 간에 맺어진 세션에 대한 채널이 되고, 이를 통해 둘은 데이터를 주고 받을 수 있게 되었습니다. 이 채널에서 클라이언트가 채팅 내용을 보내주게 되면 서버에서 받아 처리해야하므로 이 채널도 Selector에 등록을 해줘야 합니다. 이를 위해 채널을 논블로킹 모드로 변경해주고 셀렉터에 등록해줍니다. 감지 모드는 OP_READ로 등록해주는데 연결이 완료된 클라이언트에 대해서는 서버가 감지할 이벤트는 READ 밖에 없기 때문입니다. 클라이언트가 먼저 데이터를 보내지 않는 이상(READ 이벤트가 발생하지 않는 이상) 서버는 아무 일도 하지 않습니다.

 

Selector에 등록할 때 유형 다음 매개변수로 넣은 것은 등록된 채널로 생성되는 SelectionKey에 첨부할 추가 객체입니다. 하나의 채널은 하나의 클라이언트이기 때문에 해당 채널의 아이디 정보를 알아야 다른 사람들에게 누가 내용을 입력했는지 알려줄 수 있습니다. ClientInfo라는 간단한 클래스를 만들어 아이디 정보를 입력할 수 있게 한 뒤해당 채널에 하나를 붙여줬습니다. register할 때 등록해줘도 되고 추가적으로는 SelectionKey의 attach(obj) 메소드를 통해 추가해줄 수 있습니다.

 

또한 위에서 언급한대로 사용자가 들어오거나 나가면 다른 모든 클라이언트에게 알려줘야 하기 때문에 이를 위해 만들어둔 HashSet에 채널 객체를 등록해줍니다. Set계열 컬렉션을 쓴 이유는 채널은 중복되지 않는 고유한 값이기 때문입니다. 같은 채널이 등록될 일도 없긴 하지만 고유한 값을 담을 때는 Set계열을 많이 사용합니다. 마지막으로 연결 즉시 아이디를 입력받기 위한 문구를 클라이언트에게 출력합니다. ByteBuffer의 사용법은 아래 링크를 참조하시면 됩니다. 자바 프로그램 간 통신이기 때문에 Charset 클래스는 사용하지 않았습니다. 

 

2019/12/17 - [JAVA/라이브러리(API)] - java.nio 패키지 사용법(Channel / Buffer / Charset) [1/1]

 

package hs;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

public class Server {

	public static void main(String[] args) {

		// 연결된 클라이언트를 관리할 컬렉션
		Set<SocketChannel> allClient = new HashSet<>();

		try (ServerSocketChannel serverSocket = ServerSocketChannel.open()) {

			// 서비스 포트 설정 및 논블로킹 모드로 설정
			serverSocket.bind(new InetSocketAddress(15000));
			serverSocket.configureBlocking(false);

			// 채널 관리자(Selector) 생성 및 채널 등록
			Selector selector = Selector.open();
			serverSocket.register(selector, SelectionKey.OP_ACCEPT);

			System.out.println("----------서버 접속 준비 완료----------");
			// 버퍼의 모니터 출력을 위한 출력 채널 생성

			// 입출력 시 사용할 바이트버퍼 생성
			ByteBuffer inputBuf = ByteBuffer.allocate(1024);
			ByteBuffer outputBuf = ByteBuffer.allocate(1024);

			// 클라이언트 접속 시작
			while (true) {

				selector.select(); // 이벤트 발생할 때까지 스레드 블로킹

				// 발생한 이벤트를 모두 Iterator에 담아줌
				Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

				// 발생한 이벤트들을 담은 Iterator의 이벤트를 하나씩 순서대로 처리함
				while (iterator.hasNext()) {

					// 현재 순서의 처리할 이벤트를 임시 저장하고 Iterator에서 지워줌
					SelectionKey key = iterator.next();
					iterator.remove();

					// 연결 요청중인 클라이언트를 처리할 조건문 작성
					if (key.isAcceptable()) {

						// 연결 요청중인 이벤트이므로 해당 요청에 대한 소켓 채널을 생성해줌
						ServerSocketChannel server = (ServerSocketChannel) key.channel();
						SocketChannel clientSocket = server.accept();

						// Selector의 관리를 받기 위해서 논블로킹 채널로 바꿔줌
						clientSocket.configureBlocking(false);

						// 연결된 클라이언트를 컬렉션에 추가
						allClient.add(clientSocket);

						// 아이디를 입력받기 위한 출력을 해당 채널에 해줌
						clientSocket.write(ByteBuffer.wrap("아이디를 입력해주세요 : ".getBytes()));

						// 아이디를 입력받을 차례이므로 읽기모드로 셀렉터에 등록해줌
						clientSocket.register(selector, SelectionKey.OP_READ, new ClientInfo());
                        




// 접속한 사용자의 ID를 가진 클래스
class ClientInfo {

	// 아직 아이디 입력이 안된 경우 true
	private boolean idCheck = true;
	private String id;

	// ID가 들어있는지 확인
	boolean isID() {

		return idCheck;
	}

	// ID를 입력받으면 false로 변경
	private void setCheck() {

		idCheck = false;
	}

	// ID 정보 반환
	String getID() {

		return id;
	}

	// ID 입력
	void setID(String id) {
		this.id = id;
		setCheck();
	}
}

 


 

이제 클라이언트가 데이터를 보낸 이벤트가 감지될 경우의 로직입니다. 이미 소켓 채널로 연결된 채널에 대한 SelectionKey이기 때문에 SocketChannel 객체로 만들어서 가져옵니다. 또한 해당 SelectionKey에 아까 만들어서 붙여둔 ClientInfo 객체도 가져와서 저장합니다. ID가 없는 상태의 첫 메세지라면 ID로 입력해주고 이미 ID가 등록돼 있다면 가져와서 사용합니다.

 

이 채널에서 read(ByteBuffer) 메소드를 통해 버퍼로 데이터를 읽어옵니다. 그런데 만약 사용자가 채팅을 종료한 경우가 발생할 수 있습니다. 사용자가 채팅을 종료할 경우 소켓 채널의 연결이 종료되고 종료됐다는 이벤트 또한 일종의 데이터 수신이기 때문에 isReadable() 메소드는 true를 반환합니다. 즉, 소켓 채널의 종료 또한 이 로직으로 들어온다는 의미입니다. 소켓 채널이 종료되었으므로 데이터를 읽어오는 read() 메소드는 예외를 뱉어냅니다. 따라서 try-catch 구문을 적용해 read()에서 예외 발생 시 클라이언트가 통신을 끊은 것으로 판단하여 접속 종료를 알리는 로직을 추가로 구성할 수 있습니다. 종료가 발생하면 해당 채널(SelectionKey)의 아이디가 종료되었다는 메세지를 Set 컬렉션의 모든 클라이언트 채널에 출력해줍니다. 

 

접속이 종료된 채널은 반드시 key.cancel() 메소드를 수행해 Selector에 등록된 채널정보를 지워줘야 합니다. 그렇지 않으면 Selector는 이 종료된 채널에서 계속 이벤트가 발생한다고 인식해서 아래와 같이 예외 처리 로직이 무한 반복됩니다.

 

 

또한 클라이언트 연결 정보를 가진 Set 컬렉션에서도 지워줘야 나중에 다른 클라이언트들에게 메세지를 출력할 때도 연결이 유효하지 않은 채널을 사용할 때 생기는 예외가 발생됩니다. IOException의 "Connection reset by peer" 이라는 메세지를 보게 된다면 연결이 종료된 채널에 무언가 읽기 쓰기를 하는 것이 아닌지 확인해봐야 합니다.

 

 

여기까지 왔다면 정상적으로 연결된 클라이언트가 보낸 메세지를 수신해 ByteBuffer에 넣어준 상태가 됩니다. 먼저 ID가 없는 경우를 판단해서 버퍼의 내용을 아이디로 넣어주는 로직을 작성합니다. 참고로 클라이언트가 보낸 바이트 데이터에는 개행문자(\n)도 포함되어 있습니다. 아이디에 개행문자까지 같이 넣어버리면 "아이디 : 대화내용"으로 출력할 때 아이디 다음 줄바꿈이 생겨버립니다. 따라서 아이디를 넣어줄 때는 현재 버퍼의 전체 크기 중 마지막 한글자인 2byte를 제외하고 가져와서 넣어줍니다.

 

그리고 종료때와 마찬가지로 입장했다는 메세지를 모든 클라이언트에게 전송합니다. ByteBuffer를 사용할 때는 position, limit 의 개념과 flip(), clear() 메소드를 잘 이해하고 있어야 합니다. 

package hs;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

public class Server {

	public static void main(String[] args) {

		// 연결된 클라이언트를 관리할 컬렉션
		Set<SocketChannel> allClient = new HashSet<>();

		try (ServerSocketChannel serverSocket = ServerSocketChannel.open()) {

			// 서비스 포트 설정 및 논블로킹 모드로 설정
			serverSocket.bind(new InetSocketAddress(15000));
			serverSocket.configureBlocking(false);

			// 채널 관리자(Selector) 생성 및 채널 등록
			Selector selector = Selector.open();
			serverSocket.register(selector, SelectionKey.OP_ACCEPT);

			System.out.println("----------서버 접속 준비 완료----------");
			// 버퍼의 모니터 출력을 위한 출력 채널 생성

			// 입출력 시 사용할 바이트버퍼 생성
			ByteBuffer inputBuf = ByteBuffer.allocate(1024);
			ByteBuffer outputBuf = ByteBuffer.allocate(1024);

			// 클라이언트 접속 시작
			while (true) {

				selector.select(); // 이벤트 발생할 때까지 스레드 블로킹

				// 발생한 이벤트를 모두 Iterator에 담아줌
				Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

				// 발생한 이벤트들을 담은 Iterator의 이벤트를 하나씩 순서대로 처리함
				while (iterator.hasNext()) {

					// 현재 순서의 처리할 이벤트를 임시 저장하고 Iterator에서 지워줌
					SelectionKey key = iterator.next();
					iterator.remove();

					// 연결 요청중인 클라이언트를 처리할 조건문 작성
					if (key.isAcceptable()) {

						// 연결 요청중인 이벤트이므로 해당 요청에 대한 소켓 채널을 생성해줌
						ServerSocketChannel server = (ServerSocketChannel) key.channel();
						SocketChannel clientSocket = server.accept();

						// Selector의 관리를 받기 위해서 논블로킹 채널로 바꿔줌
						clientSocket.configureBlocking(false);

						// 연결된 클라이언트를 컬렉션에 추가
						allClient.add(clientSocket);

						// 아이디를 입력받기 위한 출력을 해당 채널에 해줌
						clientSocket.write(ByteBuffer.wrap("아이디를 입력해주세요 : ".getBytes()));

						// 아이디를 입력받을 차례이므로 읽기모드로 셀렉터에 등록해줌
						clientSocket.register(selector, SelectionKey.OP_READ, new ClientInfo());

					
					// 읽기 이벤트(클라이언트 -> 서버)가 발생한 경우
					} else if (key.isReadable()) {

						// 현재 채널 정보를 가져옴 (attach된 사용자 정보도 가져옴)
						SocketChannel readSocket = (SocketChannel) key.channel();
						ClientInfo info = (ClientInfo) key.attachment();

						// 채널에서 데이터를 읽어옴
						try {
							readSocket.read(inputBuf);

							// 만약 클라이언트가 연결을 끊었다면 예외가 발생하므로 처리
						} catch (Exception e) {
							key.cancel(); // 현재 SelectionKey를 셀렉터 관리대상에서 삭제
							allClient.remove(readSocket); // Set에서도 삭제
							
							// 서버에 종료 메세지 출력
							String end = info.getID() + "님의 연결이 종료되었습니다.\n";
							System.out.print(end);

							// 자신을 제외한 클라이언트에게 종료 메세지 출력
							outputBuf.put(end.getBytes());
							for(SocketChannel s : allClient) {
								if(!readSocket.equals(s)) {
									outputBuf.flip();
									s.write(outputBuf);
								}
							}
							outputBuf.clear();
							continue;
						}

						
						// 현재 아이디가 없을 경우 아이디 등록
						if (info.isID()) {
							// 현재 inputBuf의 내용 중 개행문자를 제외하고 가져와서 ID로 넣어줌
							inputBuf.limit(inputBuf.position() - 2);
							inputBuf.position(0);
							byte[] b = new byte[inputBuf.limit()];
							inputBuf.get(b);
							info.setID(new String(b));

							// 서버에 출력
							String enter = info.getID() + "님이 입장하셨습니다.\n";
							System.out.print(enter);
							
							outputBuf.put(enter.getBytes());
							
							// 모든 클라이언트에게 메세지 출력
							for(SocketChannel s : allClient) {
							
								outputBuf.flip();
								s.write(outputBuf);
							}
							
							inputBuf.clear();
							outputBuf.clear();
							continue;
						}
						
						// 읽어온 데이터와 아이디 정보를 결합해 출력한 버퍼 생성
						inputBuf.flip();
						outputBuf.put((info.getID() + " : ").getBytes());
						outputBuf.put(inputBuf);
						outputBuf.flip();
						
						for(SocketChannel s : allClient) {
							if (!readSocket.equals(s)) {
								
								s.write(outputBuf);
								outputBuf.flip();
							}
						}
						
						inputBuf.clear();
						outputBuf.clear();
					}
				}
			}

		} catch (

		IOException e) {

			e.printStackTrace();
		}
	}
}

// 접속한 사용자의 ID를 가진 클래스
class ClientInfo {

	// 아직 아이디 입력이 안된 경우 true
	private boolean idCheck = true;
	private String id;

	// ID가 들어있는지 확인
	boolean isID() {

		return idCheck;
	}

	// ID를 입력받으면 false로 변경
	private void setCheck() {

		idCheck = false;
	}

	// ID 정보 반환
	String getID() {

		return id;
	}

	// ID 입력
	void setID(String id) {
		this.id = id;
		setCheck();
	}
}

 

 


 

이것으로 간단한 논블로킹 채팅 서버 구현이 끝났습니다. 만약 블로킹 서버로 위의 로직을 구현한다면 클라이언트 하나 당 하나의 스레드를 할당해줘야 합니다. 채널에서 입력을 받을 때까지 블로킹 되기 때문에 한 스레드로는 구현이 불가능합니다. 하지만 NIO를 사용해서 논블로킹으로 구성할 시 채팅처럼 수행 시간이 짧은 간단한 로직들은 하나의 스레드로 해결할 수 있습니다. 만약 여기서 대용량 파일을 첨부하게끔 한다면 이 기능은 별도 스레드로 동작하게만들어야야 합니다. 논블로킹은 채널의 이벤트를 빠르게 돌아가면서 처리하는 것이지 병렬 처리가 아니라는 점을 명심해야 합니다. 스레드에서 CPU Core 한개가 여러 스레드를 돌아가면서 조금씩 처리해 동시에 병렬 처리되는 것처럼 보이는 것과 동일한 원리입니다.

 

 


 

[ 블로킹/다중 스레드 방식의 채팅 클라이언트 구성 ]

 

클라이언트의 경우 서버와 조금 다릅니다. 서버에서는 직접 입력받는 기능이 없지만 클라이언트는 키보드로 보낼 데이터를 입력받아야 합니다. 얼핏 키보드와 연결된 채널과 서버와 연결된 채널 두 개를 Selector에 등록하고 각자가 READ 이벤트가 발생하면 처리하면 안되나 생각할 수 있지만, Selector에 등록할 수 있는 채널은 논블로킹을 지원하는 채널만 가능합니다. 아무리 찾아봐도 키보드에서 입력받을 수 있는 채널 중에 논블로킹을 지원하는 클래스는 없는 것 같습니다. 따라서 채팅 클라이언트는 논블로킹 방식으로 만들기가 어려워 보입니다.

 

논블로킹 채널은 SelectableChannel을 상속받는 클래스에서만 지원이 가능합니다.

 

 

 

따라서 키보드에서 입력 받기 위해서는 키보드 입력이 발생할 때까지 해당 클래스를 블로킹 해야하기 때문에 서버에서 데이터를 주고 받는 스레드 하나와 키보드에서 입력 받는 스레드 두 개를 생성해서 구성했습니다. 서버의 경우 접속 클라이언트가 무한대로 늘어날 수 있기 때문에 과도한 스레드 생성을 최대한 피해야 하지만 클라이언트의 경우 딱 2개의 스레드만 있으면 되기 때문에 (물론 기능에 따라 더 필요할 수도 있음)  크게 성능에 영향을 주진 않을 것으로 보입니다. 추후 키보드 입력을 논블로킹 방식으로 받을 수 있는 방법을 발견하면 추가해 두겠습니다.

 

 


 

클라이언트는 코드가 간단합니다. 먼저 서버로 연결되는 SocketChannel을 생성해줍니다. 또 서버에서 받은 데이터를 모니터로 출력해줄 채널과 키보드에서 입력받을 채널 두 개를 생성해줍니다. print()나 println() 같은 기본 메소드로 버퍼의 내용을 바로 출력하려면 버퍼의 내용을 바이트 배열로 옮겨주는 과정이 한번 더 들어가기 때문에 그냥 채널을 하나 더 만들었습니다.

 

채널을 다 생성하고 버퍼를 만들고 나면 키보드 입력을 담당할 스레드를 만들어서 실행시켜줍니다. 마지막으로 서버에서 보내준 내용을 버퍼에 담아 모니터에 출력하는 로직을 추가합니다.

 

package hs;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;

public class Client {

	public static void main(String[] args) {

		Thread systemIn;
		// 서버 IP와 포트로 연결되는 소켓채널 생성
		try (SocketChannel socket = SocketChannel.open
				(new InetSocketAddress("172.30.1.29", 15000))) {

			// 모니터 출력에 출력할 채널 생성
			WritableByteChannel out = Channels.newChannel(System.out);

			// 버퍼 생성
			ByteBuffer buf = ByteBuffer.allocate(1024);

			// 출력을 담당할 스레드 생성 및 실행
			systemIn = new Thread(new SystemIn(socket));
			systemIn.start();

			while (true) {

				socket.read(buf); // 읽어서 버퍼에 넣고
				buf.flip();
				out.write(buf); // 모니터에 출력
				buf.clear();
			}

		} catch (IOException e) {

			System.out.println("서버와 연결이 종료되었습니다.");
		}
	}
}

 


 

키보드 입력을 담당할 클래스입니다. 여기서는 키보드(stdin)에서 입력이 들어올 때까지 블로킹돼서 기다리다가 입력이 되면 버퍼에 넣고 서버로 보내주는 역할을 담당합니다. 생성자로 서버와 연결된 채널을 공유해줍니다. 그리고 키보드로 입력받을 채널과 버퍼를 생성한 후 입력이 들어오면 서버로 보내주는 것을 반복시키면 됩니다.

 

package hs;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;

public class Client {

	public static void main(String[] args) {

		Thread systemIn;
		// 서버 IP와 포트로 연결되는 소켓채널 생성
		try (SocketChannel socket = SocketChannel.open
				(new InetSocketAddress("172.30.1.29", 15000))) {

			// 모니터 출력에 출력할 채널 생성
			WritableByteChannel out = Channels.newChannel(System.out);

			// 버퍼 생성
			ByteBuffer buf = ByteBuffer.allocate(1024);

			// 출력을 담당할 스레드 생성 및 실행
			systemIn = new Thread(new SystemIn(socket));
			systemIn.start();

			while (true) {

				socket.read(buf); // 읽어서 버퍼에 넣고
				buf.flip();
				out.write(buf); // 모니터에 출력
				buf.clear();
			}

		} catch (IOException e) {

			System.out.println("서버와 연결이 종료되었습니다.");
		}
	}
}

// 입력을 담당하는 클래스
class SystemIn implements Runnable {

	SocketChannel socket;
	

	// 연결된 소켓 채널과 모니터 출력용 채널을 생성자로 받음
	SystemIn(SocketChannel socket) {
		this.socket = socket;
	}

	@Override
	public void run() {

		// 키보드 입력받을 채널과 저장할 버퍼 생성
		ReadableByteChannel in = Channels.newChannel(System.in);
		ByteBuffer buf = ByteBuffer.allocate(1024);

		try {
			while (true) {
				in.read(buf); // 읽어올때까지 블로킹되어 대기상태
				buf.flip();
				socket.write(buf); // 입력한 내용을 서버로 출력
				buf.clear();
			}

		} catch (IOException e) {
			System.out.println("채팅 불가.");
		}
	}
}

 

728x90

댓글

💲 추천 글