redis 集群批量操作实现
0浏览
收藏
怎么入门数据库编程?需要学习哪些知识点?这是新手们刚接触编程时常见的问题;下面golang学习网就来给大家整理分享一些知识点,希望能够给初学者一些帮助。本篇文章就来介绍《redis 集群批量操作实现》,涉及到操作、批量、redis集群,有需要的可以收藏一下
Redis集群是没法执行批量操作命令的,如mget,pipeline等。这是因为redis将集群划分为16383个哈希槽,不同的key会划分到不同的槽中。但是,Jedis客户端提供了计算key的slot方法,已经slot和节点之间的映射关系,通过这两个数据,就可以计算出每个key所在的节点,然后使用pipeline获取数据。具体代码如下:
初始化 JedisCluster类
@Configuration
public class JedisClusterConfig {
@Value("${spring.redis.cluster.nodes}")
private String clusterNodes;
@Value("${spring.redis.cache.commandTimeout}")
private Integer commandTimeout;
@Bean
public JedisCluster getJedisCluster() {
String[] serverArray = clusterNodes.split(",");
Set
nodes = new HashSet();
for (String ipPort : serverArray) {
String[] ipPortPair = ipPort.split(":");
nodes.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim())));
}
return new JedisCluster(nodes, commandTimeout);
}
}
工具类 JedisClusterUtil
@Component
public class JedisClusterUtil {
@Autowired
private JedisCluster jedisCluster;
@Resource(name = "redisTemplate4Json")
protected RedisTemplate
redisTemplate;
/**
* ZSet批量查询
* @param keys
* @return
*/
public List
batchZRange(List
keys) { List
resList = new ArrayList(); if (keys == null || keys.size() == 0) { return resList; } if (keys.size() == 1) { BoundZSetOperations
operations = redisTemplate.boundZSetOps(keys.get(0)); Set
set = operations.reverseRange(0, 0); resList.add(set.iterator().next()); return resList; } Map
> jedisPoolMap = getJedisPool(keys); List
keyList; JedisPool currentJedisPool = null; Pipeline currentPipeline; List
res = new ArrayList(); Map
resultMap = new HashMap(); //执行 for (Map.Entry
> entry : jedisPoolMap.entrySet()) { Jedis jedis = null; try { currentJedisPool = entry.getKey(); keyList = entry.getValue(); //获取pipeline jedis = currentJedisPool.getResource(); currentPipeline = jedis.pipelined(); for (String key : keyList) { currentPipeline.zrevrange(key, 0, 0); } //从pipeline中获取结果 res = currentPipeline.syncAndReturnAll(); currentPipeline.close(); for (int i = 0; i set = (Set
) res.get(i); if (null == set || set.isEmpty()) { resultMap.put(keyList.get(i), null); } else { byte[] byteStr = set.iterator().next().toString().getBytes(); Object obj = redisTemplate.getDefaultSerializer().deserialize(byteStr); resultMap.put(keyList.get(i), obj); } } } } catch (Exception e) { e.printStackTrace(); } finally { returnResource(jedis, currentJedisPool); } } resList = sortList(keys, resultMap); return resList; } /** * Value批量查询 * @param keys * @return */ public List batchGet(List
keys){ List
resList = new ArrayList(); if (keys == null || keys.size() == 0) { return resList; } if (keys.size() == 1) { BoundValueOperations
operations = redisTemplate.boundValueOps(keys.get(0)); resList.add(operations.get()); return resList; } Map
> jedisPoolMap = getJedisPool(keys); List
keyList; JedisPool currentJedisPool = null; Pipeline currentPipeline; List
res = new ArrayList(); Map
resultMap = new HashMap(); for (Map.Entry
> entry : jedisPoolMap.entrySet()) { Jedis jedis = null; try { currentJedisPool = entry.getKey(); keyList = entry.getValue(); //获取pipeline jedis = currentJedisPool.getResource(); currentPipeline = jedis.pipelined(); for (String key : keyList) { currentPipeline.get(key); } //从pipeline中获取结果 res = currentPipeline.syncAndReturnAll(); currentPipeline.close(); for (int i = 0; i > getJedisPool(List
keys){ //JedisCluster继承了BinaryJedisCluster //BinaryJedisCluster的JedisClusterConnectionHandler属性 //里面有JedisClusterInfoCache,根据这一条继承链,可以获取到JedisClusterInfoCache //从而获取slot和JedisPool直接的映射 MetaObject metaObject = SystemMetaObject.forObject(jedisCluster); JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache"); //保存地址+端口和命令的映射 Map
> jedisPoolMap = new HashMap(); JedisPool currentJedisPool = null; List
keyList; for (String key : keys) { //计算哈希槽 int crc = JedisClusterCRC16.getSlot(key); //通过哈希槽获取节点的连接 currentJedisPool = cache.getSlotPool(crc); //由于JedisPool作为value保存在JedisClusterInfoCache中的一个map对象中,每个节点的 //JedisPool在map的初始化阶段就是确定的和唯一的,所以获取到的每个节点的JedisPool都是一样 //的,可以作为map的key if (jedisPoolMap.containsKey(currentJedisPool)) { jedisPoolMap.get(currentJedisPool).add(key); } else { keyList = new ArrayList(); keyList.add(key); jedisPoolMap.put(currentJedisPool, keyList); } } return jedisPoolMap; } private List
sortList(List
keys, Map
params) { List
resultList = new ArrayList(); Iterator
it = keys.iterator(); while (it.hasNext()) { String key = it.next(); resultList.add(params.get(key)); } return resultList; } /** * 释放jedis资源 * * @param jedis */ public void returnResource(Jedis jedis, JedisPool jedisPool) { if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } }
注意:一定要完成后释放 jedis 资源 不然会造成卡死现象
终于介绍完啦!小伙伴们,这篇关于《redis 集群批量操作实现》的介绍应该让你收获多多了吧!欢迎大家收藏或分享给更多需要学习的朋友吧~golang学习网公众号也会发布数据库相关知识,快来关注吧!
版本声明 本文转载于:脚本之家 如有侵犯,请联系 删除
- Redis IP地址的绑定的实现
- 详解Redis实现限流的三种方式
