最容易使用的任务队列是调度器(scheduler)队列,因为该队列中的任务不会在中断模式运行,因此可以做更多事,特别是它们还能睡眠。内核中有多处使用该队列完成各种任务。
在内核2.4.0-test11,实际实现调度器队列的任务队列被内核的其余部分隐藏了。使用这个队列的代码必须调用schedule_task把任务放入队列,而不能直接使用queue_task:
int schedule_task(struct tq_struct *task);
其中的 task 当然就是要调度的任务。返回值直接来自queue_task:如果任务不在队列中就返回非零。
再提一次,从版本 2.4.0-test11 开始内核使用了一个特殊进程 keventd,它唯一的任务就是运行 scheduler 队列中的任务。keventd 为它运行的任务提供了可预期的进程上下文,而不象以前的实现,任务是在完全随机的进程上下文中运行的。
对于 keventd 的执行有几点是值得牢记的。首先,这个队列中的任务可以睡眠,一些内核代码就使用了这一优点。但是,好的代码应该只睡眠很短的时间,因为在 keventd 睡眠的时候,调度器队列中的其他任务就不会再运行了。还有一点需要牢记,你的任务是和其它任务共享调度器队列,这些任务也可以睡眠。正常情况下,调度器队列中的任务会很快运行(也许甚至在schedule_task返回之前)。但如果其它某个任务睡眠了,轮到你的任务执行时,中间流逝的时间会显得很久。所以那些有严格的执行时限的任务应该使用其它队列。
/proc/jiqsched 文件是使用调度器队列的示例文件,该文件对应的 read 函数以如下的方式将任务放进队列中:
int jiq_read_sched(char *buf, char **start, off_t offset,
int len, int *eof, void *data)
{
jiq_data.len = 0; /* nothing printed, yet */
jiq_data.buf = buf; /* print in this place */
jiq_data.jiffies = jiffies; /* initial time */
/* jiq_print will queue_task() again in jiq_data.queue */
jiq_data.queue = SCHEDULER_QUEUE;
schedule_task(&jiq_task); /* ready to run */
interruptible_sleep_on(&jiq_wait); /* sleep till completion */
*eof = 1;
return jiq_data.len;
}
读取 /proc/jiqsched 文件产生如下输出:
time delta interrupt pid cpu command
601687 0 0 2 1 keventd
601687 0 0 2 1 keventd
601687 0 0 2 1 keventd
601687 0 0 2 1 keventd
601687 0 0 2 1 keventd
601687 0 0 2 1 keventd
601687 0 0 2 1 keventd
601687 0 0 2 1 keventd
601687 0 0 2 1 keventd
上面的输出中,time域是任务运行时的jiffies值,delta是自任务最近一次运行以来jiffies的增量,interrupt是in_interrupt函数的输出,pid是运行进程的ID,cpu是正被使用的CPU的编号(在单处理器系统中始终为 0),command是当前进程正在运行的命令。
在这个例子中,我们看到,任务总是在 keventd 进程中运行,而且运行得非常快,一个不断把自己重复提交给调度器队列的任务可以在一次定时器滴答中运行数百甚至数千次。即使是在一个负载很重的系统,调度器队列的延迟也是非常小的。
定时器队列的使用方法和调度器队列不同,它(tq_timer)是可以直接操作的。还有,定时器队列是在中断模式下执行的。另外,该队列一定会在下一个时钟滴答被运行,这消除了可能因系统负载造成的延迟。
示例代码使用定时器队列实现了/proc/jiqtimer。使用这个队列要用到 queue_task 函数。
int jiq_read_timer(char *buf, char **start, off_t offset,
int len, int *eof, void *data)
{
jiq_data.len = 0; /* nothing printed, yet */
jiq_data.buf = buf; /* print in this place */
jiq_data.jiffies = jiffies; /* initial time */
jiq_data.queue = &tq_timer; /* reregister yourself here */
queue_task(&jiq_task, &tq_timer); /* ready to run */
interruptible_sleep_on(&jiq_wait); /* sleep till completion */
*eof = 1;
return jiq_data.len;
}
下面是在我的系统在编译一个新内核时运行命令head /proc/jiqtimer输出的结果:
time delta interrupt pid cpu command
45084845 1 1 8783 0 cc1
45084846 1 1 8783 0 cc1
45084847 1 1 8783 0 cc1
45084848 1 1 8783 0 cc1
45084849 1 1 8784 0 as
45084850 1 1 8758 1 cc1
45084851 1 1 8789 0 cpp
45084852 1 1 8758 1 cc1
45084853 1 1 8758 1 cc1
45084854 1 1 8758 1 cc1
45084855 1 1 8758 1 cc1
注意,这次在任务的每次执行之间正好都经过了一个定时器滴答,而且正在运行的可能是任意一个进程。
最后一个可由模块代码使用的预定义队列是立即队列。这个队列通过底半处理机制运行,所以要用它还需额外的步骤。底半处理程序只有在通知内核需要它运行时才会运行,这是通过“标记”底半部完成的。对于tq_immediate,必须调用mark_bh(IMMEDIATE_BH)。注意必须在任务插入队列后才能调用mark_bh,否则可能在任务还没加入队列时内核就开始运行队列了。
立即队列是系统处理得最快的队列――它反应最快并且在中断期间运行。立即队列既可以由调度器执行,也可以在一个进程从系统调用返回时被尽快地执行,这取决于哪个事件先发生。典型的输出大致如下:
time delta interrupt pid cpu command
45129449 0 1 8883 0 head
45129453 4 1 0 0 swapper
45129453 0 1 601 0 X
45129453 0 1 601 0 X
45129453 0 1 601 0 X
45129453 0 1 601 0 X
45129454 1 1 0 0 swapper
45129454 0 1 601 0 X
45129454 0 1 601 0 X
45129454 0 1 601 0 X
45129454 0 1 601 0 X
45129454 0 1 601 0 X
45129454 0 1 601 0 X
45129454 0 1 601 0 X
显然该队列不能用于延迟任务的执行――它是个“立即”队列。相反,它的目的是使任务尽快地得以执行,但是要在“安全的时间”内。这对中断处理非常有用,因为它提供了在实际的中断处理程序之外执行处理程序代码的一个入口点,例如接收网络包的机制就类似这样。
注意不要把任务重新注册到立即队列中(尽管/proc/jiqimmed为了演示而这么做),这种做法没什么好处,而且在某些版本/平台的搭配上运行时会锁死计算机。因为在有些实现中会不断重运行立即队列直到它空为止。这种情况出现过,例如在PC上运行2.0版本的时候。
运行自己的工作队列
声明新的任务队列并不困难。驱动程序可以随意地声明一个甚至多个新任务队列。这些队列的使用和我们前面讨论过的预定义队列差不多。
与预定义队列不同的是,内核不会自动处理定制的任务队列。定制的任务队列要由程序员自己维护,并安排运行方法。
下面的宏声明一个定制队列并扩展为变量声明。最好把它放在文件开头的地方,所有函数的外面:
DECLARE_TASK_QUEUE(tq_custom);
声明完队列,就可以调用下面的函数对任务进行排队。上面的宏和下面的调用相匹配:
queue_task(&custom_task, &tq_custom);
当要运行累积的任务队列时,执行下面一行,运行tq_custom队列:
run_task_queue(&tq_custom);
如果现在想测试定制的任务队列,则需要在某个预定义的队列中注册一个函数来触发这个队列。尽管看起来象绕了弯路,但其实并非如此。当需要累积任务以便同时得到执行时,定制的任务队列是非常有用的,尽管需要用另一个队列来决定这个“同时”。
就在 2.4 内核发布之前,开发者们增加了一种用于内核任务延迟的新机制。这种新机制称为tasklet,现在是实现底半任务的推荐方法。实际上,现在的底半处理程序本身就是用 tasklet 实现的。
tasklet 在很多方面类似任务队列。它们都是把任务延迟到安全时间执行的一种方式,都在中断期间运行。象任务队列一样,即使被调度多次,tasklet 也只运行一次,不过tasklet 可以在 SMP 系统上和其它(不同的) tasklet 并行地运行。在SMP系统上,tasklet 还被确保在第一个调度它的 CPU 上运行,因为这样可以提供更好的高速缓存行为,从而提高性能。
每个 tasklet 都与一个函数相联系,当 tasklet 要运行的时候该函数被调用。该函数只有一个 unsigned long 类型的参数,这多少使一些内核开发者的生活变得轻松;但对那些宁愿传递一个指针的开发人员来说肯定是增加了苦恼。把long类型的参数转换为一个指针类型在所有已支持的平台上都是安全的操作,在内存管理中(第13章讨论)更是普遍使用。这个tasklet的函数的类型是void,无返回值。
tasklet 的实现部分在
DECLARE_TASKLET(name, function, data);
用指定的名字 name 声明一个 tasklet,在该 tasklet 执行时(后面要讲到),指定的函数 function 被调用,传递的参数值为 (unsigned long) data 。
DECLARE_TASKLET_DISABLED(name, function, data);
和上面一样声明一个 tasklet,不过初始状态是“禁止的”,意味着可以被调度但不会执行,直到被“使能”以后才能执行。
用2.4的头文件编译 jiq 示例驱动程序,可以实现 /proc/jiqtasklet,它和其他的 jiq 入口工作类似,只不过使用了tasklet。我们并没有在 sysdep.h 中为旧版本模拟实现 tasklet。该模块如下定义它的 tasklet:
void jiq_print_tasklet (unsigned long);
DECLARE_TASKLET (jiq_tasklet, jiq_print_tasklet, (unsigned long)
&jiq_data);
当驱动程序要调度一个 tasklet 运行的时候,它调用 tasklet_schedule:
tasklet_schedule(&jiq_tasklet);
一旦一个 tasklet 被调度,它就肯定会在一个安全时间运行一次(如果已经被使能)。tasklet 可以重新调度自己,其方式和任务队列一样。在多处理器系统上,一个 tasklet 无须担心自己会在多个处理器上同时运行,因为内核采取了措施确保任何 tasklet 都只能在一个地方运行。但是,如果驱动程序中实现了多个 tasklet,那么就可能会有多个 tasklet 在同时运行。在这种情况下,需要使用自旋锁来保护临界区代码(信号量是可以睡眠的,因为 tasklet 是在中断期间运行,所以不能用于tasklet)。
/proc/jiqtasklet的输出如下:
time delta interrupt pid cpu command
45472377 0 1 8904 0 head
45472378 1 1 0 0 swapper
45472379 1 1 0 0 swapper
45472380 1 1 0 0 swapper
45472383 3 1 0 0 swapper
45472383 0 1 601 0 X
45472383 0 1 601 0 X
45472383 0 1 601 0 X
45472383 0 1 601 0 X
45472389 6 1 0 0 swapper
注意这个tasklet总是在同一个CPU上运行,即使输出来自双CPU系统。
tasklet 子系统提供了一些其它的函数,用于高级的tasklet操作:
void tasklet_disable(struct tasklet_struct *t);
这个函数禁止指定的tasklet。该tasklet仍然可以用 tasklet_schedule 调度,但执行被推迟,直到重新被使能。
void tasklet_enable(struct tasklet_struct *t);
使能一个先前被禁止的tasklet。如果该tastlet已经被调度,它很快就会运行(但一从 tasklet_enable 返回就直接运行)。
void tasklet_kill(struct tasklet_struct *t);
该函数用于对付那些无休止地重新调度自己的 tasklet。tasklet_kill 把指定的 tasklet 从它所在的所有队列删除。为避免与正重新调度自己的tasklet产生竞态,该函数会等到tasklet执行,然后再把它移出队列。这样就可以确保 tasklet 不会在中途被打断。然而,如果目标 tasklet 当前既没有运行也没有重调度自己,tasklet_kill会挂起。tasklet_kill不能在中断期间被调用。
常常的,我们有‘housekeeping’的任务需要在某个时间做或者偶尔经常如此。如果任务由进程完成,我们可以将它放在 crontab 文件中。如果任务由内核模块完成,我们有两种可能。第一个是在 crontab 文件中放置一个在必要的时候通过系统调用唤醒模块的进程,例如通过打开文件。这是非常低效的,然而--我们运行一个不在 crontab 中的新进程, 读一个新的可执行的进程到内存,而所有这些只是唤醒在内存中的内核模块。
替代的,我们可以创建一个对每个定时器中断被调用一次的函数。我们的办法是创建一个包含在 tq_struct结构中的任务,而该结构包含该函数的指针。然后我们使用 queue_task 将那个任务放置在被称为tq_timer 的任务列表中,该列表是在下一个定时器中断将被执行的任务的列表。因为我们我们想该函数在下一次定时器中断时继续被执行,我们需要在它被调用后将它放回 tq_timer。
这还有一点我们需要记住的。当一个模块被 rmmod 移除时,它的引用计数器首先被检查,如果它为0,module_cleanup 将被调用。然后模块连同它的所有函数被从内存中清除。没有人去检查看在定时器任务列表中是否碰巧包含一个这样的不再可见的函数的指针。一段时间后(从计算机的观点看,而从人的观点看它什么也不是,它少于百分之一秒),内核有了一个定时器中断并试图去调用任务列表中的函数。不幸的,那个函数不在那儿。在大多情况下它刚才所在内存页没有被使用,而你会得到一个难看的错误消息。但是如果别的某些代码现在位于同一个内存位置,事情会变得 非常 难看。不幸的,我们没有一个简单的办法将一个任务从任务列表中注销。
既然 cleanup_module 不能返回错误代码(它是一个void函数),解决的办法是根本不让它返回。替代的,它调用sleep_on 或 module_sleep_on(他们实际上是相同的。 )使 rmmod 进程睡眠。在此之前,它通过设置一个全局变量通知在定时器中断将被调用的函数停止连接自己。然后,在下一次定时器中断, rmmod进程被唤醒,当我们的函数不再在那个队列中时移除那个模块就是安全的了。
范例 sched.c
/* sched.c - 安排一个函数在每次定时器中断时被调用 */
/* Copyright (C) 1998 by Ori Pomerantz */
/* 必要头文件 */
/* 标准头文件 */
#include /* 内核工作 */
#include /* 明确指定是模块 */
/* 处理 CONFIG_MODVERSIONS */
#if CONFIG_MODVERSIONS==1
#define MODVERSIONS
#include
#endif
/* 我们使用 proc 文件系统所必要的 */
#include
/* 我们在这儿安排任务 */
#include
/* 我们也需要睡眠和唤醒的能力 */
#include
/* 2.2.3 版/usr/include/linux/version.h 包含该宏
* 但2.0.35 版不包含 - 加入以备需要 */
#ifndef KERNEL_VERSION
#define KERNEL_VERSION(a,b,c) ((a)*65536+(b)*256+(c))
#endif
/* 定时器中断已经被调用的次数 */
static int TimerIntrpt = 0;
/* 这被清除模块使用,以防止当 intrpt_routine 仍在任务队列里时模块被清除 */
static struct wait_queue *WaitQ = NULL;
static void intrpt_routine(void *);
/* 这个任务的任务队列结构,来自 tqueue.h */
static struct tq_struct Task = {
NULL, /* 列表的下一项 - queue_task 将为我们做这个 */
0, /* 一个标志,意思是我们还没有被插入任务队列 */
intrpt_routine, /* 运行的函数 */
NULL /* 函数的void* 参数 */
};
/* 这个函数将在每次定时器中断时被调用。注意 void* 指针 -
* 任务函数可以用于多个目的,每次得到不同的参数。 */
static void intrpt_routine(void *irrelevant)
{
/* 增加计数器 */
TimerIntrpt++;
/* 如果清除模块想我们死亡 */
if (WaitQ != NULL)
wake_up(&WaitQ); /* 现在 cleanup_module 可以返回 */
else
/* 将我们放回任务队列 */
queue_task(&Task, &tq_timer);
}
/* 将数据放入proc 文件 */
int procfile_read(char *buffer,
char **buffer_location, off_t offset,
int buffer_length, int zero)
{
int len; /* 实际使用的字节数 */
/* 这是静态的因此当我们离开这个函数时它仍然在内存中 */
static char my_buffer[80];
static int count = 1;
/* 我们将所有的信息放在一个里面,因此当有人问我们是否有更多 信息时答案是否。 */
if (offset > 0)
return 0;
/* 填充缓冲区并得到它的长度 */
len = sprintf(my_buffer,
"Timer was called %d times so far\n",
TimerIntrpt);
count++;
/* 告诉调用我们的函数缓冲区在哪儿 */
*buffer_location = my_buffer;
/* 返回长度 */
return len;
}
struct proc_dir_entry Our_Proc_File =
{
0, /* 节点数 - 忽略,它将被 proc_register_dynamic 填充*/
5, /* 文件名长度 */
"sched", /* 文件名 */
S_IFREG | S_IRUGO,
/* 文件模式 - 这是一个可以被其拥有者,用户组和其他任何人读取的普通文件 */
1, /* 连接数 (文件被引用的目录)*/
0, 0, /* 文件的UID和GID - 我们将它赋予root */
80, /* 由ls报告的文件长度 */
NULL, /* 节点函数(连接,删除,等等) - 不支持 */
procfile_read,
/* 文件的读函数,当某人试图从中读什么时被调用 */
NULL
/* 可以在这儿设置一个填充文件节点的函数,以使我们可以修改权限,拥有关系等。 */
};
/* 初始化模块--登记 proc 文件 */
int init_module()
{
/* 将任务放置在 tq_timer 任务队列,因此在下次定时器中断时它将被执行 */
queue_task(&Task, &tq_timer);
/* proc_register_dynamic 成功则成功,否则失败 */
#if LINUX_VERSION_CODE > KERNEL_VERSION(2,2,0)
return proc_register(&proc_root, &Our_Proc_File);
#else
return proc_register_dynamic(&proc_root, &Our_Proc_File);
#endif
}
/* 清除 */
void cleanup_module()
{
/* 注销 /proc 文件 */
proc_unregister(&proc_root, Our_Proc_File.low_ino);
/* 睡眠,直到 intrpt_routine 上次被调用。这是必要的,因为否则我们将释放 intrpt_routine
* 和tq_timer仍然引用的任务占有的内存。注意不允许信号中断。
*
* 既然 WaitQ 现在不为 NULL,这自动的告诉中断程序它死亡的时间到了。 */
sleep_on(&WaitQ);