1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
use std::io::SeekFrom;
use async_stream::try_stream;
use futures::io::{copy, sink};
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Stream};
use parquet_format_safe::thrift::protocol::TCompactInputStreamProtocol;
use crate::compression::Compression;
use crate::error::{Error, Result};
use crate::metadata::{ColumnChunkMetaData, Descriptor};
use crate::page::{CompressedPage, ParquetPageHeader};
use super::reader::{finish_page, get_page_header, PageMetaData};
use super::PageFilter;
pub async fn get_page_stream<'a, RR: AsyncRead + Unpin + Send + AsyncSeek>(
column_metadata: &'a ColumnChunkMetaData,
reader: &'a mut RR,
scratch: Vec<u8>,
pages_filter: PageFilter,
max_page_size: usize,
) -> Result<impl Stream<Item = Result<CompressedPage>> + 'a> {
get_page_stream_with_page_meta(
column_metadata.into(),
reader,
scratch,
pages_filter,
max_page_size,
)
.await
}
pub async fn get_page_stream_from_column_start<'a, R: AsyncRead + Unpin + Send>(
column_metadata: &'a ColumnChunkMetaData,
reader: &'a mut R,
scratch: Vec<u8>,
pages_filter: PageFilter,
max_header_size: usize,
) -> Result<impl Stream<Item = Result<CompressedPage>> + 'a> {
let page_metadata: PageMetaData = column_metadata.into();
Ok(_get_page_stream(
reader,
page_metadata.num_values,
page_metadata.compression,
page_metadata.descriptor,
scratch,
pages_filter,
max_header_size,
))
}
pub async fn get_page_stream_with_page_meta<RR: AsyncRead + Unpin + Send + AsyncSeek>(
page_metadata: PageMetaData,
reader: &mut RR,
scratch: Vec<u8>,
pages_filter: PageFilter,
max_page_size: usize,
) -> Result<impl Stream<Item = Result<CompressedPage>> + '_> {
let column_start = page_metadata.column_start;
reader.seek(SeekFrom::Start(column_start)).await?;
Ok(_get_page_stream(
reader,
page_metadata.num_values,
page_metadata.compression,
page_metadata.descriptor,
scratch,
pages_filter,
max_page_size,
))
}
fn _get_page_stream<R: AsyncRead + Unpin + Send>(
reader: &mut R,
total_num_values: i64,
compression: Compression,
descriptor: Descriptor,
mut scratch: Vec<u8>,
pages_filter: PageFilter,
max_page_size: usize,
) -> impl Stream<Item = Result<CompressedPage>> + '_ {
let mut seen_values = 0i64;
try_stream! {
while seen_values < total_num_values {
let page_header = read_page_header(reader, max_page_size).await?;
let data_header = get_page_header(&page_header)?;
seen_values += data_header.as_ref().map(|x| x.num_values() as i64).unwrap_or_default();
let read_size: usize = page_header.compressed_page_size.try_into()?;
if let Some(data_header) = data_header {
if !pages_filter(&descriptor, &data_header) {
copy(reader.take(read_size as u64), &mut sink()).await?;
continue
}
}
if read_size > max_page_size {
Err(Error::WouldOverAllocate)?
}
scratch.clear();
scratch.try_reserve(read_size)?;
let bytes_read = reader
.take(read_size as u64)
.read_to_end(&mut scratch).await?;
if bytes_read != read_size {
Err(Error::oos(
"The page header reported the wrong page size".to_string(),
))?
}
yield finish_page(
page_header,
&mut scratch,
compression,
&descriptor,
None,
)?;
}
}
}
async fn read_page_header<R: AsyncRead + Unpin + Send>(
reader: &mut R,
max_page_size: usize,
) -> Result<ParquetPageHeader> {
let mut prot = TCompactInputStreamProtocol::new(reader, max_page_size);
let page_header = ParquetPageHeader::stream_from_in_protocol(&mut prot).await?;
Ok(page_header)
}