我们在开发的时候经常会用到redis。java中首选的redis客户端是jedis。jedis封装了redis的所有功能,并且提供了更多额外好用的功能。
你在开发时,可以通过jedis连接redis。jedis支持直连模式和连接池模式。
- 直连模式:
public static void main(String[] arg){
//1个实例
Jedis jedis = new Jedis("127.0.0.1",6379,100);
jedis.incr("threadSafe");
jedis.close();
}
你不应该将一个redis实例共享给不同线程,时用时消的用法会造成产生很多的socket和tcp连接,即浪费时间又耗费资源,也提高了服务不可用的风险。如果你只是在本机环境或者使用者相对少的环境中使用,这种模式比较适宜。但是,如果你需要大量使用redis时,应该采用连接池模式。
- 连接池模式:
public static void main(String[] arg){
JedisPool pool = new JedisPool(new JedisPoolConfig(),"localhost");
try (Jedis jedis = pool.getResource()) {
jedis.set("foo", "bar");
String foobar = jedis.get("foo");
jedis.zadd("sose", 0, "car"); jedis.zadd("sose", 0, "bike");
Set<String> sose = jedis.zrange("sose", 0, -1);
}
pool.close();
}
为了避免由单一redis实例引发的未知的服务不可用的风险,我们应该采用这种方案。这种方案背后的原理就是连接池(common-pool2)。当服务到达时服务会从连接池中borrow一个jedis连接,当用完或者发生错误时,连接会归还到连接池中。如果没有borrow到连接,那么服务就会报错且关闭。
我的上一篇池化技术已经提到过连接池的概念,所以这篇文章是我想记录并分享我所学习到的jedis连接池的知识。通过上一段代码我们看到当jedisPool实例化以后,我们从pool中通过getResource来获取一个jedis实例。那么我们的代码研究就从这里开始。
//jedis-3.6.0.jar/redis/clients/jedis/jedisPool.class
public Jedis getResource() {
//调用父集getResource
Jedis jedis = (Jedis)super.getResource();
//将当前实例暴露出dataSource,供后续redis操作使用
jedis.setDataSource(this);
return jedis;
}
//jedis-3.6.0.jar/redis/clients/jedis/client/util/Pool.class
//genericObjectPool是ObjectPool的实现,ObjectPool是common.pool2中关于池化对象的接口。其中定义了几个标准的对象方法,这些方法就是管理池的核心方法。
protected GenericObjectPool<T> internalPool;
……
public T getResource() {
try {
//通过调用common.pool2中的borrowObject,完成对jedis实例的借取。
return this.internalPool.borrowObject();
//如果出现了异常了,按照异常的分类进行处理。
} catch (NoSuchElementException var2) {
if (null == var2.getCause()) {
throw new JedisExhaustedPoolException("Could not get a resource since the pool is exhausted", var2);
} else {
throw new JedisException("Could not get a resource from the pool", var2);
}
} catch (Exception var3) {
throw new JedisConnectionException("Could not get a resource from the pool", var3);
}
}
在common-pool2中,对象池的核心接口叫做ObjectPool,他定义了对象池的实现的行为。
- addObject方法:往池中添加一个对象。池子里的所有对象都是通过这个方法进来的。
- borrowObject方法:从池中借走到一个对象。借走不等于删除。对象一直都属于池子,只是状态的变化。
- returnObject方法:把对象归还给对象池。归还不等于添加。对象一直都属于池子,只是状态的变化。
- invalidateObject:销毁一个对象。这个方法才会将对象从池子中删除,当然这其中最重要的就是释放对象本身持有的各种资源。
- getNumIdle:返回对象池中有多少对象是空闲的,也就是能够被借走的对象的数量。
- getNumActive:返回对象池中有对象对象是活跃的,也就是已经被借走的,在使用中的对象的数量。
- clear:清理对象池。注意是清理不是清空,改方法要求的是,清理所有空闲对象,释放相关资源。
- close:关闭对象池。这个方法可以达到清空的效果,清理所有对象以及相关资源。
在common-pool2中,objectPool的核心实现类就是GenericObjectPool。
//commons-pool2.jar/org/apache/commons/pool2/impl/GenericObjectedPool
@Override
public T borrowObject() throws Exception {
//getMaxWaitMillis()是BaseGenericObjectPool中设定的volatile类型的值,代表最长等待时间(毫秒),配置文件中的"maxWait"
return borrowObject(getMaxWaitMillis());
}
接下来我们来分析下borrowObject方法,刚才说过borrowObject是实现了ObjectPool。那么先看一下这个接口中对borrowObject的描述。
/**
* Obtains an instance from this pool.
* <p>
* Instances returned from this method will have been either newly created
* with {@link PooledObjectFactory#makeObject} or will be a previously
* idle object and have been activated with
* {@link PooledObjectFactory#activateObject} and then validated with
* {@link PooledObjectFactory#validateObject}.
* </p>
* <p>
* By contract, clients <strong>must</strong> return the borrowed instance
* using {@link #returnObject}, {@link #invalidateObject}, or a related
* method as defined in an implementation or sub-interface.
* </p>
* <p>
* The behavior of this method when the pool has been exhausted
* is not strictly specified (although it may be specified by
* implementations).
* </p>
*
* @return an instance from this pool.
*
* @throws IllegalStateException
* after {@link #close close} has been called on this pool.
* @throws Exception
* when {@link PooledObjectFactory#makeObject} throws an
* exception.
* @throws NoSuchElementException
* when the pool is exhausted and cannot or will not return
* another instance.
*/
/*
这个method返回实例,这个实例将是一个被makeObject()创建的对象,或者是一个之前就是idle并且经过activateObject()激活过的并且经过validatedObject()验证过的对象。
根据约定,客户端必须调用过returenObject(),invalidateObject()或者一个在子类中实现了归还逻辑的方法后归还。
这个方法在池子耗尽时的表现没有指明具体如何处理(尽管他有可能被他的实现制定过。)
*/
T borrowObject() throws Exception, NoSuchElementException,
IllegalStateException;
接着我们再来看看这个方法的具体实现。
//commons-pool2.jar/org/apache/commons/pool2/impl/GenericObjectPool.class
/**
* Equivalent to <code>{@link #borrowObject(long)
* borrowObject}({@link #getMaxWaitMillis()})</code>.
* <p>
* {@inheritDoc}
* </p>
*/
//通过获取配置中MaxWait配置,当做传输调用重载的方法。
@Override
public T borrowObject() throws Exception {
return borrowObject(getMaxWaitMillis());
}
/**
* Borrows an object from the pool using the specific waiting time which only
* applies if {@link #getBlockWhenExhausted()} is true.
* <p>
* If there is one or more idle instance available in the pool, then an
* idle instance will be selected based on the value of {@link #getLifo()},
* activated and returned. If activation fails, or {@link #getTestOnBorrow()
* testOnBorrow} is set to {@code true} and validation fails, the
* instance is destroyed and the next available instance is examined. This
* continues until either a valid instance is returned or there are no more
* idle instances available.
* </p>
* <p>
* If there are no idle instances available in the pool, behavior depends on
* the {@link #getMaxTotal() maxTotal}, (if applicable)
* {@link #getBlockWhenExhausted()} and the value passed in to the
* {@code borrowMaxWaitMillis} parameter. If the number of instances
* checked out from the pool is less than {@code maxTotal,} a new
* instance is created, activated and (if applicable) validated and returned
* to the caller. If validation fails, a {@code NoSuchElementException}
* is thrown.
* </p>
* <p>
* If the pool is exhausted (no available idle instances and no capacity to
* create new ones), this method will either block (if
* {@link #getBlockWhenExhausted()} is true) or throw a
* {@code NoSuchElementException} (if
* {@link #getBlockWhenExhausted()} is false). The length of time that this
* method will block when {@link #getBlockWhenExhausted()} is true is
* determined by the value passed in to the {@code borrowMaxWaitMillis}
* parameter.
* </p>
* <p>
* When the pool is exhausted, multiple calling threads may be
* simultaneously blocked waiting for instances to become available. A
* "fairness" algorithm has been implemented to ensure that threads receive
* available instances in request arrival order.
* </p>
*
* @param borrowMaxWaitMillis The time to wait in milliseconds for an object
* to become available
*
* @return object instance from the pool
*
* @throws NoSuchElementException if an instance cannot be returned
*
* @throws Exception if an object instance cannot be returned due to an
* error
*/
/*
当getBlockWhenExhausted()返回true时,需要提供等待时间从对象池中借出对象,
如果有1个或多个空闲实例的话,就通过getLifo()方法,也就是先进先出的策略选择由哪个空闲实例承接工作。如果激活失败了,或者testOnBorrow设置为true并且检测失败了,这个实例就会销毁,接下来由下一个可用实例进行承接。这个流程一直持续,直到没有可用的实例可供使用或者没有空闲实例可供使用。
如果在对象池中没有空闲实例了,接下来的走向取决于maxTotal,getBlockWhenExhausted()和传进来的等待时间参数决定。如果当前对象池中的实力数量小于maxtotal,一个新的实例将被创建,激活并且检测最终返回给调用者。如果检测失败,一个NoSuchElementException异常将被抛出。
如果对象池已经占满了(也就是说没有可用的空闲实例并且没有容量可以被创建),这个方法要不堵塞(如果getBlockWhenExhausted设置为true的情况下才会堵塞)要不就抛出一个NoSuchElementException(相反getBlockWhenExhausted为false的情况)异常。具体阻塞的时间跟传入的borrowMaxWaitMillis时间大小有关。(还记得刚才说的maxWait的设置么?)
当对象池已经耗尽,多个调用线程将被同时堵塞,他们一起等待可用的jedis实例接客。接客的规则采用公平算法。其实就是先到先被接~哈哈
*/
public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
//这一步就是检查一下对象池的状态,如果之前已经关闭了,就会抛出IllegalStateException异常。
assertOpen();
//移除已经被抛弃的实例
final AbandonedConfig ac = this.abandonedConfig;
if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
(getNumIdle() < 2) &&
(getNumActive() > getMaxTotal() - 3) ) {
/*
被抛弃的实例不为null
borrow时去除被抛弃的实例
空闲线程实例<2
当前活动的实例数量大于最大实例数量-3
*/
removeAbandoned(ac);
}
//定义一个PooledObject类型的对象,T是真正的实例对象。如jedis,db等。
PooledObject<T> p = null;
// Get local copy of current config so it is consistent for entire
// method execution
/*
获取一个本地的拷贝,这个拷贝是目前的配置。这样就能够保证整体的一致性。
这个getBlockWhenExhausted()最终是获取的一个volatile类型的值,这个值是从配置中获取的,代表当线程池满时,是否对新来的任务给予堵塞。但是,我们得注意volatile这个类型了,而且设计者在这个地方单独获取出来这个值,肯定是为了什么。接着往下看吧。
*/
final boolean blockWhenExhausted = getBlockWhenExhausted();
//是否为新建,请区别于created。为什么仅在这里定义了变量而没有初始化呢?
boolean create;
//当前的毫秒,用来就算
final long waitTime = System.currentTimeMillis();
//循环开始,第一次p=null开始循环
while (p == null) {
//初始化create=false,代表目前这个线程不是新建的。
create = false;
/*
private final LinkedBlockingDeque<PooledObject<T>> idleObjects;
idleObjects是一个阻塞有序队列,队列的类型是刚才的封装类型,封装的是实际的资源。从名字上可以看出,这里叫做空闲对象组
可以看到,从空闲对象组里拿出第一个元素。pollFirst()是LinkedBlockingDeque.java中的方法。就是通过操作节点来把第一个节点删除并返回。
*/
p = idleObjects.pollFirst();
//如果p为空代表弹出的是null,说明没有空闲。那就创建一个新的对象。
if (p == null) {
p = create();
//p可能会创建失败,创建失败的时候 create=false。 创建成功create=true
if (p != null) {
create = true;
}
}
//如果设置了对象池耗尽后堵塞等待的标示
if (blockWhenExhausted) {
if (p == null) {
//p没有创建成功
if (borrowMaxWaitMillis < 0) {
//如果没有设置为-1,只有这种情况会符合小于0.所以就直接从空闲对象组中获取第一个有效的空闲对象。这个方法有锁的实现,所以这块可能会一直堵塞着直到有一个空闲对象可以使用。
p = idleObjects.takeFirst();
} else {
//如果设置等待时间了,就在这段时间里获取。超过了就返回null
p = idleObjects.pollFirst(borrowMaxWaitMillis,
TimeUnit.MILLISECONDS);
}
}
//到这里如果还是空,就说明没等待到空闲的资源了。所以就要报异常了。
if (p == null) {
throw new NoSuchElementException(
"Timeout waiting for idle object");
}
} else {
//如果没有设置blockWhenExhausted标示且p是null,那真是获取不到资源了,因为池子已经被占满了。
if (p == null) {
throw new NoSuchElementException("Pool exhausted");
}
}
/*
到这里其实p应该不是null了,说明肯定是有资源了。有可能是获得了空闲资源或是创建了一个新的资源。这个时候就要让系统给他分配内存了。跟进allocate()代码中,发现这是一个synchronized方法,保证了线程安全。(这块会有线程安全问题。为什么呢?)方法体判断了当前池状态(空闲状态或者测试中状态)然后更改状态位,置lastBorrowTime,lastUseTime,borrowedCount++等,然后返回boolean。
我们可以推算出,当前的池资源状态可能会被另一个线程改写。有一种情况就是,空闲的连接还需要进行test和validate才能够正确分配。但是如果在test或validate阶段失败了,那线程就不能够被分配资源了,因为前后状态不一致了。所以需要allocate的同步操作来保证当时的空闲资源不被改写。
*/
if (!p.allocate()) {
//没有成功分配资源,当然p就应该置为null,等待重新进行遍历。
p = null;
}
//到这里,分配成功的p应该进行校验阶段了。没有成功分配的p目前还是null,所以跳过下面的判断。
if (p != null) {
//从这开始p是有资源了
try {
/*
private final PooledObjectFactory<T> factory;
通过池工厂(PooledObjectFactory.java)对返回来的实例重新初始化。因为PooledObjectFactory是个接口。所以,我以Jedis为T进行解析(JedisFactory.java),别的实际类型会有不同的行为,但是基本差不多。
这个方法里就是获取了BinaryJedis,判断当前的库是不是库0.如果不是就就简单做个select操作。如果是的话什么都不做。
这块的设计有一个小心思,就是作者用这种代码结构来给开发者留出了很大的实现空间。
*/
factory.activateObject(p);
} catch (final Exception e) {
//激活异常
try {
//摧毁对象,释放资源
destroy(p, DestroyMode.NORMAL);
} catch (final Exception e1) {
// 摧毁的程序都报异常了,我可管不了了。采取鸵鸟算法。
}
//help GC
p = null;
//如果是新建的资源,就说明分配成功,但是激活失败。抛个错吧。
if (create) {
final NoSuchElementException nsee = new NoSuchElementException(
"Unable to activate object");
//用什么错误原因呢?就用上面这个吧
nsee.initCause(e);
throw nsee;
}
}
/*
如果分配成功了, 激活也成功了,并且开启了TestOnBorrow,逻辑进来再让我蹂躏下吧。如果失败了跳过这个逻辑吧。
testOnBorrow的目的是不论资源是否是从池子里借的,都要在返回钱进行一次验证。如果失败了资源会被清除出池子并且毁掉。然后再向池对象尝试借一次。
*/
if (p != null && getTestOnBorrow()) {
boolean validate = false;
Throwable validationThrowable = null;
try {
/*
我依然以JedisFactory.java分析吧。
依然是首先获取一个BinaryJedis资源。判断连接的Port和设置的port是否相等。判断连接的host和设置额host是否相等。相等就返回true。否则就返回false。
*/
validate = factory.validateObject(p);
} catch (final Throwable t) {
PoolUtils.checkRethrow(t);
validationThrowable = t;
}
//如果检查没有通过依然是摧毁。如果是新建的就报错。和上面激活的流程一样。
if (!validate) {
try {
destroy(p, DestroyMode.NORMAL);
destroyedByBorrowValidationCount.incrementAndGet();
} catch (final Exception e) {
// Ignore - validation failure is more important
}
p = null;
if (create) {
final NoSuchElementException nsee = new NoSuchElementException(
"Unable to validate object");
nsee.initCause(validationThrowable);
throw nsee;
}
}
}
}
}
//恭喜获得了一个有效的p资源。然后统计一下数据吧,比如消耗时间。
updateStatsBorrow(p, System.currentTimeMillis() - waitTime);
//返回吧。记住返回的是一个PooledObject对象。真正的资源需要通过getObject()获取。
return p.getObject();
}
至此,看懂了jedis是如何从连接池中拿到资源的了。这里面经常有一些线程安全的判断,这也是难点所在,因为池化技术最重要的功能就是对线程资源的管理与利用。所以,代码中用了很多sychronize方法。不光这些,作者还要考虑到资源的释放和销毁还有扩展。
最后看到了getObject()方法。为什么作者不直接暴露资源呢?为什么还要包一层呢?接下来我来分析一下吧。PooledObject是一个接口,我们先看看接口的描述是什么。详情看我的之前一篇文章common-pool对象池学习
/**
* Defines the wrapper that is used to track the additional information, such as
* state, for the pooled objects.
* <p>
* Implementations of this class are required to be thread-safe.
* </p>
*
* @param <T> the type of object in the pool
*
* @since 2.0
*/
/*
定义一个封装结构,这个封装结构使用来记录额外信息的。比如池资源的状态等。
实现这个接口的class文件必须是线程安全的。(为什么呢?)
*/
public interface PooledObject<T> extends Comparable<PooledObject<T>> {}
参考资料: