From 4616488e4892a25f892c409477558fe242e68df4 Mon Sep 17 00:00:00 2001 From: FanBin Date: Thu, 21 Aug 2025 12:00:40 +0800 Subject: [PATCH] first version --- pom.xml | 23 ++- .../com/fanbin/FlinkKafkaConsumerExample.java | 4 +- .../com/fanbin/FlinkKafkaTimeExample.java | 168 ++++++++++++++++++ .../java/com/fanbin/KafkaMessageProducer.java | 6 +- .../com/fanbin/TransactionAlertSystem.java | 24 +-- .../java/com/fanbin/leetcode/ThreeNum.java | 38 ++-- src/test/java/SolutionTest.java | 45 +++++ 7 files changed, 271 insertions(+), 37 deletions(-) create mode 100644 src/main/java/com/fanbin/FlinkKafkaTimeExample.java create mode 100644 src/test/java/SolutionTest.java diff --git a/pom.xml b/pom.xml index 3edf433..4f05069 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,12 @@ + + org.junit.jupiter + junit-jupiter + 5.10.2 + test + org.apache.spark @@ -166,14 +172,15 @@ + + org.apache.maven.plugins + maven-surefire-plugin + 3.0.0-M7 + + --add-exports java.base/sun.nio.ch=ALL-UNNAMED + + - - - - - - - - + \ No newline at end of file diff --git a/src/main/java/com/fanbin/FlinkKafkaConsumerExample.java b/src/main/java/com/fanbin/FlinkKafkaConsumerExample.java index 1dfa784..9589447 100644 --- a/src/main/java/com/fanbin/FlinkKafkaConsumerExample.java +++ b/src/main/java/com/fanbin/FlinkKafkaConsumerExample.java @@ -9,6 +9,7 @@ import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsIni import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import java.time.Duration; import java.util.Properties; public class FlinkKafkaConsumerExample { @@ -51,7 +52,8 @@ public class FlinkKafkaConsumerExample { // 从Kafka读取数据 DataStream kafkaStream = env.fromSource(source, - WatermarkStrategy.noWatermarks(), // 不使用水印 + WatermarkStrategy + .forBoundedOutOfOrderness(Duration.ofSeconds(5)), // 不使用水印 "Kafka Source"); // 处理数据 - 打印到控制台 diff --git a/src/main/java/com/fanbin/FlinkKafkaTimeExample.java b/src/main/java/com/fanbin/FlinkKafkaTimeExample.java new file mode 100644 index 0000000..93a8f56 --- /dev/null +++ b/src/main/java/com/fanbin/FlinkKafkaTimeExample.java @@ -0,0 +1,168 @@ + +package com.fanbin; + +import org.apache.flink.api.common.eventtime.*; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; + +import java.time.Duration; +import java.time.Instant; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +public class FlinkKafkaTimeExample { + + private static final String KAFKA_BROKERS = "localhost:9092"; + private static final String INPUT_TOPIC = "flink-time-input"; + private static final String OUTPUT_TOPIC = "flink-time-output"; + private static final String CONSUMER_GROUP = "flink-time-group"; + + public static void main(String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); // 方便观察结果 + + // 1. Kafka 消费者配置 + Properties kafkaProps = new Properties(); + kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKERS); + kafkaProps.setProperty("group.id", CONSUMER_GROUP); + + // 2. 创建 Kafka 数据源 + FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>( + INPUT_TOPIC, + new SimpleStringSchema(), + kafkaProps + ); + + // 设置从最早开始消费 + kafkaConsumer.setStartFromEarliest(); + + // 3. 从 Kafka 获取数据流 + DataStream kafkaStream = env.addSource(kafkaConsumer); + + // 4. 解析数据 (格式: "timestamp|value") + DataStream> parsedStream = kafkaStream + .map(new MapFunction>() { + @Override + public Tuple2 map(String value) throws Exception { + String[] parts = value.split("\\|"); + if (parts.length == 2) { + return new Tuple2<>(Long.parseLong(parts[0]), parts[1]); + } + return new Tuple2<>(System.currentTimeMillis(), "INVALID"); + } + }) + .filter(t -> !t.f1.equals("INVALID")); + + parsedStream.map(event -> event.f0 - System.currentTimeMillis()) + .print("最大延迟"); + // 5. Event Time 处理示例 + DataStream eventTimeResult = parsedStream + .assignTimestampsAndWatermarks( + WatermarkStrategy + .>forBoundedOutOfOrderness(Duration.ofSeconds(5)) + .withTimestampAssigner((event, ts) -> event.f0) + ) + .keyBy(event -> event.f1) + .window(TumblingEventTimeWindows.of(Time.seconds(10))) + .reduce((a, b) -> new Tuple2<>(Math.max(a.f0, b.f0), a.f1 + "|" + b.f1)) + .map(value -> "[EventTime] Window: " + value); + + // 6. Processing Time 处理示例 + DataStream procTimeResult = parsedStream + .keyBy(event -> event.f1) + .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) + .reduce((a, b) -> new Tuple2<>(System.currentTimeMillis(), a.f1 + "|" + b.f1)) + .map(value -> "[ProcTime] Window: " + value); + + // 7. Ingestion Time 处理示例 + DataStream ingestTimeResult = parsedStream + .assignTimestampsAndWatermarks( + WatermarkStrategy + .>forMonotonousTimestamps() + .withTimestampAssigner((event, ts) -> System.currentTimeMillis()) + ) + .keyBy(event -> event.f1) + .window(TumblingEventTimeWindows.of(Time.seconds(10))) + .reduce((a, b) -> new Tuple2<>(Math.max(a.f0, b.f0), a.f1 + "|" + b.f1)) + .map(value -> "[IngestTime] Window: " + value); + + // 8. Kafka 生产者配置 (将结果写回 Kafka) + Properties producerProps = new Properties(); + producerProps.setProperty("bootstrap.servers", KAFKA_BROKERS); + + // 9. 创建 Kafka 生产者并输出结果 + FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>( + OUTPUT_TOPIC, + new SimpleStringSchema(), + producerProps + ); + + // 合并结果流并写入 Kafka + eventTimeResult.union(procTimeResult, ingestTimeResult).print(); + eventTimeResult.union(procTimeResult, ingestTimeResult).addSink(kafkaProducer); + // 10. 启动测试数据生成器 + new Thread(() -> { + try { + KafkaTestDataGenerator.generateTestData(); + } catch (Exception e) { + e.printStackTrace(); + } + }).start(); + + env.execute("Flink Kafka Time Example"); + } + + // Kafka 测试数据生成器 + public static class KafkaTestDataGenerator { + public static void generateTestData() throws Exception { + Properties props = new Properties(); + props.put("bootstrap.servers", KAFKA_BROKERS); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + try (org.apache.kafka.clients.producer.KafkaProducer producer = + new org.apache.kafka.clients.producer.KafkaProducer<>(props)) { + + long baseTime = System.currentTimeMillis(); + + // 发送测试数据(模拟乱序事件) + sendMessage(producer, baseTime - 8000, "A"); // 8秒前 + sendMessage(producer, baseTime - 6000, "A"); // 8秒前 + + sendMessage(producer, baseTime - 3000, "B"); // 3秒前 + sendMessage(producer, baseTime - 12000, "A"); // 12秒前(乱序) + sendMessage(producer, baseTime - 5000, "C"); // 5秒前 + Thread.sleep(2000); + sendMessage(producer, baseTime - 2000, "B"); // 2秒前 + + // 添加更多数据触发窗口计算 + Thread.sleep(15000); + for (int i = 0; i < 5; i++) { + sendMessage(producer, baseTime + i * 1000, "D" + i); + Thread.sleep(500); + } + } + } + + private static void sendMessage( + org.apache.kafka.clients.producer.KafkaProducer producer, + long timestamp, String value) throws Exception { + + String message = timestamp + "|" + value; + producer.send(new org.apache.kafka.clients.producer.ProducerRecord<>( + INPUT_TOPIC, null, System.currentTimeMillis(), null, message)); + + System.out.println("[Kafka Producer] Sent: " + message); + Thread.sleep(500); // 模拟数据间隔 + } + } +} \ No newline at end of file diff --git a/src/main/java/com/fanbin/KafkaMessageProducer.java b/src/main/java/com/fanbin/KafkaMessageProducer.java index 789ee80..c2609c6 100644 --- a/src/main/java/com/fanbin/KafkaMessageProducer.java +++ b/src/main/java/com/fanbin/KafkaMessageProducer.java @@ -8,7 +8,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaMessageProducer { - private static final String TOPIC_NAME = "topic_100"; + private static final String TOPIC_NAME = "topic_10"; private static final String BOOTSTRAP_SERVERS = "localhost:9092"; public static void main(String[] args) { @@ -23,9 +23,9 @@ public class KafkaMessageProducer { // 创建Kafka生产者实例 try (KafkaProducer producer = new KafkaProducer<>(props)) { // 发送示例消息 - for (int i = 0; i < 1000_0000; i++) { + for (int i = 0; i < 1000; i++) { String key = "key-" + i; - String value = "这是第 " + i + " 条测试消息"; + String value = "{ 'value': '" + i + "' }"; ProducerRecord record = new ProducerRecord<>(TOPIC_NAME, key, value); // 异步发送消息并处理回调 diff --git a/src/main/java/com/fanbin/TransactionAlertSystem.java b/src/main/java/com/fanbin/TransactionAlertSystem.java index 9ea2be4..27fb2b9 100644 --- a/src/main/java/com/fanbin/TransactionAlertSystem.java +++ b/src/main/java/com/fanbin/TransactionAlertSystem.java @@ -18,12 +18,12 @@ import java.time.Duration; public class TransactionAlertSystem { public static void main(String[] args) throws Exception { - Configuration config = new Configuration(); - config.setInteger(RestOptions.PORT, 8081); +// Configuration config = new Configuration(); +// config.setInteger(RestOptions.PORT, 8081); StreamExecutionEnvironment env = StreamExecutionEnvironment - .createLocalEnvironmentWithWebUI(config); - env.setParallelism(4); + .getExecutionEnvironment(); +// env.setParallelism(4); DataStream transactions = env.fromElements( new Transaction("acc1", 0.5, System.currentTimeMillis()), @@ -43,14 +43,14 @@ public class TransactionAlertSystem { alerts.print(); - FileSink sink = FileSink - .forRowFormat(new Path("output/alerts"), new SimpleStringEncoder("UTF-8")) - .build(); - FileSink sink1 = FileSink - .forRowFormat(new Path("output/alerts"), new SimpleStringEncoder("UTF-8")) - .build(); - alerts.sinkTo(sink); - transactions.sinkTo(sink1); +// FileSink sink = FileSink +// .forRowFormat(new Path("output/alerts"), new SimpleStringEncoder("UTF-8")) +// .build(); +// FileSink sink1 = FileSink +// .forRowFormat(new Path("output/alerts"), new SimpleStringEncoder("UTF-8")) +// .build(); +// alerts.sinkTo(sink); +// transactions.sinkTo(sink1); env.execute("Transaction Monitoring Job"); } diff --git a/src/main/java/com/fanbin/leetcode/ThreeNum.java b/src/main/java/com/fanbin/leetcode/ThreeNum.java index 4b10fc8..20ac74c 100644 --- a/src/main/java/com/fanbin/leetcode/ThreeNum.java +++ b/src/main/java/com/fanbin/leetcode/ThreeNum.java @@ -4,22 +4,34 @@ import java.util.ArrayList; import java.util.List; class ThreeNum { - public List> threeSum(int[] nums) { - List> result = new ArrayList>(20); + public int lengthOfLongestSubstring(String s) { + if(s.isEmpty()) return 0; + int max = 1; + List myList = new ArrayList<>(); + char[] charArr = s.toCharArray(); + for (int i=0; i newList = new ArrayList<>(); - for(int i=0; i< nums.length; i++) { - for(int j=i+1; j < nums.length; j++) { - for (int k=j+1; k< nums.length; k++) { - if (nums[i] + nums[j] + nums[k] == 0) { - List arr = new ArrayList(16); - arr.add(i); - arr.add(j); - arr.add(k); - result.add(arr); - } + int index = myList.indexOf(String.valueOf(charArr[i])); + for (int j = index; j < myList.size(); j++) { + newList.add(myList.get(j)); } + + myList = newList; + } else { + myList.add(String.valueOf(charArr[i])); } } - return result.stream().distinct().toList(); + + return Math.max(max, myList.size()); + } + + public static void main(String[] args) { + ThreeNum threeNum = new ThreeNum(); + String input = "aabaab!bb"; + int result = threeNum.lengthOfLongestSubstring(input); + System.out.println("最长不重复子串长度: " + result); } } diff --git a/src/test/java/SolutionTest.java b/src/test/java/SolutionTest.java new file mode 100644 index 0000000..e2c18b9 --- /dev/null +++ b/src/test/java/SolutionTest.java @@ -0,0 +1,45 @@ +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.*; + +class SolutionTest { + + @Test + void reverseListWithMultipleNodes() { + ListNode head = new ListNode(1, new ListNode(2, new ListNode(3, new ListNode(4, new ListNode(5))))); + Solution solution = new Solution(); + ListNode reversed = solution.reverseList(head); + assertEquals(5, reversed.val); + assertEquals(4, reversed.next.val); + assertEquals(3, reversed.next.next.val); + assertEquals(2, reversed.next.next.next.val); + assertEquals(1, reversed.next.next.next.next.val); + assertNull(reversed.next.next.next.next.next); + } + + @Test + void reverseListWithTwoNodes() { + ListNode head = new ListNode(1, new ListNode(2)); + Solution solution = new Solution(); + ListNode reversed = solution.reverseList(head); + assertEquals(2, reversed.val); + assertEquals(1, reversed.next.val); + assertNull(reversed.next.next); + } + + @Test + void reverseListWithSingleNode() { + ListNode head = new ListNode(1); + Solution solution = new Solution(); + ListNode reversed = solution.reverseList(head); + assertEquals(1, reversed.val); + assertNull(reversed.next); + } + + @Test + void reverseListWithEmptyList() { + ListNode head = null; + Solution solution = new Solution(); + ListNode reversed = solution.reverseList(head); + assertNull(reversed); + } +} \ No newline at end of file