序
本文主要聊一下jesque的幾個dao
dao列表
- FailureDAO
- KeysDAO
- QueueInfoDAO
- WorkerInfoDAO
FailureDAO
jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/FailureDAO.java
/**
* FailureDAO provides access to job failures.
*
* @author Greg Haines
*/
public interface FailureDAO {
/**
* @return total number of failures
*/
long getCount();
/**
* @param offset offset into the failures
* @param count number of failures to return
* @return a sub-list of the failures
*/
List<JobFailure> getFailures(long offset, long count);
/**
* Clear the list of failures.
*/
void clear();
/**
* Re-queue a job for execution.
* @param index the index into the failure list
* @return the date the job was re-queued
*/
Date requeue(long index);
/**
* Remove a failure from the list.
* @param index the index of the failure to remove
*/
void remove(long index);
}
主要操縱的是namespace:failed彩扔,是一個list類型
- count
使用llen方法獲取隊列長度
- clear
使用del刪除namespace:failed隊列
- getFailures
使用lrange命令查詢
- requeue
根據(jù)index取出failed job特愿,重新設(shè)定retry時間褪尝,放到入隊列中
- remove
根據(jù)index刪框咙,使用lrem盹牧,這里是先lset一個隨機值毕箍,再根據(jù)這個隨機值lrem
KeysDAO
jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/KeysDAO.java
/**
* KeysDAO provides access to available keys.
*
* @author Greg Haines
*/
public interface KeysDAO {
/**
* Get basic key info.
* @param key the key name
* @return the key information or null if the key did not exist
*/
KeyInfo getKeyInfo(String key);
/**
* Get basic key info plus a sub-list of the array value for the key, if applicable.
* @param key the key name
* @param offset the offset into the array
* @param count the number of values to return
* @return the key information or null if the key did not exist
*/
KeyInfo getKeyInfo(String key, int offset, int count);
/**
* Get basic info on all keys.
* @return a list of key informations
*/
List<KeyInfo> getKeyInfos();
/**
* @return information about the backing Redis database
*/
Map<String, String> getRedisInfo();
}
- getKeyInfo
使用type獲取類型
- getKeyInfos
使用keys *方法
- getRedisInfo
使用info
QueueInfoDAO
jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/QueueInfoDAO.java
/**
* QueueInfoDAO provides access to the queues in use by Jesque.
*
* @author Greg Haines
*/
public interface QueueInfoDAO {
/**
* @return the list of queue names
*/
List<String> getQueueNames();
/**
* @return total number of jobs pending in all queues
*/
long getPendingCount();
/**
* @return total number of jobs processed
*/
long getProcessedCount();
/**
* @return the list of queue informations
*/
List<QueueInfo> getQueueInfos();
/**
* @param name the queue name
* @param jobOffset the offset into the queue
* @param jobCount the number of jobs to return
* @return the queue information or null if the queue does not exist
*/
QueueInfo getQueueInfo(String name, long jobOffset, long jobCount);
/**
* Delete the given queue.
* @param name the name of the queue
*/
void removeQueue(String name);
}
- getQueueNames
使用smembers方法操作namespace:queues
- getPendingCount
對每個queue計算大小残邀,分queue類型
private long size(final Jedis jedis, final String queueName) {
final String key = key(QUEUE, queueName);
final long size;
if (JedisUtils.isDelayedQueue(jedis, key)) { // If delayed queue, use ZCARD
size = jedis.zcard(key);
} else { // Else, use LLEN
size = jedis.llen(key);
}
return size;
}
延時隊列使用的是zcard操作SortSet
非延時隊列使用llen操作list
- getProcessedCount
直接查詢stat的string對象
- getQueueInfos
順帶計算每個queue的大小
- removeQueue
public void removeQueue(final String name) {
PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork<Jedis, Void>() {
/**
* {@inheritDoc}
*/
@Override
public Void doWork(final Jedis jedis) throws Exception {
jedis.srem(key(QUEUES), name);
jedis.del(key(QUEUE, name));
return null;
}
});
}
操作了queues以及queue兩個對象
WorkerInfoDAO
jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/WorkerInfoDAO.java
/**
* WorkerInfoDAO provides access to information about workers.
*
* @author Greg Haines
*/
public interface WorkerInfoDAO {
/**
* @return total number of workers known
*/
long getWorkerCount();
/**
* @return number of active workers
*/
long getActiveWorkerCount();
/**
* @return number of paused workers
*/
long getPausedWorkerCount();
/**
* @return information about all active workers
*/
List<WorkerInfo> getActiveWorkers();
/**
* @return information about all paused workers
*/
List<WorkerInfo> getPausedWorkers();
/**
* @return information about all workers
*/
List<WorkerInfo> getAllWorkers();
/**
* @param workerName the name of the worker
* @return information about the given worker or null if that worker does not exist
*/
WorkerInfo getWorker(String workerName);
/**
* @return a map of worker informations by hostname
*/
Map<String, List<WorkerInfo>> getWorkerHostMap();
/**
* Removes the metadata about a worker.
*
* @param workerName
* The worker name to remove
*/
void removeWorker(String workerName);
}
- getAllWorkers
smembers操作namespace:workers
- getActiveWorkers
smembers操作namespace:workers瘪匿,然后過來出來state是working的
- getPausedWorkers
smembers操作namespace:workers判耕,然后過來出來state是paused的
- getWorkerCount
直接scard操作namespace:workers
- getActiveWorkerCount
smembers操作namespace:workers透绩,然后過來出來state是working的
- getPausedWorkerCount
smembers操作namespace:workers,然后過來出來state是paused的
- getWorkerHostMap
smembers操作namespace:workers壁熄,然后按照host來分map
這個基本是萬能的帚豪,其他的count基本是這個衍生出來
- removeWorker
public void removeWorker(final String workerName) {
PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork<Jedis, Void>() {
/**
* {@inheritDoc}
*/
@Override
public Void doWork(final Jedis jedis) throws Exception {
jedis.srem(key(WORKERS), workerName);
jedis.del(key(WORKER, workerName), key(WORKER, workerName, STARTED),
key(STAT, FAILED, workerName), key(STAT, PROCESSED, workerName));
return null;
}
});
}
操作works以及其他相關(guān)的對象