/**
* author:annegu
* date:2009-07-16
*/
annegu做了一个简单的Http多线程的下载程序,来讨论一下多线程并发下载以及断点续传的问题。
这个程序的功能,就是可以分多个线程从目标地址上下载数据,每个线程负责下载一部分,并可以支持断点续传和超时重连。
下载的方法是download(),它接收两个参数,分别是要下载的页面的url和编码方式。在这个负责下载的方法中,主要分了三个步骤。第一步是用来设置断点续传时候的一些信息的,第二步就是主要的分多线程来下载了,最后是数据的合并。
1、多线程下载:
/** http://www.bt285.cn http://www.5a520.cn
*/
public String download(String urlStr, String charset) {
this.charset = charset;
long contentLength = 0;
CountDownLatch latch = new CountDownLatch(threadNum);
long[] startPos = new long[threadNum];
long endPos = 0;
try {
// 从url中获得下载的文件格式与名字
this.fileName = urlStr.substring(urlStr.lastIndexOf("/") + 1);
this.url = new URL(urlStr);
URLConnection con = url.openConnection();
setHeader(con);
// 得到content的长度
contentLength = con.getContentLength();
// 把context分为threadNum段的话,每段的长度。
this.threadLength = contentLength / threadNum;
// 第一步,分析已下载的临时文件,设置断点,如果是新的下载任务,则建立目标文件。在第4点中说明。
startPos = setThreadBreakpoint(fileDir, fileName, contentLength, startPos);
//第二步,分多个线程下载文件
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < threadNum; i++) {
// 创建子线程来负责下载数据,每段数据的起始位置为(threadLength * i + 已下载长度)
startPos[i] += threadLength * i;
/**//*设置子线程的终止位置,非最后一个线程即为(threadLength * (i + 1) - 1)
最后一个线程的终止位置即为下载内容的长度*/
if (i == threadNum - 1) {
endPos = contentLength;
} else {
endPos = threadLength * (i + 1) - 1;
}
// 开启子线程,并执行。
ChildThread thread = new ChildThread(this, latch, i, startPos[i], endPos);
childThreads[i] = thread;
exec.execute(thread);
}
try {
// 等待CountdownLatch信号为0,表示所有子线程都结束。
latch.await();
exec.shutdown();
// 第三步,把分段下载下来的临时文件中的内容写入目标文件中。在第3点中说明。
tempFileToTargetFile(childThreads);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
首先来看最主要的步骤:多线程下载。
首先从url中提取目标文件的名称,并在对应的目录创建文件。然后取得要下载的文件大小,根据分成的下载线程数量平均分配每个线程需要下载的数据量,就是threadLength.然后就可以分多个线程来进行下载任务了。
在这个例子中,并没有直接显示的创建Thread对象,而是用Executor来管理Thread对象,并且用CachedThreadPool来创建的线程池,当然也可以用FixedThreadPool.CachedThreadPool在程序执行的过程中会创建与所需数量相同的线程,当程序回收旧线程的时候就停止创建新线程。FixedThreadPool可以预先新建参数给定个数的线程,这样就不用在创建任务的时候再来创建线程了,可以直接从线程池中取出已准备好的线程。下载线程的数量是通过一个全局变量threadNum来控制的,默认为5.
好了,这5个子线程已经通过Executor来创建了,下面它们就会各自为政,互不干涉的执行了。线程有两种实现方式:实现Runnable接口;继承Thread类。
ChildThread就是子线程,它作为DownloadTask的内部类,继承了Thread,它的构造方法需要5个参数,依次是一个对DownloadTask的引用,一个CountDownLatch,id(标识线程的id号),startPosition(下载内容的开始位置),endPosition(下载内容的结束位置)。
这个CountDownLatch是做什么用的呢?
现在我们整理一下思路,要实现分多个线程来下载数据的话,我们肯定还要把这多个线程下载下来的数据进行合。主线程必须等待所有的子线程都执行结束之后,才能把所有子线程的下载数据按照各自的id顺序进行合并。CountDownLatch就是来做这个工作的。
CountDownLatch用来同步主线程,强制主线程等待所有的子线程执行的下载操作完成。在主线程中,CountDownLatch对象被设置了一个初始计数器,就是子线程的个数5个,代码①处。在新建了5个子线程并开始执行之后,主线程用CountDownLatch的await()方法来阻塞主线程,直到这个计数器的值到达0,才会进行下面的操作,代码②处。
对每个子线程来说,在执行完下载指定区间与长度的数据之后,必须通过调用CountDownLatch的countDown()方法来把这个计数器减1.
2、在全面开启下载任务之后,主线程就开始阻塞,等待子线程执行完毕,所以下面我们来看一下具体的下载线程ChildThread.
/**
*author by http://www.5a520.cn http://www.feng123.com
*/
public class ChildThread extends Thread {
public static final int STATUS_HASNOT_FINISHED = 0;
public static final int STATUS_HAS_FINISHED = 1;
public static final int STATUS_HTTPSTATUS_ERROR = 2;
private DownloadTask task;
private int id;
private long startPosition;
private long endPosition;
private final CountDownLatch latch;
private File tempFile = null;
//线程状态码
private int status = ChildThread.STATUS_HASNOT_FINISHED;
public ChildThread(DownloadTask task, CountDownLatch latch, int id, long startPos, long endPos) {
super();
this.task = task;
this.id = id;
this.startPosition = startPos;
this.endPosition = endPos;
this.latch = latch;
try {
tempFile = new File(this.task.fileDir + this.task.fileName + "_" + id);
if(!tempFile.exists()){
tempFile.createNewFile();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
System.out.println("Thread " + id + " run ");
HttpURLConnection con = null;
InputStream inputStream = null;
BufferedOutputStream outputStream = null;
int count = 0;
long threadDownloadLength = endPosition - startPosition;
try {
outputStream = new BufferedOutputStream(new FileOutputStream(tempFile.getPath(), true));
} catch (FileNotFoundException e2) {
e2.printStackTrace();
}
③ for(;;){
④ startPosition += count;
try {
//打开URLConnection
con = (HttpURLConnection) task.url.openConnection();
setHeader(con);
con.setAllowUserInteraction(true);
//设置连接超时时间为10000ms
⑤ con.setConnectTimeout(10000);
//设置读取数据超时时间为10000ms
con.setReadTimeout(10000);
if(startPosition < endPosition){
//设置下载数据的起止区间
con.setRequestProperty("Range", "bytes=" + startPosition + "-"
+ endPosition);
System.out.println("Thread " + id + " startPosition is " + startPosition);
System.out.println("Thread " + id + " endPosition is " + endPosition);
//判断http status是否为HTTP/1.1 206 Partial Content或者200 OK
//如果不是以上两种状态,把status改为STATUS_HTTPSTATUS_ERROR
⑥ if (con.getResponseCode() != HttpURLConnection.HTTP_OK
&& con.getResponseCode() != HttpURLConnection.HTTP_PARTIAL) {
System.out.println("Thread " + id + ": code = "
+ con.getResponseCode() + ", status = "
+ con.getResponseMessage());
status = ChildThread.STATUS_HTTPSTATUS_ERROR;
this.task.statusError = true;
outputStream.close();
con.disconnect();
System.out.println("Thread " + id + " finished.");
latch.countDown();
break;
}
inputStream = con.getInputStream();
int len = 0;
byte[] b = new byte[1024];
while ((len = inputStream.read(b)) != -1) {
outputStream.write(b, 0, len);
count += len;
//每读满5000个byte,往磁盘上flush一下
if(count % 5000 == 0){
⑦ outputStream.flush();
}
}
System.out.println("count is " + count);
if(count >= threadDownloadLength){
hasFinished = true;
}
⑧ outputStream.flush();
outputStream.close();
inputStream.close();
con.disconnect();
}
System.out.println("Thread " + id + " finished.");
latch.countDown();
break;
} catch (IOException e) {
try {
⑨ outputStream.flush();
⑩ TimeUnit.SECONDS.sleep(getSleepSeconds());
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (IOException e2) {
e2.printStackTrace();
}
continue;
}
}
}
}
在ChildThread的构造方法中,除了设置一些从主线程中带来的id, 起始位置之外,就是新建了一个临时文件用来存放当前线程的下载数据。临时文件的命名规则是这样的:下载的目标文件名+“_”+线程编号。
现在让我们来看看从网络中读数据是怎么读的。我们通过URLConnection来获得一个http的连接。有些网站为了安全起见,会对请求的http连接进行过滤,因此为了伪装这个http的连接请求,我们给httpHeader穿一件伪装服。下面的setHeader方法展示了一些非常常用的典型的httpHeader的伪装方法。比较重要的有:Uer-Agent模拟从Ubuntu的firefox浏览器发出的请求;Referer模拟浏览器请求的前一个触发页面,例如从skycn站点来下载软件的话,Referer设置成skycn的首页域名就可以了;Range就是这个连接获取的流文件的起始区间。
private void setHeader(URLConnection con) {
con.setRequestProperty("User-Agent", "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.3) Gecko/2008092510 Ubuntu/8.04 (hardy) Firefox/3.0.3");
con.setRequestProperty("Accept-Language", "en-us,en;q=0.7,zh-cn;q=0.3");
con.setRequestProperty("Accept-Encoding", "aa");
con.setRequestProperty("Accept-Charset", "ISO-8859-1,utf-8;q=0.7,*;q=0.7");
con.setRequestProperty("Keep-Alive", "300");
con.setRequestProperty("Connection", "keep-alive");
con.setRequestProperty("If-Modified-Since", "Fri, 02 Jan 2009 17:00:05 GMT");
con.setRequestProperty("If-None-Match", "\"1261d8-4290-df64d224\"");
con.setRequestProperty("Cache-Control", "max-age=0");
con.setRequestProperty("Referer", "http://http://www.bt285.cn");
}