Libjingle中 thread的数据结构
1.MessageQueueManager:是一个单实例类
接口:Instance();//获取此类对象
Add(),Remove,clear.它通过一个vector管理着MessageQueue;
2.Message,表示消息
数据成员包括:MessageHandler *phandler,message_id,MessageData *pdata,ts_sensitive;
3.文件中包含一个全局的Message链表list,MessageList;
4.DelayedMessage:它会进入一个优先级队列,按照trigger time进行排序,如果有相同的trigger time,则按num_进行排序。
DelayedMessage :提供了operator < (const DelayedMessage&dmsg) const 便于进行排序。成员包括:cmsDelay_,msTrigger_,num_,Message msg_;
5.SocketFactory:为一个纯虚类
包括的接口:virtual Socket *CreateSocket(int type) = 0;//返回一个新的socket用来进行阻塞通信,type的类型可以为SOCK_DGRAM 和SOCK_STREAM。
virtualAsyncSocket* CreateAsyncSocket(int type) = 0;//返回一个新的socket用来进行非阻塞通信,type的类型可以为SOCK_DGRAM 和SOCK_STREAM
6.Socket:为一个纯虚类,包含的接口如下:
GetLocalAddress()//返回和socket绑定的地址,如果没有进行绑定,返回any-address;
GetRemoteAddress()//返回socket连接的远端地址,如果没有连接,返回any-address
Bind();Connect();Send();SendTo();0Recv(),RecvFrom();Accept();Listen();Close();GetError(),SetError();IsBlocking(),
包含的连接状态如下:ConnState{CS_CLOSED,CS_CONNECTING,CS_CONNECTED}
GetState()//获取连接状态
EstimateMTU()//估计MTU
通过一下数据结构可以设置socket的一些选项
enum Option{OPT_DONTFRAGMENT,
OPT_RCVBUF,//接收缓冲区大小
OPT_SNDBUF,//发送缓冲区大小
OPT_NODELAY,//是否启用Nagle算法
}
GetOption(),SetOption
7.AsyncSocket:public Socket
包含的独有的数据结构如下:
sigslot::signal1<AsyncSocket *>SignalReadEvent;//read to read
SignalWriteEvent//等待写
SignalConnectEvent;//已经连接
SignalCloseEvent;//已经关闭
8.AsyncSocketAdapter:publicAsyncSocket,publicsigslot::has_slos这个子类负责具体的操作。
9.SocketServer:public SocketFactory
//提供了在一系列的socket上等待活动的能力,thread类对socketserver 进行了漂亮的封装。
SocketServer也是一个socketfactory.包含的接口如下
SetMessageQueue()//当socket server 已经被安装到thread,这个函数会被调用,用来允许socket server用thread的message queue
Wait(int cms,bool process_io)//沉睡直到:1,cms毫秒已经过去
2,WakeUp被调用,当沉睡时,如果process_io为真时io可以进行
WakeUp();
10.MessageQueue
SocketServer *socketserver();
Set_socketserver(Socketserver *ss);
注意:MessageQueue的行为已经改变,当MQ已经停止,再post和send就会失败,然而,任何的pending sends和ready posts在get 返回false之前会被传送。为了保证这些消息的传输,当一个MessageHandler和一个MessageQueue可能各自独立的释放时我们要消除竞争条件。
Quit(),IsQuitting(),Restart().
Get(Message *pmsg,int cmsWait = kForever,boolprocess_io = true)//Get()将会处理IO直到一个消息是可用的(返回true),2,cmsWait秒已经过去(return false),3。Stop被调用。
Peek(Message *pmsg,int cmsWait = 0);
Post(MessageHandler *phandler,uint32 di =0,MessageData *pdata = NULL,bool time_sensitive = false);//把消息post到message queue中,这样thread就可以通过get来获取,获取之后来dispatch这个message.
PostDelayed(int cmsDelay,Messagehandler*phandler,uint32 id = 0,MessageData *pdata = NULL)
PostAt(uint32 tstamp,MessageHandler*phandler,uint32 id = 0,MessageData *pdata = NULL)
Clear(MessageHandler *phandler,uint32 id =MOID_ANY,MessageList *removed = NULL);
Dispatch(Message *pmsg);//Dispatch message 之后可以触发task的onMessage,然后在task的onMessage函数中进行RunTask()操作,在RunTasks()中调用了InternalRunTasks.在InternalRunTasks中的核心代码如下:
for (size_t i = 0; i < tasks_.size(); ++i) {
while (!tasks_[i]->Blocked()) {
tasks_[i]->Step();
did_run = true;
}
其中tasks是一个任务的集合,while是一个轮询,如果没有这个task没有block,则在step中进行任务。
ReceiveSends();
GetDelay();
Empty();
Size()
Sigslot::signal0<> SignalQueueDestroyed;//当这个信号被发送之后,这个queue的任何引用都不能再被使用。
11.ThreadManager:为一个全局的静态类,
Instance();CurrentThread();SetCurrentThread(Thread* thread);
Thread *WrapCurrentThread();
UnwrapCurrentThread();
12.线程优先级enum ThreadPriority{PRIORITY_IDLE= -1,PRIORITY_NORMAL =0,PRIORITY_ABOVE_NORMAL = 1,PRIORITY_HIGH= 2}
13,Runnable接口类
核心函数:Run(Thread * thread);
14.Thread:public MessageQueue
Thread *Current();
IsCurrent()
Bool SleepMs(int millis);
Name(),SetName;//必须在Start()之前调用
Priority(),
SetPriority(ThreadPriority priority);//必须在Start()之前调用
Start(Runnable *runnable= NULL);//开始线程的执行
Stop(),让线程停止和等待线程被join。不要在currentthread上调用stop,
Run();//默认情况下,Thread::Run()调用ProcessMessages(kForever),为了做其他工作,重载Run(),为了接收和发送消息,偶尔调用ProcessMessages.
Send(MessageHandler *phandler,uint32 id =0,MessageData *pdata = NULL)
Clear()
ReceiveSends();
ProcessMessages(int cms);//ProcessMessages 直到以下两种情况发生时就会进行io和发送消息,1。Cms 毫秒已经过去(return true)2.Stop()被调用(return false);
Join()//阻塞调用线程直到这个线程已经结束。
主要数据成员:std::list<_SendMessage> sendlist_;
15:在start函数中请求了线程调度策略的设置和线程优先级的设置,并且在此函数中进行线程的创建。
其中创建线程的函数如下:
int error_code = pthread_create(&thread_, &attr, PreRun, init);,创建成功之后会会回调PreRun函数。
在PreRun函数中调用Run函数,并且把当前线程指定为threadmanager的currentthread.
在Run中调用了ProcessMessages(kForever);
16.class SignalThread:publicsigslot::has_slots<>,protected MessageHandler;//worker threads的基类。
包含的接口:
SetName()//上下文为主线程,在start之前调用
SetPriority(ThreadPriority priority);//上下文为主线程,在start之前调用
Start();//上下文为主线程,开始worker thread
Release();//上下文为主线程,如果worker thread 已经完成,立马删除这个对象,否则,SignalWorkDone 会发送,安排这个对象在worker thread完成之后进行删除。
Destroy(bool wait)//上下文为主线程,如果worker thread没有正在运行,立即删除这个对象。否则,让worker thread 退出处理,安排这个对象被删除一旦这个worker thread 退出,SignalWorkDone 不会被发送。如果wait 为true,直到the thread被删除才会返回。
Sigslot::signal1<SignalThread *>SignalWorkDone;//上下问为主线程,当work完成时会发送。
Thread *worker();//返回线程
OnWorkStart();//上下文为主线程,子类应该在重载用于pre-work的建立
DoWork()//上下文为主线程,子类应该重写用于do work;
ContinueWork();OnWorkStop;OnWorkDone();
OnMessage()//上下文为任意线程
17.class TaskParent
包含的接口和数据成员:
GetParent(),GetRunner(),AllChildrenDone(),AnyChildError(),OnStopped(Task*task),AbortAllChildren(),parent(),
私有的接口和成员:
Initialize(),OnChildStopped(Task*child);AddChild(Task *child),
Parent_,runner_,child_error_,typedefstd::set<Task *> ChildSet;
Scoped_ptr<ChildSet > children_;
18.class Task:public TaskParent
Task(TaskParent*parent);unique_id();Start();Step();GetState();HasError(),Blocked(),IsDone(),ElapsedTime();
Abort//外部调用,用来停止task
TimedOut,timeout_time(),
Timeout_seconds(),set_timeout_seconds()
sigslot::signal0<> SignalTimeout
Wake()//在task内部调用当task可能不被阻塞时。
task的状态如下
enum{STATE_BLOCKED =-1,STATE_INIT,STATE_START,STATE_DONE,STATE_ERROR,STATE_RESPONSE,STATE_NEXT},子类如果需要更多的状态可以在这里添加
Error();//内部调用让task wake和发送一个错误的信号。
CurrentTime();
GetStateName;
Process(int state);
Stop();
ProcessStart();
ProcessResponse(),ResetTimeout();ClearTimeout;OnTimeout(),
私有接口和私有数据
void Done();
int state_;bool blocked_;bool done_;boolaborted_;bool busy_;bool error_; int64 start_time_;int64 timeout_time_;inttimeout_seconds_;bool timeout_suspended_;int32 unique_id_
19.TaskRunner:public TaskParent,publicsigslot::has_slots<>
WakeTasks(),StartTask(Task*task);RunTasks();PollTasks();
UpdateTaskTimeout(Task *task, int64 previous_task_timeout_time)
std::vector<Task *> tasks_;Task*next_timeout_task_;
其中还有不少的疑问,望了解的朋友不啬赐教,大家一起交流,共同进步。
