上一篇簡單的hello world
ReaderUtil readerUtil = new ReaderUtil(new GenericObjectPool<StringBuffer>(new StringBufferFactory()));
理解commons pool憎夷,需要了解主要類,接口
- PooledObject : 池化對象,在對象的基礎(chǔ)上,添加對象的屬性,方法,方便判斷對象的可用狀
- ObjectPool : 對象池,利用它管理,獲取對象
- PoolableObjectFactory : 可池化對象的維護工廠
- GenericObjectPoolConfig : 連接池配置
- LinkedBlockingDeque : 線程安全的阻塞隊列數(shù)據(jù)結(jié)構(gòu)
- Evictor : 驅(qū)逐者線程,當(dāng)該線程啟動的是否,負責(zé)判斷隊列中的對象是否需要驅(qū)逐,驅(qū)逐完如果空閑對象數(shù)量小于最小可以使用的數(shù)量,維持最小的idel個對象,就創(chuàng)建等于最小數(shù)量的對象數(shù)
一步步來吧
- PooledObject : 池化對象,在對象的基礎(chǔ)上,添加對象的屬性,方法,方便判斷對象的可用狀態(tài)昧旨,比如池化StringBuffer字符操作對象
PooledObject pooledObject = new DefaultPooledObject(new StringBuffer());
public DefaultPooledObject(final T object) {
//真正操作的還是StringBuffer對象拾给,只是為了方便維護,讓DefaultPooledObject裝飾一番
this.object = object;
}
ObjectPool : 對象池,利用它管理,獲取對象臼予,看看主要方法
ObjectPool接口實現(xiàn)GenericObjectPool
public GenericObjectPool(final PooledObjectFactory<T> factory,
final GenericObjectPoolConfig config) {
super(config, ONAME_BASE, config.getJmxNamePrefix());
if (factory == null) {
jmxUnregister(); // tidy up
throw new IllegalArgumentException("factory may not be null");
}
this.factory = factory;
//支持阻塞的線程安全隊列
idleObjects = new LinkedBlockingDeque<PooledObject<T>>(config.getFairness());
//設(shè)置連接池配置
setConfig(config);
//是否啟動驅(qū)逐線程
startEvictor(getTimeBetweenEvictionRunsMillis());
}
主要方法
//從空閑隊列LinkedBlockingDeque獲取連接對象
T borrowObject() throws Exception, NoSuchElementException,
IllegalStateException;
//把使用完的對象歸還到空閑隊列LinkedBlockingDeque
void returnObject(T obj) throws Exception;
//是否啟動驅(qū)逐線程
final void startEvictor(final long delay);
獲取連接對象
public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
assertOpen();
final AbandonedConfig ac = this.abandonedConfig;
//如果配置了遺棄鸣戴,當(dāng)前空閑隊列對象數(shù)量小于2,并且正在使用中的對象數(shù)量大于(池中最多對象數(shù)量-3)
if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
(getNumIdle() < 2) &&
(getNumActive() > getMaxTotal() - 3) ) {
//正在使用狀態(tài)的粘拾,并且使用時間超過配置時間還沒有歸還的對象窄锅,則銷毀
removeAbandoned(ac);
}
PooledObject<T> p = null;
// Get local copy of current config so it is consistent for entire
// method execution
final boolean blockWhenExhausted = getBlockWhenExhausted();
boolean create;
final long waitTime = System.currentTimeMillis();
while (p == null) {
create = false;
p = idleObjects.pollFirst();
if (p == null) {
p = create();
if (p != null) {
create = true;
}
}
if (blockWhenExhausted) {
if (p == null) {
//如果沒有設(shè)置獲取等待超時就一直等待,直到有對象可以獲取
if (borrowMaxWaitMillis < 0) {
p = idleObjects.takeFirst();
} else {
//設(shè)置了獲取超時時間缰雇,如果超過設(shè)置時間還沒有獲取到,直接返回null
p = idleObjects.pollFirst(borrowMaxWaitMillis,
TimeUnit.MILLISECONDS);
}
}
if (p == null) {
throw new NoSuchElementException(
"Timeout waiting for idle object");
}
} else {
if (p == null) {
throw new NoSuchElementException("Pool exhausted");
}
}
if (!p.allocate()) {
p = null;
}
if (p != null) {
try {
//激活對象
factory.activateObject(p);
} catch (final Exception e) {
try {
//激活失敗入偷,銷毀對象
destroy(p);
} catch (final Exception e1) {
// Ignore - activation failure is more important
}
p = null;
if (create) {
final NoSuchElementException nsee = new NoSuchElementException(
"Unable to activate object");
nsee.initCause(e);
throw nsee;
}
}
if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {
boolean validate = false;
Throwable validationThrowable = null;
try {
//檢查對象是否有效可用
validate = factory.validateObject(p);
} catch (final Throwable t) {
PoolUtils.checkRethrow(t);
validationThrowable = t;
}
if (!validate) {
try {
//無效則銷毀
destroy(p);
destroyedByBorrowValidationCount.incrementAndGet();
} catch (final Exception e) {
// Ignore - validation failure is more important
}
p = null;
if (create) {
final NoSuchElementException nsee = new NoSuchElementException(
"Unable to validate object");
nsee.initCause(validationThrowable);
throw nsee;
}
}
}
}
}
updateStatsBorrow(p, System.currentTimeMillis() - waitTime);
return p.getObject();
}
removeAbandoned銷毀狀態(tài)在使用中,超過一段時間沒有歸還的對象
private void removeAbandoned(final AbandonedConfig ac) {
// Generate a list of abandoned objects to remove
final long now = System.currentTimeMillis();
final long timeout =
now - (ac.getRemoveAbandonedTimeout() * 1000L);
final ArrayList<PooledObject<T>> remove = new ArrayList<PooledObject<T>>();
final Iterator<PooledObject<T>> it = allObjects.values().iterator();
while (it.hasNext()) {
final PooledObject<T> pooledObject = it.next();
synchronized (pooledObject) {
if (pooledObject.getState() == PooledObjectState.ALLOCATED &&
pooledObject.getLastUsedTime() <= timeout) {
pooledObject.markAbandoned();
remove.add(pooledObject);
}
}
}
// Now remove the abandoned objects
final Iterator<PooledObject<T>> itr = remove.iterator();
while (itr.hasNext()) {
final PooledObject<T> pooledObject = itr.next();
if (ac.getLogAbandoned()) {
pooledObject.printStackTrace(ac.getLogWriter());
}
try {
invalidateObject(pooledObject.getObject());
} catch (final Exception e) {
e.printStackTrace();
}
}
}
歸還連接
public void returnObject(final T obj) {
//從所有對象池中獲取返回的對象
final PooledObject<T> p = allObjects.get(new IdentityWrapper<T>(obj));
if (p == null) {
//如果沒有遺棄配置AbandonedConfig,拋出異常,有則直接返回
if (!isAbandonedConfig()) {
throw new IllegalStateException(
"Returned object not currently part of this pool");
}
return; // Object was abandoned and removed
}
synchronized(p) {
final PooledObjectState state = p.getState();
//判斷對象狀態(tài)是否是正在使用械哟,如果不是拋出異常疏之,是則修改對象狀態(tài)為正在歸還,防止被遺棄
if (state != PooledObjectState.ALLOCATED) {
throw new IllegalStateException(
"Object has already been returned to this pool or is invalid");
}
p.markReturning(); // Keep from being marked abandoned
}
final long activeTime = p.getActiveTimeMillis();
if (getTestOnReturn()) {
//如果對象無效
if (!factory.validateObject(p)) {
try {
//銷毀對象暇咆,在空閑隊列锋爪,所有集合中剔除對象,并且更新銷毀對象數(shù)量爸业,創(chuàng)建對象數(shù)量
destroy(p);
} catch (final Exception e) {
swallowException(e);
}
try {
//試圖確逼浣荆空閑池中存在有可用的實例
ensureIdle(1, false);
} catch (final Exception e) {
swallowException(e);
}
updateStatsReturn(activeTime);
return;
}
}
try {
//鈍化對象,下次之前可以再復(fù)用該對象,比如對象是StringBuffer扯旷,可以用setLength(0)清空
factory.passivateObject(p);
} catch (final Exception e1) {
swallowException(e1);
try {
//同上
destroy(p);
} catch (final Exception e) {
swallowException(e);
}
try {
//同上
ensureIdle(1, false);
} catch (final Exception e) {
swallowException(e);
}
updateStatsReturn(activeTime);
return;
}
//釋放資源
if (!p.deallocate()) {
throw new IllegalStateException(
"Object has already been returned to this pool or is invalid");
}
final int maxIdleSave = getMaxIdle();
//空閑隊列是否已經(jīng)等于配置的最多空閑數(shù)量拯爽,如果是則銷毀對象,不是則歸還到空閑隊列中
if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
try {
destroy(p);
} catch (final Exception e) {
swallowException(e);
}
} else {
//如果配置的是先進先出钧忽,先進后出歸還到空閑隊列中
if (getLifo()) {
idleObjects.addFirst(p);
} else {
idleObjects.addLast(p);
}
if (isClosed()) {
// Pool closed while object was being added to idle objects.
// Make sure the returned object is destroyed rather than left
// in the idle object pool (which would effectively be a leak)
clear();
}
}
updateStatsReturn(activeTime);
}
PoolableObjectFactory : 可池化對象的維護工廠
public interface PooledObjectFactory<T> {
/**
* 創(chuàng)建對象
* Create an instance that can be served by the pool and wrap it in a
* {@link PooledObject} to be managed by the pool.
*
* @return a {@code PooledObject} wrapping an instance that can be served by the pool
*
* @throws Exception if there is a problem creating a new instance,
* this will be propagated to the code requesting an object.
*/
PooledObject<T> makeObject() throws Exception;
/**
* 銷毀對象
* Destroys an instance no longer needed by the pool.
* <p>
* It is important for implementations of this method to be aware that there
* is no guarantee about what state <code>obj</code> will be in and the
* implementation should be prepared to handle unexpected errors.
* <p>
* Also, an implementation must take in to consideration that instances lost
* to the garbage collector may never be destroyed.
* </p>
*
* @param p a {@code PooledObject} wrapping the instance to be destroyed
*
* @throws Exception should be avoided as it may be swallowed by
* the pool implementation.
*
* @see #validateObject
* @see ObjectPool#invalidateObject
*/
void destroyObject(PooledObject<T> p) throws Exception;
/**
* 檢驗對象的有效性
* Ensures that the instance is safe to be returned by the pool.
*
* @param p a {@code PooledObject} wrapping the instance to be validated
*
* @return <code>false</code> if <code>obj</code> is not valid and should
* be dropped from the pool, <code>true</code> otherwise.
*/
boolean validateObject(PooledObject<T> p);
/**
* 激活對象
* Reinitialize an instance to be returned by the pool.
*
* @param p a {@code PooledObject} wrapping the instance to be activated
*
* @throws Exception if there is a problem activating <code>obj</code>,
* this exception may be swallowed by the pool.
*
* @see #destroyObject
*/
void activateObject(PooledObject<T> p) throws Exception;
/**
* 鈍化對象毯炮,簡單來說就是在歸還對象的時候,清空對象,下次借用的可以直接使用
* Uninitialize an instance to be returned to the idle object pool.
*
* @param p a {@code PooledObject} wrapping the instance to be passivated
*
* @throws Exception if there is a problem passivating <code>obj</code>,
* this exception may be swallowed by the pool.
*
* @see #destroyObject
*/
void passivateObject(PooledObject<T> p) throws Exception;
}
看看jedis的實現(xiàn)JedisFactory耸黑,方便理解
class JedisFactory implements PooledObjectFactory<Jedis> {
@Override
public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {
final BinaryJedis jedis = pooledJedis.getObject();
if (jedis.getDB() != database) {
jedis.select(database);
}
}
@Override
public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {
final BinaryJedis jedis = pooledJedis.getObject();
if (jedis.isConnected()) {
try {
try {
jedis.quit();
} catch (Exception e) {
}
jedis.disconnect();
} catch (Exception e) {
}
}
}
@Override
public PooledObject<Jedis> makeObject() throws Exception {
final HostAndPort hostAndPort = this.hostAndPort.get();
final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout,
soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);
try {
jedis.connect();
if (password != null) {
jedis.auth(password);
}
if (database != 0) {
jedis.select(database);
}
if (clientName != null) {
jedis.clientSetname(clientName);
}
} catch (JedisException je) {
jedis.close();
throw je;
}
return new DefaultPooledObject<Jedis>(jedis);
}
@Override
public void passivateObject(PooledObject<Jedis> pooledJedis) throws Exception {
// TODO maybe should select db 0? Not sure right now.
}
@Override
public boolean validateObject(PooledObject<Jedis> pooledJedis) {
final BinaryJedis jedis = pooledJedis.getObject();
try {
HostAndPort hostAndPort = this.hostAndPort.get();
String connectionHost = jedis.getClient().getHost();
int connectionPort = jedis.getClient().getPort();
return hostAndPort.getHost().equals(connectionHost)
&& hostAndPort.getPort() == connectionPort && jedis.isConnected()
&& jedis.ping().equals("PONG");
} catch (final Exception e) {
return false;
}
}
}
GenericObjectPoolConfig : 連接池配置
- lifo: true為先進先出桃煎;false為先進后出,默認為true,表示對象的出借方式
- maxWaitMillis: 當(dāng)連接池資源耗盡時崎坊,調(diào)用者最大等待阻塞的時間(ms)备禀,默認為-1表示永不超時,建議設(shè)置值奈揍,如果資源一直等待超時曲尸,會卡死服務(wù)
- maxTotal: 連接池中最大連接數(shù),默認為8.
- maxIdle: 連接池中最大空閑的連接數(shù),默認為8.該參數(shù)一般盡量與maxTotal相同,以提高并發(fā)數(shù)
- minIdle: 連接池中最小空閑的連接數(shù),默認為0男翰,該參數(shù)一般盡量比maxIdle小一些
- blockWhenExhausted: 當(dāng)連接池資源耗盡時另患,是否會阻塞等待,默認為true:阻塞
- testOnBorrow: 調(diào)用者獲取連接池資源時蛾绎,是否檢測是有有效昆箕,如果無效則從連接池中移除,并嘗試?yán)^續(xù)獲取租冠。默認為false鹏倘。建議保持默認值
- testOnReturn: 向連接池歸還連接時,是否檢測“連接”對象的有效性顽爹。默認為false纤泵。建議保持默認值
- testOnCreate:向連接池添加創(chuàng)建對象時,是否檢測“連接”對象的有效性镜粤。默認為false捏题。建議保持默認值
- testWhileIdle: 當(dāng)驅(qū)逐空閑隊列的連接對象時,是否允許空閑時進行有效性測試肉渴,默認為false
- timeBetweenEvictionRunsMillis: “空閑連接”驅(qū)逐線程公荧,檢測的周期,毫秒數(shù)同规。如果為負值循狰,表示不運行“驅(qū)逐線程”。默認為-1
- numTestsPerEvictionRun:驅(qū)逐線程一次運行檢查多少條“連接”券勺,不要設(shè)置太大绪钥,太大需要更多的時間來執(zhí)行
- minEvictableIdleTimeMillis: 連接空閑的最小時間,達到此值后空閑連接將可能會被移除朱灿。負值(-1)表示不移除
- softMinEvictableIdleTimeMillis: 連接空閑的最小時間昧识,達到此值后空閑鏈接將會被移除,且保留“minIdle”個空閑連接數(shù)盗扒。默認為-1.
PooledObject對象的狀態(tài)
/**
* Provides the possible states that a {@link PooledObject} may be in.
*
* @version $Revision: $
*
* @since 2.0
*/
public enum PooledObjectState {
/**
* In the queue, not in use.
* 位于隊列中跪楞,未使用
*/
IDLE,
/**
* In use.
* 在使用
*/
ALLOCATED,
/**
* In the queue, currently being tested for possible eviction.
* 位于隊列中,當(dāng)前正在測試侣灶,可能會被回收
*/
EVICTION,
/**
* Not in the queue, currently being tested for possible eviction. An
* attempt to borrow the object was made while being tested which removed it
* from the queue. It should be returned to the head of the queue once
* eviction testing completes.
* TODO: Consider allocating object and ignoring the result of the eviction
* test.
* 不在隊列中甸祭,當(dāng)前正在測試,可能會被回收褥影。從池中借出對象時需要從隊列出移除并進行測試
*/
EVICTION_RETURN_TO_HEAD,
/**
* In the queue, currently being validated.
* 2.0沒有用到
*/
VALIDATION,
/**
* Not in queue, currently being validated. The object was borrowed while
* being validated and since testOnBorrow was configured, it was removed
* from the queue and pre-allocated. It should be allocated once validation
* completes.
* 2.0沒有用到
*/
VALIDATION_PREALLOCATED,
/**
* Not in queue, currently being validated. An attempt to borrow the object
* was made while previously being tested for eviction which removed it from
* the queue. It should be returned to the head of the queue once validation
* completes.
* 2.0沒有用到
*/
VALIDATION_RETURN_TO_HEAD,
/**
* Failed maintenance (e.g. eviction test or validation) and will be / has
* been destroyed
* 回收或驗證失敗池户,將銷毀
*/
INVALID,
/**
* Deemed abandoned, to be invalidated.
* 即將無效
*/
ABANDONED,
/**
* Returning to the pool.
* 正在返還到池中
*/
RETURNING
}
LinkedBlockingDeque是保存空閑隊列的地方,借出,歸還都在這里
雙向鏈表實現(xiàn)的雙向并發(fā)阻塞隊列校焦。該阻塞隊列同時支持FIFO和FILO兩種操作方式赊抖,即可以從隊列的頭和尾同時操作(插入/刪除);并且寨典,該阻塞隊列是支持線程安全
private static final class Node<E> {
/**
* The item, or null if this node has been removed.
*/
E item;
/**
* One of:
* - the real predecessor Node
* - this Node, meaning the predecessor is tail
* - null, meaning there is no predecessor
*/
Node<E> prev;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head
* - null, meaning there is no successor
*/
Node<E> next;
/**
* Create a new list node.
*
* @param x The list item
* @param p Previous item
* @param n Next item
*/
Node(final E x, final Node<E> p, final Node<E> n) {
item = x;
prev = p;
next = n;
}
}