commit 9082501373bc2d8cf8cc46b8e63c677700ed46b8 Author: FanBin Date: Tue Jul 22 22:20:01 2025 +0800 first version diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d613c08 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +/.idea/ +/.vscode/ +/checkpoints/ +/data/ +/output/ +/target/ diff --git a/hadoop.dll b/hadoop.dll new file mode 100644 index 0000000..60fb8b1 Binary files /dev/null and b/hadoop.dll differ diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..3edf433 --- /dev/null +++ b/pom.xml @@ -0,0 +1,179 @@ + + + 4.0.0 + + com.fanbin + demo + 1.0-SNAPSHOT + + + central + Maven Central Repository + https://repo.maven.apache.org/maven2 + + false + + + + + 17 + 17 + 1.17.1 + + + + + + org.apache.spark + spark-core_2.12 + 3.3.0 + + + + org.apache.spark + spark-sql_2.12 + 3.3.0 + + + + org.apache.spark + spark-sql-kafka-0-10_2.12 + 3.3.0 + + + + org.apache.kafka + kafka-clients + 3.2.0 + + + + org.apache.flink + flink-java + ${flink.version} + + + org.apache.flink + flink-clients + ${flink.version} + + + + + + + + + org.apache.flink + flink-runtime-web + ${flink.version} + + + + org.apache.flink + flink-connector-files + ${flink.version} + + + org.apache.flink + flink-streaming-java + ${flink.version} + + + + + + + + + + org.apache.flink + flink-cep + ${flink.version} + provided + + + + org.apache.flink + flink-connector-kafka + ${flink.version} + + + org.apache.kafka + kafka-clients + + + + + + + + + org.apache.flink + flink-runtime + ${flink.version} + tests + + + + + + + + + org.apache.flink + flink-json + ${flink.version} + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.2.2 + + + + com.fanbin.TransactionAlertSystem + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.4.1 + + + package + + shade + + + false + + + com.fanbin.TransactionAlertSystem + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/main/java/com/fanbin/FlinkKafkaConsumerExample.java b/src/main/java/com/fanbin/FlinkKafkaConsumerExample.java new file mode 100644 index 0000000..1dfa784 --- /dev/null +++ b/src/main/java/com/fanbin/FlinkKafkaConsumerExample.java @@ -0,0 +1,66 @@ +package com.fanbin; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.Properties; + +public class FlinkKafkaConsumerExample { + private static final String TOPIC_NAME = "topic_10"; + private static final String BOOTSTRAP_SERVERS = "localhost:9092"; + private static final String GROUP_ID = "flink-consumer-group"; + + public static void main(String[] args) throws Exception { + // 设置Flink执行环境 +// final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Configuration config = new Configuration(); + config.setInteger(RestOptions.PORT, 8081); // 设置 Web UI 端口 + + // 创建带 Web UI 的本地执行环境 + StreamExecutionEnvironment env = StreamExecutionEnvironment + .createLocalEnvironmentWithWebUI(config); + + // 设置并行度 + env.setParallelism(4); + + // 添加 Kafka 源(见下一步) + // ... + + // 执行任务 + // 配置Kafka消费者属性 + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS); + properties.setProperty("group.id", GROUP_ID); +// properties.setProperty("rest.port", "8081"); // 设置Flink REST端口 + + // 创建KafkaSource + KafkaSource source = KafkaSource.builder() + .setBootstrapServers(BOOTSTRAP_SERVERS) + .setTopics(TOPIC_NAME) + .setGroupId(GROUP_ID) +// .setProperties(properties) + .setStartingOffsets(OffsetsInitializer.earliest()) // 从最早偏移量开始消费 + .setValueOnlyDeserializer(new SimpleStringSchema()) // 只反序列化value + .build(); + + // 从Kafka读取数据 + DataStream kafkaStream = env.fromSource(source, + WatermarkStrategy.noWatermarks(), // 不使用水印 + "Kafka Source"); + + // 处理数据 - 打印到控制台 + kafkaStream.print(); + + // 执行Flink作业 + env.execute("Flink Kafka Consumer Example"); + + + + } +} \ No newline at end of file diff --git a/src/main/java/com/fanbin/JVM/CustomClassLoader.java b/src/main/java/com/fanbin/JVM/CustomClassLoader.java new file mode 100644 index 0000000..eff9de4 --- /dev/null +++ b/src/main/java/com/fanbin/JVM/CustomClassLoader.java @@ -0,0 +1,40 @@ +package com.fanbin.JVM; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; + +public class CustomClassLoader extends ClassLoader { + + private final String classPath; + + public CustomClassLoader(String classPath) { + this.classPath = classPath; + } + + @Override + protected Class findClass(String name) throws ClassNotFoundException { + try { + byte[] classData = loadClassData(name); + return defineClass(name, classData, 0, classData.length); + } catch (IOException e) { + throw new ClassNotFoundException("Class not found: " + name, e); + } + } + + private byte[] loadClassData(String className) throws IOException { + String path = classPath + File.separatorChar + + className.replace('.', File.separatorChar) + ".class"; + try (FileInputStream fis = new FileInputStream(path); + ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + + byte[] buffer = new byte[4096]; + int bytesRead; + while ((bytesRead = fis.read(buffer)) != -1) { + bos.write(buffer, 0, bytesRead); + } + return bos.toByteArray(); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/fanbin/JVM/CustomClassLoaderDemo.java b/src/main/java/com/fanbin/JVM/CustomClassLoaderDemo.java new file mode 100644 index 0000000..3ccfde8 --- /dev/null +++ b/src/main/java/com/fanbin/JVM/CustomClassLoaderDemo.java @@ -0,0 +1,34 @@ +package com.fanbin.JVM; + +import java.lang.reflect.Method; + +public class CustomClassLoaderDemo { + + public static void main(String[] args) throws Exception { + // 1. 创建自定义类加载器实例 + String classPath = "target/classes"; + CustomClassLoader loader = new CustomClassLoader(classPath); + + // 2. 加载类 + String className = "com.fanbin.JVM.MapPutTest"; + Class dynamicClass = loader.loadClass(className); + + // 3. 创建实例 + Object instance = dynamicClass.getDeclaredConstructor().newInstance(); + + // 4. 调用方法(通过反射) + // 假设MyDynamicClass有一个doWork方法 + Method doWorkMethod = dynamicClass.getMethod("generateRandomString"); + doWorkMethod.invoke(instance); + + // 5. 使用接口调用(推荐) + if (instance instanceof MyInterface) { + ((MyInterface) instance).doWork(); + } + } + + // 公共接口(由系统类加载器加载) + public interface MyInterface { + void doWork(); + } +} \ No newline at end of file diff --git a/src/main/java/com/fanbin/KafkaMessageProducer.java b/src/main/java/com/fanbin/KafkaMessageProducer.java new file mode 100644 index 0000000..d4de14c --- /dev/null +++ b/src/main/java/com/fanbin/KafkaMessageProducer.java @@ -0,0 +1,54 @@ +package com.fanbin; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +public class KafkaMessageProducer { + private static final String TOPIC_NAME = "topic_100"; + private static final String BOOTSTRAP_SERVERS = "localhost:9092"; + + public static void main(String[] args) { + // 配置Kafka生产者属性 + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.ACKS_CONFIG, "all"); // 确保消息被所有副本确认 + props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数 + + // 创建Kafka生产者实例 + try (KafkaProducer producer = new KafkaProducer<>(props)) { + // 发送示例消息 + for (int i = 0; i < 1000_0000; i++) { + String key = "key-" + i; + String value = "这是第 " + i + " 条测试消息"; + ProducerRecord record = new ProducerRecord<>(TOPIC_NAME, key, value); + + // 异步发送消息并处理回调 + producer.send(record, (metadata, exception) -> { + if (exception == null) { + System.out.printf("消息发送成功 - Topic: %s, Partition: %d, Offset: %d%n", + metadata.topic(), metadata.partition(), metadata.offset()); + } else { + System.err.printf("消息发送失败: %s%n", exception.getMessage()); + } + }); + + // 模拟消息生成间隔 +// TimeUnit.MILLISECONDS.sleep(1); + } + + System.out.println("所有消息发送完成"); +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// System.err.println("生产者线程被中断: " + e.getMessage()); + } catch (Exception e) { + System.err.println("生产者发生异常: " + e.getMessage()); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/fanbin/Main.java b/src/main/java/com/fanbin/Main.java new file mode 100644 index 0000000..824659b --- /dev/null +++ b/src/main/java/com/fanbin/Main.java @@ -0,0 +1,42 @@ +package com.fanbin; + +import java.util.concurrent.CountDownLatch; + +public class Main { + private static final Object lock1 = new Object(); + private static final Object lock2 = new Object(); + private static final CountDownLatch latch = new CountDownLatch(2); + + public static void main(String[] args) { + Thread thread1 = new Thread(() -> { + synchronized (lock1) { + System.out.println("Thread 1: Holding lock1..."); + latch.countDown(); // 让 thread2 也开始执行 + + try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } // 保证两个线程同时竞争 + System.out.println("Thread 1: need lock2!"); + + synchronized (lock2) { + System.out.println("Thread 1: Acquired lock2!"); + } + } + }); + + Thread thread2 = new Thread(() -> { + synchronized (lock2) { + System.out.println("Thread 2: Holding lock2..."); + latch.countDown(); // 让 thread1 也开始执行 + + try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } // 保证两个线程同时竞争 + System.out.println("Thread 2: need lock1!"); + + synchronized (lock1) { + System.out.println("Thread 2: Acquired lock1!"); + } + } + }); + + thread1.start(); + thread2.start(); + } +} \ No newline at end of file diff --git a/src/main/java/com/fanbin/MapPutTest.java b/src/main/java/com/fanbin/MapPutTest.java new file mode 100644 index 0000000..b965a8e --- /dev/null +++ b/src/main/java/com/fanbin/MapPutTest.java @@ -0,0 +1,33 @@ +package com.fanbin; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.stream.IntStream; + +public class MapPutTest { + + public static void main(String[] args) { + System.out.println(Integer.toBinaryString(hash("1231111111111111111111111111111111111111111111111111"))); + } + static int hash(Object key) { + int h; + // 使用对象的hashCode()方法计算哈希值 + System.out.println(Integer.toBinaryString(key.hashCode())); + System.out.println(Integer.toBinaryString(key.hashCode() >>> 16)); + return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); + } + private static final String CHARACTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + private static final int LENGTH = 20; + + public static String generateRandomString() { + Random random = new Random(); + StringBuilder sb = new StringBuilder(LENGTH); + for (int i = 0; i < LENGTH; i++) { + int index = random.nextInt(CHARACTERS.length()); + sb.append(CHARACTERS.charAt(index)); + } + return sb.toString(); + } +} \ No newline at end of file diff --git a/src/main/java/com/fanbin/SparkFileStreamingReader.java b/src/main/java/com/fanbin/SparkFileStreamingReader.java new file mode 100644 index 0000000..d379545 --- /dev/null +++ b/src/main/java/com/fanbin/SparkFileStreamingReader.java @@ -0,0 +1,45 @@ +package com.fanbin; + +import java.util.concurrent.TimeoutException; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.streaming.StreamingQuery; +import org.apache.spark.sql.streaming.StreamingQueryException; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; + +public class SparkFileStreamingReader { + public static void main(String[] args) throws StreamingQueryException, TimeoutException { + // 创建SparkSession + SparkSession spark = SparkSession.builder() + .appName("Spark File Streaming Reader") + .master("local[*]") + .getOrCreate(); + + // 从文件系统读取流数据(支持CSV、JSON、Parquet等格式) + Dataset df = spark.readStream() + .format("csv") + .schema(new StructType() + .add("name", DataTypes.StringType, true) + .add("age", DataTypes.IntegerType, true)) + .option("header", "true") // 使用CSV表头 + .load("data/input/"); // 监控的目录路径 + + // 简单的数据处理:显示数据结构 + df.printSchema(); + + // 将结果输出到文件系统(Parquet格式) + StreamingQuery query = df.repartition(2).writeStream() + .outputMode("append") + .format("csv") // 支持parquet/csv/json等格式 + .option("path", "data/output/") // 输出文件路径 + .option("checkpointLocation", "data/checkpoint/") // 流式检查点路径 + .start(); + + query.awaitTermination(); + } +} \ No newline at end of file diff --git a/src/main/java/com/fanbin/SparkKafkaReader.java b/src/main/java/com/fanbin/SparkKafkaReader.java new file mode 100644 index 0000000..de4c265 --- /dev/null +++ b/src/main/java/com/fanbin/SparkKafkaReader.java @@ -0,0 +1,86 @@ +package com.fanbin; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +import com.codahale.metrics.MetricRegistry; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.streaming.StreamingQuery; +import org.apache.spark.sql.streaming.StreamingQueryException; +import org.apache.spark.sql.streaming.Trigger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SparkKafkaReader { + private static final Logger logger = LoggerFactory.getLogger(SparkKafkaReader.class); + private static final MetricRegistry metrics = new MetricRegistry(); + + public static void main(String[] args) throws StreamingQueryException, TimeoutException { + // 创建SparkSession + SparkSession spark = SparkSession.builder() + .appName("Spark Kafka Reader") + .master("local[*]") + .getOrCreate(); + + // 从Kafka读取数据 + Dataset df = spark.readStream() + .format("kafka") + .option("kafka.bootstrap.servers", "localhost:9092") // Kafka服务地址 + .option("subscribe", "topic_100") +// .option("kafka.group.id", "spark-hdfs-group") + .option("startingOffsets", "earliest") // 首次启动从最早开始 + .option("failOnDataLoss", "false") // 订阅的主题 + .load(); + + // 选择并处理数据(示例:将value转换为字符串) + Dataset result = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); + + StreamingQuery query = result.writeStream() + .foreachBatch((batchDF, batchId) -> { + // 批次开始日志 + logger.info("🚀 Starting batch {}", batchId); + metrics.counter("batches.started").inc(); + + try { + // 处理前指标 + long inputCount = batchDF.count(); + metrics.histogram("batch.size").update(inputCount); + logger.info("📥 Input records: {}", inputCount); + + // 业务处理 + Dataset processedDF = processData(batchDF); + + // 处理后指标 + long outputCount = processedDF.count(); + metrics.histogram("output.size").update(outputCount); + logger.info("📤 Output records: {}", outputCount); + + // 0: 100 --> 101 105 (A) 110(B) + // 写入结果 + processedDF.repartition((int) (outputCount/1000000 ==0 ? 1 : outputCount/1000000)).write().mode("append").csv("data/output/topic_100/"); + + // 成功指标 + metrics.counter("batches.completed").inc(); + logger.info("✅ Batch {} completed successfully", batchId); + } catch (Exception e) { + // 错误处理 + metrics.counter("batches.failed").inc(); + logger.error("❌ Batch {} failed: {}", batchId, e.getMessage(), e); + } + }) + .trigger(Trigger.ProcessingTime("30 seconds")) + .option("checkpointLocation", "checkpoints/topic_100") // 必须设置检查点! +// .option("kafka.group.id", "spark-hdfs-group") + .start(); + query.awaitTermination(); + } + + private static Dataset processData(Dataset df) { + return df.withColumn("processed_time", functions.current_timestamp()); + } +} \ No newline at end of file diff --git a/src/main/java/com/fanbin/ThreadPoolFutureExample.java b/src/main/java/com/fanbin/ThreadPoolFutureExample.java new file mode 100644 index 0000000..968f2ff --- /dev/null +++ b/src/main/java/com/fanbin/ThreadPoolFutureExample.java @@ -0,0 +1,127 @@ +package com.fanbin; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +public class ThreadPoolFutureExample { + + // 自定义任务类 + static class CalculationTask implements Callable { + private final int taskId; + private final int value; + + public CalculationTask(int taskId, int value) { + this.taskId = taskId; + this.value = value; + } + + @Override + public Integer call() throws Exception { + System.out.println("Task-" + taskId + " started. Thread: " + + Thread.currentThread().getName()); + + // 模拟耗时计算 + int result = 0; + for (int i = 1; i <= value; i++) { + result += i; + Thread.sleep(1000); // 模拟计算耗时 + } + + // 随机抛出异常 + if (taskId % 4 == 0) { + throw new RuntimeException("Task-" + taskId + " simulated exception"); + } + + System.out.println("Task-" + taskId + " completed. Result: " + result); + return result; + } + } + static class MyExceptionHandler implements RejectedExecutionHandler { + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + System.err.println("任务被拒绝: " + r.toString()); + } + } + public static void main(String[] args) { + // 1. 创建线程池 (核心线程2,最大线程4,空闲线程存活时间60秒,任务队列容量10) + ThreadPoolExecutor executor = new ThreadPoolExecutor( + 2, + 6, + 60, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(5), + new MyExceptionHandler() // 当队列满时,由调用线程执行任务 + ); + + // 2. 创建任务列表 + List> futures = new ArrayList<>(); + for (int i = 1; i <= 15; i++) { + CalculationTask task = new CalculationTask(i, i * 10); + // 提交任务并获取Future对象 + Future future = executor.submit(task); + futures.add(future); + } + + // 3. 获取任务结果 + for (int i = 0; i < futures.size(); i++) { + try { + // 设置超时时间,避免无限等待 + Integer result = futures.get(i).get(20, TimeUnit.SECONDS); + System.out.println("Got result for task-" + (i+1) + ": " + result); + } catch (InterruptedException e) { + System.err.println("Task interrupted: " + e.getMessage()); + } catch (ExecutionException e) { + System.err.println("Task execution failed for task-" + (i+1) + + ": " + e.getCause().getMessage()); + } catch (TimeoutException e) { + System.err.println("Task-" + (i+1) + " timed out. Cancelling..."); + futures.get(i).cancel(true); // 尝试中断任务 + } + } + + // 4. 执行一个特殊任务(带返回值的) + Future specialTask = executor.submit(() -> { + System.out.println("Special factorial task started"); + int result = 1; + for (int i = 1; i <= 5; i++) { + result *= i; + Thread.sleep(100); + } + return result; + }); + + // 5. 轮询等待特殊任务完成 + while (!specialTask.isDone()) { + System.out.println("Waiting for special task to complete..."); + try { + Thread.sleep(200); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + try { + System.out.println("Special task result: " + specialTask.get()); + } catch (Exception e) { + System.err.println("Special task failed: " + e.getMessage()); + } + + // 6. 关闭线程池 + executor.shutdown(); // 不再接受新任务 + try { + // 等待现有任务完成,最多等待5秒 + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + System.err.println("Forcing shutdown of remaining tasks"); + executor.shutdownNow(); // 尝试取消所有任务 + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + + System.out.println("All tasks completed. Final thread pool status:"); + System.out.println(" Completed tasks: " + executor.getCompletedTaskCount()); + System.out.println(" Active threads: " + executor.getActiveCount()); + } +} diff --git a/src/main/java/com/fanbin/TransactionAlertSystem.java b/src/main/java/com/fanbin/TransactionAlertSystem.java new file mode 100644 index 0000000..9ea2be4 --- /dev/null +++ b/src/main/java/com/fanbin/TransactionAlertSystem.java @@ -0,0 +1,154 @@ +package com.fanbin; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Collector; + +import java.time.Duration; + +public class TransactionAlertSystem { + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + config.setInteger(RestOptions.PORT, 8081); + + StreamExecutionEnvironment env = StreamExecutionEnvironment + .createLocalEnvironmentWithWebUI(config); + env.setParallelism(4); + + DataStream transactions = env.fromElements( + new Transaction("acc1", 0.5, System.currentTimeMillis()), + new Transaction("acc1", 600.0, System.currentTimeMillis() + 1000), + new Transaction("acc1", 300.0, System.currentTimeMillis() + 2000), + new Transaction("acc2", 0.8, System.currentTimeMillis() + 3000), + new Transaction("acc2", 100.0, System.currentTimeMillis() + 4000), + new Transaction("acc3", 0.9, System.currentTimeMillis() + 5000), + new Transaction("acc3", 700.0, System.currentTimeMillis() + 6000) + ).assignTimestampsAndWatermarks( + WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)) + .withTimestampAssigner((event, timestamp) -> event.timestamp)); + + DataStream alerts = transactions + .keyBy(Transaction::getAccountId) + .flatMap(new SuspiciousTransactionPatternDetector()); + + alerts.print(); + + FileSink sink = FileSink + .forRowFormat(new Path("output/alerts"), new SimpleStringEncoder("UTF-8")) + .build(); + FileSink sink1 = FileSink + .forRowFormat(new Path("output/alerts"), new SimpleStringEncoder("UTF-8")) + .build(); + alerts.sinkTo(sink); + transactions.sinkTo(sink1); + + env.execute("Transaction Monitoring Job"); + } + + public static class Transaction { + private String accountId; + private double amount; + private long timestamp; + + public Transaction() {} + + public Transaction(String accountId, double amount, long timestamp) { + this.accountId = accountId; + this.amount = amount; + this.timestamp = timestamp; + } + + public String getAccountId() { + return accountId; + } + + public double getAmount() { + return amount; + } + + public long getTimestamp() { + return timestamp; + } + + @Override + public String toString() { + return "Transaction{" + + "accountId='" + accountId + '\'' + + ", amount=" + amount + + ", timestamp=" + timestamp + + '}'; + } + } + + public static class Alert { + private String accountId; + private double smallAmount; + private double largeAmount; + private long smallTimestamp; + private long largeTimestamp; + + public Alert(String accountId, double smallAmount, double largeAmount, + long smallTimestamp, long largeTimestamp) { + this.accountId = accountId; + this.smallAmount = smallAmount; + this.largeAmount = largeAmount; + this.smallTimestamp = smallTimestamp; + this.largeTimestamp = largeTimestamp; + } + + @Override + public String toString() { + return "🚨 ALERT for account " + accountId + + ": Detected suspicious pattern!\n" + + " Small transaction: $" + smallAmount + " at " + smallTimestamp + "\n" + + " Followed by large transaction: $" + largeAmount + " at " + largeTimestamp; + } + } + + private static class SuspiciousTransactionPatternDetector + extends RichFlatMapFunction { + + private transient ValueState lastSmallTransactionState; + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor descriptor = + new ValueStateDescriptor<>("lastSmallTx", Transaction.class); + lastSmallTransactionState = getRuntimeContext().getState(descriptor); + } + + @Override + public void flatMap(Transaction transaction, Collector out) throws Exception { + Transaction lastSmallTx = lastSmallTransactionState.value(); + + if (transaction.getAmount() < 1.0) { + lastSmallTransactionState.update(transaction); + } else if (transaction.getAmount() > 500.0) { + if (lastSmallTx != null) { + if (transaction.getTimestamp() > lastSmallTx.getTimestamp()) { + out.collect(new Alert( + transaction.getAccountId(), + lastSmallTx.getAmount(), + transaction.getAmount(), + lastSmallTx.getTimestamp(), + transaction.getTimestamp() + )); + } + lastSmallTransactionState.clear(); + } + } else { + lastSmallTransactionState.clear(); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/com/fanbin/TransactionMonitoringCEP.java b/src/main/java/com/fanbin/TransactionMonitoringCEP.java new file mode 100644 index 0000000..c209859 --- /dev/null +++ b/src/main/java/com/fanbin/TransactionMonitoringCEP.java @@ -0,0 +1,148 @@ +package com.fanbin; + +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.cep.CEP; +import org.apache.flink.cep.PatternSelectFunction; +import org.apache.flink.cep.PatternStream; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.time.Time; + +import java.util.List; +import java.util.Map; + +public class TransactionMonitoringCEP { + + public static void main(String[] args) throws Exception { + // 1. 设置执行环境 + Configuration config = new Configuration(); + config.setInteger(RestOptions.PORT, 8081); // 设置 Web UI 端口 + + // 创建带 Web UI 的本地执行环境 + StreamExecutionEnvironment env = StreamExecutionEnvironment + .createLocalEnvironmentWithWebUI(config); + env.setParallelism(4); // 方便调试,生产环境可调整 + // 2. 创建模拟交易数据流 + DataStream transactionStream = env + .fromElements( + // 正常交易序列 + new TransactionEvent("acc-001", 0.5, System.currentTimeMillis() - 10000), + new TransactionEvent("acc-001", 100, System.currentTimeMillis() - 5000), + new TransactionEvent("acc-001", 0.8, System.currentTimeMillis() - 3000), + new TransactionEvent("acc-001", 600, System.currentTimeMillis() - 1000), // 应触发报警 + + // 另一个账户的触发序列 + new TransactionEvent("acc-002", 0.3, System.currentTimeMillis() - 800), + new TransactionEvent("acc-002", 700, System.currentTimeMillis() - 100), // 应触发报警 + + // 不应触发报警的序列 + new TransactionEvent("acc-003", 0.9, System.currentTimeMillis() - 600), + new TransactionEvent("acc-003", 300, System.currentTimeMillis() - 300) // 金额不足$500 + ) + .assignTimestampsAndWatermarks( + WatermarkStrategy.forMonotonousTimestamps() + .withTimestampAssigner( + (SerializableTimestampAssigner) (event, recordTimestamp) -> event.timestamp + ) + ) + .keyBy(event -> event.accountId); // 按账户ID分组 + + // 3. 定义CEP模式 + Pattern alertPattern = Pattern.begin("smallTx") + .where(new SimpleCondition() { + @Override + public boolean filter(TransactionEvent event) { + return event.amount < 1.0; // 小于$1的交易 + } + }) + .next("largeTx") // 严格连续的下一个事件 + .where(new SimpleCondition() { + @Override + public boolean filter(TransactionEvent event) { + return event.amount > 500.0; // 大于$500的交易 + } + }) + .within(Time.minutes(5)); // 5分钟内发生 + + // 4. 应用模式到数据流 + PatternStream patternStream = CEP.pattern( + transactionStream, + alertPattern + ); + + // 5. 处理匹配事件 + DataStream alerts = patternStream.select(new PatternSelectFunction() { + @Override + public Alert select(Map> pattern) throws Exception { + TransactionEvent smallTx = pattern.get("smallTx").get(0); + TransactionEvent largeTx = pattern.get("largeTx").get(0); + + return new Alert( + smallTx.accountId, + smallTx.amount, + largeTx.amount, + "检测到小额交易后立即大额交易" + ); + } + }); + + // 6. 输出报警 + alerts.print(); + + // 7. 执行作业 + env.execute("Transaction Monitoring with CEP"); + } + + // 交易事件类 + public static class TransactionEvent { + public String accountId; + public double amount; + public long timestamp; + + public TransactionEvent() {} + + public TransactionEvent(String accountId, double amount, long timestamp) { + this.accountId = accountId; + this.amount = amount; + this.timestamp = timestamp; + } + + @Override + public String toString() { + return String.format("账户: %s, 金额: $%.2f, 时间: %d", accountId, amount, timestamp); + } + } + + // 报警类 + public static class Alert { + public String accountId; + public double smallAmount; + public double largeAmount; + public String message; + + public Alert() {} + + public Alert(String accountId, double smallAmount, double largeAmount, String message) { + this.accountId = accountId; + this.smallAmount = smallAmount; + this.largeAmount = largeAmount; + this.message = message; + } + + @Override + public String toString() { + return String.format( + "【交易异常报警】账户: %s\n" + + "-> 小额交易: $%.2f\n" + + "-> 紧随的大额交易: $%.2f\n" + + "-> 报警原因: %s", + accountId, smallAmount, largeAmount, message + ); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/fanbin/leetcode/LC11.java b/src/main/java/com/fanbin/leetcode/LC11.java new file mode 100644 index 0000000..7d77be8 --- /dev/null +++ b/src/main/java/com/fanbin/leetcode/LC11.java @@ -0,0 +1,29 @@ +package com.fanbin.leetcode; + +public class LC11 { + public static int maxArea(int[] height) { + int left = 0, right = height.length - 1; + int maxArea = 0; + while (left < right) { + // 计算当前容器的面积 + int width = right - left; + int currentHeight = Math.min(height[left], height[right]); + int currentArea = width * currentHeight; + maxArea = Math.max(maxArea, currentArea); + + // 移动较短的边界 + if (height[left] < height[right]) { + left++; + } else { + right--; + } + } + + return maxArea; + } + public static void main(String[] args) { + int[] height = {6,2,6,2,5,4,8,3,7}; + int result = maxArea(height); + System.out.println("最大面积为: " + result); + } +} diff --git a/src/main/java/com/fanbin/leetcode/ThreeNum.java b/src/main/java/com/fanbin/leetcode/ThreeNum.java new file mode 100644 index 0000000..9e379d3 --- /dev/null +++ b/src/main/java/com/fanbin/leetcode/ThreeNum.java @@ -0,0 +1,27 @@ +package com.fanbin.leetcode; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +class ThreeNum { + public List> threeSum(int[] nums) { + List> result = new ArrayList>(20); + + for(int i=0; i< nums.length; i++) { + for(int j=i+1; j < nums.length; j++) { + for (int k=j+1; k< nums.length; k++) { + if (nums[i] + nums[j] + nums[k] == 0) { + List arr = new ArrayList(16); + arr.add(i); + arr.add(j); + arr.add(k); + result.add(arr); + } + } + } + } + return result.stream().distinct().toList(); + } +} diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties new file mode 100644 index 0000000..ce43159 --- /dev/null +++ b/src/main/resources/log4j.properties @@ -0,0 +1,20 @@ +# ??????? +log4j.rootLogger=DEBUG, console, file + +# ????? +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + +# ???? +log4j.appender.file=org.apache.log4j.RollingFileAppender +log4j.appender.file.File=spark-streaming.log +log4j.appender.file.MaxFileSize=100MB +log4j.appender.file.MaxBackupIndex=10 +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + +# ??????? +log4j.logger.org.apache.spark=DEBUG +log4j.logger.org.apache.kafka=DEBUG +log4j.logger.com.fanbin=DEBUG \ No newline at end of file diff --git a/winutils.exe b/winutils.exe new file mode 100644 index 0000000..4fd286d Binary files /dev/null and b/winutils.exe differ