/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.commons.pool2.impl;
import org.apache.commons.pool2.PooledObject;
/**
* To provide a custom eviction policy (i.e. something other than {@link
* DefaultEvictionPolicy} for a pool, users must provide an implementation of
* this interface that provides the required eviction policy.
*
* @param <T> the type of objects in the pool
*
* @since 2.0
*/
public interface EvictionPolicy<T> {
/**
* This method is called to test if an idle object in the pool should be
* evicted or not.
*
* @param config The pool configuration settings related to eviction
* @param underTest The pooled object being tested for eviction
* @param idleCount The current number of idle objects in the pool including
* the object under test
* @return {@code true} if the object should be evicted, otherwise
* {@code false}
*/
boolean evict(EvictionConfig config, PooledObject<T> underTest, int idleCount);
}
默認(rèn)實現(xiàn)
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.commons.pool2.impl;
import org.apache.commons.pool2.PooledObject;
/**
* Provides the default implementation of {@link EvictionPolicy} used by the
* pools. Objects will be evicted if the following conditions are met:
* <ul>
* <li>the object has been idle longer than
* {@link GenericObjectPool#getMinEvictableIdleTimeMillis()} /
* {@link GenericKeyedObjectPool#getMinEvictableIdleTimeMillis()}</li>
* <li>there are more than {@link GenericObjectPool#getMinIdle()} /
* {@link GenericKeyedObjectPoolConfig#getMinIdlePerKey()} idle objects in
* the pool and the object has been idle for longer than
* {@link GenericObjectPool#getSoftMinEvictableIdleTimeMillis()} /
* {@link GenericKeyedObjectPool#getSoftMinEvictableIdleTimeMillis()}
* </ul>
* <p>
* This class is immutable and thread-safe.
* </p>
*
* @param <T> the type of objects in the pool
*
* @since 2.0
*/
public class DefaultEvictionPolicy<T> implements EvictionPolicy<T> {
@Override
public boolean evict(final EvictionConfig config, final PooledObject<T> underTest,
final int idleCount) {
if ((config.getIdleSoftEvictTime() < underTest.getIdleTimeMillis() &&
config.getMinIdle() < idleCount) ||
config.getIdleEvictTime() < underTest.getIdleTimeMillis()) {
return true;
}
return false;
}
}
對應(yīng)配置
/**
* Create a new eviction configuration with the specified parameters.
* Instances are immutable.
*
* @param poolIdleEvictTime Expected to be provided by
* {@link BaseGenericObjectPool#getMinEvictableIdleTimeMillis()}
* @param poolIdleSoftEvictTime Expected to be provided by
* {@link BaseGenericObjectPool#getSoftMinEvictableIdleTimeMillis()}
* @param minIdle Expected to be provided by
* {@link GenericObjectPool#getMinIdle()} or
* {@link GenericKeyedObjectPool#getMinIdlePerKey()}
*/
public EvictionConfig(final long poolIdleEvictTime, final long poolIdleSoftEvictTime,
final int minIdle) {
if (poolIdleEvictTime > 0) {
idleEvictTime = poolIdleEvictTime;
} else {
idleEvictTime = Long.MAX_VALUE;
}
if (poolIdleSoftEvictTime > 0) {
idleSoftEvictTime = poolIdleSoftEvictTime;
} else {
idleSoftEvictTime = Long.MAX_VALUE;
}
this.minIdle = minIdle;
}
final EvictionConfig evictionConfig = new EvictionConfig(
getMinEvictableIdleTimeMillis(),
getSoftMinEvictableIdleTimeMillis(),
getMinIdlePerKey());
啟動驅(qū)逐器
/**
* Sets the number of milliseconds to sleep between runs of the idle object evictor thread.
* <ul>
* <li>When positive, the idle object evictor thread starts.</li>
* <li>When non-positive, no idle object evictor thread runs.</li>
* </ul>
*
* @param timeBetweenEvictionRunsMillis
* number of milliseconds to sleep between evictor runs
*
* @see #getTimeBetweenEvictionRunsMillis
*/
public final void setTimeBetweenEvictionRunsMillis(
final long timeBetweenEvictionRunsMillis) {
this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
startEvictor(timeBetweenEvictionRunsMillis);
}
timeBetweenEvictionRunsMillis >0時才啟動
* <p>Starts the evictor with the given delay. If there is an evictor
* running when this method is called, it is stopped and replaced with a
* new evictor with the specified delay.</p>
*
* <p>This method needs to be final, since it is called from a constructor.
* See POOL-195.</p>
*
* @param delay time in milliseconds before start and between eviction runs
*/
final void startEvictor(final long delay) {
synchronized (evictionLock) {
EvictionTimer.cancel(evictor, evictorShutdownTimeoutMillis, TimeUnit.MILLISECONDS);
evictor = null;
evictionIterator = null;
if (delay > 0) {
evictor = new Evictor();
EvictionTimer.schedule(evictor, delay, delay);
}
}
}
驅(qū)逐定時器
/**
* Adds the specified eviction task to the timer. Tasks that are added with a
* call to this method *must* call {@link #cancel()} to cancel the
* task to prevent memory and/or thread leaks in application server
* environments.
*
* @param task Task to be scheduled.
* @param delay Delay in milliseconds before task is executed.
* @param period Time in milliseconds between executions.
*/
static synchronized void schedule(
final BaseGenericObjectPool<?>.Evictor task, final long delay, final long period) {
if (null == executor) {
executor = new ScheduledThreadPoolExecutor(1, new EvictorThreadFactory());
executor.setRemoveOnCancelPolicy(true);
}
final ScheduledFuture<?> scheduledFuture =
executor.scheduleWithFixedDelay(task, delay, period, TimeUnit.MILLISECONDS);
task.setScheduledFuture(scheduledFuture);
}
驅(qū)逐器線程name commons-pool-evictor-thread 并只有一個線程,多個對象共享
類 EvictionTimer
/** Executor instance */
private static ScheduledThreadPoolExecutor executor;
/**
* Thread factory that creates a daemon thread, with the context class loader from this class.
*/
private static class EvictorThreadFactory implements ThreadFactory {
@Override
public Thread newThread(final Runnable runnable) {
final Thread thread = new Thread(null, runnable, "commons-pool-evictor-thread");
thread.setDaemon(true); // POOL-363 - Required for applications using Runtime.addShutdownHook().
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
thread.setContextClassLoader(EvictorThreadFactory.class.getClassLoader());
return null;
});
return thread;
}
}
不足minIdle 時補足 ensureMinIdle ( Inner classes)
/**
* The idle object evictor {@link TimerTask}.
*
* @see GenericKeyedObjectPool#setTimeBetweenEvictionRunsMillis
*/
class Evictor implements Runnable {
private ScheduledFuture<?> scheduledFuture;
/**
* Run pool maintenance. Evict objects qualifying for eviction and then
* ensure that the minimum number of idle instances are available.
* Since the Timer that invokes Evictors is shared for all Pools but
* pools may exist in different class loaders, the Evictor ensures that
* any actions taken are under the class loader of the factory
* associated with the pool.
*/
@Override
public void run() {
final ClassLoader savedClassLoader =
Thread.currentThread().getContextClassLoader();
try {
if (factoryClassLoader != null) {
// Set the class loader for the factory
final ClassLoader cl = factoryClassLoader.get();
if (cl == null) {
// The pool has been dereferenced and the class loader
// GC'd. Cancel this timer so the pool can be GC'd as
// well.
cancel();
return;
}
Thread.currentThread().setContextClassLoader(cl);
}
// Evict from the pool
try {
evict();
} catch(final Exception e) {
swallowException(e);
} catch(final OutOfMemoryError oome) {
// Log problem but give evictor thread a chance to continue
// in case error is recoverable
oome.printStackTrace(System.err);
}
// Re-create idle instances. 補足min idle
try {
ensureMinIdle();
} catch (final Exception e) {
swallowException(e);
}
} finally {
// Restore the previous CCL
Thread.currentThread().setContextClassLoader(savedClassLoader);
}
}
空閑鏈接超過maxIdle 時會直接destroy
maxIdleSave <= idleObjects.size() [ idleObjects.size()>=maxIdleSave]
final int maxIdleSave = getMaxIdle();
if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
try {
destroy(p);
} catch (final Exception e) {
swallowException(e);
}
try {
ensureIdle(1, false);
} catch (final Exception e) {
swallowException(e);
}
}
連接池dbcp 和luttence redis pool都是common pool實現(xiàn)
連接池druid 與dbcp 不同
public class DestroyTask implements Runnable {
@Override
public void run() {
shrink(true, keepAlive);
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
}
- 縮身 驅(qū)逐方法
idleMillis < minEvictableIdleTimeMillis 不驅(qū)逐
if (idleMillis < minEvictableIdleTimeMillis) {
break;
}
idleMillis > maxEvictableIdleTimeMillis 時驅(qū)逐
if (idleMillis > maxEvictableIdleTimeMillis) {
evictConnections[evictCount++] = connection;
public void shrink(boolean checkTime, boolean keepAlive) {
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
return;
}
int evictCount = 0;
int keepAliveCount = 0;
try {
if (!inited) {
return;
}
final int checkCount = poolingCount - minIdle;
final long currentTimeMillis = System.currentTimeMillis();
for (int i = 0; i < poolingCount; ++i) {
DruidConnectionHolder connection = connections[i];
if (checkTime) {
if (phyTimeoutMillis > 0) {
long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
if (phyConnectTimeMillis > phyTimeoutMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
if (idleMillis < minEvictableIdleTimeMillis) {
break;
}
if (checkTime && i < checkCount) {
evictConnections[evictCount++] = connection;
} else if (idleMillis > maxEvictableIdleTimeMillis) {
evictConnections[evictCount++] = connection;
} else if (keepAlive) {
keepAliveConnections[keepAliveCount++] = connection;
}
} else {
if (i < checkCount) {
evictConnections[evictCount++] = connection;
} else {
break;
}
}
}
int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
poolingCount -= removeCount;
}
keepAliveCheckCount += keepAliveCount;
} finally {
lock.unlock();
}
if (evictCount > 0) {
for (int i = 0; i < evictCount; ++i) {
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCountUpdater.incrementAndGet(this);
}
Arrays.fill(evictConnections, null);
}
if (keepAliveCount > 0) {
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
// keep order
for (int i = keepAliveCount - 1; i >= 0; --i) {
DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
holer.incrementKeepAliveCheckCount();
boolean validate = false;
try {
this.validateConnection(connection);
validate = true;
} catch (Throwable error) {
if (LOG.isDebugEnabled()) {
LOG.debug("keepAliveErr", error);
}
// skip
}
if (validate) {
holer.lastActiveTimeMillis = System.currentTimeMillis();
put(holer);
} else {
JdbcUtils.close(connection);
}
}
Arrays.fill(keepAliveConnections, null);
}
}
mysql內(nèi)存泄露 5.1.46
/**
* Creates a connection to a MySQL Server.
*
* @param hostToConnectTo
* the hostname of the database server
* @param portToConnectTo
* the port number the server is listening on
* @param info
* a Properties[] list holding the user and password
* @param databaseToConnectTo
* the database to connect to
* @param url
* the URL of the connection
* @param d
* the Driver instantation of the connection
* @exception SQLException
* if a database access error occurs
*/
protected ConnectionImpl(String hostToConnectTo, int portToConnectTo, Properties info,
String databaseToConnectTo, String url)
throws SQLException {
...
NonRegisteringDriver.trackConnection(this);
}
protected static void trackConnection(Connection newConn) {
ConnectionPhantomReference phantomRef = new ConnectionPhantomReference((ConnectionImpl) newConn, refQueue);
connectionPhantomRefs.put(phantomRef, phantomRef);
}
protected static final ConcurrentHashMap<ConnectionPhantomReference, ConnectionPhantomReference> connectionPhantomRefs = new ConcurrentHashMap<ConnectionPhantomReference, ConnectionPhantomReference>();
protected static final ReferenceQueue<ConnectionImpl> refQueue = new ReferenceQueue<ConnectionImpl>();
清除線程
Thread referenceThread = new Thread("Abandoned connection cleanup thread") {
public void run() {
while (true) {
try {
Reference<? extends ConnectionImpl> ref = refQueue.remove();
try {
((ConnectionPhantomReference) ref).cleanup();
} finally {
connectionPhantomRefs.remove(ref);
}
} catch (Exception ex) {
// no where to really log this if we're static
}
}
}
};
referenceThread.setDaemon(true);
referenceThread.start();
}
清除線程會處理物理鏈接 ((ConnectionPhantomReference) ref).cleanup();
static class ConnectionPhantomReference extends PhantomReference<ConnectionImpl> {
private NetworkResources io;
ConnectionPhantomReference(ConnectionImpl connectionImpl, ReferenceQueue<ConnectionImpl> q) {
super(connectionImpl, q);
try {
io = connectionImpl.getIO().getNetworkResources();
} catch (SQLException e) {
// if we somehow got here and there's really no i/o, we deal with it later
}
}
void cleanup() {
if (io != null) {
try {
io.forceClose();
} finally {
io = null;
}
}
}
}
驅(qū)逐線程會調(diào)用 ConnectionImpl close方法
public void close() throws SQLException {
try {
synchronized(this.getConnectionMutex()) {
if (this.connectionLifecycleInterceptors != null) {
Iterator var2 = this.connectionLifecycleInterceptors.iterator();
while(var2.hasNext()) {
ConnectionLifecycleInterceptor cli = (ConnectionLifecycleInterceptor)var2.next();
cli.close();
}
}
this.realClose(true, true, false, (Throwable)null);
}
} catch (CJException var7) {
throw SQLExceptionsMapping.translateException(var7, this.getExceptionInterceptor());
}
}