7 changed files with 271 additions and 37 deletions
@ -0,0 +1,168 @@ |
|||||
|
|
||||
|
package com.fanbin; |
||||
|
|
||||
|
import org.apache.flink.api.common.eventtime.*; |
||||
|
import org.apache.flink.api.common.functions.MapFunction; |
||||
|
import org.apache.flink.api.common.serialization.SimpleStringSchema; |
||||
|
import org.apache.flink.api.java.tuple.Tuple2; |
||||
|
import org.apache.flink.streaming.api.datastream.DataStream; |
||||
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
||||
|
import org.apache.flink.streaming.api.functions.source.SourceFunction; |
||||
|
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; |
||||
|
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; |
||||
|
import org.apache.flink.streaming.api.windowing.time.Time; |
||||
|
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; |
||||
|
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; |
||||
|
|
||||
|
import java.time.Duration; |
||||
|
import java.time.Instant; |
||||
|
import java.util.Properties; |
||||
|
import java.util.concurrent.TimeUnit; |
||||
|
|
||||
|
public class FlinkKafkaTimeExample { |
||||
|
|
||||
|
private static final String KAFKA_BROKERS = "localhost:9092"; |
||||
|
private static final String INPUT_TOPIC = "flink-time-input"; |
||||
|
private static final String OUTPUT_TOPIC = "flink-time-output"; |
||||
|
private static final String CONSUMER_GROUP = "flink-time-group"; |
||||
|
|
||||
|
public static void main(String[] args) throws Exception { |
||||
|
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
||||
|
env.setParallelism(1); // 方便观察结果
|
||||
|
|
||||
|
// 1. Kafka 消费者配置
|
||||
|
Properties kafkaProps = new Properties(); |
||||
|
kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKERS); |
||||
|
kafkaProps.setProperty("group.id", CONSUMER_GROUP); |
||||
|
|
||||
|
// 2. 创建 Kafka 数据源
|
||||
|
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>( |
||||
|
INPUT_TOPIC, |
||||
|
new SimpleStringSchema(), |
||||
|
kafkaProps |
||||
|
); |
||||
|
|
||||
|
// 设置从最早开始消费
|
||||
|
kafkaConsumer.setStartFromEarliest(); |
||||
|
|
||||
|
// 3. 从 Kafka 获取数据流
|
||||
|
DataStream<String> kafkaStream = env.addSource(kafkaConsumer); |
||||
|
|
||||
|
// 4. 解析数据 (格式: "timestamp|value")
|
||||
|
DataStream<Tuple2<Long, String>> parsedStream = kafkaStream |
||||
|
.map(new MapFunction<String, Tuple2<Long, String>>() { |
||||
|
@Override |
||||
|
public Tuple2<Long, String> map(String value) throws Exception { |
||||
|
String[] parts = value.split("\\|"); |
||||
|
if (parts.length == 2) { |
||||
|
return new Tuple2<>(Long.parseLong(parts[0]), parts[1]); |
||||
|
} |
||||
|
return new Tuple2<>(System.currentTimeMillis(), "INVALID"); |
||||
|
} |
||||
|
}) |
||||
|
.filter(t -> !t.f1.equals("INVALID")); |
||||
|
|
||||
|
parsedStream.map(event -> event.f0 - System.currentTimeMillis()) |
||||
|
.print("最大延迟"); |
||||
|
// 5. Event Time 处理示例
|
||||
|
DataStream<String> eventTimeResult = parsedStream |
||||
|
.assignTimestampsAndWatermarks( |
||||
|
WatermarkStrategy |
||||
|
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) |
||||
|
.withTimestampAssigner((event, ts) -> event.f0) |
||||
|
) |
||||
|
.keyBy(event -> event.f1) |
||||
|
.window(TumblingEventTimeWindows.of(Time.seconds(10))) |
||||
|
.reduce((a, b) -> new Tuple2<>(Math.max(a.f0, b.f0), a.f1 + "|" + b.f1)) |
||||
|
.map(value -> "[EventTime] Window: " + value); |
||||
|
|
||||
|
// 6. Processing Time 处理示例
|
||||
|
DataStream<String> procTimeResult = parsedStream |
||||
|
.keyBy(event -> event.f1) |
||||
|
.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) |
||||
|
.reduce((a, b) -> new Tuple2<>(System.currentTimeMillis(), a.f1 + "|" + b.f1)) |
||||
|
.map(value -> "[ProcTime] Window: " + value); |
||||
|
|
||||
|
// 7. Ingestion Time 处理示例
|
||||
|
DataStream<String> ingestTimeResult = parsedStream |
||||
|
.assignTimestampsAndWatermarks( |
||||
|
WatermarkStrategy |
||||
|
.<Tuple2<Long, String>>forMonotonousTimestamps() |
||||
|
.withTimestampAssigner((event, ts) -> System.currentTimeMillis()) |
||||
|
) |
||||
|
.keyBy(event -> event.f1) |
||||
|
.window(TumblingEventTimeWindows.of(Time.seconds(10))) |
||||
|
.reduce((a, b) -> new Tuple2<>(Math.max(a.f0, b.f0), a.f1 + "|" + b.f1)) |
||||
|
.map(value -> "[IngestTime] Window: " + value); |
||||
|
|
||||
|
// 8. Kafka 生产者配置 (将结果写回 Kafka)
|
||||
|
Properties producerProps = new Properties(); |
||||
|
producerProps.setProperty("bootstrap.servers", KAFKA_BROKERS); |
||||
|
|
||||
|
// 9. 创建 Kafka 生产者并输出结果
|
||||
|
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>( |
||||
|
OUTPUT_TOPIC, |
||||
|
new SimpleStringSchema(), |
||||
|
producerProps |
||||
|
); |
||||
|
|
||||
|
// 合并结果流并写入 Kafka
|
||||
|
eventTimeResult.union(procTimeResult, ingestTimeResult).print(); |
||||
|
eventTimeResult.union(procTimeResult, ingestTimeResult).addSink(kafkaProducer); |
||||
|
// 10. 启动测试数据生成器
|
||||
|
new Thread(() -> { |
||||
|
try { |
||||
|
KafkaTestDataGenerator.generateTestData(); |
||||
|
} catch (Exception e) { |
||||
|
e.printStackTrace(); |
||||
|
} |
||||
|
}).start(); |
||||
|
|
||||
|
env.execute("Flink Kafka Time Example"); |
||||
|
} |
||||
|
|
||||
|
// Kafka 测试数据生成器
|
||||
|
public static class KafkaTestDataGenerator { |
||||
|
public static void generateTestData() throws Exception { |
||||
|
Properties props = new Properties(); |
||||
|
props.put("bootstrap.servers", KAFKA_BROKERS); |
||||
|
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
||||
|
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
||||
|
|
||||
|
try (org.apache.kafka.clients.producer.KafkaProducer<String, String> producer = |
||||
|
new org.apache.kafka.clients.producer.KafkaProducer<>(props)) { |
||||
|
|
||||
|
long baseTime = System.currentTimeMillis(); |
||||
|
|
||||
|
// 发送测试数据(模拟乱序事件)
|
||||
|
sendMessage(producer, baseTime - 8000, "A"); // 8秒前
|
||||
|
sendMessage(producer, baseTime - 6000, "A"); // 8秒前
|
||||
|
|
||||
|
sendMessage(producer, baseTime - 3000, "B"); // 3秒前
|
||||
|
sendMessage(producer, baseTime - 12000, "A"); // 12秒前(乱序)
|
||||
|
sendMessage(producer, baseTime - 5000, "C"); // 5秒前
|
||||
|
Thread.sleep(2000); |
||||
|
sendMessage(producer, baseTime - 2000, "B"); // 2秒前
|
||||
|
|
||||
|
// 添加更多数据触发窗口计算
|
||||
|
Thread.sleep(15000); |
||||
|
for (int i = 0; i < 5; i++) { |
||||
|
sendMessage(producer, baseTime + i * 1000, "D" + i); |
||||
|
Thread.sleep(500); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private static void sendMessage( |
||||
|
org.apache.kafka.clients.producer.KafkaProducer<String, String> producer, |
||||
|
long timestamp, String value) throws Exception { |
||||
|
|
||||
|
String message = timestamp + "|" + value; |
||||
|
producer.send(new org.apache.kafka.clients.producer.ProducerRecord<>( |
||||
|
INPUT_TOPIC, null, System.currentTimeMillis(), null, message)); |
||||
|
|
||||
|
System.out.println("[Kafka Producer] Sent: " + message); |
||||
|
Thread.sleep(500); // 模拟数据间隔
|
||||
|
} |
||||
|
} |
||||
|
} |
@ -0,0 +1,45 @@ |
|||||
|
import org.junit.jupiter.api.Test; |
||||
|
import static org.junit.jupiter.api.Assertions.*; |
||||
|
|
||||
|
class SolutionTest { |
||||
|
|
||||
|
@Test |
||||
|
void reverseListWithMultipleNodes() { |
||||
|
ListNode head = new ListNode(1, new ListNode(2, new ListNode(3, new ListNode(4, new ListNode(5))))); |
||||
|
Solution solution = new Solution(); |
||||
|
ListNode reversed = solution.reverseList(head); |
||||
|
assertEquals(5, reversed.val); |
||||
|
assertEquals(4, reversed.next.val); |
||||
|
assertEquals(3, reversed.next.next.val); |
||||
|
assertEquals(2, reversed.next.next.next.val); |
||||
|
assertEquals(1, reversed.next.next.next.next.val); |
||||
|
assertNull(reversed.next.next.next.next.next); |
||||
|
} |
||||
|
|
||||
|
@Test |
||||
|
void reverseListWithTwoNodes() { |
||||
|
ListNode head = new ListNode(1, new ListNode(2)); |
||||
|
Solution solution = new Solution(); |
||||
|
ListNode reversed = solution.reverseList(head); |
||||
|
assertEquals(2, reversed.val); |
||||
|
assertEquals(1, reversed.next.val); |
||||
|
assertNull(reversed.next.next); |
||||
|
} |
||||
|
|
||||
|
@Test |
||||
|
void reverseListWithSingleNode() { |
||||
|
ListNode head = new ListNode(1); |
||||
|
Solution solution = new Solution(); |
||||
|
ListNode reversed = solution.reverseList(head); |
||||
|
assertEquals(1, reversed.val); |
||||
|
assertNull(reversed.next); |
||||
|
} |
||||
|
|
||||
|
@Test |
||||
|
void reverseListWithEmptyList() { |
||||
|
ListNode head = null; |
||||
|
Solution solution = new Solution(); |
||||
|
ListNode reversed = solution.reverseList(head); |
||||
|
assertNull(reversed); |
||||
|
} |
||||
|
} |
Loading…
Reference in new issue