c++实现多线程下载器

多线程下载原理

对于一个指定长度的文件F,假如它的长度为L,若需要n个线程来来下载。有两种情况需要考虑:

  1. 当L%n==0时

  2. 当L%n!=0时

注:由于各个线程都是下载文件的不同部分,那么它们请求的时候就该请求文件的不同部分,即:使用http协议中的范围请求

开发环境配置

程序设计

  • 完整项目下载地址:下载

  • 主要框架图

  • 主线程

    1. 获取文件长度

      /**
          要下载文件的大小
      **/
      long getDownloadFileLenth(const char* url)
      {
          double downloadFileLenth = 0;
          CURL* handle = curl_easy_init();
          curl_easy_setopt(handle, CURLOPT_URL, url);
          curl_easy_setopt(handle, CURLOPT_HEADER, 1);	//只需要header头
          curl_easy_setopt(handle, CURLOPT_NOBODY, 1);	//不需要body
          if (curl_easy_perform(handle) == CURLE_OK)
          {
              curl_easy_getinfo(handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &downloadFileLenth);
          }
          else
          {
              downloadFileLenth = -1;
          }
          return downloadFileLenth;
      }
      
    2. 线程分配

             long fileLength = getDownloadFileLenth(url);
              long partSize = fileLength / threadNum;
              //计算每个线程负责的大小
              for (int i = 0; i <= threadNum; ++i) {
                  tNode* tnode = new tNode;
                  if (i < threadNum) {
                      tnode->startPos = i * partSize;
                      tnode->endPos = (i + 1) * partSize - 1;
                  }
                  else {
                      if (fileLength % threadNum != 0) {
                          tnode->startPos = i * partSize;
                          tnode->endPos = fileLength - 1;
                      }
                      else
                          break;
                  }
                  tnode->curl = curl_easy_init();
                  tnode->fp = fp;
                  tnode->id = i;
                  downLoad(url, tnode);
                  DownThread* task = new DownThread(tnode);
                  task->start();
      
      /**
          文件下载的准备工作
      **/
      void downLoad(char* url,tNode* tnode) {
          char progress_data[3] = "* ";
          char range[64] = { 0 };
          snprintf(range, sizeof(range), "%ld-%ld", tnode->startPos, tnode->endPos);
          if (tnode->curl)
          {
              //设置请求路径
              curl_easy_setopt(tnode->curl, CURLOPT_URL, url);
      
              //允许跳转
              curl_easy_setopt(tnode->curl, CURLOPT_FOLLOWLOCATION, 1);
              curl_easy_setopt(tnode->curl, CURLOPT_SSL_VERIFYPEER, FALSE);
              curl_easy_setopt(tnode->curl, CURLOPT_SSL_VERIFYHOST, FALSE);
      
              //写入文件
              curl_easy_setopt(tnode->curl, CURLOPT_WRITEDATA, tnode);
              curl_easy_setopt(tnode->curl, CURLOPT_WRITEFUNCTION, curlWriteFunction);
      
              //设置进度回调
              curl_easy_setopt(tnode->curl, CURLOPT_NOPROGRESS, FALSE);
              curl_easy_setopt(tnode->curl, CURLOPT_PROGRESSFUNCTION, my_progress_func);
              curl_easy_setopt(tnode->curl, CURLOPT_PROGRESSDATA, &tnode->id);
      
              //设置范围请求
              curl_easy_setopt(tnode->curl, CURLOPT_NOSIGNAL, 1L);
              curl_easy_setopt(tnode->curl, CURLOPT_LOW_SPEED_LIMIT, 1L);
              curl_easy_setopt(tnode->curl, CURLOPT_LOW_SPEED_TIME, 5L);
              curl_easy_setopt(tnode->curl, CURLOPT_RANGE, range);
          }
      }
      
      
      /**
          将数据写入文件中
      **/
      size_t curlWriteFunction(void* ptr, size_t size, size_t nmemb, tNode* tnode)
      {
          //ptr指向待下载数据,size为被写入每个元素的大小,大小为字节,nmemb是元素个数,大小为size字节
          size_t written = 0;
          mutex.Lock();
          if (tnode->startPos + size * nmemb <= tnode->endPos) {
              fseek(tnode->fp, tnode->startPos, SEEK_SET);
              written = fwrite(ptr, size, nmemb, tnode->fp);
              tnode->startPos += size * nmemb;
          }
          else {
              fseek(tnode->fp, tnode->startPos, SEEK_SET);
              written = fwrite(ptr, 1, tnode->endPos - tnode->startPos + 1, tnode->fp);
              tnode->startPos = tnode->endPos;
          }
          mutex.Unlock();
          return written;
      }
      
      /**
          显示文件下载进度
      **/
      int my_progress_func(int* progress_data,
          double t, /* 下载数据大小 */
          double d, /* 当前已经下载大小 */
          double ultotal,
          double ulnow)
      {
          printf_s("当前线程为:%d %g / %g (%g %%)\n", *progress_data, d, t, d * 100.0 / t);
          return 0;
      }
      
  • 下载线程

    #pragma once
    #include"osapi/osapi.h"
    #include"tNode.h"
    #include<iostream>
    using namespace std;
    class DownThread :public OS_Thread {
    public:
        DownThread(tNode* tnode) :tnode(tnode) {
            id = tnode->id;
            alive = false;
        }
        bool getAlive()const {
            return this->alive;
        }
        void start() {
            Run();
            alive = true;
        }
        int getId() {
            return id;
        }
    private:
        virtual int Routine() {
            int res = curl_easy_perform(tnode->curl);
            if (res != 0) {
                cout << "下载出错" << endl;
            }
            curl_easy_cleanup(tnode->curl);
            this->alive = false;
            delete tnode;
            return 0;
        }
    private:
        bool alive;
        tNode* tnode;
        int id;
    };
    
  • 回收管理

    //ThreadMonitor.h
    #pragma once
    #include"osapi/osapi.h"
    #include"DownThread.h"
    #include<iostream>
    #include<list>
    using namespace std;
    class ThreadMonitor :public OS_Thread
    {
    public:
    	//单例模式
    	static ThreadMonitor* object();
    private:
    	ThreadMonitor(){}
    	~ThreadMonitor(){}
    public:
    	void start();
    	void stop();
    	void monitor(DownThread* task);
    private:
    	virtual int Routine();
    private:
    	list<DownThread*>tasks;//线程的列表
    	bool quit_flag;
    	OS_Mutex mutex;
    };
    
    //ThreadMonitor.cpp
    #include "ThreadMonitor.h"
    
    //单例模式的实现
    ThreadMonitor* ThreadMonitor::object() {
    	static ThreadMonitor only;
    	return &only;
    }
    
    void ThreadMonitor::start() {
    	quit_flag = false;
    	Run();
    }
    
    void ThreadMonitor::stop() {
    	quit_flag = false;
    	Join(this);
    }
    
    void ThreadMonitor::monitor(DownThread* task) {
    	mutex.Lock();
    	tasks.push_back(task);
    	mutex.Unlock();
    }
    
    int ThreadMonitor::Routine() {
    	while (!quit_flag) {
    		mutex.Lock();
    		//遍历tasks,找到已经完成的线程并回收
    		for (auto iter = tasks.begin(); iter != tasks.end();) {
    			DownThread* task = *iter;
    			if (task->getAlive())++iter;
    			else {
    				cout << "已经回收DownThread:" << task->getId() << endl;
    				iter = tasks.erase(iter);
    				Join(task);
    				delete(task);
    			}
    		}
    		mutex.Unlock();
    		OS_Thread::Sleep(1);
    	}
    	return 0;
    }
    
  • 其他

    //tNode.h
    #pragma once
    #include <cstdio>
    #include <curl\curl.h>
    #pragma comment(lib,"libcurl-d_imp.lib")
    struct tNode
    {
        int id;
        FILE* fp;
        long startPos;
        long endPos;
        CURL* curl;
    };
    

运行结果

char fileName[50] = "TIM.exe";//保存路径,在当前项目目录下
char url[128] = "https://dldir1.qq.com/qqfile/qq/PCTIM/TIM3.2.0/TIM3.2.0.21856.exe";//下载路径
int threadNum = 36;//要使用的线程数量