你的位置:首页 > 信息动态 > 新闻中心
信息动态
联系我们

Hadoop yarn源码分析(八) AsyncDispatcher事件异步分发器 2021SC@SDUSC

2021/11/27 10:12:06

2021SC@SDUSC

Hadoop yarn源码分析(八) AsyncDispatcher事件异步调度器 2021SC@SDUSC

  • 一、AsyncDispatcher 概述
  • 二、AsyncDispatcher 属性
    • 2.1 AsyncDispatcher成员变量
    • 2.2 AsyncDispatcher构造器
  • 三、AsyncDispatcher 源码分析
    • 3.1 serviceInit()方法
    • 3.2 serviceStart()方法
    • 3.3 run()方法
    • 3.3 dispatch()方法
    • 3.5 serviceStop()方法

一、AsyncDispatcher 概述

作为yarn中的事件异步调度器,AsyncDispatcher是RM中基于阻塞队列的调度事件的组件,它在一个特定的单线程中分派事件,并将分派的事件交给AsyncDispatcher中已经注册的对应的EventHandler事件处理器来处理。可以用一个线程池来调度时间,从而完成对多个事件的处理。
处理过程如下:处理请求进入系统后,由AsyncDispatcher(异步调度器)传递给相应的EventHandler(事件处理器)。之后,该事件处理器可能将事件转发给另外一个事件处理器,也有可能交给一个含有限状态机的事件处理器,最终的处理结果以事件的形式传送给AsyncDispatcher。新的事件会被AsyncDispatcher处理后再次转发,直到处理完成。
AsyncDispatcher
在基于事件驱动的编程模型中,所有对象被抽象成了事件处理器,而事件处理器之间又通过事件相互关联。每种事件处理器处理一种类型的事件, 同时根据需要触发另外一种事件。

二、AsyncDispatcher 属性

2.1 AsyncDispatcher成员变量

AsyncDispatcher中还有一些标志位,如下:
1.stopped:是否停止的标志位
2.drainEventsOnStop:在stop功能中开启/禁用流尽分发器事件的配置标志位。如果启动成功,则AsyncDispatcher停止前需要先处理完eventQueue中的事件,否则直接停止。
3.drained:stop功能中所有剩余分发器事件已经被处理或流尽的标志位
4.waitForDrained:drain标志位上的等待锁。
5.blockNewEvents:在AsyncDispatcher停止过程中阻塞新近到来的事件进入队列的标志位,仅当drainEventsOnStop启用(即为true)时有效。
6.exitOnDispatchException:确保调度程序崩溃,但不做系统退出system-exit的标志位。

//package org.apache.hadoop.yarn.event.AsyncDispatcher.java
  //待处理事务阻塞队列
  private final BlockingQueue<Event> eventQueue;
  //AsyncDispatcher标志位,是否停止
  private volatile boolean stopped = false;

  //控制详细信息队列事件打印的配置
  private int detailsInterval;
  private boolean printTrigger = false;

  //在stop功能中开启/禁用流尽分发器事件的配置标志位
  private volatile boolean drainEventsOnStop = false;

  //stop功能中所有剩余分发器事件已经被处理或流尽的标志位
  private volatile boolean drained = true;
  //drained的等待锁
  private final Object waitForDrained = new Object();

  // 在AsyncDispatcher停止过程中阻塞新近到来的事件进入队列的标志位,仅当drainEventsOnStop启用(即为true)时有效
  private volatile boolean blockNewEvents = false;
  //实例
  private final EventHandler<Event> handlerInstance = new GenericEventHandler();

  //事件调度线程
  private Thread eventHandlingThread;
  //事件类型枚举类Enum到事件处理器EventHandler实例的映射集合
  protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
  
  //确保调度程序崩溃,但不做系统退出system-exit
  private boolean exitOnDispatchException = true;

2.2 AsyncDispatcher构造器

无参构造: AsyncDispatcher中的eventQueue,是一个阻塞队列,默认的实现为线程安全的链式阻塞队列LinkedBlockingQueue,这在其无参构造方法中有体现。
有参构造:初始化eventDispatchers集合为HashMap,专门存储枚举Enum类型事件至事件处理器EventHandler实例的映射关系。所有被调度器分发的事件,都必须按照eventDispatchers来注册一个事件处理器EventHandler,若未注册,则调度器不会分发事件。

  //无参构造
  public AsyncDispatcher() {
    //调用有参构造,传入LinkedBlockingQueue实例
    this(new LinkedBlockingQueue<Event>());
  }

  //有参构造
  public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
    super("Dispatcher");
    this.eventQueue = eventQueue;
    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
    this.eventTypeMetricsMap = new HashMap<Class<? extends Enum>,
        EventTypeMetrics>();
  }

三、AsyncDispatcher 源码分析

线程启动

3.1 serviceInit()方法

取参数yarn.dispatcher.exit-on-error, 参数未配置默认为false

  @Override
  protected void serviceInit(Configuration conf) throws Exception{
    super.serviceInit(conf);
    this.detailsInterval = getConfig().getInt(YarnConfiguration.
                    YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD,
            YarnConfiguration.
                    DEFAULT_YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD);
  }

3.2 serviceStart()方法

创建一个事件调度线程eventHandlingThread
,并启动线程。

  @Override
  protected void serviceStart() throws Exception {
    //启动
    super.serviceStart();
    eventHandlingThread = new Thread(createThread());
    eventHandlingThread.setName(dispatcherThreadName);
    //启动事件调度线程eventHandlingThread
    eventHandlingThread.start();
  }

3.3 run()方法

eventHandlingThread由createThread()方法来定义的,并在其中有一个run()方法。
run方法中的while循环,判断标志位stopped是否为false,即当AsyncDispatcher未停止且当前线程并未中断的时候,一直运行。
先将标志位drained赋值为eventQueue是否为空,若停止过程中阻止新的事件加入队列,将标志位blockNewsEvents设为true,若待处理的事件都已调度完,则将drained赋值为true,调用waitForDrained的notify()方法通知等待者。
当遇到正常时事件event时,用take()方法取走blockingQueue中首位对象,若blockingQueue为空,则阻塞,并等待至有新的数据加入,并调用dispatch()方法分发事件。

Runnable createThread() {
    return new Runnable() {
      @Override
      public void run() {
        //标志位stopped为false,且当前线程未中断的话,一直运行
        while (!stopped && !Thread.currentThread().isInterrupted()) {
          //判断事件调度队列是否为空,并将值赋给标志位drained
          drained = eventQueue.isEmpty();
          //如果停止过程中阻止新的事件加入待处理队列,即标志位blockNewEvents为true
          if (blockNewEvents) {
            synchronized (waitForDrained) {
              if (drained) {
                waitForDrained.notify();
              }
            }
          }
          Event event;
          try {
            //从eventQueue中取出一个事件
            event = eventQueue.take();
          } catch(InterruptedException ie) {
            if (!stopped) {
              LOG.warn("AsyncDispatcher thread interrupted", ie);
            }
            return;
          }
          if (event != null) {
            if (eventTypeMetricsMap.
                get(event.getType().getDeclaringClass()) != null) {
              long startTime = clock.getTime();
              dispatch(event);
              eventTypeMetricsMap.get(event.getType().getDeclaringClass())
                  .increment(event.getType(),
                      clock.getTime() - startTime);
            } else {
              //调用dispatch()方法进行分发调度
              dispatch(event);
            }
            if (printTrigger) {
              //记录可能会导致队列中的事件过多的最新的调度事件类型
              LOG.info("Latest dispatch event type: " + event.getType());
              printTrigger = false;
            }
          }
        }
      }
    };
  }

3.3 dispatch()方法

@SuppressWarnings("unchecked")
  protected void dispatch(Event event) {
    //所有事件都通过此循环
    LOG.debug("Dispatching the event {}.{}", event.getClass().getName(),
        event);
    //根据event确定事件类型枚举类
    Class<? extends Enum> type = event.getType().getDeclaringClass();

    try{
      //根据事件类型枚举类type,从eventDispatchers中获取事件处理器EventHandler实例handler
      EventHandler handler = eventDispatchers.get(type);
      if(handler != null) {
        handler.handle(event);
      } else {
        throw new Exception("No handler for registered for " + type);
      }
    } catch (Throwable t) {
      //将队列的状态记入日志中
      LOG.error(FATAL, "Error in dispatcher thread", t);
      // 如果调用serviceStop,退出该线程.
      if (exitOnDispatchException
          && (ShutdownHookManager.get().isShutdownInProgress()) == false
          && stopped == false) {
        stopped = true;
        Thread shutDownThread = new Thread(createShutDownThread());
        shutDownThread.setName("AsyncDispatcher ShutDown handler");
        shutDownThread.start();
      }
    }
  }

3.5 serviceStop()方法

先对标志位drainEventsOnStop进行判断,若为true,则AsyncDispatcher需要在挺值钱处理完待调度处理队列eventQueue中事件,需要先将标志位blockNewEvents设置为true,阻止新的事件加入,记录info的日志信息。waitForDrained上通过synchronized进行同步:若队列中的事件还没处理完,同时eventHandlingThread线程仍然存活,waitForDrained释放锁,调用wait方法,等待1s,并记录info的日志信息。将标志位stopped设为true,标志着AsyncDispatcher服务停止。
若eventHandlingThread线程不为null,则中断eventHandlingThread线程,等待eventHandlingThread的结束,调用父类中的serviceStop()方法。

@Override
  protected void serviceStop() throws Exception {
    if (drainEventsOnStop) {
      blockNewEvents = true;
      LOG.info("AsyncDispatcher is draining to stop, ignoring any new events.");
      long endTime = System.currentTimeMillis() + getConfig()
          .getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT,
              YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT);

      synchronized (waitForDrained) {
        while (!isDrained() && eventHandlingThread != null
            && eventHandlingThread.isAlive()
            && System.currentTimeMillis() < endTime) {
          waitForDrained.wait(100);
          LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
              eventHandlingThread.getState());
        }
      }
    }
    stopped = true;
    if (eventHandlingThread != null) {
      eventHandlingThread.interrupt();
      try {
        eventHandlingThread.join();
      } catch (InterruptedException ie) {
        LOG.warn("Interrupted Exception while stopping", ie);
      }
    }

    //停止
    super.serviceStop();
  }