项目需要处理一堆表,这些表数据量不是很大都有经纬度信息,但是这些表的数据没有流域信息,需要按经纬度信息计算所属流域信息。比较简单的项目,按DeepSeek提示思索完成开发,AI真好用。
阿里AI个人版本IDEA安装
IDEA中使用DeepSeek满血版的手把手教程来了!
代码实现
1、controller
/**
* 批量流域处理任务
*/
@Tag(name = "批量流域处理任务")
@ApiSupport(order = 2)
@RequestMapping("/job")
@RestController
public class SysBatchJobController {
@Autowired
JobLauncher jobLauncher;
@Autowired
JobOperator jobOperator;
@Autowired
@Qualifier("updateWaterCodeJob")
private Job updateWaterCodeJob;
// 多线程分页更新数据
@GetMapping("/asyncJob")
public void asyncJob() throws Exception {
JobParameters jobParameters = new JobParametersBuilder().addLong("time",System.currentTimeMillis()).toJobParameters();
JobExecution run = jobLauncher.run(updateWaterCodeJob, jobParameters);
run.getId();
}
}
2、批量处理表
/**
* 需要批量处理的业务表信息
*/
@Builder
@AllArgsConstructor
@Data
@TableName(value = "ads_t_sys_batch_update_table")
public class SysBatchUpdateTable extends BaseEntity implements Serializable {
private static final long serialVersionUID = -7367871287146067724L;
@TableId(type = IdType.ASSIGN_UUID)
private String pkId;
/**
* 需要更新的表名
**/
@TableField(value = "table_name")
private String tableName;
/**
* 所需更新表所在数据库ID
**/
@TableField(value = "data_source_id")
private String dataSourceId;
/**
* 表对应的主键字段
**/
@TableField(value = "key_id")
private String keyId;
/**
* 表对应的流域字段
**/
@TableField(value = "water_code_column")
private String waterCodeColumn;
/**
* 表对应的经度字段
**/
@TableField(value = "lon_column")
private String lonColumn;
/**
* 表对应的纬度字段
**/
@TableField(value = "lat_column")
private String latColumn;
public SysBatchUpdateTable() {
}
}
3、Mapper,传递参数比较麻烦,可以考虑将参数动态整合到sql里面构造
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.bigdatacd.panorama.system.domain.po.SysBatchUpdateTable;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
import java.util.Map;
@Mapper
public interface UpdateTableMapper extends BaseMapper<SysBatchUpdateTable> {
/**
* 根据表名分页查询数据
* @param tableName 表名
* @return
*/
List<Map<String, Object>> selectUpdateTableByPage(String tableName);
/**
* 更新数据
* @param tableName 表名
* @param waterCode 流域编码
* @param pkId 表主键ID
*/
void updateWaterCode(String tableName,
String waterCode,
String pkId);
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.bigdatacd.panorama.system.mapper.UpdateTableMapper">
<!-- 动态分页查询通过#和$区别动态构造更新所需参数 -->
<select id="selectUpdateTableByPage" resultType="java.util.HashMap">
<!--如果有分页查询就直接使用分页查询sql-->
SELECT
${keyId} as pkId,
#{keyId} as keyId,
${waterCodeColumn} as waterCode,
#{waterCodeColumn} as waterCodeColumn,
${lonColumn} as lon,
${latColumn} as lat,
#{tableName} as tableName
FROM ${tableName} a where ${waterCodeColumn} is null
ORDER BY ${keyId} <!-- 确保分页顺序 -->
LIMIT #{_pagesize} OFFSET #{_skiprows}
</select>
<!-- 动态更新 -->
<update id="updateWaterCode">
UPDATE ${tableName}
SET ${waterCodeColumn} = #{waterCode}
WHERE ${keyId} = #{pkId} <!-- 假设主键为id -->
</update>
</mapper>
4、配置文件
Spring
batch:
job:
enabled: false #启动时不启动job
jdbc:
initialize-schema: always
sql:
init:
schema-locations: classpath:/org/springframework/batch/core/schema-mysql.sql
数据源url加个批处理参数rewriteBatchedStatements=true
url: jdbc:mysql://localhost:3306/xxxx?autoReconnect=true&useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=utf8&&serverTimezone=GMT%2b8&useSSL=false&rewriteBatchedStatements=true
5、主配置类调整(按表分区)
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisPagingItemReader;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.HashMap;
import java.util.Map;
// 1. 主配置类调整(按表分区)
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private SqlSessionFactory sqlSessionFactory;
/**
* 主任务
*
* @return
*/
@Bean("updateWaterCodeJob")
public Job updateWaterCodeJob(
@Qualifier("masterStep") Step masterStep
) {
return jobBuilderFactory.get("updateWaterCodeJob")
.start(masterStep)
.build();
}
@Bean("masterStep")
public Step masterStep(
@Qualifier("updateBatchTableData") Step updateBatchTableData,
@Qualifier("multiTablePartitioner") MultiTablePartitioner multiTablePartitioner
) {
return stepBuilderFactory.get("masterStep")
.partitioner(updateBatchTableData.getName(), multiTablePartitioner) // 分区器按表名分区一个表一个分区
.step(updateBatchTableData)
.gridSize(10) // 按表分区了 并发数一般设置为核心数
.taskExecutor(taskExecutor())
.listener(new BatchJobListener())
.build();
}
// 线程池配置(核心线程数=表数量)
@Bean("batchTaskExecutor")
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setThreadNamePrefix("table-processor-");
return executor;
}
/**
* 处理分页数据更新步骤
* @return
*/
@Bean("updateBatchTableData")
public Step updateBatchTableData(
@Qualifier("dynamicTableReader") MyBatisPagingItemReader<Map<String, Object>> myBatisPagingItemReader,
@Qualifier("batchUpdateWriter") BatchUpdateWriter batchUpdateWriter,
@Qualifier("tableProcessor") TableProcessor tableProcessor
) {
return stepBuilderFactory.get("updateBatchTableData")
.<Map<String, Object>, Map<String, Object>>chunk(100)
.reader(myBatisPagingItemReader)
.processor(tableProcessor)
.writer(batchUpdateWriter)
.faultTolerant()
.skipPolicy(new AlwaysSkipItemSkipPolicy())
.build();
}
/**
* 分页获取需要更新的表数据
* @return
*/
@Bean
@StepScope
public MyBatisPagingItemReader<Map<String, Object>> dynamicTableReader(
@Value("#{stepExecutionContext['keyId']}") String keyId, //需要更新的表ID字段
@Value("#{stepExecutionContext['waterCodeColumn']}") String waterCodeColumn,// 需要更新的流域字段
@Value("#{stepExecutionContext['lonColumn']}") String lonColumn,// 经度纬度字段
@Value("#{stepExecutionContext['latColumn']}") String latColumn,// 经度纬度字段
@Value("#{stepExecutionContext['tableName']}") String tableName // 需要更新的表名
) {
MyBatisPagingItemReader<Map<String, Object>> reader = new MyBatisPagingItemReader<>();
reader.setSqlSessionFactory(sqlSessionFactory);
reader.setQueryId("com.bigdatacd.panorama.system.mapper.UpdateTableMapper.selectUpdateTableByPage");
Map<String,Object> param = new HashMap<>();
param.put("keyId",keyId);
param.put("waterCodeColumn",waterCodeColumn);
param.put("lonColumn",lonColumn);
param.put("latColumn",latColumn);
param.put("tableName",tableName);
reader.setParameterValues(param);
reader.setPageSize(2000);
return reader;
}
}
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
// 批量更新Writer
@Component("batchUpdateWriter")
@StepScope
public class BatchUpdateWriter implements ItemWriter<Map<String, Object>> {
@Autowired
private NamedParameterJdbcTemplate jdbcTemplate;
@Override
public void write(List<? extends Map<String, Object>> items) {
// 构造动态sql更新数据
StringBuilder sb = new StringBuilder();
sb.append("UPDATE ");
sb.append((String) items.get(0).get("tableName"));
sb.append(" SET ");
sb.append((String) items.get(0).get("waterCodeColumn"));
sb.append(" = :waterCode");
sb.append(" WHERE ");
sb.append((String) items.get(0).get("keyId"));
sb.append(" = :pkId");
jdbcTemplate.batchUpdate(sb.toString(), items.stream()
.map(item -> new MapSqlParameterSource()
.addValue("waterCode", item.get("waterCode"))
.addValue("tableName", item.get("tableName"))
.addValue("waterCodeColumn", item.get("waterCodeColumn"))
.addValue("keyId", item.get("keyId"))
.addValue("pkId", item.get("pkId"))
)
.toArray(SqlParameterSource[]::new));
}
}
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
@Slf4j
public class MultiTablePartitioner implements Partitioner {
private final DataSource dataSource;
public MultiTablePartitioner(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
String sql = "SELECT key_id as keyId,water_code_column as waterCodeColumn,lon_column as lonColumn,lat_column as latColumn,page_sql as pageSql,table_name as tableName FROM ads_t_sys_batch_update_table where deleted = 0 and data_status = '0'";
List<Map<String,Object>> tables = jdbcTemplate.queryForList(sql);
log.info("查询" + sql);
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < tables.size(); i++) {
ExecutionContext ctx = new ExecutionContext();
// 将需要传递的参数放到上下文中,用于动态批量更新的sql用
ctx.putString("keyId", String.valueOf(tables.get(i).get("keyId")));
ctx.putString("waterCodeColumn", String.valueOf(tables.get(i).get("waterCodeColumn")));
ctx.putString("lonColumn", String.valueOf(tables.get(i).get("lonColumn")));
ctx.putString("latColumn", String.valueOf(tables.get(i).get("latColumn")));
//ctx.putString("pageSql", String.valueOf(tables.get(i).get("pageSql")));
ctx.putString("tableName", String.valueOf(tables.get(i).get("tableName")));
partitions.put("partition" + i, ctx);
}
return partitions;
}
}
import com.bigdatacd.panorama.common.utils.GeoJsonUtil;
import lombok.Builder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
import java.util.Map;
// 处理数据的经纬度所在流域
@Component("tableProcessor")
@Builder
public class TableProcessor implements ItemProcessor<Map<String, Object>, Map<String, Object>> {
@Override
public Map<String, Object> process(Map<String, Object> item) {
// 处理数据经纬度查找对应的流域
item.put("waterCode", GeoJsonUtil.getWaterCode(Double.parseDouble(item.get("lon").toString()), Double.parseDouble(item.get("lat").toString())));
return item;
}
}
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
/**
* Job 监听
*/
public class BatchJobListener implements JobExecutionListener {
private long beingTime;
private long endTime;
@Override
public void beforeJob(JobExecution jobExecution) {
beingTime = System.currentTimeMillis();
System.out.println(jobExecution.getJobInstance().getJobName() + " beforeJob...... " + beingTime);
}
@Override
public void afterJob(JobExecution jobExecution) {
endTime = System.currentTimeMillis();
System.out.println(jobExecution.getJobInstance().getJobName() + " afterJob...... " + endTime);
System.out.println(jobExecution.getJobInstance().getJobName() + "一共耗耗时:【" + (endTime - beingTime) + "】毫秒");
}
}
6、通过经纬度计算流域工具类
import lombok.extern.slf4j.Slf4j;
import org.geotools.feature.FeatureCollection;
import org.geotools.feature.FeatureIterator;
import org.geotools.geojson.feature.FeatureJSON;
import org.geotools.geometry.jts.JTSFactoryFinder;
import org.locationtech.jts.geom.*;
import org.opengis.feature.Feature;
import org.opengis.feature.Property;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
/**
* @Description: GeoJSON工具类
* @author: Mr.xulong
* @date: 2023年01月09日 14:39
*/
@Slf4j
public class GeoJsonUtil {
/*public static void main(String[] args) {
try {
FeatureCollection featureCollection = getFeatureCollection("sichuanliuyu.json");
double x = 106.955085;
double y = 32.09546061139062;
System.out.println(JSON.toJSONString(properties(x,y,featureCollection)));
} catch (Exception e) {
e.printStackTrace();
}
}*/
private static String geoJsonFilePath = "sichuanliuyu.json";
private GeoJsonUtil() {
}
/**
* 获取区域数据集合
*
* @return
*/
public static FeatureCollection getFeatureCollection() {
// 读取 GeoJson 文件
InputStream resourceAsStream = GeoJsonUtil.class.getResourceAsStream("/json/" + geoJsonFilePath);
FeatureJSON featureJSON = new FeatureJSON();
try {
return featureJSON.readFeatureCollection(resourceAsStream);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 判断指定区域集合是否包含某个点
* @param longitude
* @param latitude
* @param featureCollection
* @return
*/
public static boolean contains(double longitude, double latitude, FeatureCollection featureCollection) {
FeatureIterator features = featureCollection.features();
try {
while (features.hasNext()) {
Feature next = features.next();
if (isContains(longitude, latitude, next)) {
return true;
}
}
} finally {
features.close();
}
return false;
}
/**
* 判断指定区域集合是否包含某个点,如果包含,则返回所需属性
* @param longitude
* @param latitude
* @param featureCollection
* @return
*/
public static Map<String, Object> properties(double longitude, double latitude, FeatureCollection featureCollection) {
FeatureIterator features = featureCollection.features();
try {
while (features.hasNext()) {
Feature next = features.next();
boolean contains = isContains(longitude, latitude, next);
// 如果点在面内则返回所需属性
if (contains) {
HashMap<String, Object> properties = new HashMap<>();
properties.put("waterCode", next.getProperty("FID").getValue());
properties.put("waterName", next.getProperty("name").getValue());
return properties;
}
}
} finally {
features.close();
}
return null;
}
/**
* 判断指定区域集合是否包含某个点,如果包含,则返回所需属性
* @param longitude
* @param latitude
* @return
*/
public static Map<String, Object> properties(double longitude, double latitude) {
FeatureCollection featureCollection = getFeatureCollection();
FeatureIterator features = featureCollection.features();
try {
while (features.hasNext()) {
Feature next = features.next();
boolean contains = isContains(longitude, latitude, next);
// 如果点在面内则返回所需属性
if (contains) {
HashMap<String, Object> properties = new HashMap<>();
properties.put("waterCode", next.getProperty("FID").getValue());
properties.put("waterName", next.getProperty("name").getValue());
return properties;
}
}
} finally {
features.close();
}
return null;
}
/**
* 判断指定区域集合是否包含某个点,如果包含,则返回所需属性
* @param longitude
* @param latitude
* @return
*/
public static String getWaterCode(double longitude, double latitude) {
FeatureCollection featureCollection = getFeatureCollection();
FeatureIterator features = featureCollection.features();
try {
while (features.hasNext()) {
Feature next = features.next();
boolean contains = isContains(longitude, latitude, next);
// 如果点在面内则返回所需属性
if (contains) {
return String.valueOf(next.getProperty("FID").getValue());
}
}
} finally {
features.close();
}
return null;
}
private static boolean isContains(double longitude, double latitude, Feature feature) {
// 获取边界数据
Property geometry = feature.getProperty("geometry");
Object value = geometry.getValue();
// 创建坐标的point
GeometryFactory geometryFactory = JTSFactoryFinder.getGeometryFactory();
Point point = geometryFactory.createPoint(new Coordinate(longitude, latitude));
boolean contains = false;
// 判断是单面还是多面
if (value instanceof MultiPolygon) {
MultiPolygon multiPolygon = (MultiPolygon) value;
contains = multiPolygon.contains(point);
} else if (value instanceof Polygon) {
Polygon polygon = (Polygon) value;
contains = polygon.contains(point);
}
return contains;
}
}
7、地图依赖
<geotools.version>27-SNAPSHOT</geotools.version>
<!--地图-->
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-shapefile</artifactId>
<version>${geotools.version}</version>
<exclusions>
<exclusion>
<groupId>org.geotools</groupId>
<artifactId>gt-main</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-main</artifactId>
<version>${geotools.version}</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-geojson</artifactId>
<version>${geotools.version}</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-swing</artifactId>
<version>${geotools.version}</version>
</dependency>
<repositories>
<repository>
<id>osgeo</id>
<name>OSGeo Release Repository</name>
<url>https://repo.osgeo.org/repository/release/</url>
<snapshots><enabled>false</enabled></snapshots>
<releases><enabled>true</enabled></releases>
</repository>
<repository>
<id>osgeo-snapshot</id>
<name>OSGeo Snapshot Repository</name>
<url>https://repo.osgeo.org/repository/snapshot/</url>
<snapshots><enabled>true</enabled></snapshots>
<releases><enabled>false</enabled></releases>
</repository>
</repositories>
参考git项目 springbatch: 这是一个SpringBoot开发的SpringBatch批处理示例,示例主要是将文件30W条数据使用多线程导入到t_cust_temp表,然后又将t_cust_temp表数据使用分片导入到t_cust_info表。下载即可用。