refactor(stream): 重构流处理相关代码 #8

Merged
minglipro merged 1 commits from dev into master 2025-09-15 17:27:11 +08:00
14 changed files with 181 additions and 2113 deletions
Showing only changes of commit cc3156572f - Show all commits

View File

@ -16,10 +16,10 @@
# ProjectName mingli-utils
# ModuleName mingli-utils
# CurrentFile gradle.properties
# LastUpdate 2025-09-15 13:54:50
# LastUpdate 2025-09-15 17:24:10
# UpdateUser MingLiPro
#
JDKVERSIONS=1.8
GROUPSID=com.mingliqiye.utils
ARTIFACTID=mingli-utils
VERSIONS=4.0.4
VERSIONS=4.0.5

View File

@ -1,279 +0,0 @@
package com.mingliqiye.utils.stream;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jetbrains.annotations.NotNull;
/**
* 自定义的输入输出流工具类支持线程安全的数据读写操作<br>
* 谁闲着没事干用本地输入输出<br>
* 实现了 AutoCloseableCloseable Flushable 接口便于资源管理
*
* @author MingLiPro
*/
public class InOutSteam implements AutoCloseable, Closeable, Flushable {
private final Lock lock = new ReentrantLock();
List<Byte> bytes;
/**
* 从内部缓冲区中读取最多 len 个字节到指定的字节数组 b
*
* @param b 目标字节数组用于存储读取的数据
* @param off 起始偏移位置
* @param len 最大读取字节数
* @return 实际读取的字节数如果已到达流末尾则返回 -1
* @throws IndexOutOfBoundsException 如果 off len 参数非法
*/
public int read(byte@NotNull [] b, int off, int len) {
if (bytes == null) return -1;
if (bytes.isEmpty()) {
return 0;
}
if (off < 0 || len < 0 || off + len > b.length) {
throw new IndexOutOfBoundsException();
}
try {
lock.lock();
int bytesRead = Math.min(len, bytes.size());
for (int i = 0; i < bytesRead; i++) {
b[off + i] = bytes.get(i);
}
if (bytesRead > 0) {
bytes.subList(0, bytesRead).clear();
}
return bytesRead;
} finally {
lock.unlock();
}
}
/**
* 读取一个字节的数据
*
* @return 下一个字节数据0~255如果已到达流末尾则返回 -1
*/
public int read() {
if (bytes == null) return -1;
try {
lock.lock();
if (bytes.isEmpty()) {
return -1;
}
return bytes.remove(0);
} finally {
lock.unlock();
}
}
/**
* 从内部缓冲区中读取最多 b.length 个字节到指定的字节数组 b
*
* @param b 目标字节数组用于存储读取的数据
* @return 实际读取的字节数如果已到达流末尾则返回 -1
*/
public int read(byte@NotNull [] b) {
if (bytes == null) return -1;
return read(b, 0, b.length);
}
/**
* 跳过并丢弃此输入流中数据的 n 个字节
*
* @param n 要跳过的字节数
* @return 实际跳过的字节数
*/
public long skip(long n) {
if (bytes == null) return -1;
if (bytes.isEmpty()) {
return 0;
}
try {
lock.lock();
if (n <= 0) {
return 0;
}
long bytesToSkip = Math.min(n, bytes.size());
// 移除跳过的字节
if (bytesToSkip > 0) {
bytes.subList(0, (int) bytesToSkip).clear();
}
return bytesToSkip;
} finally {
lock.unlock();
}
}
/**
* 返回此输入流下一个方法调用可以不受阻塞地从此输入流读取或跳过的估计字节数
*
* @return 可以无阻塞读取的字节数
*/
public int available() {
if (bytes == null) return -1;
try {
lock.lock();
return bytes.size();
} finally {
lock.unlock();
}
}
/**
* 获取与当前对象关联的 InputStream 实例
*
* @return InputStream 实例
*/
public InputStream getInputStream() {
if (inputStream == null) {
inputStream = inputStream();
}
return inputStream;
}
private InputStream inputStream;
private OutputStream outputStream;
/**
* 获取与当前对象关联的 OutputStream 实例
*
* @return OutputStream 实例
*/
public OutputStream getOutputStream() {
if (outputStream == null) {
outputStream = outputStream();
}
return outputStream;
}
/**
* 创建并返回一个包装后的 InputStream 实例
*
* @return 新创建的 InputStream 实例
*/
private InputStream inputStream() {
return new InputStreanWrapper(
new InputStream() {
@Override
public int read() {
return InOutSteam.this.read();
}
@Override
public int read(byte@NotNull [] b, int off, int len) {
return InOutSteam.this.read(b, off, len);
}
@Override
public long skip(long n) {
return InOutSteam.this.skip(n);
}
@Override
public int available() {
return InOutSteam.this.available();
}
}
);
}
/**
* 创建并返回一个 OutputStream 实例
*
* @return 新创建的 OutputStream 实例
*/
private OutputStream outputStream() {
return new OutputStream() {
@Override
public void write(int b) {
InOutSteam.this.write(b);
}
@Override
public void write(byte@NotNull [] b, int off, int len) {
InOutSteam.this.write(b, off, len);
}
};
}
/**
* 构造函数初始化内部字节列表
*/
public InOutSteam() {
bytes = new ArrayList<>();
}
/**
* 将指定的字节写入此输出流
*
* @param b 要写入的字节
*/
public void write(int b) {
try {
lock.lock();
bytes.add((byte) b);
} finally {
lock.unlock();
}
}
/**
* 将指定字节数组中的部分数据写入此输出流
*
* @param b 数据源字节数组
* @param off 起始偏移位置
* @param len 写入的字节数
* @throws IndexOutOfBoundsException 如果 off len 参数非法
*/
public void write(byte@NotNull [] b, int off, int len) {
if (off < 0 || len < 0 || off + len > b.length) {
throw new IndexOutOfBoundsException("Invalid offset or length");
}
try {
lock.lock();
for (int i = off; i < off + len; i++) {
bytes.add(b[i]);
}
} finally {
lock.unlock();
}
}
/**
* 将整个字节数组写入此输出流
*
* @param b 要写入的字节数组
*/
public void write(byte@NotNull [] b) {
write(b, 0, b.length);
}
/**
* 刷新此输出流并强制写出所有缓冲的输出字节
* 当前实现为空方法
*/
@Override
public void flush() {}
/**
* 关闭此流并释放与其相关的所有资源
* 清空并置空内部字节列表
*/
@Override
public void close() {
bytes.clear();
bytes = null;
}
}

View File

@ -1,123 +0,0 @@
/*
* Copyright 2025 mingliqiye
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ProjectName mingli-utils
* ModuleName mingli-utils.main
* CurrentFile InputStreamUtils.java
* LastUpdate 2025-09-15 08:30:57
* UpdateUser MingLiPro
*/
package com.mingliqiye.utils.stream;
import com.mingliqiye.utils.collection.Collections;
import lombok.val;
import org.jetbrains.annotations.NotNull;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
/**
* 输入流工具类提供对InputStream的常用操作封装
*/
public class InputStreamUtils {
/**
* 默认缓冲区大小1MB
*/
public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
/**
* 将输入流读取到字节数组中<br>
* 请在外部自行关闭输入流对象 避免资源泄露
*
* @param inputStream 输入流对象用于读取数据
* @return 包含输入流所有数据的字节数组
* @throws IOException 当读取输入流或写入输出流时发生IO异常
*/
public static byte[] readToArray(InputStream inputStream)
throws IOException {
// 使用ByteArrayOutputStream来收集输入流中的所有数据
try (val byteArrayOutputStream = new ByteArrayOutputStream()) {
val bytes = new byte[DEFAULT_BUFFER_SIZE];
int length;
// 循环读取输入流数据直到读取完毕
while ((length = inputStream.read(bytes)) != -1) {
byteArrayOutputStream.write(bytes, 0, length);
}
return byteArrayOutputStream.toByteArray();
}
}
/**
* 将输入流读取到Byte列表中<br>
* 请在外部自行关闭输入流对象 避免资源泄露
*
* @param inputStream 输入流对象用于读取数据
* @return 包含输入流所有数据的Byte列表
* @throws IOException 当读取输入流时发生IO异常
*/
public static @NotNull List<Byte> readToList(InputStream inputStream)
throws IOException {
return Collections.newArrayLists(readToArray(inputStream));
}
/**
* 将输入流读取为字符串<br>
* 请在外部自行关闭输入流对象 避免资源泄露
*
* @param inputStream 输入流对象用于读取数据
* @return 输入流对应的字符串内容
* @throws IOException 当读取输入流时发生IO异常
*/
public static String readToString(InputStream inputStream)
throws IOException {
return new String(readToArray(inputStream));
}
/**
* 将输入流的数据传输到输出流中<br>
* 请在外部自行关闭输入流输出流对象 避免资源泄露
*
* @param inputStream 源输入流用于读取数据
* @param outputStream 目标输出流用于写入数据
* @return 传输的总字节数
* @throws IOException 当读取或写入流时发生IO异常
*/
public static long transferTo(
InputStream inputStream,
OutputStream outputStream
) throws IOException {
if (inputStream == null) {
throw new IllegalArgumentException("inputStream can not be null");
}
if (outputStream == null) {
throw new IllegalArgumentException("outputStream can not be null");
}
val bytes = new byte[DEFAULT_BUFFER_SIZE];
int length;
long readAll = 0L;
// 循环读取并写入数据直到输入流读取完毕
while ((length = inputStream.read(bytes)) != -1) {
outputStream.write(bytes, 0, length);
readAll += length;
}
outputStream.flush();
return readAll;
}
}

View File

@ -1,120 +0,0 @@
/*
* Copyright 2025 mingliqiye
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ProjectName mingli-utils
* ModuleName mingli-utils.main
* CurrentFile InputStreanWrapper.java
* LastUpdate 2025-09-14 22:12:16
* UpdateUser MingLiPro
*/
package com.mingliqiye.utils.stream;
import lombok.Getter;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
public class InputStreanWrapper extends InputStream implements AutoCloseable {
@Getter
private final InputStream inputStream;
public InputStreanWrapper(InputStream inputStream) {
this.inputStream = inputStream;
}
private static InputStreanWrapper of(InputStream inputStream) {
return new InputStreanWrapper(inputStream);
}
@Override
public int available() throws IOException {
return inputStream.available();
}
@Override
public int read() throws IOException {
return inputStream.read();
}
@Override
public int read(byte@NotNull [] b) throws IOException {
return inputStream.read(b);
}
@Override
public int read(byte@NotNull [] b, int off, int len) throws IOException {
return inputStream.read(b, off, len);
}
@Override
public long skip(long n) throws IOException {
return inputStream.skip(n);
}
@Override
public void mark(int readlimit) {
inputStream.mark(readlimit);
}
@Override
public void reset() throws IOException {
inputStream.reset();
}
@Override
public boolean markSupported() {
return inputStream.markSupported();
}
@Override
public void close() {
try {
inputStream.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 输入流转换为输出流 <br>
* jdk8 兼容实现 jdk9+ <br>
* 请使用 InputStream.transferTo()
*
* @param outputStream 输出流
* @return 转换的字节数
* @throws IOException IO错误
*/
public long transferToOutputStream(OutputStream outputStream)
throws IOException {
return InputStreamUtils.transferTo(inputStream, outputStream);
}
public byte[] readToArray() throws IOException {
return InputStreamUtils.readToArray(inputStream);
}
public List<Byte> readToList() throws IOException {
return InputStreamUtils.readToList(inputStream);
}
public String readToString() throws IOException {
return InputStreamUtils.readToString(inputStream);
}
}

View File

@ -1,81 +0,0 @@
package com.mingliqiye.utils.stream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import lombok.Getter;
import org.jetbrains.annotations.NotNull;
public class OutputStreamWrapper extends OutputStream implements AutoCloseable {
@Getter
private final OutputStream outputStream;
private final ByteArrayOutputStream byteArrayOutputStream =
new ByteArrayOutputStream();
public OutputStreamWrapper(OutputStream outputStream) {
this.outputStream = outputStream;
}
public static OutputStreamWrapper of(OutputStream outputStream) {
return new OutputStreamWrapper(outputStream);
}
@Override
public void write(int b) throws IOException {
byteArrayOutputStream.write(b);
}
@Override
public void write(byte@NotNull [] b) throws IOException {
byteArrayOutputStream.write(b);
}
public void write(List<Byte> b) throws IOException {
write(b, 0, b.size());
}
@Override
public void write(byte@NotNull [] b, int off, int len) throws IOException {
byteArrayOutputStream.write(b, off, len);
}
public void write(List<Byte> b, int off, int len) throws IOException {
byte[] bytes = new byte[b.size()];
for (int i = 0; i < b.size(); i++) {
bytes[i] = b.get(i);
}
byteArrayOutputStream.write(bytes, off, len);
}
@Override
public void flush() throws IOException {
outputStream.write(byteArrayOutputStream.toByteArray());
byteArrayOutputStream.reset();
}
public int getBufferCachedSize() {
return byteArrayOutputStream.size();
}
public byte[] getBufferCachedBytes() {
return byteArrayOutputStream.toByteArray();
}
@Override
public void close() {
try {
outputStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public long transferFromOutputStream(InputStream inputStream)
throws IOException {
return InputStreamUtils.transferTo(inputStream, outputStream);
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,9 +0,0 @@
package com.mingliqiye.utils.stream.interfaces;
public interface GetIdable<T> extends Getable<T> {
T getId();
default T get() {
return getId();
}
}

View File

@ -1,9 +0,0 @@
package com.mingliqiye.utils.stream.interfaces;
public interface GetKeyable<T> {
T getKey();
default T get() {
return getKey();
}
}

View File

@ -1,9 +0,0 @@
package com.mingliqiye.utils.stream.interfaces;
public interface GetNameable<T> extends Getable<T> {
T getName();
default T get() {
return getName();
}
}

View File

@ -1,5 +0,0 @@
package com.mingliqiye.utils.stream.interfaces;
public interface Getable<T> {
T get();
}

View File

@ -16,15 +16,13 @@
* ProjectName mingli-utils
* ModuleName mingli-utils.main
* CurrentFile ByteUtils.kt
* LastUpdate 2025-09-15 00:07:22
* LastUpdate 2025-09-15 17:26:34
* UpdateUser MingLiPro
*/
@file:JvmName("ByteUtils")
package com.mingliqiye.utils.bytes
import com.mingliqiye.utils.stream.SuperStream
const val ESC_ASC: Byte = 0x10
const val ESC_DESC: Byte = 0x1B
const val ESC_NONE: Byte = 0x00
@ -43,6 +41,6 @@ const val ESC_RESERVED: Byte = 0x06
fun ByteArray.getByteArrayString(): MutableList<String> {
return this.toList().stream()
.map { a -> String.format("0X%02X", a!!.toInt() and 0xFF) }
.collect(SuperStream.Collectors.toList())
.collect(com.mingliqiye.utils.stream.toList()) as MutableList<String>
}

View File

@ -16,7 +16,7 @@
* ProjectName mingli-utils
* ModuleName mingli-utils.main
* CurrentFile Collection.kt
* LastUpdate 2025-09-15 09:30:37
* LastUpdate 2025-09-15 17:26:00
* UpdateUser MingLiPro
*/
@ -24,8 +24,8 @@
package com.mingliqiye.utils.collection
import com.mingliqiye.utils.stream.SuperStream
import java.util.*
import java.util.stream.Collectors
inline fun <reified T> Collection<T>.toArray(): Array<T> {
@ -34,15 +34,15 @@ inline fun <reified T> Collection<T>.toArray(): Array<T> {
inline fun <reified T, V> Collection<T>.toMap(noinline v: (T) -> V): Map<T, V> {
return this.stream().collect(
SuperStream.Collectors.toMap(
SuperStream.Collectors::getThis, v
com.mingliqiye.utils.stream.toMapValueThis(
v
)
)
}
inline fun <reified T, V, K> Collection<T>.toMap(noinline k: (T) -> K, noinline v: (T) -> V): Map<K, V> {
return this.stream().collect(
SuperStream.Collectors.toMap(
Collectors.toMap(
k, v
)
)

View File

@ -0,0 +1,50 @@
/*
* Copyright 2025 mingliqiye
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ProjectName mingli-utils
* ModuleName mingli-utils.main
* CurrentFile InputStreamUtils.kt
* LastUpdate 2025-09-15 17:26:34
* UpdateUser MingLiPro
*/
@file:JvmName("InputStreamUtils")
package com.mingliqiye.utils.io
import java.io.InputStream
import java.io.OutputStream
import java.nio.charset.Charset
fun InputStream.readAllText(charset: Charset = Charsets.UTF_8): String {
return this.readAllBytes().toString(charset)
}
fun InputStream.readAllBytes(): ByteArray {
return this.readBytes()
}
fun InputStream.exportBytes(out: OutputStream) {
out.write(this.readAllBytes())
out.flush()
}
fun InputStream.readToList(): List<Byte> {
return this.readBytes().toList()
}

View File

@ -0,0 +1,122 @@
/*
* Copyright 2025 mingliqiye
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ProjectName mingli-utils
* ModuleName mingli-utils.main
* CurrentFile SuperStream.kt
* LastUpdate 2025-09-15 17:17:48
* UpdateUser MingLiPro
*/
@file:JvmName("Colls")
package com.mingliqiye.utils.stream
import java.util.stream.Collector
import java.util.stream.Collectors
import java.util.stream.Stream
class SuperStream<T> private constructor(val stream: Stream<T>) : Stream<T> by stream {
companion object {
@JvmStatic
fun <T> of(stream: Stream<T>): SuperStream<T> {
return SuperStream(stream)
}
@JvmStatic
fun <T> of(collection: Collection<T>): SuperStream<T> {
return SuperStream(collection.stream())
}
@JvmStatic
fun <T : Map<K, V>, K, V> of(map: T): SuperStream<Map.Entry<K, V>> {
return of(map.entries)
}
@JvmStatic
fun <T> of(vararg array: T): SuperStream<T> {
return of(array.toList())
}
@JvmStatic
fun <T> of(iterator: Iterator<T>): SuperStream<T> {
val data = ArrayList<T>(20)
while (iterator.hasNext()) {
data.add(iterator.next())
}
return of(data)
}
}
}
interface Gettable<T> {
fun get(): T
}
interface KeyGettable<T> : Gettable<T> {
fun getKey(): T
override fun get(): T {
return getKey()
}
}
interface IdGettable<T> : Gettable<T> {
fun getId(): T
override fun get(): T {
return getId()
}
}
fun <T> getThis(t: T): T {
return t
}
fun <T, U> toMapValueThis(valueMapper: java.util.function.Function<in T, out U>): Collector<T, *, Map<T, U>> {
return Collectors.toMap(
java.util.function.Function<T, T> { it },
valueMapper
) as Collector<T, *, Map<T, U>>
}
fun <T, K> toMap(keyMapper: java.util.function.Function<in T, out K>): Collector<T, *, Map<K, T>> {
return Collectors.toMap(
keyMapper,
java.util.function.Function<T, T> { it },
) as Collector<T, *, Map<K, T>>
}
fun <K> toMapGet(): Collector<Gettable<K>, *, Map<K, Gettable<K>>> {
return Collectors.toMap(
java.util.function.Function<Gettable<K>, K> { it.get() },
java.util.function.Function<Gettable<K>, Gettable<K>> { it },
) as Collector<Gettable<K>, *, Map<K, Gettable<K>>>
}
fun <K, V> toMap(): Collector<Map.Entry<K, V>, *, Map<K, V>> {
return Collectors.toMap(
{ entry: Map.Entry<K, V> -> entry.key },
{ entry: Map.Entry<K, V> -> entry.value }
) as Collector<Map.Entry<K, V>, *, Map<K, V>>
}
fun <T> toList(): Collector<T, *, List<T>> {
return Collectors.toList<T>()
}
fun <T> toSet(): Collector<T, *, Set<T>> {
return Collectors.toSet<T>()
}