1、redis.properties
##redisIP地址 #redis.host=10.14.2.212 redis.host=127.0.0.1 ##redis默认端口号 redis.port=6379 #redis密码 redis.pass=a7217sec!@###redis.database=0 ##指定使用第几个库redis.maxIdle=300 redis.maxActive=600 redis.maxWait=1000 stOnBorrow=true redis.timeout=1000redis.pool.maxActive=200 redis.pool.maxIdle=50 redis.pool.minIdle=0 redis.pool.maxWait=15000 ##向调用者输出链接资源时,是否检测是否有效,如果无效则从连接池中移除,尝试继续获取,默认为false,建议保留默认值 stOnBorrow=false ##向连接池“归还”链接时,是否检测“链接”对象的有效性。默认为false。建议保持默认值. stOnReturn=false ##maxActive: 链接池中最大连接数,默认为8. ##maxIdle: 链接池中最大空闲的连接数,默认为8. ##minIdle: 连接池中最少空闲的连接数,默认为0. ##maxWait: 当连接池资源耗尽时,调用者最大阻塞的时间,超时将跑出异常。单位,毫秒数;默认为-1.表示永不超时. ##minEvictableIdleTimeMillis: 连接空闲的最小时间,达到此值后空闲连接将可能会被移除。负值(-1)表示不移除。 ##softMinEvictableIdleTimeMillis: 连接空闲的最小时间,达到此值后空闲链接将会被移除,且保留“minIdle”个空闲连接数。默认为-1. ##numTestsPerEvictionRun: 对于“空闲链接”检测线程而言,每次检测的链接资源的个数。默认为3. ##testOnBorrow: 向调用者输出“链接”资源时,是否检测是有有效,如果无效则从连接池中移除,并尝试获取继续获取。默认为false。建议保持默认值. ##testOnReturn: 向连接池“归还”链接时,是否检测“链接”对象的有效性。默认为false。建议保持默认值. ##testWhileIdle:false 向调用者输出“链接”对象时,是否检测它的空闲超时;默认为false。如果“链接”空闲超时,将会被移除。建议保持默认值. ##timeBetweenEvictionRunsMillis: “空闲链接”检测线程,检测的周期,毫秒数。如果为负值,表示不运行“检测线程”。默认为-1. ##whenExhaustedAction: 当“连接池”中active数量达到阀值时,即“链接”资源耗尽时,连接池需要采取的手段, 默认为1: ## -> 0 : 抛出异常, ## -> 1 : 阻塞,直到有可用链接资源 ## -> 2 : 强制创建新的链接资源
2、l
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns=""xmlns:xsi=""xmlns:context=""xmlns:p=""xsi:schemaLocation=" .0.xsd .0.xsd"><context:annotation-config /><!-- 引入外部属性文件. --><context:property-placeholder location="classpath*:redis.properties" ignore-unresolvable="true" /><!-- 注册 --><context:component-scan base-package="com.dis.demo01"/><bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig"><!--<property name="maxTotal" value="${redis.pool.maxActive}" />--><!--最大空闲连接数 --><property name="maxIdle" value="${redis.pool.maxIdle}" /><!--初始化连接数 --><property name="minIdle" value="${redis.pool.minIdle}" /><!--最大等待时间 --><property name="maxWaitMillis" value="${redis.pool.maxWait}" /><!--对拿到的connection进行validateObject校验 --><property name="testOnBorrow" value="${stOnBorrow}" /><!--在进行returnObject对返回的connection进行validateObject校验 --><property name="testOnReturn" value="${stOnReturn}" /><!--定时对线程池中空闲的链接进行validateObject校验 --><property name="testWhileIdle" value="false" /></bean><bean id="connectionFactory" class="org.tion.jedis.JedisConnectionFactory"p:host-name="${redis.host}" p:port="${redis.port}" p:password="mxb123" p:pool-config-ref="poolConfig"/><bean id="redisTemplate" class="org.StringRedisTemplate"><property name="connectionFactory" ref="connectionFactory" /><property name="keySerializer"><bean class="org.dis.serializer.StringRedisSerializer" /></property><property name="valueSerializer">
<!-- 采用jdk序列 --><bean class="org.dis.serializer.JdkSerializationRedisSerializer" /></property></bean><!-- 发送邮件的队列,这边只是个例子 --><bean id="sentEmailQueue" class="com.dis.demo01.SendEmailQueue"><property name="redisQueue" ><bean class="com.dis.demo01.RedisQueue" destroy-method="destroy"><property name="redisTemplate" ref="redisTemplate"></property><property name="queueName" value="email"></property></bean></property></bean></beans>
3、IRedisQueue.java
package com.dis.demo01;import urrent.TimeUnit;/*** @author xiaomei* @version V1.0* @Title: IRedisQueue* @Package com.dis.demo01* @Description:* @date 11/7/17*/ public interface IRedisQueue<T> {/*** 从头开始拿* 拿出,没有就等待* @return* @throws InterruptedException*/public T take() throws InterruptedException;/*** 从尾开始拿* 拿出,没有就等待* @return* @throws InterruptedException*/public T takeOpposite() throws InterruptedException;/*** 从头开始拿* 拿出,没有就等待 seconds 秒* @param seconds* @return* @throws InterruptedException*/public T poll(int seconds) throws InterruptedException;/*** 从头开始拿* 拿出,没有就等待 seconds 秒* @param seconds* @return* @throws InterruptedException*/public T pollOpposite(int seconds) throws InterruptedException;/*** 从尾开始放* 入队* @param value* @return* @throws InterruptedException*/public void add(T value);/*** 从头开始放* 入队* @param value* @return* @throws InterruptedException*/public void addOpposite(T value);// // /** // * 从头开始删除 // * @return // */ // public T remove(); // // // /** // * 从尾开始删除 // * @return // */ // public T removeOpposite(); // /*** 删除所有*/public void clearAll();}
4、RedisQueue.java
package com.dis.demo01;import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.tion.RedisConnection; import org.tion.RedisConnectionFactory; import org.BoundListOperations; import org.RedisConnectionUtils; import org.RedisTemplate; import org.springframework.util.CollectionUtils;import java.util.List; import urrent.locks.Lock; import urrent.locks.ReentrantLock;/*** @author xiaomei* @version V1.0* @Title: RedisQueue* @Package com.dis.demo01* @Description:* @date 11/7/17*/ public class RedisQueue<T> implements InitializingBean,DisposableBean,IRedisQueue<T> {//队列名称public String queueName;//原生keyprivate byte[] rawKey;private RedisTemplate redisTemplate;private RedisConnectionFactory factory;private RedisConnection connection; //为了堵塞private BoundListOperations<String, T> listOperations; private Lock lock = new ReentrantLock();//基于底层IO阻塞考虑 如果分布式的话,就是用分式式的锁 @Overridepublic T take() throws InterruptedException {return poll(0);}@Overridepublic T takeOpposite() throws InterruptedException {return pollOpposite(0);}@Overridepublic T poll(int seconds) throws InterruptedException {lock.lockInterruptibly();try{List<byte[]> results = connection.bRPop(seconds, rawKey);if(CollectionUtils.isEmpty(results)){return null;}return (ValueSerializer().(1));}finally{lock.unlock();}}@Overridepublic T pollOpposite(int seconds) throws InterruptedException {lock.lockInterruptibly();try{List<byte[]> results = connection.bLPop(seconds, rawKey);if(CollectionUtils.isEmpty(results)){return null;}return (ValueSerializer().(1));}finally{lock.unlock();}}@Overridepublic void add(T value) {listOperations.rightPush(value);}@Overridepublic void addOpposite(T value) {listOperations.leftPush(value);}@Overridepublic void clearAll() { // listOperations. }@Overridepublic void afterPropertiesSet() throws Exception {factory = ConnectionFactory();connection = Connection(factory);rawKey = KeySerializer().serialize(queueName);listOperations = redisTemplate.boundListOps(queueName);}@Overridepublic void destroy() throws Exception { leaseConnection(connection, factory);}public void setQueueName(String queueName) {this.queueName = queueName;}public void setRedisTemplate(RedisTemplate redisTemplate) {disTemplate = redisTemplate;}}
5、SendEmailQueue
package com.dis.demo01;import com.ity.EmailEntity; import org.springframework.beans.factory.annotation.Autowired; import t.support.ClassPathXmlApplicationContext; import org.RedisTemplate;/*** @author xiaomei* @version V1.0* @Title: SendEmailQueue* @Package com.dis.demo01* @Description:* @date 11/7/17*/ public class SendEmailQueue implements Runnable{//@Autowiredprivate IRedisQueue<EmailEntity> redisQueue;public IRedisQueue<EmailEntity> getRedisQueue() {return redisQueue;}public void setRedisQueue(IRedisQueue<EmailEntity> redisQueue) {disQueue = redisQueue;}public void sentEmail(EmailEntity emailEntity) {redisQueue.add(emailEntity);}public EmailEntity getEmail() throws InterruptedException {return redisQueue.poll(1);}@Overridepublic void run() {try {while (true){System.out.println("take"+getEmail());}} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) {ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(l");SendEmailQueue sendEmailQueue = (SendEmailQueue) Bean("sentEmailQueue"); // Thread thread = new Thread(){ // @Override // public void run() { // for (int i = 0 ; i <100 ; i++){ // EmailEntity emailEntity = new EmailEntity(); // emailEntity.setEmailAddr(i+"-- - @qq"); // emailEntity.setUserName("name:"+i); // sendEmailQueue.sentEmail(emailEntity); // } // } // }; // thread.run(); sendEmailQueue.run();} }
6、EmailEntity.java 注意必须 实现Serializeable 不然不能序列化
package com.ity;import java.io.Serializable;/*** @author 梅谢兵* @version V1.0* @Title: EmailEntity* @Package com.ity* @Description:* @date 11/7/17*/ public class EmailEntity implements Serializable {private String userName;private String emailAddr;public String getUserName() {return userName;}public void setUserName(String userName) {this.userName = userName;}public String getEmailAddr() {return emailAddr;}public void setEmailAddr(String emailAddr) {ailAddr = emailAddr;}@Overridepublic String toString() {return "EmailEntity{" +"userName='" + userName + ''' +", emailAddr='" + emailAddr + ''' +'}';} }
采用的是生产者和消费者的模式。
最后加上一直图
转载于:.html
本文发布于:2024-01-28 09:47:49,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/17064064736549.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |