Java 多线程断点续传

多线程断点续传

步骤拆分

1.下载文件

2.断点续传

3.多线程下载

4.多线程断点续传

下载文件

断点续传下载

步骤分解

  1. 推荐至少两个线程,当然也可以一个HttpURLConnection打开后,获取完文件长度,再关闭;然后再打开,然后设置文件数据范围

    1. 主线程:HttpURLConnection.getContentLengthLong() 获取文件大小
    2. 子线程:HttpURLConnection.setRequestProperty(“Range”, “bytes=” + filePos + “-“ + fileEnd) 获取文件数据范围
  2. File.length()与HttpURLConnection.getContentLengthLong()比较文件大小

  3. RandomAccessFile写入文件(如下)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
RandomAccessFile raf = new RandomAccessFile(fileDir, "rws");

// 搜索续传的起始位置
raf.seek(filePos);

// 文件写入
byte[] buffer = new byte[1024];
int length;
InputStream is = connection.getInputStream();
while ((length = is.read(buffer)) != -1) {
raf.write(buffer, 0, length);
}

raf.close();

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package com.playgroud;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;

/**
* 单线程下的断点续传
*/
public class DownloadTest implements Runnable {

// 下载的URL
private String url;

// 文件下载路径
private String storePath;

// 文件名
private String fileName;

// 文件下载的容量
private long fileReqLength = 0;

public DownloadTest(String url, String storePath) {
this.url = url;
this.storePath = storePath;

// TODO 考虑获取不到文件名的情况
// 获取文件名
this.fileName = url.substring(url.lastIndexOf('/') + 1, url
.contains("?") ? url.lastIndexOf('?') : url.length());
}

/**
* 下载文件
*/
public void download(String url, long fileStart, long fileEnd) {

URL downloadURL = null;
HttpURLConnection connection = null;

try {
downloadURL = new URL(url);
connection = (HttpURLConnection) downloadURL.openConnection();
connection.setRequestProperty("Range", "bytes=" + fileStart + "-" + fileEnd);
connection.connect();

System.out.println("开始断点续传...");

} catch (Exception e) {
e.printStackTrace();
} finally {
connection.disconnect();
}

try {
RandomAccessFile raf = new RandomAccessFile(storePath + "/" + fileName, "rws");
raf.seek(fileStart);

// 写入到文件
int length;
byte[] buffer = new byte[1024];
InputStream is = connection.getInputStream();
while ((length = is.read(buffer)) != -1) {
raf.write(buffer, 0, length);
}

raf.close();
is.close();

} catch (Exception e) {
System.out.println("文件续传失败!");
e.printStackTrace();
}

System.out.println("文件下载完成!");
}

public void openHttpConnection() {
URL downloadURL = null;
HttpURLConnection connection = null;

try {
System.out.println("连接中...");
downloadURL = new URL(url);
connection = (HttpURLConnection) downloadURL.openConnection();
connection.connect();
} catch (Exception e) {
System.out.println("URL无效");
e.printStackTrace();
}

// http请求设置
connection.setConnectTimeout(10000);

try {
connection.connect();
if (connection.getResponseCode() == 200) {
System.out.println("连接服务器成功!");

fileReqLength = connection.getContentLength();

} else {
System.out.println("连接服务器失败: " + connection.getResponseCode()
+ ":" + connection.getResponseMessage());
}
} catch (IOException e) {
System.out.println("连接文件服务器失败!");
e.printStackTrace();
} finally {
connection.disconnect();
}
}

// 连接文件服务器
@Override
public void run() {

// 打开http连接,获取文件总大小
openHttpConnection();

// 文件全路径
File downloadFile = new File(storePath + "/" + fileName);
System.out.println("文件名: " + downloadFile.toString());
System.out.println("文件类型: ");
System.out.println("文件大小: " + (fileReqLength / 1024 / 1024) + "MB");

if (!downloadFile.exists()) {
// 文件不存在则直接下载
download(url, 0, fileReqLength);
} else {
// 文件存在,则比较文件大小
long fileLocalLength = downloadFile.length();
if (fileReqLength == fileLocalLength) {
System.out.println("文件已下载完成!");
} else {
System.out.println("文件已连接,已下载: " + (fileLocalLength / 1024 / 1024)
+ "MB" + ";剩余: " + (fileReqLength - fileLocalLength) / 1024 / 1024 + "MB");

// 断点续传
long filePos = fileLocalLength;
download(url, filePos, fileReqLength);
}
}
}

// Main Thread
public static void main(String[] args) {

String storePath = "/Users/kalosora/Desktop";
String url = "http://dldir1.qq.com/qqfile/qq/QQ7.9/16621/QQ7.9.exe";
DownloadTest downloadTest = new DownloadTest(url, storePath);

Thread t1 = new Thread(downloadTest);
t1.start();
}
}

普通文件下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class DownloadTest {

// 下载的URL
private String url;

// 文件下载路径
private String storePath;

// 文件名称
private String fileName;

public DownloadTest(String url, String storePath, String fileName) {
this.url = url;
this.storePath = storePath;
this.fileName = fileName;
}

/**
* 下载文件
*/
public void download() throws IOException {
URL downloadURL = new URL(url);
HttpURLConnection connection = (HttpURLConnection) downloadURL.openConnection();
//File tempFile = new File(storePath);

// 设置超时时间,单位ms
connection.setConnectTimeout(10000);

// 获取文件长度,单位byte
long fileLength = connection.getContentLengthLong();
// FIXME 文件大小判断不准确
System.out.println("文件大小: " + fileLength / 1024 / 1024 + "MB");

// TODO 获取文件类型和文件名 lastIndexOf
System.out.println("文件类型:");

// 在本地新建文件
File fileDir = new File(storePath + "/" + fileName);
System.out.println("文件路径:" + fileDir);
if(!fileDir.exists())
{

// 写入文件
// TODO 断点续传
// RandomAccessFile raf = new RandomAccessFile(fileDir,"rws");
// raf.setLength(fileLength);
// raf.close();

System.out.println("下载中...");

FileOutputStream fos = new FileOutputStream(fileDir);
InputStream is = connection.getInputStream();
byte[] buffer = new byte[1024];
int length;
while ((length = is.read(buffer)) != -1)
{
fos.write(buffer,0,length);
}

is.close();
fos.close();

System.out.println("文件下载完毕!");

}

}

// Main Thread
public static void main(String[] args) throws IOException {

String storePath = "/Users/kalosora/Desktop";
String url = "http://dldir1.qq.com/qqfile/qq/QQ7.9/16621/QQ7.9.exe";
String fileName = "QQ7.9.exe";
DownloadTest downloadTest = new DownloadTest(url, storePath, fileName);

downloadTest.download();
}
}

多线程下载

步骤分解

  1. 文件分块,与文件的起止下载位置

1、文件分块。 文件分块大小(blockSize)= (文件大小 +线程数 - 1 )/ 线程数 ;
2、确定每一个线程所要下载的 文件的起始和结束位置。
现假设为每个线程分别编号:0,1, 2,3;则
第一个线程负责的下载位置是: 0blockSize - (0+1) blockSize -1,
第二个线程负责的下载位置是: 1* blockSize - (1+1)blockSize -1,以此类推第i个线程负责的下载位置是:i blockSize - (i+1)blockSize -1;即线程(编号为id)下载开始位置 start = id block;
即线程(编号为id)下载结束位置 end = (id+1)*block -1;

  1. 线程同步:使用CountDownLatch同步各线程的任务,只有所有任务都完成了,才算执行完毕。

实例

核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package com.playgroud;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

/**
* 多线程下载
*/
public class DownloadMultiply {

// 下载的URL
private String url;

// 文件下载路径
private String storePath;

// 文件名
private String fileName;

// 文件下载的容量
private Long fileReqLength;

// 下载的线程数
private Integer threadNum;

// 线程同步器
private CountDownLatch countDownLatch;

/**
* 构造函数
* 1.参数设置:URL、文件保存路径、下载线程数
* 2.获取下载的文件名、文件大小
*
* @param url
* @param storePath
* @param threadNum
*/
public DownloadMultiply(String url, String storePath, Integer threadNum) {
this.url = url;
this.storePath = storePath;
this.threadNum = threadNum;
this.fileReqLength = getFileReqLength(this.url);
this.countDownLatch = new CountDownLatch(threadNum);

// TODO 考虑获取不到文件名的情况
// 获取文件名
this.fileName = url.substring(url.lastIndexOf('/') + 1, url
.contains("?") ? url.lastIndexOf('?') : url.length());
}

/**
* 获取下载的文件大小
*
* @param url
* @return
*/
public Long getFileReqLength(String url) {
URL downloadURL = null;
HttpURLConnection connection = null;
Long fileReqLength = null;

try {
downloadURL = new URL(url);
connection = (HttpURLConnection) downloadURL.openConnection();
connection.connect();

// 获取文件大小
fileReqLength = connection.getContentLengthLong();
} catch (Exception e) {
System.out.println("无效的URL链接: " + e.getMessage());
} finally {
connection.disconnect();
}

return fileReqLength;
}

/**
* 文件下载入口
*/
public void fileDownload() {
Long threadLength;
Long remainLength;

// 输出初始化信息
System.out.println("连接中...");
if (fileReqLength > 0) {
System.out.println("连接文件服务器成功!");
System.out.println("当前任务下载线程数:" + threadNum);
System.out.println("文件名为:" + fileName + ",共" + (fileReqLength / 1024 / 1024) + "MB");
} else {
System.out.println("连接文件服务器失败");
}

// 判断文件是否下载过:1.下载过且文件大小一致,则无需下载;
// 2.如果没有下载过或者下载未完成,则继续下载,会影响各线程的下载长度
File fileLocal = new File(storePath + "/" + fileName);
Long fileLocalLength = fileLocal.length();
if (fileLocalLength == fileReqLength) {
// TODO md5文件一致性校验
System.out.println("文件已完成下载!");
return;
} else {
if (!fileLocal.exists()) {
// 若文件不存在则创建
remainLength = fileReqLength;

try {
fileLocal.createNewFile();
System.out.println("文件开始下载...");
} catch (IOException e) {
System.out.println("下载文件创建失败:" + e.getMessage());
}
} else {
// 如果存在则计算剩余的下载量
remainLength = fileReqLength - fileLocalLength;

System.out.println("文件剩余:" + (remainLength / 1024 / 1024) + "MB");
System.out.println("文件开始续传...");
}

// 计算单个线程的下载量
//threadLength = (remainLength % threadNum == 0 ? remainLength / threadNum : remainLength / (threadNum + 1));
threadLength = (remainLength + threadNum - 1) / threadNum;
System.out.println("单个线程下载量:" + (threadLength / 1024 / 1024) + "Mb,即 " + threadLength + " byte");
}

ExecutorService executorService = Executors.newCachedThreadPool();
IntStream.range(0, threadNum).forEach(x -> {

executorService.submit(new downloadThread(x, threadLength, countDownLatch));
});

try {
countDownLatch.await();
} catch (InterruptedException e) {
System.out.println("线程同步异常:" + e.getMessage());
}
executorService.shutdown();

System.out.println("文件下载完成!");
}

/**
* 内部类,开启下载线程
*/
class downloadThread implements Runnable {

// 当前线程ID
private Integer threadID;

// 文件下载的起始位置
private Long startPos;

// 文件下载的结束位置
private Long endPos;

// 文件下载的长度
private Long threadLength;

// 线程同步器,运行完成后提交
private CountDownLatch countDownLatch;

/**
* 构造函数,初始化文件下载的数据区域
*/
public downloadThread(Integer threadID, Long threadLength, CountDownLatch countDownLatch) {
this.threadID = threadID;
this.threadLength = threadLength;
this.countDownLatch = countDownLatch;

// TODO 未实现断点续传,仅能够多线程下载,断点续传需要创建临时文件记录上次下载的位置
// 计算每个线程的下载位置
this.startPos = threadID * threadLength;
this.endPos = threadID * threadLength + threadLength - 1;
}

@Override
public void run() {
System.out.println("[线程" + threadID + "] 从 " + startPos + " ~ " + endPos);

URL downloadURL = null;
HttpURLConnection connection = null;

// 打开HTTP连接
try {
downloadURL = new URL(url);
connection = (HttpURLConnection) downloadURL.openConnection();
connection.setRequestProperty("Range", "bytes=" + startPos + "-" + endPos);
connection.setConnectTimeout(100000);
connection.connect();
} catch (Exception e) {
System.out.println("[线程" + threadID + "] 无法访问URL链接: " + e.getMessage());
}

// 获取文件
try {
RandomAccessFile raf = new RandomAccessFile(storePath + "/" + fileName, "rws");
raf.seek(startPos);

int length;
byte[] buffer = new byte[1024];
InputStream is = connection.getInputStream();
while((length = is.read(buffer)) != -1)
{
raf.write(buffer,0,length);
}

is.close();
raf.close();

} catch (Exception e) {
System.out.println("[线程" + threadID + "] 访问文件失败: " + e.getMessage());
}

connection.disconnect();
countDownLatch.countDown();
}
}
}

测试程序

1
2
3
4
5
6
7
8
9
10
11
12
13
public class playground {

// Main Thread
public static void main(String[] args) {

String storePath = "/Users/kalosora/Desktop";
String url = "http://dldir1.qq.com/qqfile/qq/QQ7.9/16621/QQ7.9.exe";
Integer threadNum = 4;

DownloadMultiply downloadMultiply = new DownloadMultiply(url, storePath,threadNum);
downloadMultiply.fileDownload();
}
}

多线程断点续传

可以有两种实现形式:

  1. 有多少个线程就生成多少个文件,各自记录下载的起止位置,最后全部合并为一个文件(尝试失败,多个文件要处理比较复杂)
  2. 生成一个目标文件,生成一个(或根据线程数目)临时文件,在日志文件中记录各个线程的下载位置,则不需要合并文件,仅清除日志文件(推荐)

步骤分解

  • 优雅结束进程,以便在临时文件中记录下载的进度、下载的线程数、下载大小等内容

    • 写入一行,就记录一次位置,这样就不需要考虑优雅结束进程的问题;但是会导致部分数据丢失
    • 一直写入,在接收到interrupt的时候,再记录;不会丢失数据,但是处理逻辑更复杂(尝试)
    • 使用Properties类来记录
  • 文件合并(方式二)

  • 文件一致性校验

  • 迅雷链接的解析

    • 迅雷的编码规则为:原地址前面加”AA”,后面加”ZZ”,然后进行Base64编码,最后加上迅雷下载协议”Thunder://“组成完整的下载链接

实例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
package com.playgroud;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Properties;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

/**
* 多线程断点续传
* <p>
* 思路梳理
* 1.判断文件是否存在
* 1.1如果不存在则生成一个tmp文件,4个日志,分别记录每个线程的下载位置
* 1.2如果tmp文件存在,则校验文件完整性,必须有4个日志文件,如果没有则清空任务并重新下载
* 2.继续下载的位置 = id*block + (已下载的文件长度)
* 3.下载截止的位置 = (id+1)*block - 1
* 4.文件完整性校验
* <p>
* 改进:Springboot 如果是把下载位置存到数据库,并且定时刷新,那么是否只需要一个tmp文件就可以解决?
* 1.生成tmp文件,并且在数据库中生成4条记录,对应4个线程
* 2.线程执行定时任务刷新文件的开始位置,此时截止位置仍然是固定的
* 3.如果任务中断,则只需要重新寻址到数据库记录的开始位置,然后下载即可
* 这样可以减少文件的个数,也能够降低任务中断带来的重新寻址问题
*/
public class DownloadMultiCon {

// 下载的URL
private String url;

// 文件下载路径
private String storePath;

// 文件名
private String fileName;

// 临时文件名
private String fileTemp;

// 文件下载的容量
private Long fileReqLength;

// 下载的线程数
private Integer threadNum;

// 线程同步器
private CountDownLatch countDownLatch;

// 线程ID集合,每次初始化都记录下载的线程id
private CopyOnWriteArrayList<Integer> threadIDList;

// 静态变量
private static final String THREAD_NAME = "download-thread-";
private static final String THREAD_TEMP_FILE = "download-file-";
private static final String THREAD_TEMP_KEY = "download-thread-key-";

/**
* 构造函数
* 1.参数设置:URL、文件保存路径、下载线程数
* 2.获取下载的文件名、文件大小
*
* @param url
* @param storePath
* @param threadNum
*/
public DownloadMultiCon(String url, String storePath, Integer threadNum) {
this.url = url;
this.storePath = storePath;
this.threadNum = threadNum;
this.fileReqLength = getFileReqLength(this.url);
this.countDownLatch = new CountDownLatch(threadNum + 1); // 多一个守护线程负责中断下载线程
this.threadIDList = new CopyOnWriteArrayList<>();

// TODO 考虑获取不到文件名的情况
// 获取文件名
this.fileName = url.substring(url.lastIndexOf('/') + 1, url
.contains("?") ? url.lastIndexOf('?') : url.length());
this.fileTemp = fileName;
}

/**
* 获取下载的文件大小
*
* @param url
* @return
*/
public Long getFileReqLength(String url) {
URL downloadURL = null;
HttpURLConnection connection = null;
Long fileReqLength = null;

try {
downloadURL = new URL(url);
connection = (HttpURLConnection) downloadURL.openConnection();
connection.connect();

// 获取文件大小
fileReqLength = connection.getContentLengthLong();
} catch (Exception e) {
System.out.println("无效的URL链接: " + e.getMessage());
} finally {
connection.disconnect();
}

return fileReqLength;
}

/**
* 用户线程,开启下载线程
*/
class downloadThread implements Runnable {

// 当前线程ID
private Integer threadID;

// 文件下载的起始位置
private final Long startPos;

// 文件下载的结束位置
private final Long endPos;

// 文件下载的长度
private final Long threadLength;

// 线程同步器,运行完成后提交
private CountDownLatch countDownLatch;

// 线程同步的数组
private CopyOnWriteArrayList<Integer> threadIDList;

/**
* 构造函数,初始化文件下载的数据区域
*/
public downloadThread(Integer threadID, CountDownLatch countDownLatch, CopyOnWriteArrayList<Integer> threadIDList, Long threadLength) {
this.threadID = threadID;
this.countDownLatch = countDownLatch;
this.threadIDList = threadIDList;
this.threadLength = threadLength;

// 计算每个线程的下载位置
this.startPos = threadID * threadLength;
this.endPos = ((threadID + 1) * threadLength) - 1;
}

@Override
public void run() {

// 获取当前线程id,并加入到集合中
int threadLocalID = (int) Thread.currentThread().getId();
threadIDList.add(threadLocalID);

// 设置线程名称,通过截取可以获得每个线程的threadID,以便记录下载进度
Thread.currentThread().setName(THREAD_NAME + threadID);

// 获取properties文件,获得下载的位置
String fileThreadName = storePath + "/" + THREAD_TEMP_FILE + threadID + ".properties";
File fileThread = new File(fileThreadName);
Properties prop = new Properties();
try {
FileInputStream fis = new FileInputStream(fileThread);
prop.load(fis);

} catch (Exception e) {
System.out.println("获取线程文件 " + fileThreadName + " 失败:" + e.getMessage());
}

// 如果配置文件为空,则设置key,否则读取key
String propKey = THREAD_TEMP_KEY + threadID;
Long realStartPos;
if (prop.size() == 0) {
realStartPos = startPos;
prop.setProperty(propKey, String.valueOf(realStartPos));
} else {
realStartPos = Long.valueOf(prop.getProperty(propKey));
}

// 获取剩余长度
Long remainLength = endPos - realStartPos;
System.out.println("[下载线程" + threadID + "]开始任务,剩余 " + remainLength + " byte");

// 获取下载范围
URL downloadURL = null;
HttpURLConnection connection = null;
try {
downloadURL = new URL(url);
connection = (HttpURLConnection) downloadURL.openConnection();
connection.setConnectTimeout(100000);
connection.setRequestProperty("Range", "bytes=" + realStartPos + "-" + endPos);
connection.connect();
} catch (Exception e) {
System.out.println("打开下载链接失败:" + e.getMessage());
}

// 链接到本地文件
try {
RandomAccessFile file = new RandomAccessFile(storePath + "/" + fileName, "rws");
file.seek(realStartPos);

byte[] buffer = new byte[1024];
int length = 0;
InputStream is = connection.getInputStream();
OutputStream fos = new FileOutputStream(fileThreadName);
while((length = is.read(buffer)) != -1)
{
file.write(buffer,0,length);

// 每次写入都更新下载的开始位置
realStartPos += length;
prop.setProperty(propKey, String.valueOf(realStartPos));
prop.store(fos,Thread.currentThread().getName() + " - Update" );
}

// 释放资源
is.close();
fos.close();
file.close();
connection.disconnect();
} catch (Exception e) {
System.out.println("链接本地文件失败:" + e.getMessage());
}

System.out.println("[下载线程" + threadID + "] 任务完成");
countDownLatch.countDown();

}
}

/**
* 守护线程,负责处理临时文件:记录每个线程的下载位置
*/
class logDeamonThred implements Runnable {

@Override
public void run() {
System.out.println("[守护线程] 开始运行");

// 把守护线程添加到线程集合中
int threadLocalID = (int) Thread.currentThread().getId();
threadIDList.add(threadLocalID);

Scanner userScan = new Scanner(System.in);
String userInput = null;
do {
System.out.println("下载任务进行中,输入exit暂停任务...");
userInput = userScan.nextLine();
} while (!userInput.equals("exit"));

System.out.println("用户请求暂停任务!");

// 遍历线程ID,找到以后暂时挂起
Set<Thread> setOfThread = Thread.getAllStackTraces().keySet();
for (Integer threadID : threadIDList) {
for (Thread thread : setOfThread) {
if (thread.getId() == threadID) {
thread.interrupt();
}
}
}

userScan.close();
countDownLatch.countDown();
}
}

/**
* 文件下载入口
*/
public void fileDownload() {

Long threadLength = fileReqLength / threadNum;
Long remainLength = 0L;

// 输出初始化信息
System.out.println("连接中...");
if (fileReqLength.longValue() > 0L) {
System.out.println("连接文件服务器成功!");
System.out.println("当前任务下载线程数:" + threadNum);
System.out.println("文件名为:" + fileName + ",共" + (fileReqLength / 1024 / 1024) + "MB");
} else {
System.out.println("连接文件服务器失败");
}

// 判断文件是否下载过:1.下载过且文件大小一致,则无需下载;
// 2.如果没有下载过或者下载未完成,则继续下载
File fileLocal = new File(storePath + "/" + fileName);
Long fileLocalLength = fileLocal.length();
if (fileLocalLength.longValue() == fileReqLength.longValue()) {
System.out.println("文件已经下载完成,无需下载!");
return;
} else if (fileLocalLength.longValue() >= 0) {
// 文件不存在则创建
if (!fileLocal.exists()) {
try {
fileLocal.createNewFile();

// 创建临时文件
for (int i = 0; i < threadNum; i++) {
String fileThreadName = storePath + "/" + THREAD_TEMP_FILE + i + ".properties";
File fileThread = new File(fileThreadName);
if (!fileThread.exists()) {
fileThread.createNewFile();
}
}
} catch (IOException e) {
System.out.println("文件创建失败:" + e.getMessage());
}
}

// TODO 考虑临时文件缺失的问题

// 开启守护线程
Thread logThread = new Thread(new logDeamonThred());
logThread.setDaemon(true);
logThread.start();

// 开启多线程,并将线程保存在集合中
ExecutorService executorService = Executors.newCachedThreadPool();
IntStream.range(0, threadNum).forEach(x -> {

executorService.submit(new downloadThread(x, countDownLatch, threadIDList, threadLength));
});

try {
countDownLatch.await();
} catch (
InterruptedException e) {
e.printStackTrace();
}
executorService.shutdown();
}

// TODO 清理临时文件
// TODO 文件完整性校验

System.out.println("文件下载完成!");
}

}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class playground {

// Main Thread
public static void main(String[] args) {

String storePath = "/Users/kalosora/Desktop/多线程下载";
String url = "http://dldir1.qq.com/qqfile/qq/QQ7.9/16621/QQ7.9.exe";
Integer threadNum = 4;

DownloadMultiCon downloadMultiCon = new DownloadMultiCon(url, storePath,threadNum);
downloadMultiCon.fileDownload();

}
}

改进:Web项目

#TODO#

  • 下载速度、进度的显示

  • 多个任务下,线程间速度的均摊

  • 解析迅雷、BT链接

    • 迅雷的编码规则为:原地址前面加”AA”,后面加”ZZ”,然后进行Base64编码,最后加上迅雷下载协议”thunder://“组成完整的下载链接
    • BitTorrent Java实现

技术点总结

多线程断点续传

  1. 断点续传需要提供的能力:

    1. RandomAccessFile类提供随机文件读取与写入的能力
    2. File类提供文件创建、计算长度的能力
    3. HttpURLConnection类提供访问http请求、查找数据资源范围的能力
  2. 多线程下载需要的能力:

    1. ExecutorService类帮助进行线程管理

    2. CountDownLatch类帮助同步子线程任务进度

    3. 相关公式帮助计算

      1. 单个线程的块大小:blockSize = (文件大小 +线程数 - 1 )/ 线程数 ;
      2. 下载的开始位置:start = id* block;
      3. 下载的结束位置:end = (id+1)*block -1;
  3. 多线程断点续传综合:

    1. Properties类提供日志文件的读写能力,存放各个线程下载的信息

参考来源

断点续传相关

断点续传原理

断点续传与同步器的一个实例

多线程相关

多线程下载最简单实例

多线程断点续传

如何优雅结束java进程

断点续传完整实例-支持迅雷

多线程断点续传的简单实例

多线程断点续传的详细讲解

java如何实现BT下载

现成的JAVA BT库

eclipse bittorrent

0%