线程的调度
- 所有线程分享CPU
- 线程的调度——分时复用
- 操作系统把CPU时间划分为多个均等的时间片。例如:每5ms一个时间片,在每个时间片内运行一个线程。不同时间片运行不同的线程,让线程在同一段时间段内进行交替运行,即并发
- 在系统调度过程中,当线程的时间片用完被切换后,过一段时间会重新接着运行
主线程不能轻易退出,一旦退出,表示整个程序结束
- 线程切换
- 存储当前线程的状态,并把当前的线程切到后台,进入队列等待
- 从队列中取得一个正在排队的线程,运行一个时间片后,再次切换,这个队列称为候选队列,表示这里面的线程都希望自己被立刻执行,即:这些线程的各种资源齐全,只差CPU
- 每个线程都要自觉地让出CPU,让别的线程也有机会被运行。
- 程序中使用Sleep()函数,可以主动让自己的线程提前让出CPU
- Sleep时间到的时候,该线程并不是被立刻执行,而是进入了候选队列
- 不同操作系统从候选队列中挑选下一个线程的方法不同,一般是相同优先级根据到达队列的先后顺序进行排队,优先级高的能够排在更前面
创建一个线程
- 线程的创建是由操作系统完成的
- 基于OSAPI实现
#include <iostream>
#include"osapi/osapi.h"
using namespace std;
//定义一个类
class Buddhist :public OS_Thread {
private:
virtual int Routine() {
//线程体,执行它的任务
for (int i = 0; i < 10; ++i) {
cout << "ma mi ma mi hong\n";
OS_Thread::Sleep(1);
}
return 0;
}
};
class Confucian :public OS_Thread {
private:
virtual int Routine() {
//线程体,执行它的任务
for (int i = 0; i < 100; ++i) {
cout << "人之初,性本善\n";
OS_Thread::Sleep(1);
}
return 0;
}
};
int main()
{
//运行线程
Buddhist buddhist;
buddhist.Run();
Confucian confucian;
confucian.Run();
printf("------主线程开始-------");
for (int i = 0; i < 10; ++i) {
cout << "我是主线程\n";
OS_Thread::Sleep(1);
}
getchar();//防止主线程退出,从而导致其他线程中止
return 0;
}
线程的停止与回收
- 线程的停止
- 正常停止
(1) 正常中止的办法:
a. 设置标志法
b. 在线程主函数Routine()中,检查标识量,当标识量为true时,应该退出
c. 线程在处理任务的时候,会不停的检测标识量,及时退出线程。退出的时候,保存当前任务的进度,以便下次继续。或者保存所有其它需要保存的数据
d. 获取线程正常退出的时间,线程退出也需要时间。使用静态函数Join()来实现,即:OS_Thread::Join(&线程对象)函数
(2) Join函数的作用:
a. 等待目标线程的退出
b. 回收这个线程的相关系统资源
(3) Join的调用位置:
不能只看字面上的位置,而是要从运行时的角度来看待问题,即:类中只有Routine()函数中的那部分才是线程
注:不能回收自己,即:在自己的Routine()中调用自己,Join(this) - 异常停止
主线程(主程序)退出的时候,有线程正在运行,但是所有的线程都会被立刻终止,这种终止是不正常的,因为它可能正在处理某个任务,从而造成了不完整数据
- 代码示例
#include <iostream>
#include"osapi/osapi.h"
using namespace std;
class MyTask :public OS_Thread {
public:
void Start() {
m_quitflag = false;
Run();
}
void Stop() {
m_quitflag = true;
Join(this);//没有在Routine()中,没有让线程自己调用自己
}
private:
virtual int Routine() {
for (int i = 0; !this->m_quitflag&&i < 10; ++i) {
cout << "c++最榜\n";
OS_Thread::Sleep(1);
}
cout << "线程退出\n";//正常退出
//保存数据
return 0;
}
private:
bool m_quitflag;
};
int main()
{
MyTask task;
task.Start();
getchar();//不让主程序退出
task.Stop();
return 0;
}
线程间共享数据、互斥锁
- 共享数据(生命周期长)的种类
全局对象
堆对象(动态创建的对象,即:new) - 数据的完整性
定义:定义一个数组char key[16],规定:key的每一个元素的值都必须相等,否则视为不完整的
例如,key[0],key[1],……,key[15]的值全为100
两个线程:
KeyGenerator:定时生成key,更新key,即:写操作
KeyChecker:获取key,检验其完整性,即:读操作
数据不完整性的根本原因:线程在运行时,可能会在任意位置被切换。
#include <iostream>
#include"osapi/osapi.h"
using namespace std;
char g_key[16];//Generator更新,Checker获取
OS_Mutex g_mutex;
class KeyGenerator :public OS_Thread {
private:
virtual int Routine() {
int times = 0;
while (!quit_flag) {
//更新key
char key_new[16];
for (int i = 0; i < 16; ++i) {
OS_Thread::Msleep(5);
key_new[i] = times;
}
g_mutex.Lock();
memcpy(g_key, key_new, 16);
g_mutex.Unlock();
++times;
if (times >= 128)times = 0;
}
return 0;
}
private:
bool quit_flag;
public:
void Start() {
quit_flag = false;
Run();
}
void Stop() {
quit_flag = true;
Join(this);
}
};
class KeyChecker :public OS_Thread {
private:
virtual int Routine() {
while (!quit_flag) {
char key_copy[16];
g_mutex.Lock();
memcpy(key_copy, g_key, 16);
g_mutex.Unlock();
for (int i = 1; i < 16; ++i) {
if (key_copy[i] != key_copy[i - 1]) {
cout << "不完整!!\n";
return 0;
}
}
}
return 0;
}
private:
bool quit_flag;
public:
void Start() {
quit_flag = false;
Run();
}
void Stop() {
quit_flag = true;
Join(this);
}
};
int main()
{
KeyGenerator a;
a.Start();
KeyChecker b;
b.Start();
getchar();
return 0;
}
- 互斥锁——系统资源
- c++一般称为Mutex,Java里则一般称为Lock
- 定义:
当多个线程同时访问一块内存,就有可能出现的数据不完整性的问题,此时我们需要一种机制来“同步”各线程对它的访问。(所谓“同步”,是指协调、安排,使之步调一致),这种机制就是“互斥锁”机制 - 使用:
在访问共享数据之前,先获取Mutex
在访问完毕后,释放Mutex - 机制:
在一个线程获取Mutex之后,另外一个线程的Mutex操作会阻塞,直到该Mutex被释放后
使用模式
创建全局对象,或者堆对象(动态创建的对象)
char g_data[128];//共享数据
OS_Mutex g_mutex;
在线程中要访问共享数据,必须要先获取锁
g_mutex.Lock();//此函数会进行阻塞,一直等待拥有锁
for(int i=0;i<128;++i) g_data[i]=i;
g_mutex.Unlock();//释放锁
#include <stdio.h>
#include "osapi/osapi.h"
OS_Mutex g_mutex;
char g_key[16]; // Generator更新它,Checker获取它
class KeyGenerator : public OS_Thread
{
private:
virtual int Routine()
{
int times = 0;
while(1)
{
// 更新key
g_mutex.Lock();
for(int i=0; i<16; i++)
{
OS_Thread::Msleep(5);
g_key[i] = times;
}
g_mutex.Unlock();
times ++;
if(times >= 128) times = 0;
//OS_Thread::Msleep(50);
}
return 0;
}
};
class KeyChecker : public OS_Thread
{
private:
// 线程主函数
virtual int Routine()
{
while(1)
{
// 数据处理
// 检查完整性
g_mutex.Lock();
for(int i=1; i<16; i++)
{
if(g_key[i] != g_key[i-1])
{
printf("不完整!!\n");
PrintKey();
//return 0;
}
}
g_mutex.Unlock();
//OS_Thread::Msleep(50);
}
return 0; // 正常退出
}
void PrintKey()
{
printf("Key: ");
for(int i=0; i<16; i++)
printf("%02X ", g_key[i]);
printf("\n");
}
};
int main()
{
KeyGenerator a;
a.Run();
KeyChecker b;
b.Run();
getchar();
return 0;
}
- 使用原则
当一个线程占有锁时,应该尽快地完成对共享数据的访问,因为别的线程还在等待这个锁
一般策略:直接把数据拷贝一份出来,然后再做处理(假设处理数据需要较长的时间)
例如:让处理线程处理拷贝的那一份,然后再获取锁,将处理完的数据拷贝回去。尽量缩短锁的占有时间
#include<iostream>
#include "osapi/osapi.h"
OS_Mutex g_mutex;
char g_key[16]; // Generator更新它,Checker获取它
class KeyGenerator : public OS_Thread
{
private:
virtual int Routine()
{
int times = 0;
while(1)
{
// 生成key: 需要80ms
char key_new[16];
for(int i=0; i<16; i++)
{
OS_Thread::Msleep(5);
key_new[i] = times;
}
// 更新key: 占有锁的时间非常短
g_mutex.Lock();
memcpy(g_key, key_new, 16);
g_mutex.Unlock();
times ++;
if(times >= 128) times = 0;
//OS_Thread::Msleep(50);
}
return 0;
}
};
class KeyChecker : public OS_Thread
{
private:
// 线程主函数
virtual int Routine()
{
while(1)
{
// 尽量缩短对共享数据的访问时间
char copy[16];
g_mutex.Lock();
memcpy(copy, g_key, 16);
g_mutex.Unlock();
// 数据处理
// 检查完整性
for(int i=1; i<16; i++)
{
if(copy[i] != copy[i-1])
{
std::cout<<"不完整性\n";
PrintKey();
//return 0;
}
}
//OS_Thread::Msleep(50);
}
return 0; // 正常退出
}
void PrintKey()
{
std::cout<<"Key: \n";
for(int i=0; i<16; i++)
printf("%02X ", g_key[i]);
std::cout<<"\n";
}
};
int main()
{
KeyGenerator a;
a.Run();
KeyChecker b;
b.Run();
getchar();
return 0;
}
线程安全的函数
- 可重入(reentrant)的函数,又称为线程安全(thread safe)的函数
是指一个函数,在多个线程里同时调用(并发调用)的时候,功能仍然正常 - 判断函数可重入
- 在单线程的情况下,该函数表现正常,如果单线程不行,说明函数写错了
- 在多线程并发调用此函数时,该函数仍然表现正常,则称该函数是可重入
- 以下函数很可能不可重入
- 一个全局函数(写在类体之外的函数),如果它借助于全局对象来实现,并且有写的操作,那么就是不可重入的
- 一个类的成员函数,它访问并修改了成员变量,纳闷一般情况下他就是不可重入的
- 将不可重入的函数修改为可重入的
- 不借助外部变量来实现
- 尽量用本函数内部定义的局部变量来实现。或者在本函数内部动态创建对象,并在退出前进行销毁
- 加上互斥锁控制
#include <iostream>
#include"osapi/osapi.h"
using namespace std;
int result;
OS_Mutex g_mutex;
//求和:1+2+3……+n
int sum(int n) {
g_mutex.Lock();
result=0;
for (int i = 1; i <= n; ++i) {
result += i;
}
int r = result;
g_mutex.Unlock();
return r;
}
class MyTask :public OS_Thread {
private:
virtual int Routine() {
while (!quit_flag) {
int ret = sum(100);
//cout << "ret=" << ret << endl;
if (ret != 5050)cout << "ret=" << ret << endl;
OS_Thread::Msleep(5);
}
return 0;
}
private:
bool quit_flag;
public:
void Start() {
quit_flag = false;
Run();
}
void Stop() {
quit_flag = true;
Join(this);
}
};
int main()
{
MyTask task1, task2;
task1.Start();
task2.Start();
getchar();
return 0;
}
线程间的通知机制,信号量
- 轮询机制:每隔一定的时间,查询一次
缺点:查询不能太频繁(浪费CPU),也不能太不频繁(缓冲区满),难以把握。性能不佳
标志:有Sleep
#include <iostream>
#include"osapi/osapi.h"
using namespace std;
OS_Mutex g_mutex;
int g_buf[100];//缓冲区,最多存放一百个数
int g_count = 0;
//第一个线程:生产者
class Producer :public OS_Thread {
private:
virtual int Routine() {
while (!quit_flag) {
int r = rand() % 20 + 1;//生成一个1……20之间的随机数
OS_Thread::Msleep(50 * r);//休息时间在50-1000毫秒之间
//存放一个物品(这里存放的数据代表物品)
g_mutex.Lock();
g_buf[g_count] = r;
++g_count;
cout << "放入物品:" << r << endl;
g_mutex.Unlock();
}
return 0;
}
private:
bool quit_flag;
public:
void Start() {
quit_flag = false;
Run();
}
void Stop() {
quit_flag = true;
Join(this);
}
};
//第二个线程:消费者
class Consumer :public OS_Thread {
private:
virtual int Routine() {
//轮询机制:频繁地查询当前物品个数
while (!quit_flag) {
OS_Thread::Msleep(50);
g_mutex.Lock();
if (g_count > 0) {
for (int i = 0; i < g_count; ++i) {
cout << "消耗物品:" << g_buf[i] << endl;
}
g_count = 0;
}
g_mutex.Unlock();
}
return 0;
}
private:
bool quit_flag;
public:
void Start() {
quit_flag = false;
Run();
}
void Stop() {
quit_flag = true;
Join(this);
}
};
int main()
{
srand(time(nullptr));
//启动第一个线程
Producer producer;
producer.Start();
//启动第二个线程
Consumer consumer;
consumer.Start();
getchar();
return 0;
}
- 通知机制
信号量:Semaphore,用于实现线程间的通信机制(和Mutex一样,是一个系统级对象)
使用
//定义一个信号量,并将内部的值置为0
OS_Semaphore g_sem(0);
//第一个线程通知,信号量内部的值加1
g_sem.Post();
//第二个线程等待通知,信号量的值减1,如果信号量的值为0,线程进行等待
g_sem.Wait();
//超时等待
int ret=g_sem.Wait(1000); //单位为毫秒
if(ret!=0){ //如果返回值不为0,表明已经超时
//超时处理
}
线程在等待信号的时候,是不占cpu的,相当于被阻塞的状态
#include <iostream>
#include"osapi/osapi.h"
using namespace std;
OS_Mutex g_mutex;
int g_buf[100];//缓冲区,最多存放一百个数
int g_count = 0;
OS_Semaphore g_sem(0);
//第一个线程:生产者
class Producer :public OS_Thread {
private:
virtual int Routine() {
while (!quit_flag) {
int r = rand() % 20 + 1;//生成一个1……20之间的随机数
OS_Thread::Msleep(50 * r);//休息时间在50-1000毫秒之间
//存放一个物品(这里存放的数据代表物品)
g_mutex.Lock();
g_buf[g_count] = r;
++g_count;
cout << "放入物品:" << r << endl;
g_mutex.Unlock();
g_sem.Post();//把信号量的值加1
}
return 0;
}
private:
bool quit_flag;
public:
void Start() {
quit_flag = false;
Run();
}
void Stop() {
quit_flag = true;
Join(this);
}
};
//第二个线程:消费者
class Consumer :public OS_Thread {
private:
virtual int Routine() {
while (!quit_flag) {
g_sem.Wait();
g_mutex.Lock();
if (g_count > 0) {
for (int i = 0; i < g_count; ++i) {
cout << "消耗物品:" << g_buf[i] << endl;
}
g_count = 0;
}
g_mutex.Unlock();
}
return 0;
}
private:
bool quit_flag;
public:
void Start() {
quit_flag = false;
Run();
}
void Stop() {
quit_flag = true;
Join(this);
}
};
int main()
{
srand(time(nullptr));
//启动第一个线程
Producer producer;
producer.Start();
//启动第二个线程
Consumer consumer;
consumer.Start();
getchar();
return 0;
}