你的位置:首页 > 信息动态 > 新闻中心
信息动态
联系我们

Netty——任务队列

2021/12/18 8:39:32

一、概述

任务队列中的 Task 有3种典型的使用场景:

  • 1)用户程序自定义的普通任务
  • 2)用户自定义的定时任务
  • 3)非当前 Reactor 线程调用 Channel 的各种方法

例如在推送系统的业务线程里,根据用户的标识,找到对应的Channel 引用,然后调用 Write 类方法向该用户推送消息,就会进入到这种场景,最终 Write 会提交到任务队列种被异步消费。

二、示例

1、自定义普通任务

在NettyServerHandler的channelRead方法里修改代码,编写一个自定义普通任务提交到channel的TaskQueue中。

/*customize task*/
ctx.channel()
    .eventLoop()
    .execute(
    new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(10 * 1000);
            } catch (InterruptedException exception) {
                System.out.println("Exception: " + exception.getMessage());
            }
            System.out.println("server ctx = " + ctx);
            /*transform the msg into byte buf*/
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("client sent: " + buf.toString(CharsetUtil.UTF_8));
            System.out.println("Address of Client: " + ctx.channel().remoteAddress());
            ctx.writeAndFlush(Unpooled.copiedBuffer("Asynchronous execution reply",
										              CharsetUtil.UTF_8));
        }
    });

2、用户自定义定时任务

这种类型的任务将提交到scheduleTaskQueue,任然在ServerHandler里面添加

ctx.channel().eventLoop().schedule(new Runnable() {
    @Override
    public void run() {
        try {
            Thread.sleep(5 * 1000);
        } catch (InterruptedException exception) {
            System.out.println("Exception: " + exception.getMessage());
        }
        ctx.writeAndFlush(Unpooled
                          .copiedBuffer("Asynchronous execution reply",
                                        CharsetUtil.UTF_8));
    }
},5, TimeUnit.SECONDS);