|
马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有帐号?立即注册
x
要多google,因为我不可能,也不可以给你解答所有内容,我只能告诉你一些关键点,甚至我会故意隐瞒答案,因为在寻找答案的过程中。
1、基础道理
偶然候我们必要完成一个大众的模块,必要对多个其他的模块供应服务,最经常使用的体例就是完成一个SocketServer,承受客户的哀求,并前往给客户了局。
这常常触及到假如办理多个毗连及怎样多线程的供应服务的成绩,经常使用的体例就是毗连池和线程池,基础流程以下:
起首服务器端有一个监听线程,不休监听来自客户真个毗连。
当一个客户端毗连到监听线程后,便创建了一个新的毗连。
监听线程将新创建的毗连放进毗连池举行办理,然后持续监听新来的毗连。
线程池中有多个服务线程,每一个线程都监听一个义务行列,一个创建的毗连对应一个服务义务,当服务线程发明有新的义务的时分,便用此毗连向客户端供应服务。
一个SocketServer所可以供应的毗连数可设置,假如凌驾设置的个数则回绝新的毗连。
当服务线程完成服务的时分,客户端封闭毗连,服务线程封闭毗连,余暇并守候处置新的义务。
毗连池的监控线程扫除个中封闭的毗连对象,从而能够创建新的毗连。
2、对Socket的封装
Socket的挪用次要包括以下的步骤:
挪用对照庞大,我们起首辨别两类Socket,一类是ListeningSocket,一类是ConnectedSocket.
ListeningSocket由MySocketServer卖力,一旦accept,则天生一个ConnectedSocket,又MySocket卖力。
MySocket次要完成的办法以下:
intMySocket::write(constchar*buf,intlength)
{
intret=0;
intleft=length;
intindex=0;
while(left>0)
{
ret=send(m_socket,buf+index,left,0);
if(ret==0)
break;
elseif(ret==-1)
{
break;
}
left-=ret;
index+=ret;
}
if(left>0)
return-1;
return0;
}
intMySocket::read(char*buf,intlength)
{
intret=0;
intleft=length;
intindex=0;
while(left>0)
{
ret=recv(m_socket,buf+index,left,0);
if(ret==0)
break;
elseif(ret==-1)
return-1;
left-=ret;
index+=ret;
}
returnindex;
}
intMySocket::status()
{
intstatus;
intret;
fd_setcheckset;
structtimevaltimeout;
FD_ZERO(&checkset);
FD_SET(m_socket,&checkset);
timeout.tv_sec=10;
timeout.tv_usec=0;
status=select((int)m_socket+1,&checkset,0,0,&timeout);
if(status<0)
ret=-1;
elseif(status==0)
ret=0;
else
ret=0;
returnret;
}
intMySocket::close()
{
structlingerlin;
lin.l_onoff=1;
lin.l_linger=0;
setsockopt(m_socket,SOL_SOCKET,SO_LINGER,(constchar*)&lin,sizeof(lin));
::close(m_socket);
return0;
}
MySocketServer的次要办法完成以下:
intMySocketServer::init(intport)
{
if((m_socket=socket(AF_INET,SOCK_STREAM,0))==-1)
{
return-1;
}
structsockaddr_inserverAddr;
memset(&serverAddr,0,sizeof(structsockaddr_in));
serverAddr.sin_addr.s_addr=htonl(INADDR_ANY);
serverAddr.sin_family=AF_INET;
serverAddr.sin_port=htons(port);
if(bind(m_socket,(structsockaddr*)&serverAddr,sizeof(serverAddr))==-1)
{
::close(m_socket);
return-1;
}
if(listen(m_socket,SOMAXCONN)==-1)
{
::close(m_socket);
return-1;
}
structlingerlin;
lin.l_onoff=1;
lin.l_linger=0;
setsockopt(m_socket,SOL_SOCKET,SO_LINGER,(constchar*)&lin,sizeof(lin));
m_port=port;
m_inited=true;
return0;
}
MySocket*MySocketServer::accept()
{
intsock;
structsockaddr_inclientAddr;
socklen_tclientAddrSize=sizeof(clientAddr);
if((sock=::accept(m_socket,(structsockaddr*)&clientAddr,&clientAddrSize))==-1)
{
returnNULL;
}
MySocket*socket=newMySocket(sock);
returnsocket;
}
MySocket*MySocketServer::accept(inttimeout)
{
structtimevaltimeout;
timeout.tv_sec=timeout;
timeout.tv_usec=0;
fd_setcheckset;
FD_ZERO(&checkset);
FD_SET(m_socket,&checkset);
intstatus=(int)select((int)(m_socket+1),&checkset,NULL,NULL,&timeout);
if(status<0)
returnNULL;
elseif(status==0)
returnNULL;
if(FD_ISSET(m_socket,&checkset))
{
returnaccept();
}
}
3、线程池的完成
一个线程池一样平常有一个义务行列,启动的各个线程从义务行列中合作义务,失掉的线程则举行处置:list<MyTask*>m_taskQueue;
义务行列由锁回护,使得线程平安:pthread_mutex_tm_queueMutex
义务行列必要前提变量来撑持临盆者消耗者形式:pthread_cond_tm_cond
假如义务列表为空,则线程守候,守候中的线程个数为:m_numWaitThreads
必要一个列表来保护线程池中的线程:vector<MyThread*>m_threads
每一个线程必要一个线程运转函数:
void*__thread_new_proc(void*p)
{
((MyThread*)p)->run();
return0;
}
每一个线程由MyThread类卖力,次要函数以下:
intMyThread::start()
{
pthread_attr_tattr;
pthread_attr_init(&attr);
pthread_attr_setschedpolicy(&attr,SCHED_FIFO);
intret=pthread_create(&m_thread,&attr,thread_func,args);
pthread_attr_destroy(&attr);
if(ret!=0)
return–1;
}
intMyThread::stop()
{
intret=pthread_kill(m_thread,SIGINT);
if(ret!=0)
return–1;
}
intMyThread::join()
{
intret=pthread_join(m_thread,NULL);
if(ret!=0)
return–1;
}
voidMyThread::run()
{
while(false==m_bStop)
{
MyTask*pTask=m_threadPool->getNextTask();
if(NULL!=pTask)
{
pTask->process();
}
}
}
线程池由MyThreadPool卖力,次要函数以下:
intMyThreadPool::init()
{
pthread_condattr_tcond_attr;
pthread_condattr_init(&cond_attr);
pthread_condattr_setpshared(&cond_attr,PTHREAD_PROCESS_SHARED);
intret=pthread_cond_init(&m_cond,&cond_attr);
pthread_condattr_destroy(&cond_attr);
if(ret_val!=0)
return–1;
pthread_mutexattr_tattr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_setpshared(&attr,PTHREAD_PROCESS_SHARED);
ret=pthread_mutex_init(&m_queueMutex,&attr);
pthread_mutexattr_destroy(&attr);
if(ret_val!=0)
return–1;
for(inti=0;i<m_poolSize;++i)
{
MyThread*thread=newMyThread(i+1,this);
m_threads.push_back(thread);
}
return0;
}
intMyThreadPool::start()
{
intret;
for(inti=0;i<m_poolSize;++i)
{
ret=m_threads->start();
if(ret!=0)
break;
}
ret=pthread_cond_broadcast(&m_cond);
if(ret!=0)
return–1;
return0;
}
voidMyThreadPool::addTask(MyTask*ptask)
{
if(NULL==ptask)
return;
pthread_mutex_lock(&m_queueMutex);
m_taskQueue.push_back(ptask);
if(m_waitingThreadCount>0)
pthread_cond_signal(&m_cond);
pthread_mutex_unlock(&m_queueMutex);
}
MyTask*MyThreadPool::getNextTask()
{
MyTask*pTask=NULL;
pthread_mutex_lock(&m_queueMutex);
while(m_taskQueue.begin()==m_taskQueue.end())
{
++m_waitingThreadCount;
pthread_cond_wait(&n_cond,&m_queueMutex);
--m_waitingThreadCount;
}
pTask=m_taskQueue.front();
m_taskQueue.pop_front();
pthread_mutex_unlock(&m_queueMutex);
returnpTask;
}
个中每个义务的实行由MyTask卖力,其次要办法以下:
voidMyTask::process()
{
//用read从客户端读取指令
//对指令举行处置
//用write向客户端写进了局
}
4、毗连池的完成
每一个毗连池保留一个链表保留已创建的毗连:list<MyConnection*>*m_connections
固然这个链表也必要锁来举行多线程回护:pthread_mutex_tm_connectionMutex;
此处一个MyConnection也是一个MyTask,由一个线程来卖力。
使用gcc或g++进行编译,使用gdb进行调试; |
|