最近项目中使用到 kafka ,所以来记录一下
kafka 的作用,这里不做介绍,烦请自行百度。
简单介绍一下我们项目使用的目的:项目模拟交易所,进行证券之类的交易,在撮合交易中:添加委托,更新委托,添加成交,添加或者更新持仓,会频繁进行数据库操作。防止在频繁操作数据库的过程中,数据库处理不完,导致报错,然后抛出异常,数据丢失的问题。也考虑到项目以后会使用 kafka 作为总线,进行数据交互,所以在此阶段,db 操作直接使用 kafka,以后稍作改动即可。
kafka 的安装和部署,以及 demo,参考:
总体思想:
1.在消息发送过来进行数据库操作的时候,我们不进行数据库操作,而是使用 kafka 发送消息到 kafka
2.kafka 消费者,消费到消息之后,进行具体的数据库操作,插入或者更新数据库,如果出错,目前是打印日志,进行记录
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>0.11.0.1</version></dependency>
##produce
bootstrap.servers=10.20.135.20:9092
pe=sync
quired.acks=1
serializer.class=kafka.serializer.DefaultEncoder
key.serializer=org.apache.kafkamon.serialization.StringSerializer
value.serializer=org.apache.kafkamon.serialization.StringSerializer
bak.partitioner.class=kafka.producer.DefaultPartitioner
bak.key.serializer=org.apache.kafkamon.serialization.StringSerializer
bak.value.serializer=org.apache.kafkamon.serialization.StringSerializer##consume
t=10.20.135.20:2181
group.id=test-consumer-group
zookeeper.session.timeout.ms=4000
zookeeper.sync.time.ms=200
#enable.automit=false
automit.interval.ms=1000
set=smallest
serializer.class=kafka.serializer.StringEncoder# kafka消息配置信息
pic=test
sumer.key.989847=989847
sumer.key.989848=989848
sumer.key.989849=989849
sumer.key.989850=989850
加载信息的工具类:
import java.io.File;
import java.io.FileInputStream;
import java.util.Properties;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 加载 配置的 kafka.properties 文件**/
public class ReadKafkaPropertiesUtil {/*** 日志*/private static Logger LOGGER = Logger(ReadKafkaPropertiesUtil.class);/*** 属性*/private static Properties properties;/*** 读取kafka.properties*/static {// kafka.properties路径LOGGER.debug(" read kafka.properties ");properties = new Properties();String path = Resource("/").getFile().toString() + "kafka.properties";LOGGER.debug(" read kafka.properties path:" + path);try {FileInputStream fis = new FileInputStream(new File(path));properties.load(fis);} catch (Exception e) {(" Kafka Produce init kafka properties " + e);}}/*** 获取kafka的配置信息* * @return*/public static Properties getProperties() {return properties;}/*** 获取kafka的topic* * * @return*/public static String getTopic() {Property(pic");}/*** 获取kafka的sumer.key.989847* * @return*/public static String getKey989847() {Property(sumer.key.989847");}/*** 获取kafka的sumer.key.989848* * @return*/public static String getKey989848() {Property(sumer.key.989848");}/*** 获取kafka的sumer.key.989849* * @return*/public static String getKey989849() {Property(sumer.key.989849");}/*** 获取kafka的sumer.key.989850* * @return*/public static String getKey989850() {Property(sumer.key.989850");}/*** 私有构造函数*/private ReadKafkaPropertiesUtil() {}}
import java.util.Properties;import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.alibaba.fastjson.JSON;
import com.ption.TengException;/*** kafka生产者**/
public class KafkaProduce {/*** 日志*/private static Logger LOGGER = Logger(KafkaProduce.class);private static final String SEND_MESSAGE_FAILED_NUM = "12000002";private static final String SEND_MESSAGE_FAILED_MESSAGE = " send message to kafka error :";/*** 发送消息* * @param topic* @param key* @param value*/public static void sendMsg(String topic, String key, String value) {Properties properties = Properties();// 实例化produceKafkaProducer<String, String> kp = new KafkaProducer<String, String>(properties);// 消息封装ProducerRecord<String, String> pr = new ProducerRecord<String, String>(topic, key, value);// 发送数据kp.send(pr, new Callback() {// 回调函数@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (null != exception) {(" Kafka Produce send message error " + exception);(" Kafka Produce send message info: metadata: " + JSONString(metadata));throw new TengException(SEND_MESSAGE_FAILED_NUM, SEND_MESSAGE_FAILED_MESSAGE + Message());}}});// 关闭producekp.close();}
}
<!-- ContextLoaderListener 监听器要放在 kafka 监听器之前进行加载因为 kafka 的监听器中使用 ServletContextEvent 进行 dao 类的加载监听器启用的使用,spring 还没有开始加载,所以认识不了 @Service 等注解需要手动使用 ServletContextEvent 进行后续一些类的初始化--><listener><listener-class>org.t.ContextLoaderListener</listener-class></listener><!-- kafka --><listener><listener-class>com.hundsun.cloudtrade.match.kafka.KafkaConsumeLinstener</listener-class></listener>
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 消费者**/
public class KafkaConsumeLinstener implements ServletContextListener {/*** 日志*/private static Logger LOGGER = Logger(KafkaConsumeLinstener.class);@Overridepublic void contextInitialized(ServletContextEvent sce) {LOGGER.debug(" init kafka ");Thread t = new Thread(new KafkaConsumeRunnable(sce));t.start();LOGGER.debug(" init kafka consume thread end ");}@Overridepublic void contextDestroyed(ServletContextEvent sce) {// TODO Auto-generated method stub}}
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;import javax.servlet.ServletContextEvent;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.t.support.WebApplicationContextUtils;import com.alibaba.fastjson.JSON;
import com.hundsun.cloudtrade.match.dao.IDayEntrustDao;
import com.hundsun.cloudtrade.match.dao.IDayHoldDao;
import com.hundsun.cloudtrade.match.dao.IDayTransactionDao;
import com.hundsun.ftenantmon.kafka.ReadKafkaPropertiesUtil;sumer.ConsumerConfig;
sumer.ConsumerIterator;
sumer.KafkaStream;
import sumer.ConsumerConnector;
ssage.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;/*** kafka 线程类**/
public class KafkaConsumeRunnable implements Runnable {/*** 日志*/private static Logger LOGGER = Logger(KafkaConsumeRunnable.class);// 委托private final IDayEntrustDao aIDayEntrustDao;// 成交private final IDayTransactionDao aIDayTransactionDao;// 持仓private final IDayHoldDao aIDayHoldDao;/*** kafka消费信息接口*/private final IKafkaDataConsumer kafkaDataConsumer;/*** spring未加载,来手动加载后续需要使用到的dao类* * @param sce*/public KafkaConsumeRunnable(ServletContextEvent sce) {LOGGER.debug(" kafka consumer init dao class ");aIDayHoldDao = ServletContext()).getBean(IDayHoldDao.class);aIDayEntrustDao = ServletContext()).getBean(IDayEntrustDao.class);aIDayTransactionDao = ServletContext()).getBean(IDayTransactionDao.class);kafkaDataConsumer = new KafkaDataConsumer(aIDayHoldDao, aIDayEntrustDao, aIDayTransactionDao);}/** 读取kafka消息* */@Overridepublic void run() {// kafka配置属性获取Properties properties = Properties();// kafka配置属性获取topicString TOPIC = Topic();LOGGER.info(" kafka consumer topic : " + TOPIC);LOGGER.info(" kafka consumer properties : " + JSONString(properties));ConsumerConfig config = new ConsumerConfig(properties);Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(TOPIC, new Integer(1));StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());ConsumerConnector consumer = ateJavaConsumerConnector(config);Map<String, List<KafkaStream<String, String>>> consumerMap = ateMessageStreams(topicCountMap, keyDecoder, valueDecoder);KafkaStream<String, String> stream = (TOPIC).get(0);ConsumerIterator<String, String> it = stream.iterator();while (it.hasNext()) {// kafka获取到的数据MessageAndMetadata<String, String> keyVlaue = it.next();LOGGER.debug(" kafka get message , key : " + keyVlaue.key() + " ; value : " + ssage());// 处理kafka数据kafkaDataConsumer.dealKafkaMessage(keyVlaue.key(), ssage());}}}
/*** kafka 消息处理接口**/
public interface IKafkaDataConsumer {/*** kafka 消息的处理方法* * @param key* @param message*/public void dealKafkaMessage(String key, String message);}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.alibaba.fastjson.JSON;
import com.hundsun.cloudtrade.match.dao.IDayEntrustDao;
import com.hundsun.cloudtrade.match.dao.IDayHoldDao;
import com.hundsun.cloudtrade.match.dao.IDayTransactionDao;
import com.hundsun.cloudtrade.match.domain.DayEntrustDomain;
import com.hundsun.cloudtrade.match.domain.DayHoldDomain;
import com.hundsun.cloudtrade.match.domain.DayTransactionDomain;
import com.hundsun.ftenantmon.kafka.ReadKafkaPropertiesUtil;/*** kafka具体的消息处理**/
public class KafkaDataConsumer implements IKafkaDataConsumer {/*** 日志*/private static Logger LOGGER = Logger(KafkaDataConsumer.class);// 委托private final IDayEntrustDao aIDayEntrustDao;// 成交private final IDayTransactionDao aIDayTransactionDao;// 持仓private final IDayHoldDao aIDayHoldDao;/*** @param aIDayHoldDao* @param aIDayEntrustDao2* @param aIDayTransactionDao*/public KafkaDataConsumer(IDayHoldDao aIDayHoldDao, IDayEntrustDao aIDayEntrustDao, IDayTransactionDao aIDayTransactionDao) {this.aIDayEntrustDao = aIDayEntrustDao;this.aIDayTransactionDao = aIDayTransactionDao;this.aIDayHoldDao = aIDayHoldDao;}/** 处理数据* * @see com.hundsun.ftenantmon.kafka.IKafkaDataConsumer#dealKafkaMessage(java.lang.String, java.lang.String)*/@Overridepublic void dealKafkaMessage(String key, String value) {LOGGER.debug(" kafka get message , key : " + key + " ; value : " + value);// 记录数据库操作是否成功int result = 0;try {if (Key989847().equals(key)) {// 添加委托LOGGER.debug(" kafka 989847 ");DayEntrustDomain domain = JSON.parseObject(value, DayEntrustDomain.class);result = aIDayEntrustDao.insertOne(domain);} else if (Key989848().equals(key)) {// 更新委托LOGGER.debug(" kafka 989848 ");DayEntrustDomain domain = JSON.parseObject(value, DayEntrustDomain.class);result = aIDayEntrustDao.updateOne(domain);} else if (Key989849().equals(key)) {// 添加成交LOGGER.debug(" kafka 989849 ");DayTransactionDomain domain = JSON.parseObject(value, DayTransactionDomain.class);result = aIDayTransactionDao.insertOne(domain);} else if (Key989850().equals(key)) {// 添加或者更新持仓.LOGGER.debug(" kafka 989850 ");DayHoldDomain domain = JSON.parseObject(value, DayHoldDomain.class);result = aIDayHoldDao.addOrUpdateOne_addAmount(domain);}} catch (Exception e) {(" insert or update db error. key: " + key + "; value:" + value);(" kafka deal data error " + e);}LOGGER.debug(" kafka insert or update database result : " + result);}}
以上,即为全部内容,欢迎留言交流讨论
##produce
bootstrap.servers=10.20.135.20:9092
pe=sync
quired.acks=1
serializer.class=kafka.serializer.DefaultEncoder
key.serializer=org.apache.kafkamon.serialization.StringSerializer
value.serializer=org.apache.kafkamon.serialization.StringSerializer
bak.partitioner.class=kafka.producer.DefaultPartitioner
bak.key.serializer=org.apache.kafkamon.serialization.StringSerializer
bak.value.serializer=org.apache.kafkamon.serialization.StringSerializer##consume
t=10.20.135.20:2181
group.id=test-consumer-group
zookeeper.session.timeout.ms=4000
zookeeper.sync.time.ms=200
#enable.automit=false
automit.interval.ms=1000
set=smallest
serializer.class=kafka.serializer.StringEncoder# kafka消息配置信息
pic=test
sumer.key.989847=989847
sumer.key.989848=989848
sumer.key.989849=989849
sumer.key.989850=989850
加载信息的工具类:
import java.io.File;
import java.io.FileInputStream;
import java.util.Properties;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 加载 配置的 kafka.properties 文件**/
public class ReadKafkaPropertiesUtil {/*** 日志*/private static Logger LOGGER = Logger(ReadKafkaPropertiesUtil.class);/*** 属性*/private static Properties properties;/*** 读取kafka.properties*/static {// kafka.properties路径LOGGER.debug(" read kafka.properties ");properties = new Properties();String path = Resource("/").getFile().toString() + "kafka.properties";LOGGER.debug(" read kafka.properties path:" + path);try {FileInputStream fis = new FileInputStream(new File(path));properties.load(fis);} catch (Exception e) {(" Kafka Produce init kafka properties " + e);}}/*** 获取kafka的配置信息* * @return*/public static Properties getProperties() {return properties;}/*** 获取kafka的topic* * * @return*/public static String getTopic() {Property(pic");}/*** 获取kafka的sumer.key.989847* * @return*/public static String getKey989847() {Property(sumer.key.989847");}/*** 获取kafka的sumer.key.989848* * @return*/public static String getKey989848() {Property(sumer.key.989848");}/*** 获取kafka的sumer.key.989849* * @return*/public static String getKey989849() {Property(sumer.key.989849");}/*** 获取kafka的sumer.key.989850* * @return*/public static String getKey989850() {Property(sumer.key.989850");}/*** 私有构造函数*/private ReadKafkaPropertiesUtil() {}}
本文发布于:2024-02-01 00:46:58,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170671961932610.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |