你的位置:首页 > 信息动态 > 新闻中心
信息动态
联系我们

libjingle thread 代码阅读

2022/5/16 5:03:08

Libjingle中 thread的数据结构

1MessageQueueManager:是一个单实例类

接口:Instance();//获取此类对象

Add(),Remove,clear.它通过一个vector管理着MessageQueue;

 

2.Message,表示消息

数据成员包括:MessageHandler *phandler,message_id,MessageData *pdata,ts_sensitive;

 

3.文件中包含一个全局的Message链表list,MessageList;

 

4.DelayedMessage:它会进入一个优先级队列,按照trigger time进行排序,如果有相同的trigger time,则按num_进行排序。

 

DelayedMessage :提供了operator < (const DelayedMessage&dmsg) const 便于进行排序。成员包括:cmsDelay_,msTrigger_,num_,Message msg_;

 

5.SocketFactory:为一个纯虚类

包括的接口:virtual Socket *CreateSocket(int type) = 0;//返回一个新的socket用来进行阻塞通信,type的类型可以为SOCK_DGRAM SOCK_STREAM

virtualAsyncSocket* CreateAsyncSocket(int type) = 0;//返回一个新的socket用来进行非阻塞通信,type的类型可以为SOCK_DGRAM SOCK_STREAM

 

6.Socket:为一个纯虚类,包含的接口如下:

GetLocalAddress()//返回和socket绑定的地址,如果没有进行绑定,返回any-address;

GetRemoteAddress()//返回socket连接的远端地址,如果没有连接,返回any-address

Bind();Connect();Send();SendTo();0Recv(),RecvFrom();Accept();Listen();Close();GetError(),SetError();IsBlocking(),

包含的连接状态如下:ConnState{CS_CLOSED,CS_CONNECTING,CS_CONNECTED}

GetState()//获取连接状态

EstimateMTU()//估计MTU

通过一下数据结构可以设置socket的一些选项

enum Option{OPT_DONTFRAGMENT,

OPT_RCVBUF,//接收缓冲区大小

OPT_SNDBUF,//发送缓冲区大小

OPT_NODELAY,//是否启用Nagle算法

}

GetOption(),SetOption

7.AsyncSocket:public Socket

包含的独有的数据结构如下:

sigslot::signal1<AsyncSocket *>SignalReadEvent;//read to read

SignalWriteEvent//等待写

SignalConnectEvent;//已经连接

SignalCloseEvent;//已经关闭

 

8.AsyncSocketAdapter:publicAsyncSocket,publicsigslot::has_slos这个子类负责具体的操作。

9.SocketServer:public SocketFactory

//提供了在一系列的socket上等待活动的能力,thread类对socketserver 进行了漂亮的封装。

SocketServer也是一个socketfactory.包含的接口如下

SetMessageQueue()//socket server 已经被安装到thread,这个函数会被调用,用来允许socket serverthreadmessage queue

Wait(int cms,bool process_io)//沉睡直到:1cms毫秒已经过去

2WakeUp被调用,当沉睡时,如果process_io为真时io可以进行

WakeUp();

 

10.MessageQueue

SocketServer *socketserver();

Set_socketserver(Socketserver *ss);

注意:MessageQueue的行为已经改变,当MQ已经停止,再postsend就会失败,然而,任何的pending sendsready postsget 返回false之前会被传送。为了保证这些消息的传输,当一个MessageHandler和一个MessageQueue可能各自独立的释放时我们要消除竞争条件。

Quit(),IsQuitting(),Restart().

Get(Message *pmsg,int cmsWait = kForever,boolprocess_io = true)//Get()将会处理IO直到一个消息是可用的(返回true,2,cmsWait秒已经过去(return false),3Stop被调用。

Peek(Message *pmsg,int cmsWait = 0);

Post(MessageHandler *phandler,uint32 di =0,MessageData *pdata = NULL,bool time_sensitive = false);//把消息postmessage queue中,这样thread就可以通过get来获取,获取之后来dispatch这个message.

PostDelayed(int cmsDelay,Messagehandler*phandler,uint32 id = 0,MessageData *pdata = NULL)

PostAt(uint32 tstamp,MessageHandler*phandler,uint32 id = 0,MessageData *pdata = NULL)

Clear(MessageHandler *phandler,uint32 id =MOID_ANY,MessageList *removed = NULL);

Dispatch(Message *pmsg);//Dispatch message 之后可以触发taskonMessage,然后在taskonMessage函数中进行RunTask()操作,在RunTasks()中调用了InternalRunTasks.InternalRunTasks中的核心代码如下:

for (size_t i = 0; i < tasks_.size(); ++i) {

      while (!tasks_[i]->Blocked()) {

        tasks_[i]->Step();

        did_run = true;

      }

其中tasks是一个任务的集合,while是一个轮询,如果没有这个task没有block,则在step中进行任务。

ReceiveSends();

GetDelay();

Empty();

Size()

Sigslot::signal0<> SignalQueueDestroyed;//当这个信号被发送之后,这个queue的任何引用都不能再被使用。

 

11ThreadManager:为一个全局的静态类,

Instance();CurrentThread();SetCurrentThread(Thread* thread);

Thread *WrapCurrentThread();

UnwrapCurrentThread();

 

12.线程优先级enum ThreadPriority{PRIORITY_IDLE= -1,PRIORITY_NORMAL =0,PRIORITY_ABOVE_NORMAL = 1PRIORITY_HIGH= 2}

13,Runnable接口类

核心函数:Run(Thread * thread);

14.Thread:public MessageQueue

Thread *Current();

IsCurrent()

Bool SleepMs(int millis);

Name(),SetName;//必须在Start()之前调用

Priority(),

SetPriority(ThreadPriority priority);//必须在Start()之前调用

Start(Runnable *runnable= NULL);//开始线程的执行

Stop(),让线程停止和等待线程被join。不要在currentthread上调用stop,

Run();//默认情况下,Thread::Run()调用ProcessMessages(kForever),为了做其他工作,重载Run(),为了接收和发送消息,偶尔调用ProcessMessages.

Send(MessageHandler *phandler,uint32 id =0,MessageData *pdata = NULL)

Clear()

ReceiveSends();

ProcessMessages(int cms);//ProcessMessages 直到以下两种情况发生时就会进行io和发送消息,1Cms 毫秒已经过去(return true2.Stop()被调用(return false;

Join()//阻塞调用线程直到这个线程已经结束。

主要数据成员:std::list<_SendMessage> sendlist_;

 

15:start函数中请求了线程调度策略的设置和线程优先级的设置,并且在此函数中进行线程的创建。

其中创建线程的函数如下:

int error_code = pthread_create(&thread_, &attr, PreRun, init);,创建成功之后会会回调PreRun函数。

PreRun函数中调用Run函数,并且把当前线程指定为threadmanagercurrentthread.

Run中调用了ProcessMessages(kForever);

 

16.class SignalThread:publicsigslot::has_slots<>,protected MessageHandler;//worker threads的基类。

包含的接口:

SetName()//上下文为主线程,在start之前调用

SetPriority(ThreadPriority priority);//上下文为主线程,在start之前调用

Start();//上下文为主线程,开始worker thread

Release();//上下文为主线程,如果worker thread 已经完成,立马删除这个对象,否则,SignalWorkDone 会发送,安排这个对象在worker thread完成之后进行删除。

Destroy(bool wait)//上下文为主线程,如果worker thread没有正在运行,立即删除这个对象。否则,让worker thread 退出处理,安排这个对象被删除一旦这个worker thread 退出,SignalWorkDone 不会被发送。如果wait true,直到the thread被删除才会返回。

Sigslot::signal1<SignalThread *>SignalWorkDone;//上下问为主线程,当work完成时会发送。

Thread *worker();//返回线程

OnWorkStart();//上下文为主线程,子类应该在重载用于pre-work的建立

DoWork()//上下文为主线程,子类应该重写用于do work;

ContinueWork();OnWorkStop;OnWorkDone();

OnMessage()//上下文为任意线程

 

17.class TaskParent

包含的接口和数据成员:

GetParent(),GetRunner(),AllChildrenDone(),AnyChildError(),OnStopped(Task*task),AbortAllChildren(),parent(),

私有的接口和成员:

Initialize(),OnChildStopped(Task*child);AddChild(Task *child),

Parent_,runner_,child_error_,typedefstd::set<Task *> ChildSet;

Scoped_ptr<ChildSet > children_;

 

18.class Task:public TaskParent

Task(TaskParent*parent);unique_id();Start();Step();GetState();HasError(),Blocked(),IsDone(),ElapsedTime();

Abort//外部调用,用来停止task

TimedOut,timeout_time(),

Timeout_seconds(),set_timeout_seconds()

sigslot::signal0<> SignalTimeout

Wake()//task内部调用当task可能不被阻塞时。

task的状态如下

enum{STATE_BLOCKED =-1,STATE_INIT,STATE_START,STATE_DONE,STATE_ERROR,STATE_RESPONSE,STATE_NEXT},子类如果需要更多的状态可以在这里添加

Error();//内部调用让task wake和发送一个错误的信号。

 

CurrentTime();

GetStateName;

Process(int state);

Stop();

ProcessStart();

ProcessResponse(),ResetTimeout();ClearTimeout;OnTimeout(),

私有接口和私有数据

void Done();

int state_;bool blocked_;bool done_;boolaborted_;bool busy_;bool error_; int64 start_time_;int64 timeout_time_;inttimeout_seconds_;bool timeout_suspended_;int32 unique_id_

 

19.TaskRunner:public TaskParent,publicsigslot::has_slots<>

WakeTasks(),StartTask(Task*task);RunTasks();PollTasks();

UpdateTaskTimeout(Task *task, int64 previous_task_timeout_time)

std::vector<Task *> tasks_;Task*next_timeout_task_;


其中还有不少的疑问,望了解的朋友不啬赐教,大家一起交流,共同进步。