您的位置: 首页 - 站长

seo建站淘客wordpress注册不跳转

当前位置: 首页 > news >正文

seo建站淘客,wordpress注册不跳转,设计师培训多久,局网站建设总结目录 一、前言二、分布式锁具备的特点三、Redis分布式锁的实现核心思路四、分布式锁代码实现#xff08;解决分布式锁原子性、死锁、误删、可重入、自动续期等问题#xff09;4.1、分布式锁实现工具类4.2、测试分布式锁效果 五、分布式锁常见问题以及解决方法5.1、分布式锁死… 目录 一、前言二、分布式锁具备的特点三、Redis分布式锁的实现核心思路四、分布式锁代码实现解决分布式锁原子性、死锁、误删、可重入、自动续期等问题4.1、分布式锁实现工具类4.2、测试分布式锁效果 五、分布式锁常见问题以及解决方法5.1、分布式锁死锁问题5.1.1、逻辑说明5.1.2、解决方案 5.2、分布式锁原子性问题5.2.1、逻辑说明5.2.2、解决方案 5.3、分布式锁可重入问题5.3.1、逻辑说明5.3.2、解决方案 5.4、分布式锁如何防止误删5.4.1、逻辑说明5.4.2、解决方案 5.5、分布式锁自动续期问题5.5.1、逻辑说明5.5.2、解决方案 一、前言 分布式锁是一种用于在分布式系统中实现同步和互斥访问的机制。在分布式系统中多个节点同时访问共享资源可能会导致数据不一致或竞争条件的发生。分布式锁提供了一种保护共享资源的方式以确保在任意时刻只有一个节点可以访问该资源如同一时刻每个订单只能有一个线程操作取消订单功能。 常见分布式锁实现 MySQLMySQL本身就带有锁机制由于业务特性使用MySQL作为分布式锁并不合适而且性能一般一般很少使用MySQL来实现分布式锁。ZooKeeperZooKeeper是企业级开发中较好的一个实现分布式锁的方案相对于RedisZooKeeper的部署和维护复杂一些。此外ZooKeeper的性能相对较低适用于对性能要求不高的场景。RedisRedis分布式锁的实现通常使用了SETNX(SET if Not eXists)命令和EXPIRE命令。使用SETNX可以尝试将一个键值对设置到Redis中只有在该键不存在的情况下才能成功。成功获取锁的客户端可以设置一个过期时间确保即使在发生故障的情况下锁也能自动释放。
二、分布式锁具备的特点 实现的分布式锁需要具备一下特征 特点描述独占性任何时刻有且只有一个线程持有使用该锁高可用 高性能程序不易崩溃时刻都保证较高的可用性在redis集群环境下不能因为某一个节点挂了而出现获取锁和释放锁失败的情况在高并发请求下分布式锁依旧具有良好的性能防死锁不能出现死锁问题必须有超时重试机制或者撤销操作有个终止跳出的途径不乱抢多线程下防止张冠李戴只能解锁自己的锁不能把别人的锁给释放了重入性同一节点的同一线程如果获得锁之后该线程可以再次获取使用这个锁 三、Redis分布式锁的实现核心思路 在常规的实现方式中Redis锁机制一般是由 setnx 命令实现是”set if not exists”的简写语法setnx key value将key设置值为value如果key不存在会返回1这种情况下等同 set 命令。 当key存在时什么也不做会返回0并且要使用 expire 设置一个锁的过期时间避免应用程序异常导致锁一直没有释放。 例如 127.0.0.1:6379 setnx key1 1 (integer) 1 127.0.0.1:6379 setnx key1 1 (integer) 0 127.0.0.1:6379 expire key1 60 (integer) 1但是上面的setnx和expire实现分布式锁的方式是不安全两条命令非原子性的并不能保证一致性可以通过一些第三方框架或者自己通过lua脚本实现原子操作下面会通过代码分析分布式锁来实现。 四、分布式锁代码实现解决分布式锁原子性、死锁、误删、可重入、自动续期等问题 这里使用的是SpringBoot环境会使用RedisTemplate的API操作Redis实现分布式锁解决分布式锁原子性、死锁、误删、可重入、自动续期等问题。 需要SpringBoot集成调用Redis资料的可以跳转https://blog.csdn.net/weixin_44606481/article/details/133907103 4.1、分布式锁实现工具类 这个分布式锁实现工具类已经将分布式锁原子性、死锁、误删、可重入、自动续期等问题都已解决为了做演示和重点讲解问题解决步骤这里没有进行特定封装可以根据需要自行封装增强拓展性。 import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Component; import java.util.Collections; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit;/*** author Kerwin/ Component public class RedisLockUtils {Autowiredprivate RedisTemplateString, String redisTemplate;// 活跃锁keyvalue集合续期的时候会使用private volatile static CopyOnWriteArraySet activeLockKeySet new CopyOnWriteArraySet();// 定时线程池 用于续期private static ScheduledExecutorService executorService Executors.newScheduledThreadPool(5);/** 加锁* Param key* Param value 锁的value一般使用线程ID在解锁时需要使用* Param expireTime 过期时间 单位秒/public boolean lock(String key, String value, long expireTime) {// 为了实现锁的可重入这里要自己封装一个lua脚本如果不考虑可重入可以直接使用redisTemplate.opsForValue().setIfAbsent(K key, V value, long timeout, TimeUnit unit)String lockLua getLockLua();boolean result executeLua(lockLua, key, value, String.valueOf(expireTime));// 当加锁成功且活跃锁keyvalue不在集合中则添加续期任务if (result !activeLockKeySet.contains(key value)) {// 将活跃锁keyvalue放入集合中activeLockKeySet.add(key value);// 加锁成功添加续期任务resetExpire(key, value, expireTime);}return result;}/** 获取加锁lua脚本/private String getLockLua() {// 封装加锁lua脚本 PS: 这个lua脚本应该是要定义成全局的我这里为了演示定义成局部组装方便介绍每一步流程// lua脚本参数介绍 KEYS[1]传入的key ARGV[1]传入的value ARGV[2]传入的过期时间// 在使用redisTemplate执行lua脚本时会传入key数组和参数数组ListK keys, Object… args在lua脚本中取key值和参数值时使用KEYS和ARGV数组下标从1开始StringBuilder lockLua new StringBuilder();// 通过SETNX命令设置锁如果设置成功则添加一个过期时间并且返回1否则判断是否为重入锁lockLua.append(if redis.call(SETNX, KEYS[1], ARGV[1]) 1 then\n);lockLua.append( redis.call(EXPIRE, KEYS[1], tonumber(ARGV[2]))\n);lockLua.append( return 1\n);lockLua.append(else\n);// 当锁已经存在时判断传入的value是否相等如果相等代表为重入锁返回1并且重置过期时间否则返回0lockLua.append( if redis.call(GET, KEYS[1]) ARGV[1] then\n);lockLua.append( redis.call(EXPIRE, KEYS[1], tonumber(ARGV[2]))\n);lockLua.append( return 1\n);lockLua.append( else\n);lockLua.append( return 0\n);lockLua.append( end\n);lockLua.append(end);return lockLua.toString();}/** 解锁* Param key* Param value 锁的value一般使用线程ID在解锁时需要判断是当前线程才运行删除/public boolean unlock(String key, String value) {// 为了实现避免误删锁这里要自己封装一个lua脚本String unLockLua getUnlockLua();boolean result executeLua(unLockLua, key, value);if (result) {// 将活跃锁keyvalue从集合中删除activeLockKeySet.remove(key value);}return result;}/** 获取解锁lua脚本/private String getUnlockLua() {// 封装解锁lua脚本 PS: 这个lua脚本应该是要定义成全局的我这里为了演示定义成局部组装方便介绍每一步流程// lua脚本参数介绍 KEYS[1]传入的key ARGV[1]传入的value// 在使用redisTemplate执行lua脚本时会传入key数组和参数数组ListK keys, Object… args在lua脚本中取key值和参数值时使用KEYS和ARGV数组下标从1开始StringBuilder unlockLua new StringBuilder();// 判断传入的锁key是否存在如果不存在则直接返回1如果存在则判断传入的value值是否和获取到的value相等unlockLua.append(if redis.call(EXISTS,KEYS[1]) 0 then\n);unlockLua.append( return 1\n);unlockLua.append(else\n);// 判断传入的value值是否和获取到的value相等如果相等则代表是当前线程删除锁执行删除对应key逻辑然后返回1否则返回0unlockLua.append( if redis.call(GET,KEYS[1]) ARGV[1] then\n);unlockLua.append( return redis.call(DEL,KEYS[1])\n);unlockLua.append( else\n);unlockLua.append( return 0\n);unlockLua.append( end\n);unlockLua.append(end);return unlockLua.toString();}/** 封装redisTemplate执行lua脚本返回boolean类型执行器* param scriptText lua脚本* param key 传入数组keys的第一个元素这里就是我们锁key* param args 传入数组args的第一个元素这里就是我们传入的value/private boolean executeLua(String scriptText, String key, Object… args) {// 通过 DefaultRedisScript 来执行 lua脚本DefaultRedisScriptBoolean redisScript new DefaultRedisScript();// Boolean 对应 lua脚本返回的 0 1redisScript.setResultType(Boolean.class);// 指定需要执行的 lua脚本redisScript.setScriptText(scriptText);// 注意 需要提供 ListK keys, Object… args 代表 keys 和 ARGVreturn redisTemplate.execute(redisScript, Collections.singletonList(key), args);}/** 锁续期* Param key* Param value 锁的value一般使用线程ID在解锁时需要使用* Param expireTime 过期时间 单位秒/private void resetExpire(String key, String value, long expireTime) {// 如果keyvalue在集合中不存在则不再进行续期操作if (!activeLockKeySet.contains(key value)) {return;}//设置过期时间推荐设置成过期时间的1/3时间续期一次比如30s过期10s续期一次long delay expireTime 3 ? 1 : expireTime / 3;executorService.schedule(() - {System.out.println(自动续期 keykey valuevalue);// 执行续期操作如果续期成功则再次添加续期任务如果不成功则将不在进行任务续期并且将活跃锁keyvalue从集合中删除if (executeLua(getResetExpireLua(), key, value, String.valueOf(expireTime))) {System.out.println(自动续期成功开启下一轮自动续期);resetExpire(key, value, expireTime);} else {System.out.println(自动续期失败锁key已经删除或不是指定value持有的锁取消自动续期);activeLockKeySet.remove(key value);}}, delay, TimeUnit.SECONDS);}/** 获取锁续期lua脚本/private String getResetExpireLua() {// 封装续期lua脚本 PS: 这个lua脚本应该是要定义成全局的我这里为了演示定义成局部组装方便介绍每一步流程// lua脚本参数介绍 KEYS[1]传入的key ARGV[1]传入的value ARGV[2]传入的过期时间// 在使用redisTemplate执行lua脚本时会传入key数组和参数数组ListK keys, Object… args在lua脚本中取key值和参数值时使用KEYS和ARGV数组下标从1开始StringBuilder resetExpireLua new StringBuilder();// 判断传入的锁key是否存在且获取到的value值是否和传入的value值相等如果相等则重置过期时间然后返回1否则返回0resetExpireLua.append(if redis.call(EXISTS,KEYS[1]) 1 and redis.call(GET,KEYS[1]) ARGV[1] then\n);resetExpireLua.append( redis.call(EXPIRE,KEYS[1],tonumber(ARGV[2]))\n);resetExpireLua.append( return 1\n);resetExpireLua.append(else\n);resetExpireLua.append( return 0\n);resetExpireLua.append(end);return resetExpireLua.toString();} } 4.2、测试分布式锁效果 这个测试类中模拟1000个用户抢10个商品测试是否会出现超卖情况提供了两个方法一个使用分布式锁一个不使用分布式锁当使用分布式锁是不会出现超卖当没有使用分布式时肯定会出现超卖。 import com.redisscene.utils.RedisLockUtils; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.;RunWith(SpringRunner.class) SpringBootTest(classes RedisSceneApplication.class) public class RedisLockTest {Autowiredprivate RedisLockUtils redisLockUtils;// 产品库存lock前缀private final String productLockKeyPrefix PRODUCT_LOCK_KEY:;// 模拟产品库存信息private static MapString, String productMap new HashMap();static {productMap.put(id, P0001);productMap.put(title, 分布式锁);productMap.put(stock, 10);productMap.put(sold, 0);}Testpublic void t1() throws InterruptedException {// 定义一个线程池队列根据需要设置ThreadPoolExecutor executor new ThreadPoolExecutor(100, 100, 10, TimeUnit.SECONDS, new LinkedBlockingQueue(1000));CountDownLatch countDownLatch new CountDownLatch(1000);// 模拟10000个人抢10个商品for (int i 0; i 1000; i) {executor.execute(() - {// 加锁扣减库存deductStockLock();// 不加锁扣减库存 // deductStockNotLock();countDownLatch.countDown();});}countDownLatch.await();executor.shutdown();System.out.println(productMap productMap.toString());}/*** 扣减库存使用分布式锁/private void deductStockLock() {// 通过UUID和线程ID组合成value标识当前线程解锁的时候判断是否是当前线程持有的锁String uuidValue UUID.randomUUID() : Thread.currentThread().getId();// 组装锁keyString lockKey productLockKeyPrefix productMap.get(id);boolean lock false;try {// 获取锁lock redisLockUtils.lock(lockKey, uuidValue, 30);// 测试锁续期效果 这里模拟业务处理时间40s超过Thread.sleep(40000);// 再次获取锁测试可重入效果lock redisLockUtils.lock(lockKey, uuidValue, 30);// 如果没有获取到锁则直接返回if (!lock) {// 这里直接响应失败也可以进行重试return;}System.out.println(获取锁成功 uuidValue uuidValue);// 获取到锁执行业务逻辑处理库存信息假设每个线程每次购买1个商品Integer stock Integer.valueOf(productMap.get(stock));if (stock 0) { // System.out.println(库存不足);return;}// 库存 - 1productMap.put(stock, String.valueOf(Integer.valueOf(productMap.get(stock)) - 1));// 已售 1productMap.put(sold, String.valueOf(Integer.valueOf(productMap.get(sold)) 1));} catch (Exception e) {e.printStackTrace();} finally {if (lock) {// 解锁redisLockUtils.unlock(lockKey, uuidValue);}}}/** 扣减库存不使用分布式锁*/private void deductStockNotLock() {// 判断库存是否足够假设每个线程每次购买1个商品Integer stock Integer.valueOf(productMap.get(stock));if (stock 0) { // System.out.println(库存不足);return;}// 暂停10毫秒方便呈现不加锁超卖效果try {Thread.sleep(10);} catch (InterruptedException e) {throw new RuntimeException(e);}// 库存 - 1productMap.put(stock, String.valueOf(Integer.valueOf(productMap.get(stock)) - 1));// 已售 1productMap.put(sold, String.valueOf(Integer.valueOf(productMap.get(sold)) 1));} }五、分布式锁常见问题以及解决方法 在章节四中的分布式锁实现工具类中已经将下面的问题都解决并且注释的也比较详细。 5.1、分布式锁死锁问题 5.1.1、逻辑说明 使用分布式锁最害怕的问题就是出现死锁自己业务上写出的死锁这里不做说明这里只介绍异常情况出现死锁应该如何解决在业务异常没有处理好或者应用服务宕机没有解锁就会出现死锁问题锁key一直存储在Redis中不会被释放后续业务恢复去获取锁时因为锁已经存在一直都无法获取到锁这就是死锁问题但是死锁问题很好解决只要给锁key加上一个过期时间即可。 5.1.2、解决方案 代码中使用RedisTemplate添加过期时间 // setIfAbsent方法等同于setnx当这个key不存在时插入成功返回true当key存在时不做处理返回false boolean lock redisTemplate.opsForValue().setIfAbsent(key1, value1); // 加锁成功设置一个30s过期时间 if(lock){redisTemplate.expire(key,30,TimeUnit.SECONDS); }存在问题 获取锁和添加过期时间是两步操作并没有原子性在并发操作时会存在问题下面会通过lua脚本来解决原子性问题。 5.2、分布式锁原子性问题 5.2.1、逻辑说明 在5.1中给锁key设置超时时间解决了死锁问题但是因为是分为两个步骤操作需要分别调用Redis不具备原子性要保证原子性只要保证将两个步骤合并成一个Redis调用即可核心思想是通过lua脚本来实现可以直接通过RedisTemplate操作Redis执行lua脚本Redis执行lua脚本也是单线程的所以可以保证原子性。 5.2.2、解决方案 1、使用lua脚本实现加锁并且添加过期时间 – KEYS[1]传入的key ARGV[1]传入的value ARGV[2]传入的过期时间 – 使用setnx插入key如果成功给key设置一个过期时间然后返回1如果失败直接返回0 if redis.call(SETNX, KEYS[1], ARGV[1]) 1 thenredis.call(EXPIRE, KEYS[1], tonumber(ARGV[2]))return 1 elsereturn 0 end2、使用RedisTemplate的 setIfAbsent方法setIfAbsent方法等同于setnx而且这个方法还实现了原子性给key添加过期时间操作具体实现和我们上面lua脚本类似 // 当这个key不存在时插入成功并且设置一个超时时间然后返回true当key存在时不做处理返回false boolean lock redisTemplate.opsForValue().setIfAbsent(key1, value1, 30, TimeUnit.SECONDS);5.3、分布式锁可重入问题 5.3.1、逻辑说明 可重入锁又名递归锁是指在同一个线程在外层方法获取锁的时候再进入该线程的内层方法会自动获取锁(前提锁对象得是同一个对象)不会因为之前已经获取过还没释放而阻塞。如果是1个有 synchronized 修饰的递归调用方法程序第2次进入被自己阻塞了就很麻烦。所以Java中ReentrantLock和synchronized都是可重入锁可重入锁的一个优点是可一定程度避免死锁分布式锁实现可重入也比较简单只要在保证原子性的lua脚本中在判断value值即可。 5.3.2、解决方案 lua脚本实现 – KEYS[1]传入的key ARGV[1]传入的value ARGV[2]传入的过期时间 – 通过SETNX命令设置锁如果设置成功则添加一个过期时间并且返回1否则判断是否为重入锁 if redis.call(SETNX, KEYS[1], ARGV[1]) 1 thenredis.call(EXPIRE, KEYS[1], tonumber(ARGV[2]))return 1 else– 当锁已经存在时判断传入的value是否相等如果相等代表为重入锁返回1并且重置过期时间否则返回0if redis.call(GET, KEYS[1]) ARGV[1] thenredis.call(EXPIRE, KEYS[1], tonumber(ARGV[2]))return 1elsereturn 0end end5.4、分布式锁如何防止误删 5.4.1、逻辑说明 持有锁的线程在锁的内部出现了阻塞导致他的锁自动释放这时其他线程线程2来尝试获得锁就拿到了这把锁然后线程2在持有锁执行过程中线程1反应过来继续执行而线程1执行过程中走到了删除锁逻辑此时就会把本应该属于线程2的锁进行删除这就是误删要想解决误删只需要判断一下这把锁是否属于自己只能删除属于自己的锁这里会将一个UUID 线程ID 作为锁的 value在删除锁的时候判断value值是否相同即可。 5.4.2、解决方案 lua脚本实现 – KEYS[1]传入的key ARGV[1]传入的value – 判断传入的锁key是否存在如果不存在则直接返回1如果存在则判断传入的value值是否和获取到的value相等 if redis.call(EXISTS,KEYS[1]) 0 thenreturn 1 else– 判断传入的value值是否和获取到的value相等如果相等则代表是当前线程删除锁执行删除对应key逻辑然后返回1否则返回0if redis.call(GET,KEYS[1]) ARGV[1] thenreturn redis.call(DEL,KEYS[1])elsereturn 0end end5.5、分布式锁自动续期问题 5.5.1、逻辑说明 假设我们给某个业务分布式锁设置了30s的过期时间但是这个业务要执行40s在30s后锁过期其它线程也可以获取到这把锁但是前一个线程还没有执行完毕这样显然是有问题的要对还没有执行完的业务进行锁的自动续期操作。 5.5.2、解决方案 我这里采用的是定时线程池ScheduledExecutorService来实现在加锁时同步给定时线程池添加一个定时任务定时时间一般设置为过期时间的1/3其中还有考虑重入问题会使用一个set集合存储keyvalue的组合值每个keyvalue只能被添加一次续期方法在 4.1、分布式锁实现工具类的resetExpire方法中有详细描述。