c++ 多线程 基于OSAPI

线程的调度

  • 所有线程分享CPU
  • 线程的调度——分时复用
    1. 操作系统把CPU时间划分为多个均等的时间片。例如:每5ms一个时间片,在每个时间片内运行一个线程。不同时间片运行不同的线程,让线程在同一段时间段内进行交替运行,即并发
    2. 在系统调度过程中,当线程的时间片用完被切换后,过一段时间会重新接着运行
      主线程不能轻易退出,一旦退出,表示整个程序结束
  • 线程切换
    1. 存储当前线程的状态,并把当前的线程切到后台,进入队列等待
    2. 从队列中取得一个正在排队的线程,运行一个时间片后,再次切换,这个队列称为候选队列,表示这里面的线程都希望自己被立刻执行,即:这些线程的各种资源齐全,只差CPU
    3. 每个线程都要自觉地让出CPU,让别的线程也有机会被运行。
    4. 程序中使用Sleep()函数,可以主动让自己的线程提前让出CPU
    5. Sleep时间到的时候,该线程并不是被立刻执行,而是进入了候选队列
    6. 不同操作系统从候选队列中挑选下一个线程的方法不同,一般是相同优先级根据到达队列的先后顺序进行排队,优先级高的能够排在更前面

创建一个线程

  • 线程的创建是由操作系统完成的
  • 基于OSAPI实现
#include <iostream>
#include"osapi/osapi.h"

using namespace std;
//定义一个类
class Buddhist :public OS_Thread {
private:
    virtual int Routine() {
        //线程体,执行它的任务
        for (int i = 0; i < 10; ++i) {
            cout << "ma mi ma mi hong\n";
            OS_Thread::Sleep(1);
        }
        return 0;
    }
};

class Confucian :public OS_Thread {
private:
    virtual int Routine() {
        //线程体,执行它的任务
        for (int i = 0; i < 100; ++i) {
            cout << "人之初,性本善\n";
            OS_Thread::Sleep(1);
        }
        return 0;
    }
};

int main()
{
    //运行线程
    Buddhist buddhist;
    buddhist.Run();

    Confucian confucian;
    confucian.Run();

    printf("------主线程开始-------");
    for (int i = 0; i < 10; ++i) {
        cout << "我是主线程\n";
        OS_Thread::Sleep(1);
    }

    getchar();//防止主线程退出,从而导致其他线程中止
    return 0;
}

线程的停止与回收

  • 线程的停止
  1. 正常停止
    (1) 正常中止的办法
    a. 设置标志法
    b. 在线程主函数Routine()中,检查标识量,当标识量为true时,应该退出
    c. 线程在处理任务的时候,会不停的检测标识量,及时退出线程。退出的时候,保存当前任务的进度,以便下次继续。或者保存所有其它需要保存的数据
    d. 获取线程正常退出的时间,线程退出也需要时间。使用静态函数Join()来实现,即:OS_Thread::Join(&线程对象)函数
    (2) Join函数的作用:
    a. 等待目标线程的退出
    b. 回收这个线程的相关系统资源
    (3) Join的调用位置:
    不能只看字面上的位置,而是要从运行时的角度来看待问题,即:类中只有Routine()函数中的那部分才是线程
    注:不能回收自己,即:在自己的Routine()中调用自己,Join(this)
  2. 异常停止
    主线程(主程序)退出的时候,有线程正在运行,但是所有的线程都会被立刻终止,这种终止是不正常的,因为它可能正在处理某个任务,从而造成了不完整数据
  • 代码示例
#include <iostream>
#include"osapi/osapi.h"

using namespace std;


class MyTask :public OS_Thread {
public:
    void Start() {
        m_quitflag = false;
        Run();
    }
    void Stop() {
        m_quitflag = true;
        Join(this);//没有在Routine()中,没有让线程自己调用自己
    }
private:
    virtual int Routine() {
        for (int i = 0; !this->m_quitflag&&i < 10; ++i) {
            cout << "c++最榜\n";
            OS_Thread::Sleep(1);
        }
        cout << "线程退出\n";//正常退出
        //保存数据
        return 0;
    }
private:
    bool m_quitflag;
};

int main()
{
    MyTask task;
    task.Start();
    getchar();//不让主程序退出
    task.Stop();
    return 0;
}

线程间共享数据、互斥锁

  • 共享数据(生命周期长)的种类
    全局对象
    堆对象(动态创建的对象,即:new)
  • 数据的完整性
    定义:定义一个数组char key[16],规定:key的每一个元素的值都必须相等,否则视为不完整的
    例如,key[0],key[1],……,key[15]的值全为100
    两个线程:
    KeyGenerator:定时生成key,更新key,即:写操作
    KeyChecker:获取key,检验其完整性,即:读操作
    数据不完整性的根本原因:线程在运行时,可能会在任意位置被切换。
#include <iostream>
#include"osapi/osapi.h"
using namespace std;

char g_key[16];//Generator更新,Checker获取
OS_Mutex g_mutex;

class KeyGenerator :public OS_Thread {
private:
    virtual int Routine() {
        int times = 0;
        while (!quit_flag) {
            //更新key
            char key_new[16];
            for (int i = 0; i < 16; ++i) {
                OS_Thread::Msleep(5);
                key_new[i] = times;
            }
            g_mutex.Lock();
            memcpy(g_key, key_new, 16);
            g_mutex.Unlock();
            ++times;
            if (times >= 128)times = 0;
        }
        return 0;
    }
private:
    bool quit_flag;
public:
    void Start() {
        quit_flag = false;
        Run();
    }
    void Stop() {
        quit_flag = true;
        Join(this);
    }
};

class KeyChecker :public OS_Thread {
private:
    virtual int Routine() {
        while (!quit_flag) {
            char key_copy[16];
            g_mutex.Lock();
            memcpy(key_copy, g_key, 16);
            g_mutex.Unlock();
            for (int i = 1; i < 16; ++i) {
                if (key_copy[i] != key_copy[i - 1]) {
                    cout << "不完整!!\n";
                    return 0;
                }
            }
        }
        return 0;
    }
private:
    bool quit_flag;
public:
    void Start() {
        quit_flag = false;
        Run();
    }
    void Stop() {
        quit_flag = true;
        Join(this);
    }
};

int main()
{
    KeyGenerator a;
    a.Start();
    KeyChecker b;
    b.Start();
    getchar();
    return 0;
}
  • 互斥锁——系统资源
  1. c++一般称为Mutex,Java里则一般称为Lock
  2. 定义:
    当多个线程同时访问一块内存,就有可能出现的数据不完整性的问题,此时我们需要一种机制来“同步”各线程对它的访问。(所谓“同步”,是指协调、安排,使之步调一致),这种机制就是“互斥锁”机制
  3. 使用:
    在访问共享数据之前,先获取Mutex
    在访问完毕后,释放Mutex
  4. 机制:
    在一个线程获取Mutex之后,另外一个线程的Mutex操作会阻塞,直到该Mutex被释放后
    使用模式
    创建全局对象,或者堆对象(动态创建的对象)
char g_data[128];//共享数据
OS_Mutex g_mutex;

在线程中要访问共享数据,必须要先获取锁

g_mutex.Lock();//此函数会进行阻塞,一直等待拥有锁
for(int i=0;i<128;++i) g_data[i]=i;
g_mutex.Unlock();//释放锁
#include <stdio.h>

#include "osapi/osapi.h"

OS_Mutex g_mutex;
char g_key[16]; // Generator更新它,Checker获取它

class KeyGenerator : public OS_Thread
{
private:
	virtual int Routine()
	{
		int times = 0;
		while(1)
		{
			// 更新key
			g_mutex.Lock();
			for(int i=0; i<16; i++)
			{
				OS_Thread::Msleep(5);
				g_key[i] = times;
			}
			g_mutex.Unlock();

			times ++;
			if(times >= 128) times = 0;
			//OS_Thread::Msleep(50);
		}
		return 0; 
	}
};

class KeyChecker : public OS_Thread
{
private:
	// 线程主函数
	virtual int Routine()
	{
		while(1)
		{		
			// 数据处理
			// 检查完整性
			g_mutex.Lock();
			for(int i=1; i<16; i++)
			{
				if(g_key[i] != g_key[i-1])
				{
					printf("不完整!!\n");
					PrintKey();
					//return 0;
				}
			}
			g_mutex.Unlock();

			//OS_Thread::Msleep(50);
		}
		return 0; // 正常退出
	}

	void PrintKey()
	{
		printf("Key: ");
		for(int i=0; i<16; i++)
			printf("%02X ", g_key[i]);					
		printf("\n");
	}
};


int main()
{
	KeyGenerator a;
	a.Run();

	KeyChecker b;
	b.Run();

	getchar();


	return 0;
}

  1. 使用原则
    当一个线程占有锁时,应该尽快地完成对共享数据的访问,因为别的线程还在等待这个锁
    一般策略:直接把数据拷贝一份出来,然后再做处理(假设处理数据需要较长的时间)
    例如:让处理线程处理拷贝的那一份,然后再获取锁,将处理完的数据拷贝回去。尽量缩短锁的占有时间
#include<iostream>
#include "osapi/osapi.h"

OS_Mutex g_mutex;
char g_key[16]; // Generator更新它,Checker获取它

class KeyGenerator : public OS_Thread
{
private:
	virtual int Routine()
	{
		int times = 0;
		while(1)
		{
			// 生成key: 需要80ms
			char key_new[16];			
			for(int i=0; i<16; i++)
			{
				OS_Thread::Msleep(5);
				key_new[i] = times;
			}

			// 更新key: 占有锁的时间非常短
			g_mutex.Lock();
			memcpy(g_key, key_new, 16);
			g_mutex.Unlock();

			times ++;
			if(times >= 128) times = 0;
			//OS_Thread::Msleep(50);
		}
		return 0; 
	}
};

class KeyChecker : public OS_Thread
{
private:
	// 线程主函数
	virtual int Routine()
	{
		while(1)
		{		
			// 尽量缩短对共享数据的访问时间
			char copy[16];
			g_mutex.Lock();
			memcpy(copy, g_key, 16);
			g_mutex.Unlock();

			// 数据处理
			// 检查完整性			
			for(int i=1; i<16; i++)
			{
				if(copy[i] != copy[i-1])
				{
					std::cout<<"不完整性\n";
					PrintKey();
					//return 0;
				}
			}			

			//OS_Thread::Msleep(50);
		}
		return 0; // 正常退出
	}

	void PrintKey()
	{
		std::cout<<"Key: \n";
		for(int i=0; i<16; i++)
			printf("%02X ", g_key[i]);					
		std::cout<<"\n";
	}
};


int main()
{
	KeyGenerator a;
	a.Run();

	KeyChecker b;
	b.Run();

	getchar();


	return 0;
}


线程安全的函数

  • 可重入(reentrant)的函数,又称为线程安全(thread safe)的函数
    是指一个函数,在多个线程里同时调用(并发调用)的时候,功能仍然正常
  • 判断函数可重入
  1. 在单线程的情况下,该函数表现正常,如果单线程不行,说明函数写错了
  2. 在多线程并发调用此函数时,该函数仍然表现正常,则称该函数是可重入
  • 以下函数很可能不可重入
  1. 一个全局函数(写在类体之外的函数),如果它借助于全局对象来实现,并且有写的操作,那么就是不可重入的
  2. 一个类的成员函数,它访问并修改了成员变量,纳闷一般情况下他就是不可重入的
  • 将不可重入的函数修改为可重入的
  1. 不借助外部变量来实现
  2. 尽量用本函数内部定义的局部变量来实现。或者在本函数内部动态创建对象,并在退出前进行销毁
  3. 加上互斥锁控制
#include <iostream>
#include"osapi/osapi.h"
using namespace std;

int result;
OS_Mutex g_mutex;
//求和:1+2+3……+n
int sum(int n) {
    g_mutex.Lock();
    result=0;
    for (int i = 1; i <= n; ++i) {
        result += i;
    }
    int r = result;
    g_mutex.Unlock();
    return r;
}

class MyTask :public OS_Thread {
private:
    virtual int Routine() {
        while (!quit_flag) {
            int ret = sum(100);
            //cout << "ret=" << ret << endl;
            if (ret != 5050)cout << "ret=" << ret << endl;
            OS_Thread::Msleep(5);
        }
        return 0;
    }
private:
    bool quit_flag;
public:
    void Start() {
        quit_flag = false;
        Run();
    }
    void Stop() {
        quit_flag = true;
        Join(this);
    }
};
int main()
{
    MyTask task1, task2;
    task1.Start();
    task2.Start();
    getchar();
    return 0;
}

线程间的通知机制,信号量

  • 轮询机制:每隔一定的时间,查询一次
    缺点:查询不能太频繁(浪费CPU),也不能太不频繁(缓冲区满),难以把握。性能不佳
    标志:有Sleep
#include <iostream>
#include"osapi/osapi.h"
using namespace std;

OS_Mutex g_mutex;
int g_buf[100];//缓冲区,最多存放一百个数
int g_count = 0;

//第一个线程:生产者
class Producer :public OS_Thread {
private:
    virtual int Routine() {
        while (!quit_flag) {
            int r = rand() % 20 + 1;//生成一个1……20之间的随机数
            OS_Thread::Msleep(50 * r);//休息时间在50-1000毫秒之间
            //存放一个物品(这里存放的数据代表物品)
            g_mutex.Lock();
            g_buf[g_count] = r;
            ++g_count;
            cout << "放入物品:" << r << endl;
            g_mutex.Unlock();
        }
        return 0;
    }
private:
    bool quit_flag;
public:
    void Start() {
        quit_flag = false;
        Run();
    }
    void Stop() {
        quit_flag = true;
        Join(this);
    }
};

//第二个线程:消费者
class Consumer :public OS_Thread {
private:
    virtual int Routine() {
        //轮询机制:频繁地查询当前物品个数
        while (!quit_flag) {
            OS_Thread::Msleep(50);
            g_mutex.Lock();
            if (g_count > 0) {
                for (int i = 0; i < g_count; ++i) {
                    cout << "消耗物品:" << g_buf[i] << endl;
                }
                g_count = 0;
            }
            g_mutex.Unlock();
        }
        return 0;
    }
private:
    bool quit_flag;
public:
    void Start() {
        quit_flag = false;
        Run();
    }
    void Stop() {
        quit_flag = true;
        Join(this);
    }
};

int main()
{
    srand(time(nullptr));
    //启动第一个线程
    Producer producer;
    producer.Start();
    //启动第二个线程
    Consumer consumer;
    consumer.Start();
    getchar();
    return 0;
}
  • 通知机制
    信号量:Semaphore,用于实现线程间的通信机制(和Mutex一样,是一个系统级对象
    使用
//定义一个信号量,并将内部的值置为0
OS_Semaphore g_sem(0);
//第一个线程通知,信号量内部的值加1
g_sem.Post();
//第二个线程等待通知,信号量的值减1,如果信号量的值为0,线程进行等待
g_sem.Wait();
//超时等待
int ret=g_sem.Wait(1000); //单位为毫秒
if(ret!=0){ //如果返回值不为0,表明已经超时
	//超时处理
}
线程在等待信号的时候,是不占cpu的,相当于被阻塞的状态
#include <iostream>
#include"osapi/osapi.h"
using namespace std;

OS_Mutex g_mutex;
int g_buf[100];//缓冲区,最多存放一百个数
int g_count = 0;

OS_Semaphore g_sem(0);

//第一个线程:生产者
class Producer :public OS_Thread {
private:
    virtual int Routine() {
        while (!quit_flag) {
            int r = rand() % 20 + 1;//生成一个1……20之间的随机数
            OS_Thread::Msleep(50 * r);//休息时间在50-1000毫秒之间
            //存放一个物品(这里存放的数据代表物品)
            g_mutex.Lock();
            g_buf[g_count] = r;
            ++g_count;
            cout << "放入物品:" << r << endl;
            g_mutex.Unlock();
            g_sem.Post();//把信号量的值加1
        }
        return 0;
    }
private:
    bool quit_flag;
public:
    void Start() {
        quit_flag = false;
        Run();
    }
    void Stop() {
        quit_flag = true;
        Join(this);
    }
};

//第二个线程:消费者
class Consumer :public OS_Thread {
private:
    virtual int Routine() {
        while (!quit_flag) {
            g_sem.Wait();
            g_mutex.Lock();
            if (g_count > 0) {
                for (int i = 0; i < g_count; ++i) {
                    cout << "消耗物品:" << g_buf[i] << endl;
                }
                g_count = 0;
            }
            g_mutex.Unlock();
        }
        return 0;
    }
private:
    bool quit_flag;
public:
    void Start() {
        quit_flag = false;
        Run();
    }
    void Stop() {
        quit_flag = true;
        Join(this);
    }
};

int main()
{
    srand(time(nullptr));
    //启动第一个线程
    Producer producer;
    producer.Start();
    //启动第二个线程
    Consumer consumer;
    consumer.Start();
    getchar();
    return 0;
}