多线程下载原理
对于一个指定长度的文件F,假如它的长度为L,若需要n个线程来来下载。有两种情况需要考虑:
-
当L%n==0时
-
当L%n!=0时
注:由于各个线程都是下载文件的不同部分,那么它们请求的时候就该请求文件的不同部分,即:使用http协议中的范围请求
开发环境配置
-
开发工具:Visual Studio Community 2019 16.7.3
-
第三方库:
-
libcurl(curl-7.72.0)
配置参考:libcurl库源码编译,安装c++_wanghualin033的博客-CSDN博客
我已经编译完成,需要自取:https://wws.lanzous.com/iKOU7h6o4ri
-
osapi
-
程序设计
-
完整项目下载地址:下载
-
主要框架图
-
主线程
-
获取文件长度
/** 要下载文件的大小 **/ long getDownloadFileLenth(const char* url) { double downloadFileLenth = 0; CURL* handle = curl_easy_init(); curl_easy_setopt(handle, CURLOPT_URL, url); curl_easy_setopt(handle, CURLOPT_HEADER, 1); //只需要header头 curl_easy_setopt(handle, CURLOPT_NOBODY, 1); //不需要body if (curl_easy_perform(handle) == CURLE_OK) { curl_easy_getinfo(handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &downloadFileLenth); } else { downloadFileLenth = -1; } return downloadFileLenth; }
-
线程分配
long fileLength = getDownloadFileLenth(url); long partSize = fileLength / threadNum; //计算每个线程负责的大小 for (int i = 0; i <= threadNum; ++i) { tNode* tnode = new tNode; if (i < threadNum) { tnode->startPos = i * partSize; tnode->endPos = (i + 1) * partSize - 1; } else { if (fileLength % threadNum != 0) { tnode->startPos = i * partSize; tnode->endPos = fileLength - 1; } else break; } tnode->curl = curl_easy_init(); tnode->fp = fp; tnode->id = i; downLoad(url, tnode); DownThread* task = new DownThread(tnode); task->start();
/** 文件下载的准备工作 **/ void downLoad(char* url,tNode* tnode) { char progress_data[3] = "* "; char range[64] = { 0 }; snprintf(range, sizeof(range), "%ld-%ld", tnode->startPos, tnode->endPos); if (tnode->curl) { //设置请求路径 curl_easy_setopt(tnode->curl, CURLOPT_URL, url); //允许跳转 curl_easy_setopt(tnode->curl, CURLOPT_FOLLOWLOCATION, 1); curl_easy_setopt(tnode->curl, CURLOPT_SSL_VERIFYPEER, FALSE); curl_easy_setopt(tnode->curl, CURLOPT_SSL_VERIFYHOST, FALSE); //写入文件 curl_easy_setopt(tnode->curl, CURLOPT_WRITEDATA, tnode); curl_easy_setopt(tnode->curl, CURLOPT_WRITEFUNCTION, curlWriteFunction); //设置进度回调 curl_easy_setopt(tnode->curl, CURLOPT_NOPROGRESS, FALSE); curl_easy_setopt(tnode->curl, CURLOPT_PROGRESSFUNCTION, my_progress_func); curl_easy_setopt(tnode->curl, CURLOPT_PROGRESSDATA, &tnode->id); //设置范围请求 curl_easy_setopt(tnode->curl, CURLOPT_NOSIGNAL, 1L); curl_easy_setopt(tnode->curl, CURLOPT_LOW_SPEED_LIMIT, 1L); curl_easy_setopt(tnode->curl, CURLOPT_LOW_SPEED_TIME, 5L); curl_easy_setopt(tnode->curl, CURLOPT_RANGE, range); } }
/** 将数据写入文件中 **/ size_t curlWriteFunction(void* ptr, size_t size, size_t nmemb, tNode* tnode) { //ptr指向待下载数据,size为被写入每个元素的大小,大小为字节,nmemb是元素个数,大小为size字节 size_t written = 0; mutex.Lock(); if (tnode->startPos + size * nmemb <= tnode->endPos) { fseek(tnode->fp, tnode->startPos, SEEK_SET); written = fwrite(ptr, size, nmemb, tnode->fp); tnode->startPos += size * nmemb; } else { fseek(tnode->fp, tnode->startPos, SEEK_SET); written = fwrite(ptr, 1, tnode->endPos - tnode->startPos + 1, tnode->fp); tnode->startPos = tnode->endPos; } mutex.Unlock(); return written; } /** 显示文件下载进度 **/ int my_progress_func(int* progress_data, double t, /* 下载数据大小 */ double d, /* 当前已经下载大小 */ double ultotal, double ulnow) { printf_s("当前线程为:%d %g / %g (%g %%)\n", *progress_data, d, t, d * 100.0 / t); return 0; }
-
-
下载线程
#pragma once #include"osapi/osapi.h" #include"tNode.h" #include<iostream> using namespace std; class DownThread :public OS_Thread { public: DownThread(tNode* tnode) :tnode(tnode) { id = tnode->id; alive = false; } bool getAlive()const { return this->alive; } void start() { Run(); alive = true; } int getId() { return id; } private: virtual int Routine() { int res = curl_easy_perform(tnode->curl); if (res != 0) { cout << "下载出错" << endl; } curl_easy_cleanup(tnode->curl); this->alive = false; delete tnode; return 0; } private: bool alive; tNode* tnode; int id; };
-
回收管理
//ThreadMonitor.h #pragma once #include"osapi/osapi.h" #include"DownThread.h" #include<iostream> #include<list> using namespace std; class ThreadMonitor :public OS_Thread { public: //单例模式 static ThreadMonitor* object(); private: ThreadMonitor(){} ~ThreadMonitor(){} public: void start(); void stop(); void monitor(DownThread* task); private: virtual int Routine(); private: list<DownThread*>tasks;//线程的列表 bool quit_flag; OS_Mutex mutex; };
//ThreadMonitor.cpp #include "ThreadMonitor.h" //单例模式的实现 ThreadMonitor* ThreadMonitor::object() { static ThreadMonitor only; return &only; } void ThreadMonitor::start() { quit_flag = false; Run(); } void ThreadMonitor::stop() { quit_flag = false; Join(this); } void ThreadMonitor::monitor(DownThread* task) { mutex.Lock(); tasks.push_back(task); mutex.Unlock(); } int ThreadMonitor::Routine() { while (!quit_flag) { mutex.Lock(); //遍历tasks,找到已经完成的线程并回收 for (auto iter = tasks.begin(); iter != tasks.end();) { DownThread* task = *iter; if (task->getAlive())++iter; else { cout << "已经回收DownThread:" << task->getId() << endl; iter = tasks.erase(iter); Join(task); delete(task); } } mutex.Unlock(); OS_Thread::Sleep(1); } return 0; }
-
其他
//tNode.h #pragma once #include <cstdio> #include <curl\curl.h> #pragma comment(lib,"libcurl-d_imp.lib") struct tNode { int id; FILE* fp; long startPos; long endPos; CURL* curl; };
运行结果
char fileName[50] = "TIM.exe";//保存路径,在当前项目目录下
char url[128] = "https://dldir1.qq.com/qqfile/qq/PCTIM/TIM3.2.0/TIM3.2.0.21856.exe";//下载路径
int threadNum = 36;//要使用的线程数量