-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMergeFilesforMulti-Thread.java
More file actions
133 lines (122 loc) · 4.1 KB
/
Copy pathMergeFilesforMulti-Thread.java
File metadata and controls
133 lines (122 loc) · 4.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
//package com.sinatest;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
class fileWorker implements Runnable {
private File inputFile;
private String outputFile;
//构造函数
public fileWorker(File inputFile, String outputFile) {
this.inputFile = inputFile;
this.outputFile = outputFile;
}
@Override
public synchronized void run() {
File output = new File(outputFile);
if (!output.exists()) {
try {
output.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}
FileInputStream fin = null;
FileOutputStream fout = null;
FileChannel fic = null;
FileChannel foc = null;
try {
fin = new FileInputStream(inputFile);
fout = new FileOutputStream(output, true);
// 从FileInputStream创建用于输入的FileChannel
fic = fin.getChannel();
// 从FileOutputStream 创建用于输出的FileChannel
foc = fout.getChannel();
// 16KB缓冲区
ByteBuffer bb = ByteBuffer.allocate(1024 << 4);
// 根据 read返回实际读出的字节数 中止循环
while (fic.read(bb) > 0) {
// 缓冲区翻转用于输出到foc
bb.flip();
foc.write(bb);
// 清空缓冲区用于下次读取
bb.clear();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
// 安全释放资源
if (null != fic)
try {
fic.close();
} catch (IOException e) {
e.printStackTrace();
}
if (null != foc)
try {
foc.close();
} catch (IOException e) {
e.printStackTrace();
}
if (null != fin)
try {
fin.close();
} catch (IOException e) {
e.printStackTrace();
}
if (null != fout)
try {
fout.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public class Main {
//线程池线程数量
public static final int THREAD_POOL_SIZE = 5;
//遍历文件夹
public static List<File> filePathsList = new ArrayList<File>();
//缓存队列
private static final BlockingQueue BLOCKING_QUEUE = new LinkedBlockingQueue();
//1、遍历文件夹
//2、线程池读取
//3、线程写入
public static void main(String[] args) {
//读取文件类型
String fileSuffix = ".txt";
//读取文件目录
String fileFolder = "D://Project//Java//MergeFile//datatest";
//合并文件夹路径
String outputFilePath = "D://Project//Java//MergeFile//output.txt";
//遍历文件夹
getFileList(fileFolder, fileSuffix);
//创建线程池
ExecutorService es = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
//每一个线程读取一个文件
for (File filePath : filePathsList) {
es.execute(new fileWorker(filePath, outputFilePath));
}
}
public static void getFileList(String fileFolder, String fileSuffix) {
File f = new File(fileFolder);
File[] filePaths = f.listFiles();
for (File s : filePaths) {
if (s.isDirectory()) {
getFileList(s.toString(), fileSuffix);
} else {
if (-1 != s.getName().lastIndexOf(fileSuffix)) {
filePathsList.add(s);
}
}
}
}
}