네트워크, 서버

Completion Port 모델

춤추는수달 2022. 4. 30. 09:55

IOCP

Completion Port 모델은 Completion Port를 활용해 비동기 함수의 완료 통지를 받는 네트워크 모델이다.  Overlapped 모델보다 멀티 쓰레드를 활용하기 좋다. 

Completion Port(이하 CP)란 무엇인가?

CP란 비동기 입출력 작업의 완료를 통지해주는 녀석이다. 동작 방식만 보면 마치 Overlapped 모델의 APC 큐와 비슷한데, 큰 차이점은 하나의 쓰레드에서 발생한 콜백만 처리할 수 있는 APC 큐와는 달리, 모든 스레드에서 공용으로 사용할 수 있다는 점이다. 이 CP를 활용하는 방법은 대략 아래와 같다.

  1. CP를 생성함.
  2. 비동기 입출력 함수를 호출함.
  3. 새로운 쓰레드를 생성하여 CP가 완료 통지를 보내주길 기다리도록 함.
  4. CP는 비동기 입출력 함수가 완료되면 대기중인 쓰레드에게 완료되었음을 알려줌.

이렇게 멀티 쓰레드로 활용하기 좋기 때문에 보통 Overlapped 모델보다 성능이 뛰어나다.


함수들

CreateIoCompletionPort : CP 생성 및 소켓을 등록하는 함수. CP를 생성하려면 매개변수로 모두 0을 전달해주면 된다. 소켓 등록 시 세 번쨰 인자로 전달해준 포인터는 완료 통지를 받을 때 그대로 다시 전달받을 수 있으므로 필요한 정보를 담아서 넘겨주면 된다. 여기선 세션 및 소켓의 정보를 넘겨 어떤 소켓의 비동기 함수가 완료되었는지 구분할 수 있도록 하겠다.

//CP 생성
HANDLE iocpHandle = ::CreateCompletionPort(INVALID_ID_HANDLE_VALUE, NULL,0,0);
//CP에 소켓 등록
//등록할 소켓, iocp 핸들, 소켓 구분할 키, iocp가 활용할 최대 스레드 수(0이면 자동으로 최대 코어 수)
CreateCompletionPort(clientSocket, iocpHandle, (ULONG_PTR)session, 0);

WSARecv, WSASend : 비동기 입출력 함수. Overlapped 모델에서 사용한 그것과 같다. 6번째 인자인 Overlapped 구조체는 딱히 유용한 정보를 담고있지 않다. 하지만 위의  CreateIoCompletionPort 함수의 3번째 인자와 마찬가지로 비동기 작업 완료 시 그대로 받아볼 수 있다. 때문에 여기서는 Overlapped 구조체와 비동기 작업의 타입을 포함하는 새로운 구조체(OverlappedEx)를 선언하여 이 구조체에 포함된 Overlapoped 구조체의 포인터를 전달해주었다. 이렇게 하면 비동기 작업 완료 시 어떤 종류의 비동기 작업이 완료되었는지 확인할 수 있다.

WSABUF wsaBuf;
wsaBuf.buf = session->recvBuffer;
wsaBuf.len = BUFSIZE;

OverlappedEx* overlappedEx = new OverlappedEx();
overlappedEx->type = IO_TYPE::READ;
        
DWORD recvLen = 0;
DWORD flags = 0;
::WSARecv(session->socket, &wsaBuf, 1, &recvLen, &flags, &overlappedEx->overlapped, NULL);

GetQueuedCompletionStatus : 결과 처리 함수. 호출하면 완료된 I/O가 존재하고, CPU를 할당받을 때까지 대기한다. 즉 대기가 풀리면 WSARecv 등의 비동기 입출력 함수로 요청한 I/O가 완료되었다는 의미이다. 그럼 넘겨준 인자를 통해 결과 데이터를 받아볼 수 있고, 그것을 가지고 원하는 작업을 하면 된다. 세 번째 인자로 받는 포인터에는 CreateIoCompletionPort 함수에 세 번째 인자로 전해주었던 그것이 채워지고, 4번째 인자로 받는 포인터는 비동기 입출력 함수에 6번째인자로 전해주었던 그것이 채워진다. 이를 활용해 어떤 소켓의 어떤 비동기 작업이 완료되었는지 파악할 수 있다. 

DWORD bytesTransferred = 0;
Session* session = nullptr;
OverlappedEx* overlappedEx = nullptr;
//iocp 핸들, 전송된 데이터 바이트수, 소켓 구분 키, overlapped 객체, 대기시간
BOOL ret = ::GetQueuedCompletionStatus(iocpHandle, &bytesTransferred,
    (ULONG_PTR*)&session, (LPOVERLAPPED*)&overlappedEx, INFINITE);

if (ret == FALSE || bytesTransferred == 0)
{
    // TODO : 연결 끊김
    continue;
}

ASSERT_CRASH(overlappedEx->type == IO_TYPE::READ);

cout << "Recv Data IOCP = " << bytesTransferred << endl;

흐름

  1. 소켓을 생성함
  2. CP를 생성함
  3. 소켓을 CP에 등록함.
  4. 완료 통지를 받을 쓰레드를 생성함.
  5. 생성된 쓰레드에서 GetQueuedCompletionStatus 함수를 호출해 완료 통지가 올 때까지 대기함.
  6. 비동기 입출력 함수를 호출함.
  7. 비동기 입출력 작업이 완료되면 대기하던 쓰레드가 완료 통지를 받음.
  8. 쓰레드에서 입출력된 내용을 활용해 작업함.

 


코드

const int32 BUFSIZE = 1000;

struct Session
{
	SOCKET socket = INVALID_SOCKET;
	char recvBuffer[BUFSIZE] = {};
	int32 recvBytes = 0;	
};

enum IO_TYPE
{
	READ,
	WRITE,
	ACCEPT,
	CONNECT,
};

struct OverlappedEx
{
	WSAOVERLAPPED overlapped = {};
	int32 type = 0; // read, write, accept, connect ...
};

void WorkerThreadMain(HANDLE iocpHandle)
{
	while (true)
	{
		DWORD bytesTransferred = 0;
		Session* session = nullptr;
		OverlappedEx* overlappedEx = nullptr;

		BOOL ret = ::GetQueuedCompletionStatus(iocpHandle, &bytesTransferred,
			(ULONG_PTR*)&session, (LPOVERLAPPED*)&overlappedEx, INFINITE);

		if (ret == FALSE || bytesTransferred == 0)
		{
			// TODO : 연결 끊김
			continue;
		}

		ASSERT_CRASH(overlappedEx->type == IO_TYPE::READ);

		cout << "Recv Data IOCP = " << bytesTransferred << endl;

		WSABUF wsaBuf;
		wsaBuf.buf = session->recvBuffer;
		wsaBuf.len = BUFSIZE;

		DWORD recvLen = 0;
		DWORD flags = 0;
		::WSARecv(session->socket, &wsaBuf, 1, &recvLen, &flags, &overlappedEx->overlapped, NULL);
	}
}

int main()
{
	// Overlapped 모델 (Completion Routine 콜백 기반)
	// - 비동기 입출력 함수 완료되면, 쓰레드마다 있는 APC 큐에 일감이 쌓임
	// - Alertable Wait 상태로 들어가서 APC 큐 비우기 (콜백 함수)
	// 단점) APC큐 쓰레드마다 있다! Alertable Wait 자체도 조금 부담!
	// 단점) 이벤트 방식 소켓:이벤트 1:1 대응

	// IOCP (Completion Port) 모델
	// - APC -> Completion Port (쓰레드마다 있는건 아니고 1개. 중앙에서 관리하는 APC 큐?)
	// - Alertable Wait -> CP 결과 처리를 GetQueuedCompletionStatus
	// 쓰레드랑 궁합이 굉장히 좋다!

	// CreateIoCompletionPort
	// GetQueuedCompletionStatus

	vector<Session*> sessionManager;

	// CP 생성
	HANDLE iocpHandle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

	// WorkerThreads
	for (int32 i = 0; i < 5; i++)
		GThreadManager->Launch([=]() { WorkerThreadMain(iocpHandle); });

	// Main Thread = Accept 담당
	while (true)
	{
		SOCKADDR_IN clientAddr;
		int32 addrLen = sizeof(clientAddr);

		SOCKET clientSocket = ::accept(listenSocket, (SOCKADDR*)&clientAddr, &addrLen);
		if (clientSocket == INVALID_SOCKET)
			return 0;

		Session* session = xnew<Session>();
		session->socket = clientSocket;
		sessionManager.push_back(session);

		cout << "Client Connected !" << endl;

		// 소켓을 CP에 등록
		::CreateIoCompletionPort((HANDLE)clientSocket, iocpHandle, /*Key*/(ULONG_PTR)session, 0);

		WSABUF wsaBuf;
		wsaBuf.buf = session->recvBuffer;
		wsaBuf.len = BUFSIZE;

		OverlappedEx* overlappedEx = new OverlappedEx();
		overlappedEx->type = IO_TYPE::READ;

		// ADD_REF
		DWORD recvLen = 0;
		DWORD flags = 0;
		::WSARecv(clientSocket, &wsaBuf, 1, &recvLen, &flags, &overlappedEx->overlapped, NULL);

		// 유저가 게임 접속 종료!
		//Session* s = sessionManager.back();
		//sessionManager.pop_back();
		//xdelete(s);
		
		//::closesocket(session.socket);
		//::WSACloseEvent(wsaEvent);
	}

	GThreadManager->Join();
	
	// 윈속 종료
	::WSACleanup();
}

https://www.inflearn.com/course/%EC%96%B8%EB%A6%AC%EC%96%BC-3d-mmorpg-4/lecture/79369?tab=curriculum&volume=0.90&speed=1.5 

 

[C++과 언리얼로 만드는 MMORPG 게임 개발 시리즈] Part4: 게임 서버 - 인프런 | 학습 페이지

지식을 나누면 반드시 나에게 돌아옵니다. 인프런을 통해 나의 지식에 가치를 부여하세요....

www.inflearn.com

이 글은 위 강의를 참고하여 작성함.

'네트워크, 서버' 카테고리의 다른 글

JobQueue  (0) 2022.07.16
Google Protocol Buffer  (0) 2022.07.07
Overlapped 모델(콜백함수)  (0) 2022.04.30
WSAEventSelect model  (0) 2022.04.29
Select model  (0) 2022.04.28