前言
由于工作需要使用到ftp服務(wù)睬棚,一開(kāi)始是每次建立ftp連接,上傳文件成功后杜秸,再釋放連接拼余,后來(lái)發(fā)現(xiàn)這個(gè)方法太浪費(fèi)資源和時(shí)間了,就想到了使用ftp連接池的方式實(shí)現(xiàn)亩歹,這樣匙监,預(yù)先創(chuàng)建好ftp連接池,需要上傳的時(shí)候從池子取一個(gè)連接小作,上傳成功后再放回池子即可亭姥,省下了創(chuàng)建和釋放ftp連接的時(shí)間。
實(shí)現(xiàn)
ftp服務(wù)的配置文件
在
config.properties
配置好ftp服務(wù)
ftp.ip=127.0.0.1
ftp.username=root
ftp.password=root
ftp.port=21
FtpClientConfig
FtpClientConfig
是用于讀取config.properties
的一個(gè)實(shí)體類
public class FtpClientConfig {
private String host;
private int port;
private String username;
private String password;
...
FtpClientFactory
FtpClientFactory
可以理解為一個(gè)工廠類顾稀,用于生成ftp連接达罗、銷毀ftp連接以及檢測(cè)ftp連接是否有效。
- 生成ftp連接
在生成ftp連接的時(shí)候静秆,我們可以設(shè)定連接的超時(shí)時(shí)間等粮揉,ftp有主動(dòng)模式和被動(dòng)模式兩種模式。
- 主動(dòng)模式:FTP客戶端隨機(jī)開(kāi)啟一個(gè)大于1024的端口N向服務(wù)器的21號(hào)端口發(fā)起連接抚笔,然后開(kāi)放N+1號(hào)端口進(jìn)行監(jiān)聽(tīng)扶认,并向服務(wù)器發(fā)出PORT N+1命令。服務(wù)器接收到命令后殊橙,會(huì)用其本地的FTP數(shù)據(jù)端口(通常是20)來(lái)連接客戶端指定的端口N+1辐宾,進(jìn)行數(shù)據(jù)傳輸
- 被動(dòng)模式:FTP客戶端隨機(jī)開(kāi)啟一個(gè)大于1024的端口N向服務(wù)器的21號(hào)端口發(fā)起連接,同時(shí)會(huì)開(kāi)啟N+1號(hào)端口膨蛮。然后向服務(wù)器發(fā)送PASV命令叠纹,通知服務(wù)器自己處于被動(dòng)模式。服務(wù)器收到命令后敞葛,會(huì)開(kāi)放一個(gè)大于1024的端口P進(jìn)行監(jiān)聽(tīng)誉察,然后用PORT P命令通知客戶端,自己的數(shù)據(jù)端口是P惹谐〕制客戶端收到命令后,會(huì)通過(guò)N+1號(hào)端口連接服務(wù)器的端口P豺鼻,然后在兩個(gè)端口之間進(jìn)行數(shù)據(jù)傳輸综液。
public FTPClient makeClient() throws Exception{
FTPClient ftpClient = new FTPClient();
ftpClient.setConnectTimeout(1000 * 10);
try {
ftpClient.connect(config.getHost(), config.getPort());
boolean result = ftpClient.login(config.getUsername(), config.getPassword());
if(!result) {
log.info("ftp登錄失敗,username: {}",config.getUsername());
return null;
}
ftpClient.setControlEncoding(encode);
ftpClient.setFileType(FTPClient.BINARY_FILE_TYPE);
//被動(dòng)模式 被動(dòng)模式是客戶端向服務(wù)端發(fā)送PASV命令,服務(wù)端隨機(jī)開(kāi)啟一個(gè)端口并通知客戶端儒飒,客戶端根據(jù)該端口與服務(wù)端建立連接,然后發(fā)送數(shù)據(jù)檩奠。服務(wù)端是兩種模式的桩了,
//使用哪種模式取決于客戶端附帽,同時(shí)關(guān)鍵點(diǎn)在于網(wǎng)絡(luò)環(huán)境適合用哪種模式,比如客戶端在防火墻內(nèi)井誉,則最好選擇被動(dòng)模式
//在mac下測(cè)試用被動(dòng)模式?jīng)]問(wèn)題蕉扮,用主動(dòng)模式則報(bào)錯(cuò),在linux服務(wù)器上則相反
//ftpClient.enterLocalPassiveMode();
ftpClient.enterLocalActiveMode();
} catch (Exception e) {
log.error("makeClient exception",e);
destroyClient(ftpClient);
throw e;
}
return ftpClient;
}
- 銷毀ftp連接
public void destroyClient(FTPClient ftpClient) {
try {
if(ftpClient != null && ftpClient.isConnected()) {
ftpClient.logout();
}
} catch (Exception e) {
log.error("ftpClient logout exception",e);
} finally {
try {
if(ftpClient != null) {
ftpClient.disconnect();
}
} catch (Exception e2) {
log.error("ftpClient disconnect exception",e2);
}
}
}
- 檢測(cè)ftp連接
public boolean validateClient(FTPClient ftpClient) {
try {
return ftpClient.sendNoOp();
} catch (Exception e) {
log.error("ftpClient validate exception",e);
}
return false;
}
FtpClientPool
FtpClientPool
就是我們真正使用的類颗圣,我們使用了BlockingQueue
阻塞對(duì)列來(lái)實(shí)現(xiàn)連接池的效果喳钟,如果需要進(jìn)行ftp連接,就從連接池獲取一個(gè)連接在岂,完成后就把連接歸還到池子里奔则。使用阻塞對(duì)列是為了防止多線程時(shí)多個(gè)線程同時(shí)獲取了同一個(gè)ftp連接導(dǎo)致失敗。
private static final int DEFAULT_POOL_SIZE = 16;
private BlockingQueue<FTPClient> pool;
private FtpClientFactory factory;
public FtpClientPool(FtpClientFactory factory) {
this(factory, DEFAULT_POOL_SIZE);
}
public FtpClientPool(FtpClientFactory factory,int size) {
this.factory = factory;
this.pool = new ArrayBlockingQueue<>(size);
initPool(size);
}
- 初始化
private void initPool(int maxPoolSize) {
try {
int count = 0;
while (count < maxPoolSize) {
pool.offer(factory.makeClient(),10,TimeUnit.SECONDS);
count ++;
}
} catch (Exception e) {
log.error("ftp連接池初始化失敗",e);
}
}
- 從阻塞對(duì)列獲取一個(gè)ftp連接
public FTPClient borrowClient() throws Exception{
FTPClient client = pool.take();
if(client == null) {
client = factory.makeClient();
//addClient(client);
returnClient(client);
}else if(!factory.validateClient(client)) {
invalidateClient(client);
client = factory.makeClient();
//addClient(client);
returnClient(client);
}
return client;
}
- 歸還一個(gè)ftp連接
public void returnClient(FTPClient ftpClient) throws Exception{
try {
if(ftpClient != null && !pool.offer(ftpClient, 10, TimeUnit.SECONDS)) {
factory.destroyClient(ftpClient);
}
} catch (Exception e) {
log.error("歸還對(duì)象失敗",e);
throw e;
}
}
FtpClientKeepAlive
如果服務(wù)器設(shè)置了ftp連接在一段時(shí)間內(nèi)不使用會(huì)自動(dòng)斷開(kāi)連接蔽午,就會(huì)導(dǎo)致我們的連接超過(guò)時(shí)間就會(huì)失敗易茬,為了避免一直重復(fù)創(chuàng)建連接,這里使用了長(zhǎng)連接及老,
FtpClientKeepAlive
負(fù)責(zé)保持長(zhǎng)連接抽莱,如果連接失效,就重新創(chuàng)建連接骄恶。
根據(jù)服務(wù)器超時(shí)時(shí)間設(shè)置長(zhǎng)連接保持的時(shí)間食铐,每隔一段時(shí)間,從阻塞對(duì)列獲取連接來(lái)進(jìn)行驗(yàn)證僧鲁。
public class FtpClientKeepAlive {
private static final Logger log = LoggerFactory.getLogger(FtpClientKeepAlive.class);
private KeepAliveThread keepAliveThread;
@Autowired
private FtpClientPool ftpClientPool;
public void init() {
// 啟動(dòng)心跳檢測(cè)線程
if (keepAliveThread == null) {
keepAliveThread = new KeepAliveThread();
Thread thread = new Thread(keepAliveThread);
thread.start();
}
}
class KeepAliveThread implements Runnable {
@Override
public void run() {
FTPClient ftpClient = null;
while (true) {
try {
BlockingQueue<FTPClient> pool = ftpClientPool.getPool();
if (pool != null && pool.size() > 0) {
Iterator<FTPClient> it = pool.iterator();
while (it.hasNext()) {
ftpClient = it.next();
boolean result = ftpClient.sendNoOp();
log.info("心跳結(jié)果: {}",result);
if (!result) {
ftpClientPool.invalidateClient(ftpClient);
}
}
}
} catch (Exception e) {
log.error("ftp心跳檢測(cè)異常", e);
ftpClientPool.invalidateClient(ftpClient);
}
// 每30s發(fā)送一次心跳璃岳,服務(wù)器超時(shí)時(shí)間為60s
try {
Thread.sleep(1000 * 30);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
log.error("ftp休眠異常", e);
}
}
}
}
}
spring-ftp.xml
由于項(xiàng)目是使用spring的,所以在xml配置文件里進(jìn)行bean的配置悔捶。
<!-- 省略了spring beans頭部配置-->
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="location" value="classpath:config.properties" />
</bean>
<bean id="ftpClientConfig" class="com.zeromk.study.util.FtpClientConfig">
<property name="host" value="${ftp.ip}"/>
<property name="port" value="${ftp.port}"/>
<property name="username" value="${ftp.username}"/>
<property name="password" value="${ftp.password}"/>
</bean>
<bean id="ftpClientFactory" class="com.zeromk.study.util.FtpClientFactory">
<constructor-arg index="0" ref="ftpClientConfig"/>
</bean>
<bean id="ftpClientPool" class="com.zeromk.study.util.FtpClientPool">
<constructor-arg index="0" ref="ftpClientFactory"/>
<constructor-arg index="1" value="8"/>
</bean>
<bean id="ftpClientKeepAlive" class="com.zeromk.study.util.FtpClientKeepAlive" init-method="init">
</bean>
源碼
詳細(xì)代碼請(qǐng)參考github铃慷。
參考
https://github.com/jimiyi/ftpService
http://blog.51cto.com/11010174/1983978