Browse Source

first version

master
FanBin 1 month ago
commit
9082501373
  1. 6
      .gitignore
  2. BIN
      hadoop.dll
  3. 179
      pom.xml
  4. 66
      src/main/java/com/fanbin/FlinkKafkaConsumerExample.java
  5. 40
      src/main/java/com/fanbin/JVM/CustomClassLoader.java
  6. 34
      src/main/java/com/fanbin/JVM/CustomClassLoaderDemo.java
  7. 54
      src/main/java/com/fanbin/KafkaMessageProducer.java
  8. 42
      src/main/java/com/fanbin/Main.java
  9. 33
      src/main/java/com/fanbin/MapPutTest.java
  10. 45
      src/main/java/com/fanbin/SparkFileStreamingReader.java
  11. 86
      src/main/java/com/fanbin/SparkKafkaReader.java
  12. 127
      src/main/java/com/fanbin/ThreadPoolFutureExample.java
  13. 154
      src/main/java/com/fanbin/TransactionAlertSystem.java
  14. 148
      src/main/java/com/fanbin/TransactionMonitoringCEP.java
  15. 29
      src/main/java/com/fanbin/leetcode/LC11.java
  16. 27
      src/main/java/com/fanbin/leetcode/ThreeNum.java
  17. 20
      src/main/resources/log4j.properties
  18. BIN
      winutils.exe

6
.gitignore

@ -0,0 +1,6 @@
/.idea/
/.vscode/
/checkpoints/
/data/
/output/
/target/

BIN
hadoop.dll

Binary file not shown.

179
pom.xml

@ -0,0 +1,179 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fanbin</groupId>
<artifactId>demo</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
<repository>
<id>central</id>
<name>Maven Central Repository</name>
<url>https://repo.maven.apache.org/maven2</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<flink.version>1.17.1</flink.version>
</properties>
<dependencies>
<!-- Spark Core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.3.0</version>
</dependency>
<!-- Spark SQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.0</version>
</dependency>
<!-- Spark Kafka Connector -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.3.0</version>
</dependency>
<!-- Kafka Clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
<!-- Flink Core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-log4j12</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 添加到 <dependencies> 节点下 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-log4j12</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
</dependency>
<!-- Flink CEP 核心 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Flink Kafka Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
<!-- <exclusion>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-log4j12</artifactId>-->
<!-- </exclusion>-->
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<classifier>tests</classifier>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-shaded-guava</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>-->
<!-- Flink JSON Support -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 设置主类 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<archive>
<manifest>
<mainClass>com.fanbin.TransactionAlertSystem</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<!-- 打包所有依赖为可执行jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.fanbin.TransactionAlertSystem</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-surefire-plugin</artifactId>-->
<!-- <version>3.0.0-M7</version>-->
<!-- <configuration>-->
<!-- <argLine>&#45;&#45;add-exports java.base/sun.nio.ch=ALL-UNNAMED</argLine>-->
<!-- </configuration>-->
<!-- </plugin>-->
</project>

66
src/main/java/com/fanbin/FlinkKafkaConsumerExample.java

@ -0,0 +1,66 @@
package com.fanbin;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class FlinkKafkaConsumerExample {
private static final String TOPIC_NAME = "topic_10";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "flink-consumer-group";
public static void main(String[] args) throws Exception {
// 设置Flink执行环境
// final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration config = new Configuration();
config.setInteger(RestOptions.PORT, 8081); // 设置 Web UI 端口
// 创建带 Web UI 的本地执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(config);
// 设置并行度
env.setParallelism(4);
// 添加 Kafka 源(见下一步)
// ...
// 执行任务
// 配置Kafka消费者属性
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
properties.setProperty("group.id", GROUP_ID);
// properties.setProperty("rest.port", "8081"); // 设置Flink REST端口
// 创建KafkaSource
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(BOOTSTRAP_SERVERS)
.setTopics(TOPIC_NAME)
.setGroupId(GROUP_ID)
// .setProperties(properties)
.setStartingOffsets(OffsetsInitializer.earliest()) // 从最早偏移量开始消费
.setValueOnlyDeserializer(new SimpleStringSchema()) // 只反序列化value
.build();
// 从Kafka读取数据
DataStream<String> kafkaStream = env.fromSource(source,
WatermarkStrategy.noWatermarks(), // 不使用水印
"Kafka Source");
// 处理数据 - 打印到控制台
kafkaStream.print();
// 执行Flink作业
env.execute("Flink Kafka Consumer Example");
}
}

40
src/main/java/com/fanbin/JVM/CustomClassLoader.java

@ -0,0 +1,40 @@
package com.fanbin.JVM;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
public class CustomClassLoader extends ClassLoader {
private final String classPath;
public CustomClassLoader(String classPath) {
this.classPath = classPath;
}
@Override
protected Class<?> findClass(String name) throws ClassNotFoundException {
try {
byte[] classData = loadClassData(name);
return defineClass(name, classData, 0, classData.length);
} catch (IOException e) {
throw new ClassNotFoundException("Class not found: " + name, e);
}
}
private byte[] loadClassData(String className) throws IOException {
String path = classPath + File.separatorChar +
className.replace('.', File.separatorChar) + ".class";
try (FileInputStream fis = new FileInputStream(path);
ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
byte[] buffer = new byte[4096];
int bytesRead;
while ((bytesRead = fis.read(buffer)) != -1) {
bos.write(buffer, 0, bytesRead);
}
return bos.toByteArray();
}
}
}

34
src/main/java/com/fanbin/JVM/CustomClassLoaderDemo.java

@ -0,0 +1,34 @@
package com.fanbin.JVM;
import java.lang.reflect.Method;
public class CustomClassLoaderDemo {
public static void main(String[] args) throws Exception {
// 1. 创建自定义类加载器实例
String classPath = "target/classes";
CustomClassLoader loader = new CustomClassLoader(classPath);
// 2. 加载类
String className = "com.fanbin.JVM.MapPutTest";
Class<?> dynamicClass = loader.loadClass(className);
// 3. 创建实例
Object instance = dynamicClass.getDeclaredConstructor().newInstance();
// 4. 调用方法(通过反射)
// 假设MyDynamicClass有一个doWork方法
Method doWorkMethod = dynamicClass.getMethod("generateRandomString");
doWorkMethod.invoke(instance);
// 5. 使用接口调用(推荐)
if (instance instanceof MyInterface) {
((MyInterface) instance).doWork();
}
}
// 公共接口(由系统类加载器加载)
public interface MyInterface {
void doWork();
}
}

54
src/main/java/com/fanbin/KafkaMessageProducer.java

@ -0,0 +1,54 @@
package com.fanbin;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class KafkaMessageProducer {
private static final String TOPIC_NAME = "topic_100";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
// 配置Kafka生产者属性
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 确保消息被所有副本确认
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
// 创建Kafka生产者实例
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
// 发送示例消息
for (int i = 0; i < 1000_0000; i++) {
String key = "key-" + i;
String value = "这是第 " + i + " 条测试消息";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, key, value);
// 异步发送消息并处理回调
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("消息发送成功 - Topic: %s, Partition: %d, Offset: %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
System.err.printf("消息发送失败: %s%n", exception.getMessage());
}
});
// 模拟消息生成间隔
// TimeUnit.MILLISECONDS.sleep(1);
}
System.out.println("所有消息发送完成");
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// System.err.println("生产者线程被中断: " + e.getMessage());
} catch (Exception e) {
System.err.println("生产者发生异常: " + e.getMessage());
}
}
}

42
src/main/java/com/fanbin/Main.java

@ -0,0 +1,42 @@
package com.fanbin;
import java.util.concurrent.CountDownLatch;
public class Main {
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();
private static final CountDownLatch latch = new CountDownLatch(2);
public static void main(String[] args) {
Thread thread1 = new Thread(() -> {
synchronized (lock1) {
System.out.println("Thread 1: Holding lock1...");
latch.countDown(); // 让 thread2 也开始执行
try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } // 保证两个线程同时竞争
System.out.println("Thread 1: need lock2!");
synchronized (lock2) {
System.out.println("Thread 1: Acquired lock2!");
}
}
});
Thread thread2 = new Thread(() -> {
synchronized (lock2) {
System.out.println("Thread 2: Holding lock2...");
latch.countDown(); // 让 thread1 也开始执行
try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } // 保证两个线程同时竞争
System.out.println("Thread 2: need lock1!");
synchronized (lock1) {
System.out.println("Thread 2: Acquired lock1!");
}
}
});
thread1.start();
thread2.start();
}
}

33
src/main/java/com/fanbin/MapPutTest.java

@ -0,0 +1,33 @@
package com.fanbin;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.stream.IntStream;
public class MapPutTest {
public static void main(String[] args) {
System.out.println(Integer.toBinaryString(hash("1231111111111111111111111111111111111111111111111111")));
}
static int hash(Object key) {
int h;
// 使用对象的hashCode()方法计算哈希值
System.out.println(Integer.toBinaryString(key.hashCode()));
System.out.println(Integer.toBinaryString(key.hashCode() >>> 16));
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
private static final String CHARACTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
private static final int LENGTH = 20;
public static String generateRandomString() {
Random random = new Random();
StringBuilder sb = new StringBuilder(LENGTH);
for (int i = 0; i < LENGTH; i++) {
int index = random.nextInt(CHARACTERS.length());
sb.append(CHARACTERS.charAt(index));
}
return sb.toString();
}
}

45
src/main/java/com/fanbin/SparkFileStreamingReader.java

@ -0,0 +1,45 @@
package com.fanbin;
import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
public class SparkFileStreamingReader {
public static void main(String[] args) throws StreamingQueryException, TimeoutException {
// 创建SparkSession
SparkSession spark = SparkSession.builder()
.appName("Spark File Streaming Reader")
.master("local[*]")
.getOrCreate();
// 从文件系统读取流数据(支持CSV、JSON、Parquet等格式)
Dataset<Row> df = spark.readStream()
.format("csv")
.schema(new StructType()
.add("name", DataTypes.StringType, true)
.add("age", DataTypes.IntegerType, true))
.option("header", "true") // 使用CSV表头
.load("data/input/"); // 监控的目录路径
// 简单的数据处理:显示数据结构
df.printSchema();
// 将结果输出到文件系统(Parquet格式)
StreamingQuery query = df.repartition(2).writeStream()
.outputMode("append")
.format("csv") // 支持parquet/csv/json等格式
.option("path", "data/output/") // 输出文件路径
.option("checkpointLocation", "data/checkpoint/") // 流式检查点路径
.start();
query.awaitTermination();
}
}

86
src/main/java/com/fanbin/SparkKafkaReader.java

@ -0,0 +1,86 @@
package com.fanbin;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import com.codahale.metrics.MetricRegistry;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SparkKafkaReader {
private static final Logger logger = LoggerFactory.getLogger(SparkKafkaReader.class);
private static final MetricRegistry metrics = new MetricRegistry();
public static void main(String[] args) throws StreamingQueryException, TimeoutException {
// 创建SparkSession
SparkSession spark = SparkSession.builder()
.appName("Spark Kafka Reader")
.master("local[*]")
.getOrCreate();
// 从Kafka读取数据
Dataset<Row> df = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092") // Kafka服务地址
.option("subscribe", "topic_100")
// .option("kafka.group.id", "spark-hdfs-group")
.option("startingOffsets", "earliest") // 首次启动从最早开始
.option("failOnDataLoss", "false") // 订阅的主题
.load();
// 选择并处理数据(示例:将value转换为字符串)
Dataset<Row> result = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
StreamingQuery query = result.writeStream()
.foreachBatch((batchDF, batchId) -> {
// 批次开始日志
logger.info("🚀 Starting batch {}", batchId);
metrics.counter("batches.started").inc();
try {
// 处理前指标
long inputCount = batchDF.count();
metrics.histogram("batch.size").update(inputCount);
logger.info("📥 Input records: {}", inputCount);
// 业务处理
Dataset<Row> processedDF = processData(batchDF);
// 处理后指标
long outputCount = processedDF.count();
metrics.histogram("output.size").update(outputCount);
logger.info("📤 Output records: {}", outputCount);
// 0: 100 --> 101 105 (A) 110(B)
// 写入结果
processedDF.repartition((int) (outputCount/1000000 ==0 ? 1 : outputCount/1000000)).write().mode("append").csv("data/output/topic_100/");
// 成功指标
metrics.counter("batches.completed").inc();
logger.info("✅ Batch {} completed successfully", batchId);
} catch (Exception e) {
// 错误处理
metrics.counter("batches.failed").inc();
logger.error("❌ Batch {} failed: {}", batchId, e.getMessage(), e);
}
})
.trigger(Trigger.ProcessingTime("30 seconds"))
.option("checkpointLocation", "checkpoints/topic_100") // 必须设置检查点!
// .option("kafka.group.id", "spark-hdfs-group")
.start();
query.awaitTermination();
}
private static Dataset<Row> processData(Dataset<Row> df) {
return df.withColumn("processed_time", functions.current_timestamp());
}
}

127
src/main/java/com/fanbin/ThreadPoolFutureExample.java

@ -0,0 +1,127 @@
package com.fanbin;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class ThreadPoolFutureExample {
// 自定义任务类
static class CalculationTask implements Callable<Integer> {
private final int taskId;
private final int value;
public CalculationTask(int taskId, int value) {
this.taskId = taskId;
this.value = value;
}
@Override
public Integer call() throws Exception {
System.out.println("Task-" + taskId + " started. Thread: " +
Thread.currentThread().getName());
// 模拟耗时计算
int result = 0;
for (int i = 1; i <= value; i++) {
result += i;
Thread.sleep(1000); // 模拟计算耗时
}
// 随机抛出异常
if (taskId % 4 == 0) {
throw new RuntimeException("Task-" + taskId + " simulated exception");
}
System.out.println("Task-" + taskId + " completed. Result: " + result);
return result;
}
}
static class MyExceptionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.err.println("任务被拒绝: " + r.toString());
}
}
public static void main(String[] args) {
// 1. 创建线程池 (核心线程2,最大线程4,空闲线程存活时间60秒,任务队列容量10)
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
6,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5),
new MyExceptionHandler() // 当队列满时,由调用线程执行任务
);
// 2. 创建任务列表
List<Future<Integer>> futures = new ArrayList<>();
for (int i = 1; i <= 15; i++) {
CalculationTask task = new CalculationTask(i, i * 10);
// 提交任务并获取Future对象
Future<Integer> future = executor.submit(task);
futures.add(future);
}
// 3. 获取任务结果
for (int i = 0; i < futures.size(); i++) {
try {
// 设置超时时间,避免无限等待
Integer result = futures.get(i).get(20, TimeUnit.SECONDS);
System.out.println("Got result for task-" + (i+1) + ": " + result);
} catch (InterruptedException e) {
System.err.println("Task interrupted: " + e.getMessage());
} catch (ExecutionException e) {
System.err.println("Task execution failed for task-" + (i+1) +
": " + e.getCause().getMessage());
} catch (TimeoutException e) {
System.err.println("Task-" + (i+1) + " timed out. Cancelling...");
futures.get(i).cancel(true); // 尝试中断任务
}
}
// 4. 执行一个特殊任务(带返回值的)
Future<Integer> specialTask = executor.submit(() -> {
System.out.println("Special factorial task started");
int result = 1;
for (int i = 1; i <= 5; i++) {
result *= i;
Thread.sleep(100);
}
return result;
});
// 5. 轮询等待特殊任务完成
while (!specialTask.isDone()) {
System.out.println("Waiting for special task to complete...");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
try {
System.out.println("Special task result: " + specialTask.get());
} catch (Exception e) {
System.err.println("Special task failed: " + e.getMessage());
}
// 6. 关闭线程池
executor.shutdown(); // 不再接受新任务
try {
// 等待现有任务完成,最多等待5秒
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.err.println("Forcing shutdown of remaining tasks");
executor.shutdownNow(); // 尝试取消所有任务
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("All tasks completed. Final thread pool status:");
System.out.println(" Completed tasks: " + executor.getCompletedTaskCount());
System.out.println(" Active threads: " + executor.getActiveCount());
}
}

154
src/main/java/com/fanbin/TransactionAlertSystem.java

@ -0,0 +1,154 @@
package com.fanbin;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class TransactionAlertSystem {
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
config.setInteger(RestOptions.PORT, 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(config);
env.setParallelism(4);
DataStream<Transaction> transactions = env.fromElements(
new Transaction("acc1", 0.5, System.currentTimeMillis()),
new Transaction("acc1", 600.0, System.currentTimeMillis() + 1000),
new Transaction("acc1", 300.0, System.currentTimeMillis() + 2000),
new Transaction("acc2", 0.8, System.currentTimeMillis() + 3000),
new Transaction("acc2", 100.0, System.currentTimeMillis() + 4000),
new Transaction("acc3", 0.9, System.currentTimeMillis() + 5000),
new Transaction("acc3", 700.0, System.currentTimeMillis() + 6000)
).assignTimestampsAndWatermarks(
WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.timestamp));
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.flatMap(new SuspiciousTransactionPatternDetector());
alerts.print();
FileSink<Alert> sink = FileSink
.forRowFormat(new Path("output/alerts"), new SimpleStringEncoder<Alert>("UTF-8"))
.build();
FileSink<Transaction> sink1 = FileSink
.forRowFormat(new Path("output/alerts"), new SimpleStringEncoder<Transaction>("UTF-8"))
.build();
alerts.sinkTo(sink);
transactions.sinkTo(sink1);
env.execute("Transaction Monitoring Job");
}
public static class Transaction {
private String accountId;
private double amount;
private long timestamp;
public Transaction() {}
public Transaction(String accountId, double amount, long timestamp) {
this.accountId = accountId;
this.amount = amount;
this.timestamp = timestamp;
}
public String getAccountId() {
return accountId;
}
public double getAmount() {
return amount;
}
public long getTimestamp() {
return timestamp;
}
@Override
public String toString() {
return "Transaction{" +
"accountId='" + accountId + '\'' +
", amount=" + amount +
", timestamp=" + timestamp +
'}';
}
}
public static class Alert {
private String accountId;
private double smallAmount;
private double largeAmount;
private long smallTimestamp;
private long largeTimestamp;
public Alert(String accountId, double smallAmount, double largeAmount,
long smallTimestamp, long largeTimestamp) {
this.accountId = accountId;
this.smallAmount = smallAmount;
this.largeAmount = largeAmount;
this.smallTimestamp = smallTimestamp;
this.largeTimestamp = largeTimestamp;
}
@Override
public String toString() {
return "🚨 ALERT for account " + accountId +
": Detected suspicious pattern!\n" +
" Small transaction: $" + smallAmount + " at " + smallTimestamp + "\n" +
" Followed by large transaction: $" + largeAmount + " at " + largeTimestamp;
}
}
private static class SuspiciousTransactionPatternDetector
extends RichFlatMapFunction<Transaction, Alert> {
private transient ValueState<Transaction> lastSmallTransactionState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Transaction> descriptor =
new ValueStateDescriptor<>("lastSmallTx", Transaction.class);
lastSmallTransactionState = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Transaction transaction, Collector<Alert> out) throws Exception {
Transaction lastSmallTx = lastSmallTransactionState.value();
if (transaction.getAmount() < 1.0) {
lastSmallTransactionState.update(transaction);
} else if (transaction.getAmount() > 500.0) {
if (lastSmallTx != null) {
if (transaction.getTimestamp() > lastSmallTx.getTimestamp()) {
out.collect(new Alert(
transaction.getAccountId(),
lastSmallTx.getAmount(),
transaction.getAmount(),
lastSmallTx.getTimestamp(),
transaction.getTimestamp()
));
}
lastSmallTransactionState.clear();
}
} else {
lastSmallTransactionState.clear();
}
}
}
}

148
src/main/java/com/fanbin/TransactionMonitoringCEP.java

@ -0,0 +1,148 @@
package com.fanbin;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.List;
import java.util.Map;
public class TransactionMonitoringCEP {
public static void main(String[] args) throws Exception {
// 1. 设置执行环境
Configuration config = new Configuration();
config.setInteger(RestOptions.PORT, 8081); // 设置 Web UI 端口
// 创建带 Web UI 的本地执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(config);
env.setParallelism(4); // 方便调试,生产环境可调整
// 2. 创建模拟交易数据流
DataStream<TransactionEvent> transactionStream = env
.fromElements(
// 正常交易序列
new TransactionEvent("acc-001", 0.5, System.currentTimeMillis() - 10000),
new TransactionEvent("acc-001", 100, System.currentTimeMillis() - 5000),
new TransactionEvent("acc-001", 0.8, System.currentTimeMillis() - 3000),
new TransactionEvent("acc-001", 600, System.currentTimeMillis() - 1000), // 应触发报警
// 另一个账户的触发序列
new TransactionEvent("acc-002", 0.3, System.currentTimeMillis() - 800),
new TransactionEvent("acc-002", 700, System.currentTimeMillis() - 100), // 应触发报警
// 不应触发报警的序列
new TransactionEvent("acc-003", 0.9, System.currentTimeMillis() - 600),
new TransactionEvent("acc-003", 300, System.currentTimeMillis() - 300) // 金额不足$500
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<TransactionEvent>forMonotonousTimestamps()
.withTimestampAssigner(
(SerializableTimestampAssigner<TransactionEvent>) (event, recordTimestamp) -> event.timestamp
)
)
.keyBy(event -> event.accountId); // 按账户ID分组
// 3. 定义CEP模式
Pattern<TransactionEvent, ?> alertPattern = Pattern.<TransactionEvent>begin("smallTx")
.where(new SimpleCondition<TransactionEvent>() {
@Override
public boolean filter(TransactionEvent event) {
return event.amount < 1.0; // 小于$1的交易
}
})
.next("largeTx") // 严格连续的下一个事件
.where(new SimpleCondition<TransactionEvent>() {
@Override
public boolean filter(TransactionEvent event) {
return event.amount > 500.0; // 大于$500的交易
}
})
.within(Time.minutes(5)); // 5分钟内发生
// 4. 应用模式到数据流
PatternStream<TransactionEvent> patternStream = CEP.pattern(
transactionStream,
alertPattern
);
// 5. 处理匹配事件
DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<TransactionEvent, Alert>() {
@Override
public Alert select(Map<String, List<TransactionEvent>> pattern) throws Exception {
TransactionEvent smallTx = pattern.get("smallTx").get(0);
TransactionEvent largeTx = pattern.get("largeTx").get(0);
return new Alert(
smallTx.accountId,
smallTx.amount,
largeTx.amount,
"检测到小额交易后立即大额交易"
);
}
});
// 6. 输出报警
alerts.print();
// 7. 执行作业
env.execute("Transaction Monitoring with CEP");
}
// 交易事件类
public static class TransactionEvent {
public String accountId;
public double amount;
public long timestamp;
public TransactionEvent() {}
public TransactionEvent(String accountId, double amount, long timestamp) {
this.accountId = accountId;
this.amount = amount;
this.timestamp = timestamp;
}
@Override
public String toString() {
return String.format("账户: %s, 金额: $%.2f, 时间: %d", accountId, amount, timestamp);
}
}
// 报警类
public static class Alert {
public String accountId;
public double smallAmount;
public double largeAmount;
public String message;
public Alert() {}
public Alert(String accountId, double smallAmount, double largeAmount, String message) {
this.accountId = accountId;
this.smallAmount = smallAmount;
this.largeAmount = largeAmount;
this.message = message;
}
@Override
public String toString() {
return String.format(
"【交易异常报警】账户: %s\n" +
"-> 小额交易: $%.2f\n" +
"-> 紧随的大额交易: $%.2f\n" +
"-> 报警原因: %s",
accountId, smallAmount, largeAmount, message
);
}
}
}

29
src/main/java/com/fanbin/leetcode/LC11.java

@ -0,0 +1,29 @@
package com.fanbin.leetcode;
public class LC11 {
public static int maxArea(int[] height) {
int left = 0, right = height.length - 1;
int maxArea = 0;
while (left < right) {
// 计算当前容器的面积
int width = right - left;
int currentHeight = Math.min(height[left], height[right]);
int currentArea = width * currentHeight;
maxArea = Math.max(maxArea, currentArea);
// 移动较短的边界
if (height[left] < height[right]) {
left++;
} else {
right--;
}
}
return maxArea;
}
public static void main(String[] args) {
int[] height = {6,2,6,2,5,4,8,3,7};
int result = maxArea(height);
System.out.println("最大面积为: " + result);
}
}

27
src/main/java/com/fanbin/leetcode/ThreeNum.java

@ -0,0 +1,27 @@
package com.fanbin.leetcode;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
class ThreeNum {
public List<List<Integer>> threeSum(int[] nums) {
List<List<Integer>> result = new ArrayList<List<Integer>>(20);
for(int i=0; i< nums.length; i++) {
for(int j=i+1; j < nums.length; j++) {
for (int k=j+1; k< nums.length; k++) {
if (nums[i] + nums[j] + nums[k] == 0) {
List<Integer> arr = new ArrayList<Integer>(16);
arr.add(i);
arr.add(j);
arr.add(k);
result.add(arr);
}
}
}
}
return result.stream().distinct().toList();
}
}

20
src/main/resources/log4j.properties

@ -0,0 +1,20 @@
# ???????
log4j.rootLogger=DEBUG, console, file
# ?????
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
# ????
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=spark-streaming.log
log4j.appender.file.MaxFileSize=100MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
# ???????
log4j.logger.org.apache.spark=DEBUG
log4j.logger.org.apache.kafka=DEBUG
log4j.logger.com.fanbin=DEBUG

BIN
winutils.exe

Binary file not shown.
Loading…
Cancel
Save