著录数据打包

阅读: 评论:0

著录数据打包

著录数据打包

使用 MyBatis 连接 Oracle ,然后向 MongoDB 中写入。

数据量 1.3 亿,每分钟 7 - 10 万。

SyncPacker_201603.java

package syncPacker;import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.UnknownHostException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import org.ssage.BasicNameValuePair;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;db.BasicDBObject;
db.DB;
db.DBCollection;
db.Mongo;
import com.pro.framework.action.BaseController;import logCenter.SendLog;
import net.sf.json.JSONObject;
import syncPacker.bean.PatentBibliographicChangeBean;
import syncPacker.bean.SyncDataPackageBean;
import utils.DateUtils;
import utils.DatetimeUtils;
import utils.HttpUtils;/*** 增量数据分块打包 全处理* * localhost:8080/PatentSearchExtend/syncPacker!pack.action* * :8080/PatentSearchExtend/syncPacker!pack.action?bean.* tableName=E_BIBLIOGRAPHIC_CHANGE_TEMP&bean.maxRowsPerSyncPackerPackage= 10000*/
@Service
public class SyncPacker_201603 extends BaseController {private static final long serialVersionUID = 1L;// 初始化:数据库接口
    @Autowiredprivate SyncPackerDao dao;// 初始化:发送端地址@Value("${url_syncSender}")private String url_syncSender;// 初始化:本地文件存储路径@Value("${path_syncPacker_package}")private String path_syncPacker_package;// 初始化:读取最大数据包名称的地址@Value("${url_selectMaxPackageNumber}")private String url_selectMaxPackageNumber;// 初始化:存储最大数据包名称的地址@Value("${url_insertSyncDataPackage}")private String url_insertSyncDataPackage;// 初始化:查询条件Beanprivate SyncDataPackageBean bean = new SyncDataPackageBean();// 初始化:查询结果Listprivate List<PatentBibliographicChangeBean> pbcList = new ArrayList<PatentBibliographicChangeBean>();// 初始化:形成的数据包名称private List<PatentBibliographicChangeBean> pbcList_withPackageName = new ArrayList<PatentBibliographicChangeBean>();// 初始化:已打包的增量数据ID表单private List<String> pdaIdList = new ArrayList<String>();// 初始化:用于删除数据的临时ID Listprivate List<String> idPartList = new ArrayList<String>();// 初始化:传输协议private HttpUtils httpUtils = new HttpUtils();// 初始化:键值串private List<BasicNameValuePair> paramList = new ArrayList<BasicNameValuePair>();// POST 结果private String str_postResult;// 初始化:发送url返回的信息private JSONObject json_postResult;// 初始化:历史最大包编号private String maxPackageNumber;// 初始化:记录打包完成后的数据版本private Integer centerNodeDataVersion;// 发送远程日志private SendLog sendLog = new SendLog();// 记录本地日志private Logger logger = Logger(SyncPacker_201603.class);// 初始化:判断程序是否正在运行public static boolean isRunning = false;// 本次处理完成后的最大包编号private String packedPackageNumber;// 用于返回json的成功信息private String success = "success";/** 文件补发用:指定数据重新打包 */// localhost:8080/PatentSearchExtend/syncPacker!packByPackageNumber.action?bean.packageNumberStart=000101&bean.packageNumberEnd=000102public String packByPackageNumber() throws Exception {logMemory("本次请求处理开始。", PackageNumberStart() + " - " + PackageNumberEnd() );String job_start = CurrentTimeString(0);try {for ( int i = Integer.PackageNumberStart()); i <= Integer.PackageNumberEnd()); i++) {                // 开始时间String package_start = CurrentTimeString(0);// 包编号String packageNumber = String.format( "%06d", i );logMemory("开始,包编号:", packageNumber );//(1)读历史表pbcList = selectList_changeHistory( packageNumber ); if( null == pbcList ) {logMemory("<span style="color:red;">数据为空 !!!</span>", "");} else{logMemory("数据查询完毕,数据量为", String.valueOf( pbcList.size() ));//(2)插入MongoDB
                    insertMongoDB( pbcList );pbcList.clear();};logMemory("传输结束,包编号:" + packageNumber ," 用时: "+ DistanceTimes_string(package_start, CurrentTimeString(0)) );}} catch (Exception e) {// 日志输出logMemory("系统发生异常", e.getMessage());e.printStackTrace();}logMemory("本次请求处理完成,程序结束。", PackageNumberStart() + " - " + PackageNumberEnd()+ " 用时: " + DistanceTimes_string( job_start, CurrentTimeString(0)) );return SUCCESS;}/*** 读历史表* @param packageNumber* @return*/private List<PatentBibliographicChangeBean> selectList_changeHistory( String packageNumber ){for ( int i = 0; i < 100; i++ ) {try {return dao.selectList_changeHistory( packageNumber );                } catch (Exception e) {// TODO Auto-generated catch block
//                e.printStackTrace();logMemory("系统发生异常", e.getMessage());try {Thread.sleep(500);logMemory("暂停 0.5 秒", "" );} catch (InterruptedException e1) {// TODO Auto-generated catch block
                    e1.printStackTrace();}} }// loop endreturn null ;}/*** 插入 MongoDB*/public void insertMongoDB( List<PatentBibliographicChangeBean> pbcList ) {    //# MongoDB(数据加载目标)String syncLoadIntoMongoDbService = "10.78.2.23:27017";String syncLoadIntoMongoDbName = "patent_search_extend";String syncLoadIntoMongoTable = "patent_bibliographic_20160319";// 加载开始
//        logger.Now() + " Load start: " + syncLoadIntoMongoDbService );
Mongo m = null;try {m = new Mongo( syncLoadIntoMongoDbService );} catch (UnknownHostException e) {e.printStackTrace();logger.Now() + " UnknownHostException:" + e.getMessage());}// 库名DB db = m.getDB( syncLoadIntoMongoDbName );
//        logger.info( Now() + " Db:" + syncLoadIntoMongoDbName );// 表名DBCollection collection = db.getCollection( syncLoadIntoMongoTable );
//        logger.Now() + " Table:" + syncLoadIntoMongoTable );// 循环列表,将每个元素插入数据库for( PatentBibliographicChangeBean pbcBean : pbcList ){        //(1)读取一条著录数据
//            JSONObject json = packByPackageNumber_readBibl( Id() );//            if( null == json ) return ;// 列,值BasicDBObject insDoc = new BasicDBObject();insDoc.put("abstract_No"    , Abstract_No()    );insDoc.put("app_Addr"       , App_Addr()       );insDoc.put("app_Cn"         , App_Cn()         );insDoc.put("app_Country"    , App_Country()    );insDoc.put("app_Date"       , App_Date()       );insDoc.put("app_Name"       , App_Name()       );insDoc.put("app_Sn"         , App_Sn()         );insDoc.put("app_Type"       , App_Type()       );insDoc.put("app_Zip"        , App_Zip()        );insDoc.put("ecla"           , Ecla()           );insDoc.put("fi"             , Fi()             );insDoc.put("ft"             , Ft()             );insDoc.put("id"             , Id()             );insDoc.put("inv_Title"      , Inv_Title()      );insDoc.put("invent_Type"    , Invent_Type()    );insDoc.put("inventor"       , Inventor()       );insDoc.put("ipc_Standard"   , Ipc_Standard()   );insDoc.put("locarno"        , Locarno()        );insDoc.put("operation_Time" , Operation_Time() );insDoc.put("operation_Type" , Operation_Type() );insDoc.put("package_Name"   , Package_Name()   );insDoc.put("pct_App_Cn"     , Pct_App_Cn()     );insDoc.put("pct_App_Date"   , Pct_App_Date()   );insDoc.put("pct_App_Sn"     , Pct_App_Sn()     );insDoc.put("pct_Date"       , Pct_Date()       );insDoc.put("pct_Pub_Cn"     , Pct_Pub_Cn()     );insDoc.put("pct_Pub_Date"   , Pct_Pub_Date()   );insDoc.put("pct_Pub_Lang"   , Pct_Pub_Lang()   );insDoc.put("pct_Pub_Sn"     , Pct_Pub_Sn()     );insDoc.put("prn"            , Prn()            );insDoc.put("prn_Cn"         , Prn_Cn()         );insDoc.put("prn_Date"       , Prn_Date()       );insDoc.put("prn_Sn"         , Prn_Sn()         );insDoc.put("prn_Type"       , Prn_Type()       );insDoc.put("pub_Cn"         , Pub_Cn()         );insDoc.put("pub_Date"       , Pub_Date()       );insDoc.put("pub_Sn"         , Pub_Sn()         );insDoc.put("pub_Type"       , Pub_Type()       );insDoc.put("uc"             , Uc()             );collection.insert(insDoc);insDoc.clear();}// 循环遍历pdaBeanList
//        System.out.println("loading ...");
//        logger.Now() + " rows:" + pdaBeanList.size());// 当前记录编号// int currentRowNumber = 0;if ( m != null) m.close();//        System.out.println("Load finished.");
    }/** 记录日志 */private void logMemory(String behavior, String content) {// 向服务器发送日志
//        sendLog.send("syncPacker", behavior, content);// 记录本地日志logger.Now() + " " + behavior + " :" + content);// 控制台输出日志
//        System.out.println("syncPacker : " + Now() + " " + behavior + " :" + content);
    }@Overridepublic String insert() throws Exception {return null;}@Overridepublic String update() throws Exception {return null;}@Overridepublic String selectList() throws Exception {return null;}@Overridepublic String delete() throws Exception {return null;}public static boolean isRunning() {return isRunning;}public static void setRunning(boolean isRunning) {SyncPacker_201603.isRunning = isRunning;}public Integer getCenterNodeDataVersion() {return centerNodeDataVersion;}public void setCenterNodeDataVersion(Integer centerNodeDataVersion) {NodeDataVersion = centerNodeDataVersion;}public String getSuccess() {return success;}public void setSuccess(String success) {this.success = success;}public String getMaxPackageNumber() {return maxPackageNumber;}public void setMaxPackageNumber(String maxPackageNumber) {this.maxPackageNumber = maxPackageNumber;}public String getPackedPackageNumber() {return packedPackageNumber;}public void setPackedPackageNumber(String packedPackageNumber) {this.packedPackageNumber = packedPackageNumber;}public SyncDataPackageBean getBean() {return bean;}public void setBean(SyncDataPackageBean bean) {this.bean = bean;}
}

 

一个备份,记一下吧:

  1 package syncPacker;
  2 
  3 import java.io.BufferedReader;
  4 import java.io.File;
  5 import java.io.FileReader;
  6 import java.io.FileWriter;
  7 import java.io.IOException;
  8 import java.UnknownHostException;
  9 import java.util.ArrayList;
 10 import java.util.Date;
 11 import java.util.HashMap;
 12 import java.util.List;
 13 import java.util.Map;
 14 
 15 import org.ssage.BasicNameValuePair;
 16 import org.apache.log4j.Logger;
 17 import org.springframework.beans.factory.annotation.Autowired;
 18 import org.springframework.beans.factory.annotation.Value;
 19 import org.springframework.stereotype.Service;
 20 
 21 db.BasicDBObject;
 22 db.DB;
 23 db.DBCollection;
 24 db.Mongo;
 25 import com.pro.framework.action.BaseController;
 26 
 27 import logCenter.SendLog;
 28 import net.sf.json.JSONObject;
 29 import syncPacker.bean.PatentBibliographicChangeBean;
 30 import syncPacker.bean.SyncDataPackageBean;
 31 import utils.DateUtils;
 32 import utils.DatetimeUtils;
 33 import utils.HttpUtils;
 34 
 35 /**
 36  * 增量数据分块打包 全处理
 37  * 
 38  * localhost:8080/PatentSearchExtend/syncPacker!pack.action
 39  * 
 40  * :8080/PatentSearchExtend/syncPacker!pack.action?bean.
 41  * tableName=E_BIBLIOGRAPHIC_CHANGE_TEMP&bean.maxRowsPerSyncPackerPackage= 10000
 42  */
 43 @Service
 44 public class SyncPacker_201603 extends BaseController {
 45 
 46     private static final long serialVersionUID = 1L;
 47 
 48     // 初始化:数据库接口
 49     @Autowired
 50     private SyncPackerDao dao;
 51 
 52     // 初始化:发送端地址
 53     @Value("${url_syncSender}")
 54     private String url_syncSender;
 55 
 56     // 初始化:本地文件存储路径
 57     @Value("${path_syncPacker_package}")
 58     private String path_syncPacker_package;
 59 
 60     // 初始化:读取最大数据包名称的地址
 61     @Value("${url_selectMaxPackageNumber}")
 62     private String url_selectMaxPackageNumber;
 63 
 64     // 初始化:存储最大数据包名称的地址
 65     @Value("${url_insertSyncDataPackage}")
 66     private String url_insertSyncDataPackage;
 67 
 68     // 初始化:查询条件Bean
 69     private SyncDataPackageBean bean = new SyncDataPackageBean();
 70     // 初始化:查询结果List
 71     private List<PatentBibliographicChangeBean> pbcList = new ArrayList<PatentBibliographicChangeBean>();
 72     // 初始化:形成的数据包名称
 73     private List<PatentBibliographicChangeBean> pbcList_withPackageName = new ArrayList<PatentBibliographicChangeBean>();
 74     // 初始化:已打包的增量数据ID表单
 75     private List<String> pdaIdList = new ArrayList<String>();
 76     // 初始化:用于删除数据的临时ID List
 77     private List<String> idPartList = new ArrayList<String>();
 78     // 初始化:传输协议
 79     private HttpUtils httpUtils = new HttpUtils();
 80     // 初始化:键值串
 81     private List<BasicNameValuePair> paramList = new ArrayList<BasicNameValuePair>();
 82     // POST 结果
 83     private String str_postResult;
 84     // 初始化:发送url返回的信息
 85     private JSONObject json_postResult;
 86     // 初始化:历史最大包编号
 87     private String maxPackageNumber;
 88     // 初始化:记录打包完成后的数据版本
 89     private Integer centerNodeDataVersion;
 90     // 发送远程日志
 91     private SendLog sendLog = new SendLog();
 92     // 记录本地日志
 93     private Logger logger = Logger(SyncPacker_201603.class);
 94     // 初始化:判断程序是否正在运行
 95     public static boolean isRunning = false;
 96 
 97     // 本次处理完成后的最大包编号
 98     private String packedPackageNumber;
 99     // 用于返回json的成功信息
100     private String success = "success";
101 
102     /** 主处理:数据同步_增量数据分块打包 */
103     // @Scheduled(cron = "${scheduled_syncPacker}")
104     public String pack() {
105 
106         logMemory("scheduled_syncPacker start", "");
107         if (isRunning) {
108             logger.Now() + "正在运行,退出");
109             return SUCCESS;
110         }
111         isRunning = true;
112         // 数据包数量
113         int packagesAmount = 0;
114         Date startTime = new Date();
115 
116         for (int i = 1; i <= 1; i++) {
117             logMemory("循环内部(单个数据包)开始:内部编号", "" + i);
118 
119             try {
120                 // 1.读取定量数据
121                 selectPbcList();
122                 if (pbcList.size() <= 0) {
123                     logMemory("无数据", "退出");
124                     break;
125                 }
126 
127                 // 2.形成文件
128                 if (save2disk()) {
129                     packagesAmount++;
130                     // 3.转移数据
131                     transferFromChange2History();
132                 }
133             } catch (Exception e) {
134                 // 日志输出
135                 logMemory("系统发生异常", e.getMessage());
136                 e.printStackTrace();
137                 break;
138             } finally {
139                 // 清空临时List
140                 paramList.clear();
141                 pbcList.clear();
142                 pbcList_withPackageName.clear();
143                 pdaIdList.clear();
144                 ();
145             }
146             logMemory("循环内部(单个数据包)结束,内部编号", "" + i);
147         }
148         // loop end
149 
150         // 清空临时List
151         paramList.clear();
152         pbcList.clear();
153         pbcList_withPackageName.clear();
154         pdaIdList.clear();
155         ();
156 
157         isRunning = false;
158         logMemory("本次打包结束,新增数据包数量:" + packagesAmount + ",历时:"
159                 + DistanceTimes_string(startTime, new Date()), "");
160         logMemory("finished", "scheduled_syncPacker end");
161 
162         return SUCCESS;
163     }
164 
165     /** selectPbcList() */
166     // 读取 专利著录变化 数据 数据库有连接失败的时候,尝试 10 次
167     private void selectPbcList() {
168 
169         // 日志输出
170         logMemory("增量数据分块打包开始执行", "");
171 
172         for (int i = 0; i < 10; i++) {
173             try {
174                 // -------------- 查询分块著录信息增量数据 --------------
175                 logMemory("准备读取数据", "");
176                 // 每次取10000
177                 pbcList = dao.selectList(bean);
178                 logMemory("读取完毕", "数据量" + pbcList.size());
179                 return;
180             } catch (Exception e) {
181                 // 日志输出
182                 logMemory("系统发生异常", e.getMessage());
183                 e.printStackTrace();
184             }
185         }
186         // loop end
187         return;
188     }
189 
190     /**
191      * save2disk()
192      * 
193      * 将打包文件存盘
194      */
195     private boolean save2disk() throws Exception {
196 
197         // -------------- 分块打包增量数据 --------------
198 
199         // 查询上一次处理最后生成的数据包名称
200         logMemory("准备查询最后一个数据包名称", "");
201 
202         // 发送url,请求对应服务器返回最大数据包名称
203         // syncDataPackageBean = dao.lastDataPackageInfo(0);
204         str_postResult = httpUtils.post(url_selectMaxPackageNumber, null);
205         logMemory("取最大包编号:" + url_selectMaxPackageNumber, " 返回 " + str_postResult);
206 
207         // 判断是否成功发送信息
208         if (str_postResult == null || "".equals(str_postResult)) {
209             // 发送失败,跳出循环,结束程序
210             logMemory("失败!", " 返回为空");
211             isRunning = false;
212             return false;
213         }
214 
215         // 解析Post返回的Json
216         json_postResult = JSONObject.fromObject(str_postResult);
217 
218         // 判断是否成功获取到最大包编号
219         if ("success".equalsIgnoreCase(String("success"))) {
220             // 发送成功,进行下一步操作
221             maxPackageNumber = ("maxPackageNumber").toString();
222         } else {
223             // 发送失败,跳出循环,结束程序
224             logMemory("失败!", "没有返回成功标志。");
225             isRunning = false;
226             return false;
227         }
228 
229         logMemory("上一次最后一个数据包名称", maxPackageNumber);
230         logMemory("准备将数据写入文件", " 路径:" + path_syncPacker_package);
231 
232         // 得到新数据包名,同时分块打包增量数据
233         pbcList_withPackageName = DataPacking_2.CompressPdaData(pbcList, maxPackageNumber, path_syncPacker_package);
234 
235         // 日志输出
236         logMemory("新数据包名称", (0).getPackage_Name() + ".txt");
237         logMemory("转换完成,数量", String.valueOf(pbcList_withPackageName.size()));
238 
239         // 记录打包完成的数据版本信息(流水号)
240         centerNodeDataVersion = Integer.parseInt((0).getPackage_Name());
241 
242         // -------------- 记录已完成的数据包名称 --------------
243         // 发送打包完成的数据包名称(流水号)
244         // dao.insertPacName(syncDataPackageBean);
245         paramList = new ArrayList<BasicNameValuePair>();
246         paramList.add(0,
247                 new BasicNameValuePair("packedPackageNumber", (0).getPackage_Name()));
248 
249         str_postResult = httpUtils.post(url_insertSyncDataPackage, paramList);
250         logMemory("新增数据包:" + url_insertSyncDataPackage, " 返回 " + str_postResult);
251 
252         // 判断是否成功发送信息
253         if (str_postResult == null || "".equals(str_postResult)) {
254             // 发送失败,跳出循环,结束程序
255             logMemory("失败!", " 返回为空");
256             isRunning = false;
257             return false;
258         } else {
259             // 解析Post返回的Json
260             json_postResult = JSONObject.fromObject(str_postResult);
261             // 判断是否成功发送信息
262             if (!"success".equalsIgnoreCase(String("success"))) {
263                 // 发送失败,跳出循环,结束程序
264                 logMemory("失败!", "没有返回成功标志。");
265                 isRunning = false;
266                 return false;
267             }
268         }
269         logMemory("新增数据包,完成", "");
270         return true;
271     }
272 
273     /**
274      * transferFromChange2History
275      * 
276      * 将数据从 Change 表移至 History 表
277      */
278     private void transferFromChange2History() throws Exception {
279 
280         // -------------- 添加数据至归档表 --------------
281         logMemory("准备将已打包的记录移入归档表,数据量:", String.valueOf(pbcList_withPackageName.size()));
282 
283         // 根据打包后的全量数据量,循环执行插入历史表
284         for (int j = 0; j < pbcList_withPackageName.size(); j++) {
285             // 执行SQL插入操作语句
286             dao.appendIntoChangeHistory((j));
287             pdaIdList.add((j).getId());
288         }
289         // 日志输出
290         logMemory("插入归档表,结束", String.valueOf(pdaIdList.size()));
291 
292         // -------------- 删除操作表的数据 --------------
293         logMemory("准备删除专利著录信息变化表中的数据,数据量", String.valueOf(pdaIdList.size()));
294         // List元素数量
295         double total = pdaIdList.size();
296         // 总SQL删除语句执行次数
297         int loop = (int) il(total / 1000);
298         // List下标控制
299         int idnumber;
300         for (int k = 0; k < loop; k++) {
301             // 每次只能删除 1000 条,否则SQL长度超长,出错。
302             for (int z = 0; z < 1000; z++) {
303                 idnumber = k * 1000 + z;
304                 if (idnumber >= total) {
305                     break;
306                 }
307                 // 循环拼接删除条件List
308                 idPartList.add((idnumber).getId());
309             }
310             // 执行批量删除操作
311             veFromChangeTable(idPartList);
312             idPartList.clear();
313         }
314         // 日志输出
315         logMemory("全部删除完毕,删除数据量:", String.valueOf(pdaIdList.size()));
316     }
317 
318     /** 获取历史最大Json编号 */
319     // :8080/PatentSearchExtend/syncPacker!selectMaxPackageNum.action
320     public String selectMaxPackageNum() {
321         try {
322             maxPackageNumber = MaxPackageNumber();
323         } catch (Exception e) {
324             e.printStackTrace();
325             this.setSuccess(false);
326         }
327         return SUCCESS;
328     }
329 
330     /** 存储当前打包Json编号 */
331     // :8080/PatentSearchExtend/syncPacker!insertSyncDataPackage.action
332     public String insertSyncDataPackage() {
333         try {
334             dao.insertSyncDataPackage(packedPackageNumber);
335         } catch (Exception e) {
336             e.printStackTrace();
337             this.setSuccess(false);
338         }
339         return SUCCESS;
340     }
341 
342     /** 文件补发用:指定数据重新打包 */
343     // localhost:8080/PatentSearchExtend/syncPacker!packByPackageNumber.action?bean.packageNumberStart=000101&bean.packageNumberEnd=000102
344     public String packByPackageNumber() throws Exception {
345 
346         logMemory("本次请求处理开始。", PackageNumberStart() + " - " + PackageNumberEnd() );
347         
348         try {
349             StringBuffer fileFullName;
350             // 函数计算用Integer型数据
351 //            int packageNumberStart = Integer.PackageNumberStart());
352 //            int packageNumberEnd = Integer.PackageNumberEnd());
353             // 重新设置bean地数据包编号
354 //            bean.setPackageNumberStart(String.format("%06d", packageNumberStart));
355 //            bean.setPackageNumberEnd(String.format("%06d", packageNumberEnd));
356             // 输出日志
357 //            logMemory("本次手动补包的开始编号为", PackageNumberStart());
358 //            logMemory("本次手动补包的结束编号为", PackageNumberEnd());
359             
360             // 打包指定数据包编号对应的数据
361 //            for (int i = 0; i < Integer.PackageNumberEnd())
362 //                    - Integer.PackageNumberStart()) + 1; i++) {
363             for ( int i = Integer.PackageNumberStart()); 
364                     i <= Integer.PackageNumberEnd()); i++) {                
365 
366                 String date_start = CurrentTimeString(0);
367                 
368                 String packageNumber = String.format( "%06d", i );
369                 
370                 logMemory("开始,包编号:", packageNumber );
371                 
372                 // 删除现有文件
373 //                logMemory("开始配置数据包存储路径", "");
374 //                fileFullName = new StringBuffer();
375                 fileFullName.append(path_syncPacker_package);
376 //                fileFullName.append("d:/Patent_Bibliographic_Change_Packages/PBC_");
377 //                fileFullName.append( packageNumber );
378 //                fileFullName.append(".txt");
379 //                logMemory("数据包全路径为", String());
380 //                 
381 //                File file;
382 //                file = new String());
383 //                ists()&&file.isFile()) file.delete();
384                 
385 //                if (file.isFile() && ists()) {
386 //                    logMemory("原数据包存在", String());
387 //                    file.delete();
388 //                    logMemory("原数据包已删除", String());
389 //                } else {
390 //                    logMemory("原数据包不存在,准备创建新文件", String());
391 //                }
392                 
393                 // 打包数据
394 //                logMemory("设置新的文件包编号", String.format("%06d", "" + i));
395 //                bean.setPackageNumber(String.format("%06d", "" + i));
396 //                pbcList = packList(bean);
397                 
398                 pbcList = packByPackageNumber_readHistory( packageNumber ); 
399                 
400                 if( null == pbcList ) continue ;
401                 
402                 logMemory("数据查询完毕,数据量为", String.valueOf( pbcList.size() ));
403                 
404 //                Map<String, JSONObject> bibliographicMap = new HashMap();
405 //                FileWriter writer = new FileWriter( String(), true);  
406 //                
407 //                for( PatentBibliographicChangeBean pbcBean : pbcList ){
408 //                    
409 //                    // 读取 著录 数据
410 //                    bibliographicMap.put( Id(), packByPackageNumber_readBibl( Id() ));
411 //                    try {  
412 //                        //打开一个写文件器,构造函数中的第二个参数true表示以追加形式写文件  
413 //                        writer.write( packByPackageNumber_readBibl( Id() ).toString() + 'r' );  
414 //                    } catch (IOException e) {  
415 //                        e.printStackTrace();  
416 //                    } 
417 //                }
418 //                writer.close();  
419                 insertMongoDB( pbcList );
420                 
421                 File file = new File(fileName); 
422 //                BufferedReader reader = null;  
423 //                try {  
424 //                    System.out.println("以行为单位读取文件内容,一次读一整行:");  
425 //                    reader = new BufferedReader(new FileReader(file));  
426 //                    String tempString = null;  
427 //                    int line = 1;  
428 //                    // 一次读入一行,直到读入null为文件结束  
429 //                    while ((tempString = adLine()) != null) {  
430 //                        // 显示行号  
431 //                        System.out.println("line " + line + ": " + tempString);  
432 //                        line++;  
433 //                    }  
434 //                    reader.close();  
435 //                } catch (IOException e) {  
436 //                    e.printStackTrace();  
437 //                } finally {  
438 //                    if (reader != null) {  
439 //                        try {  
440 //                            reader.close();  
441 //                        } catch (IOException e1) {  
442 //                        }  
443 //                    }  
444 //                }  
445                 
446 //                for( String mapKey : bibliographicMap.keySet()){
447 //                    JSONObject bibliographic = ( mapKey );
448 //                    System.out.println( bibliographic );
449 //                }
450                 
451 //                logMemory("数据打包完毕数据", String());
452                 pbcList.clear();
453                 
454                 Now() + " 传输结束,包编号:" + packageNumber ," 用时: "
455                         + DistanceTimes_string(date_start, CurrentTimeString(0)));
456             }
457         } catch (Exception e) {
458             // 日志输出
459             logMemory("系统发生异常", e.getMessage());
460             e.printStackTrace();
461         }
462         logMemory("本次请求处理完成,程序结束。", PackageNumberStart() + " - " + PackageNumberEnd() );
463         
464         return SUCCESS;
465     }
466     
467 
468     /**
469      * 读历史表
470      * @param packageNumber
471      * @return
472      */
473     private List<PatentBibliographicChangeBean> packByPackageNumber_readHistory( String packageNumber ){
474         
475         for (int i = 0; i < 10; i++) {
476             try {
477                 return dao.packByPackageNumber_readHistory( packageNumber );                
478             } catch (Exception e) {
479                 // TODO Auto-generated catch block
480                 e.printStackTrace();
481                 logMemory("系统发生异常", e.getMessage());
482                 try {
483                     Thread.sleep(500);
484                 } catch (InterruptedException e1) {
485                     // TODO Auto-generated catch block
486                     e1.printStackTrace();
487                 }
488             } 
489         }// loop end
490         return null ;
491         
492     }
493     
494     
495     /**
496      * 读 著录数据
497      * @return
498      */
499     private String packByPackageNumber_readBibliographic( String patentId ){
500 
501         StringBuffer sb = new StringBuffer();
502         
503         try {
504             pbcList = dao.packByPackageNumber_readBibliographic( patentId );
505             
506             for( PatentBibliographicChangeBean pbcBean : pbcList ){
507                 // 读取 著录 数据
508                 sb.append( String() );
509             }
510             
511         } catch (Exception e) {
512             // TODO Auto-generated catch block
513             e.printStackTrace();
514         } 
515         String() ;
516     }
517     
518     private JSONObject packByPackageNumber_readBibl( String patentId ){
519 
520         StringBuffer sb = new StringBuffer();
521         JSONObject json = new JSONObject();        
522 
523         for (int i = 0; i < 10; i++) {
524             try {
525                 pbcList = dao.packByPackageNumber_readBibliographic( patentId );
526                 
527                 for( PatentBibliographicChangeBean pbcBean : pbcList ){
528                     // 读取 著录 数据
529                     json = JSONObject.fromObject( pbcBean );
530                 }
531                 return json ;
532                 
533             } catch (Exception e) {
534                 // TODO Auto-generated catch block
535                 e.printStackTrace();
536                 logMemory("系统发生异常", e.getMessage());
537                 try {
538                     Thread.sleep(500);
539                 } catch (InterruptedException e1) {
540                     // TODO Auto-generated catch block
541                     e1.printStackTrace();
542                 }
543             } 
544         }// loop end
545         
546         return null ;
547     }
548     
549 
550     /**
551      * 插入 MongoDB
552      */
553     public void insertMongoDB( List<PatentBibliographicChangeBean> pbcList ) {    
554 
555         //# MongoDB(数据加载目标)
556         String syncLoadIntoMongoDbService = "10.78.2.23:27017";
557         String syncLoadIntoMongoDbName = "patent_search_extend";
558         String syncLoadIntoMongoTable = "patent_bibliographic_20160319";
559 
560         // 加载开始
561 //        logger.Now() + " Load start: " + syncLoadIntoMongoDbService );
562 
563         Mongo m = null;
564         try {
565             m = new Mongo( syncLoadIntoMongoDbService );
566         } catch (UnknownHostException e) {
567             e.printStackTrace();
568             logger.Now() + " UnknownHostException:" + e.getMessage());
569         }
570 
571         // 库名
572         DB db = m.getDB( syncLoadIntoMongoDbName );
573 //        logger.info( Now() + " Db:" + syncLoadIntoMongoDbName );
574 
575         // 表名
576         DBCollection collection = db.getCollection( syncLoadIntoMongoTable );
577 //        logger.Now() + " Table:" + syncLoadIntoMongoTable );
578         
579         // 循环列表,将每个元素插入数据库
580         for( PatentBibliographicChangeBean pbcBean : pbcList ){        
581             
582             // 读取一条著录数据
583             JSONObject json = packByPackageNumber_readBibl( Id() );
584             
585             if( null == json ) return ;
586 
587             // 列,值
588             BasicDBObject insDoc = new BasicDBObject();
589             
590             insDoc.put("abstract_No"    , ( "abstract_No"    ).toString());
591             insDoc.put("app_Addr"       , ( "app_Addr"       ).toString());
592             insDoc.put("app_Cn"         , ( "app_Cn"         ).toString());
593             insDoc.put("app_Country"    , ( "app_Country"    ).toString());
594             insDoc.put("app_Date"       , ( "app_Date"       ).toString());
595             insDoc.put("app_Name"       , ( "app_Name"       ).toString());
596             insDoc.put("app_Sn"         , ( "app_Sn"         ).toString());
597             insDoc.put("app_Type"       , ( "app_Type"       ).toString());
598             insDoc.put("app_Zip"        , ( "app_Zip"        ).toString());
599             insDoc.put("ecla"           , ( "ecla"           ).toString());
600             insDoc.put("fi"             , ( "fi"             ).toString());
601             insDoc.put("ft"             , ( "ft"             ).toString());
602             insDoc.put("id"             , ( "id"             ).toString());
603             insDoc.put("inv_Title"      , ( "inv_Title"      ).toString());
604             insDoc.put("invent_Type"    , ( "invent_Type"    ).toString());
605             insDoc.put("inventor"       , ( "inventor"       ).toString());
606             insDoc.put("ipc_Standard"   , ( "ipc_Standard"   ).toString());
607             insDoc.put("locarno"        , ( "locarno"        ).toString());
608             insDoc.put("operation_Time" , ( "operation_Time" ).toString());
609             insDoc.put("operation_Type" , ( "operation_Type" ).toString());
610             insDoc.put("package_Name"   , ( "package_Name"   ).toString());
611             insDoc.put("pct_App_Cn"     , ( "pct_App_Cn"     ).toString());
612             insDoc.put("pct_App_Date"   , ( "pct_App_Date"   ).toString());
613             insDoc.put("pct_App_Sn"     , ( "pct_App_Sn"     ).toString());
614             insDoc.put("pct_Date"       , ( "pct_Date"       ).toString());
615             insDoc.put("pct_Pub_Cn"     , ( "pct_Pub_Cn"     ).toString());
616             insDoc.put("pct_Pub_Date"   , ( "pct_Pub_Date"   ).toString());
617             insDoc.put("pct_Pub_Lang"   , ( "pct_Pub_Lang"   ).toString());
618             insDoc.put("pct_Pub_Sn"     , ( "pct_Pub_Sn"     ).toString());
619             insDoc.put("prn"            , ( "prn"            ).toString());
620             insDoc.put("prn_Cn"         , ( "prn_Cn"         ).toString());
621             insDoc.put("prn_Date"       , ( "prn_Date"       ).toString());
622             insDoc.put("prn_Sn"         , ( "prn_Sn"         ).toString());
623             insDoc.put("prn_Type"       , ( "prn_Type"       ).toString());
624             insDoc.put("pub_Cn"         , ( "pub_Cn"         ).toString());
625             insDoc.put("pub_Date"       , ( "pub_Date"       ).toString());
626             insDoc.put("pub_Sn"         , ( "pub_Sn"         ).toString());
627             insDoc.put("pub_Type"       , ( "pub_Type"       ).toString());
628             insDoc.put("uc"             , ( "uc"             ).toString());
629             
630             collection.insert(insDoc);
631             insDoc.clear();
632             
633         }
634 
635         // 循环遍历pdaBeanList
636 //        System.out.println("loading ...");
637 //        logger.Now() + " rows:" + pdaBeanList.size());
638 
639         // 当前记录编号
640         // int currentRowNumber = 0;
641 
642         if ( m != null) m.close();
643         
644 //        System.out.println("Load finished.");
645     }
646     
647     
648     
649     
650     
651     
652     
653     
654     
655     
656     
657     
658     
659     
660     
661     
662     
663 
664     /** 记录日志 */
665     private void logMemory(String behavior, String content) {
666         // 向服务器发送日志
667 //        sendLog.send("syncPacker", behavior, content);
668         // 记录本地日志
669         logger.Now() + " " + behavior + " :" + content);
670         // 控制台输出日志
671 //        System.out.println("syncPacker : " + Now() + " " + behavior + " :" + content);
672     }
673 
674     @Override
675     public String insert() throws Exception {
676         return null;
677     }
678 
679     @Override
680     public String update() throws Exception {
681         return null;
682     }
683 
684     @Override
685     public String selectList() throws Exception {
686         return null;
687     }
688 
689     @Override
690     public String delete() throws Exception {
691         return null;
692     }
693 
694     public static boolean isRunning() {
695         return isRunning;
696     }
697 
698     public static void setRunning(boolean isRunning) {
699         SyncPacker_201603.isRunning = isRunning;
700     }
701 
702     public Integer getCenterNodeDataVersion() {
703         return centerNodeDataVersion;
704     }
705 
706     public void setCenterNodeDataVersion(Integer centerNodeDataVersion) {
707         NodeDataVersion = centerNodeDataVersion;
708     }
709 
710     public String getSuccess() {
711         return success;
712     }
713 
714     public void setSuccess(String success) {
715         this.success = success;
716     }
717 
718     public String getMaxPackageNumber() {
719         return maxPackageNumber;
720     }
721 
722     public void setMaxPackageNumber(String maxPackageNumber) {
723         this.maxPackageNumber = maxPackageNumber;
724     }
725 
726     public String getPackedPackageNumber() {
727         return packedPackageNumber;
728     }
729 
730     public void setPackedPackageNumber(String packedPackageNumber) {
731         this.packedPackageNumber = packedPackageNumber;
732     }
733 
734     public SyncDataPackageBean getBean() {
735         return bean;
736     }
737 
738     public void setBean(SyncDataPackageBean bean) {
739         this.bean = bean;
740     }
741 }

 

 

# Logger = INFO, Logger= INFO, syncLoader# + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - +
# 数据加载器( syncLoader )
# + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - +
usoft.d.syncLoader.SyncLoaderAction = INFO, syncLoader
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 
log4j.appender.syncLoader.File           = ${}/logs/syncLoader_20160318/log.html
log4j.appender.syncLoader                = org.apache.log4j.RollingFileAppender
log4j.appender.syncLoader.Encoding       = GBK
log4j.appender.syncLoader.MaxFileSize    = 5MB
log4j.appender.syncLoader.MaxBackupIndex = 100
log4j.appender.syncLoader.Threshold      = INFO
log4j.appender.syncLoader.layout         = org.apache.log4j.HTMLLayout

 

  1 <?xml version="1.0" encoding="UTF-8" ?> 
  2 <!DOCTYPE mapper 
  3     PUBLIC "-//mybatis//DTD Mapper 3.0//EN" 
  4     ".dtd">
  5 
  6 <mapper namespace="syncPacker.SyncPackerDao">
  7 
  8     <!-- 查询分块著录信息 -->
  9     <select id="selectList"
 10         parameterType="syncPacker.bean.SyncDataPackageBean"
 11         resultType="syncPacker.bean.PatentBibliographicChangeBean">
 12             select t.id,
 13                    t.operation_time,
 14                    t.operation_type,
 15                    sv.app_cn,
 16                    sv.app_sn,
 17                    sv.app_type,
 18                    sv.app_date,
 19                    sv.pub_cn,
 20                    sv.pub_sn,
 21                    sv.pub_type,
 22                    sv.pub_date,
 23                    sv.invent_type,
 24                    sv.app_name,
 25                    sv.inventor,
 26                    sv.app_country,
 27                    sv.app_addr,
 28                    sv.app_zip,
 29                    sv.inv_title,
 30                    sv.abstract_no,
 31                    sv.pct_date,
 32                    sv.pct_app_cn,
 33                    sv.pct_app_sn,
 34                    sv.pct_app_date,
 35                    sv.pct_pub_cn,
 36                    sv.pct_pub_sn,
 37                    sv.pct_pub_lang,
 38                    sv.pct_pub_date,
 39                    sv.ipc_standard,
 40                    sv.ecla,
 41                    sv.uc,
 42                    sv.fi,
 43                    sv.ft,
 44                    sv.prn_cn,
 45                    sv.prn_sn,
 46                    sv.prn_type,
 47                    sv.prn_date,
 48                    sv.prn,
 49                    sv.locarno
 50               from (select t.id, t.operation_time, t.operation_type
 51                      from ${tableName} t
 52                      where rownum &lt;= ${maxRowsPerSyncPackerPackage}
 53                      order by operation_time asc) t
 54               left join sipo_vdb sv
 55                 on t.id = sv.id
 56     </select>
 57          
 58     <!-- 添加数据至归档表 -->
 59     <insert id="appendIntoChangeHistory"
 60         parameterType="syncPacker.bean.PatentBibliographicChangeBean">
 61             insert into e_bibliographic_change_history
 62               (id,
 63                operation_time,
 64                operation_type,
 65                package_number)
 66             values
 67               (#{Id},
 68                #{Operation_Time},
 69                #{Operation_Type},
 70                #{Package_Name})
 71     </insert>
 72 
 73     <!-- 删除操作表的数据 -->
 74     <delete id="removeFromChangeTable" parameterType="java.util.List">
 75         delete from
 76             e_bibliographic_change_temp 
 77         where id in
 78         <foreach collection="list" index="index" item="item" open="(" separator="," close=")">
 79             #{item}
 80         </foreach>
 81     </delete>
 82 
 83     <!-- 获取当前最大序列号的数据包信息 -->
 84     <select id="getMaxPackageNumber" resultType="String">
 85         select
 86             ifnull(max(sync_data_package.packageNumber),0) packageNumber
 87         from
 88             sync_data_package
 89     </select>
 90 
 91     <!-- 添加数据包编号至发送记录表 -->
 92     <insert id="insertSyncDataPackage" parameterType="String">
 93         insert into
 94             sync_data_package
 95             (packageNumber)
 96         values
 97             (#{theMaxSerialNumber})
 98     </insert>
 99     
100     <!-- 重新打包指定数据 -->
101     <select id="repackList"
102         parameterType="syncPacker.bean.SyncDataPackageBean"
103         resultType="syncPacker.bean.PatentBibliographicChangeBean">
104         select t.id,
105                t.operation_time,
106                t.operation_type,
107                sv.app_cn,
108                sv.app_sn,
109                sv.app_type,
110                sv.app_date,
111                sv.pub_cn,
112                sv.pub_sn,
113                sv.pub_type,
114                sv.pub_date,
115                sv.invent_type,
116                sv.app_name,
117                sv.inventor,
118                sv.app_country,
119                sv.app_addr,
120                sv.app_zip,
121                sv.inv_title,
122                sv.abstract_no,
123                sv.pct_date,
124                sv.pct_app_cn,
125                sv.pct_app_sn,
126                sv.pct_app_date,
127                sv.pct_pub_cn,
128                sv.pct_pub_sn,
129                sv.pct_pub_lang,
130                sv.pct_pub_date,
131                sv.ipc_standard,
132                sv.ecla,
133                sv.uc,
134                sv.fi,
135                sv.ft,
136                sv.prn_cn,
137                sv.prn_sn,
138                sv.prn_type,
139                sv.prn_date,
140                sv.prn,
141                sv.locarno
142           from (select t.id, t.operation_time, t.operation_type, t.package_number
143                   from E_BIBLIOGRAPHIC_CHANGE_HISTORY t
144                  where t.package_number = #{packageNumber}
145                  order by transfer_time asc) t
146           left join sipo_vdb sv
147             on t.id = sv.id
148     </select>
149     
150     
151     <!-- 
152         重新打包: 读取著录变化数据
153         参数:包编号
154      -->        
155     <select id="selectList_changeHistory"
156         parameterType="java.lang.String"
157         resultType="syncPacker.bean.PatentBibliographicChangeBean">
158         
159         select            
160             sv.id,
161             sv.app_cn,
162             sv.app_sn,
163             sv.app_type,
164             sv.app_date,
165             sv.pub_cn,
166             sv.pub_sn,
167             sv.pub_type,
168             sv.pub_date,
169             sv.invent_type,
170             sv.app_name,
171             sv.inventor,
172             sv.app_country,
173             sv.app_addr,
174             sv.app_zip,
175             sv.inv_title,
176             sv.abstract_no,
177             sv.pct_date,
178             sv.pct_app_cn,
179             sv.pct_app_sn,
180             sv.pct_app_date,
181             sv.pct_pub_cn,
182             sv.pct_pub_sn,
183             sv.pct_pub_lang,
184             sv.pct_pub_date,
185             sv.ipc_standard,
186             sv.ecla,
187             sv.uc,
188             sv.fi,
189             sv.ft,
190             sv.prn_cn,
191             sv.prn_sn,
192             sv.prn_type,
193             sv.prn_date,
194             sv.prn,
195             sv.locarno
196             
197         from 
198             sipo_vdb sv
199             
200         where 
201             exists ( 
202                 select 1 from e_bibliographic_change_history t 
203                 where sv.ID = t.id and t.package_number = #{packageNumber}  
204             )
205             
206     </select>
207     
208 </mapper> 

 

转载于:.html

本文发布于:2024-01-30 15:01:55,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170659811520846.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:数据
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23