1use crate::{
20 encode::{encode, encode_internal, encode_to_vec},
21 rabin::Rabin,
22 schema::{AvroSchema, Name, ResolvedOwnedSchema, ResolvedSchema, Schema},
23 ser_schema::SchemaAwareWriteSerializer,
24 types::Value,
25 AvroResult, Codec, Error,
26};
27use serde::Serialize;
28use std::{collections::HashMap, io::Write, marker::PhantomData};
29
30const DEFAULT_BLOCK_SIZE: usize = 16000;
31const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
32
33#[derive(bon::Builder)]
35pub struct Writer<'a, W: Write> {
36 schema: &'a Schema,
37 writer: W,
38 #[builder(skip)]
39 resolved_schema: Option<ResolvedSchema<'a>>,
40 #[builder(default = Codec::Null)]
41 codec: Codec,
42 #[builder(default = DEFAULT_BLOCK_SIZE)]
43 block_size: usize,
44 #[builder(skip = Vec::with_capacity(block_size))]
45 buffer: Vec<u8>,
46 #[builder(skip)]
47 num_values: usize,
48 #[builder(default = generate_sync_marker())]
49 marker: [u8; 16],
50 #[builder(default = false)]
51 has_header: bool,
52 #[builder(default)]
53 user_metadata: HashMap<String, Value>,
54}
55
56impl<'a, W: Write> Writer<'a, W> {
57 pub fn new(schema: &'a Schema, writer: W) -> Self {
61 Writer::with_codec(schema, writer, Codec::Null)
62 }
63
64 pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> Self {
67 let mut w = Self::builder()
68 .schema(schema)
69 .writer(writer)
70 .codec(codec)
71 .build();
72 w.resolved_schema = ResolvedSchema::try_from(schema).ok();
73 w
74 }
75
76 pub fn with_schemata(
81 schema: &'a Schema,
82 schemata: Vec<&'a Schema>,
83 writer: W,
84 codec: Codec,
85 ) -> Self {
86 let mut w = Self::builder()
87 .schema(schema)
88 .writer(writer)
89 .codec(codec)
90 .build();
91 w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
92 w
93 }
94
95 pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> Self {
99 Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
100 }
101
102 pub fn append_to_with_codec(
105 schema: &'a Schema,
106 writer: W,
107 codec: Codec,
108 marker: [u8; 16],
109 ) -> Self {
110 let mut w = Self::builder()
111 .schema(schema)
112 .writer(writer)
113 .codec(codec)
114 .marker(marker)
115 .has_header(true)
116 .build();
117 w.resolved_schema = ResolvedSchema::try_from(schema).ok();
118 w
119 }
120
121 pub fn append_to_with_codec_schemata(
124 schema: &'a Schema,
125 schemata: Vec<&'a Schema>,
126 writer: W,
127 codec: Codec,
128 marker: [u8; 16],
129 ) -> Self {
130 let mut w = Self::builder()
131 .schema(schema)
132 .writer(writer)
133 .codec(codec)
134 .marker(marker)
135 .has_header(true)
136 .build();
137 w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
138 w
139 }
140
141 pub fn schema(&self) -> &'a Schema {
143 self.schema
144 }
145
146 pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
155 let n = self.maybe_write_header()?;
156
157 let avro = value.into();
158 self.append_value_ref(&avro).map(|m| m + n)
159 }
160
161 pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
169 let n = self.maybe_write_header()?;
170
171 match self.resolved_schema {
173 Some(ref rs) => {
174 write_value_ref_resolved(self.schema, rs, value, &mut self.buffer)?;
175 self.num_values += 1;
176
177 if self.buffer.len() >= self.block_size {
178 return self.flush().map(|b| b + n);
179 }
180
181 Ok(n)
182 }
183 None => {
184 let rs = ResolvedSchema::try_from(self.schema)?;
185 self.resolved_schema = Some(rs);
186 self.append_value_ref(value)
187 }
188 }
189 }
190
191 pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
201 let n = self.maybe_write_header()?;
202
203 match self.resolved_schema {
204 Some(ref rs) => {
205 let mut serializer = SchemaAwareWriteSerializer::new(
206 &mut self.buffer,
207 self.schema,
208 rs.get_names(),
209 None,
210 );
211 value.serialize(&mut serializer)?;
212 self.num_values += 1;
213
214 if self.buffer.len() >= self.block_size {
215 return self.flush().map(|b| b + n);
216 }
217
218 Ok(n)
219 }
220 None => {
221 let rs = ResolvedSchema::try_from(self.schema)?;
222 self.resolved_schema = Some(rs);
223 self.append_ser(value)
224 }
225 }
226 }
227
228 pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
236 where
237 I: IntoIterator<Item = T>,
238 {
239 let mut num_bytes = 0;
254 for value in values {
255 num_bytes += self.append(value)?;
256 }
257 num_bytes += self.flush()?;
258
259 Ok(num_bytes)
260 }
261
262 pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
271 where
272 I: IntoIterator<Item = T>,
273 {
274 let mut num_bytes = 0;
289 for value in values {
290 num_bytes += self.append_ser(value)?;
291 }
292 num_bytes += self.flush()?;
293
294 Ok(num_bytes)
295 }
296
297 pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
305 let mut num_bytes = 0;
306 for value in values {
307 num_bytes += self.append_value_ref(value)?;
308 }
309 num_bytes += self.flush()?;
310
311 Ok(num_bytes)
312 }
313
314 pub fn flush(&mut self) -> AvroResult<usize> {
319 if self.num_values == 0 {
320 return Ok(0);
321 }
322
323 self.codec.compress(&mut self.buffer)?;
324
325 let num_values = self.num_values;
326 let stream_len = self.buffer.len();
327
328 let num_bytes = self.append_raw(&num_values.into(), &Schema::Long)?
329 + self.append_raw(&stream_len.into(), &Schema::Long)?
330 + self
331 .writer
332 .write(self.buffer.as_ref())
333 .map_err(Error::WriteBytes)?
334 + self.append_marker()?;
335
336 self.buffer.clear();
337 self.num_values = 0;
338
339 self.writer.flush().map_err(Error::FlushWriter)?;
340
341 Ok(num_bytes)
342 }
343
344 pub fn into_inner(mut self) -> AvroResult<W> {
349 self.maybe_write_header()?;
350 self.flush()?;
351 Ok(self.writer)
352 }
353
354 fn append_marker(&mut self) -> AvroResult<usize> {
356 self.writer.write(&self.marker).map_err(Error::WriteMarker)
359 }
360
361 fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
363 self.append_bytes(encode_to_vec(value, schema)?.as_ref())
364 }
365
366 fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
368 self.writer.write(bytes).map_err(Error::WriteBytes)
369 }
370
371 pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
374 if !self.has_header {
375 if key.starts_with("avro.") {
376 return Err(Error::InvalidMetadataKey(key));
377 }
378 self.user_metadata
379 .insert(key, Value::Bytes(value.as_ref().to_vec()));
380 Ok(())
381 } else {
382 Err(Error::FileHeaderAlreadyWritten)
383 }
384 }
385
386 fn header(&self) -> Result<Vec<u8>, Error> {
388 let schema_bytes = serde_json::to_string(self.schema)
389 .map_err(Error::ConvertJsonToString)?
390 .into_bytes();
391
392 let mut metadata = HashMap::with_capacity(2);
393 metadata.insert("avro.schema", Value::Bytes(schema_bytes));
394 metadata.insert("avro.codec", self.codec.into());
395 match self.codec {
396 #[cfg(feature = "bzip")]
397 Codec::Bzip2(settings) => {
398 metadata.insert(
399 "avro.codec.compression_level",
400 Value::Bytes(vec![settings.compression_level]),
401 );
402 }
403 #[cfg(feature = "xz")]
404 Codec::Xz(settings) => {
405 metadata.insert(
406 "avro.codec.compression_level",
407 Value::Bytes(vec![settings.compression_level]),
408 );
409 }
410 #[cfg(feature = "zstandard")]
411 Codec::Zstandard(settings) => {
412 metadata.insert(
413 "avro.codec.compression_level",
414 Value::Bytes(vec![settings.compression_level]),
415 );
416 }
417 _ => {}
418 }
419
420 for (k, v) in &self.user_metadata {
421 metadata.insert(k.as_str(), v.clone());
422 }
423
424 let mut header = Vec::new();
425 header.extend_from_slice(AVRO_OBJECT_HEADER);
426 encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
427 header.extend_from_slice(&self.marker);
428
429 Ok(header)
430 }
431
432 fn maybe_write_header(&mut self) -> AvroResult<usize> {
433 if !self.has_header {
434 let header = self.header()?;
435 let n = self.append_bytes(header.as_ref())?;
436 self.has_header = true;
437 Ok(n)
438 } else {
439 Ok(0)
440 }
441 }
442}
443
444fn write_avro_datum<T: Into<Value>, W: Write>(
450 schema: &Schema,
451 value: T,
452 writer: &mut W,
453) -> Result<(), Error> {
454 let avro = value.into();
455 if !avro.validate(schema) {
456 return Err(Error::Validation);
457 }
458 encode(&avro, schema, writer)?;
459 Ok(())
460}
461
462fn write_avro_datum_schemata<T: Into<Value>>(
463 schema: &Schema,
464 schemata: Vec<&Schema>,
465 value: T,
466 buffer: &mut Vec<u8>,
467) -> AvroResult<usize> {
468 let avro = value.into();
469 let rs = ResolvedSchema::try_from(schemata)?;
470 let names = rs.get_names();
471 let enclosing_namespace = schema.namespace();
472 if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
473 return Err(Error::Validation);
474 }
475 encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
476}
477
478pub struct GenericSingleObjectWriter {
482 buffer: Vec<u8>,
483 resolved: ResolvedOwnedSchema,
484}
485
486impl GenericSingleObjectWriter {
487 pub fn new_with_capacity(
488 schema: &Schema,
489 initial_buffer_cap: usize,
490 ) -> AvroResult<GenericSingleObjectWriter> {
491 let fingerprint = schema.fingerprint::<Rabin>();
492 let mut buffer = Vec::with_capacity(initial_buffer_cap);
493 let header = [
494 0xC3,
495 0x01,
496 fingerprint.bytes[0],
497 fingerprint.bytes[1],
498 fingerprint.bytes[2],
499 fingerprint.bytes[3],
500 fingerprint.bytes[4],
501 fingerprint.bytes[5],
502 fingerprint.bytes[6],
503 fingerprint.bytes[7],
504 ];
505 buffer.extend_from_slice(&header);
506
507 Ok(GenericSingleObjectWriter {
508 buffer,
509 resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
510 })
511 }
512
513 pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
515 if self.buffer.len() != 10 {
516 Err(Error::IllegalSingleObjectWriterState)
517 } else {
518 write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
519 writer.write_all(&self.buffer).map_err(Error::WriteBytes)?;
520 let len = self.buffer.len();
521 self.buffer.truncate(10);
522 Ok(len)
523 }
524 }
525
526 pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
528 self.write_value_ref(&v, writer)
529 }
530}
531
532pub struct SpecificSingleObjectWriter<T>
534where
535 T: AvroSchema,
536{
537 inner: GenericSingleObjectWriter,
538 schema: Schema,
539 header_written: bool,
540 _model: PhantomData<T>,
541}
542
543impl<T> SpecificSingleObjectWriter<T>
544where
545 T: AvroSchema,
546{
547 pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
548 let schema = T::get_schema();
549 Ok(SpecificSingleObjectWriter {
550 inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
551 schema,
552 header_written: false,
553 _model: PhantomData,
554 })
555 }
556}
557
558impl<T> SpecificSingleObjectWriter<T>
559where
560 T: AvroSchema + Into<Value>,
561{
562 pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
565 let v: Value = data.into();
566 self.inner.write_value_ref(&v, writer)
567 }
568}
569
570impl<T> SpecificSingleObjectWriter<T>
571where
572 T: AvroSchema + Serialize,
573{
574 pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
577 let mut bytes_written: usize = 0;
578
579 if !self.header_written {
580 bytes_written += writer
581 .write(self.inner.buffer.as_slice())
582 .map_err(Error::WriteBytes)?;
583 self.header_written = true;
584 }
585
586 let names: HashMap<Name, &Schema> = HashMap::new();
587 let mut serializer = SchemaAwareWriteSerializer::new(writer, &self.schema, &names, None);
588 bytes_written += data.serialize(&mut serializer)?;
589
590 Ok(bytes_written)
591 }
592
593 pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
596 self.write_ref(&data, writer)
597 }
598}
599
600fn write_value_ref_resolved(
601 schema: &Schema,
602 resolved_schema: &ResolvedSchema,
603 value: &Value,
604 buffer: &mut Vec<u8>,
605) -> AvroResult<usize> {
606 match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
607 Some(reason) => Err(Error::ValidationWithReason {
608 value: value.clone(),
609 schema: schema.clone(),
610 reason,
611 }),
612 None => encode_internal(
613 value,
614 schema,
615 resolved_schema.get_names(),
616 &schema.namespace(),
617 buffer,
618 ),
619 }
620}
621
622fn write_value_ref_owned_resolved(
623 resolved_schema: &ResolvedOwnedSchema,
624 value: &Value,
625 buffer: &mut Vec<u8>,
626) -> AvroResult<()> {
627 let root_schema = resolved_schema.get_root_schema();
628 if let Some(reason) = value.validate_internal(
629 root_schema,
630 resolved_schema.get_names(),
631 &root_schema.namespace(),
632 ) {
633 return Err(Error::ValidationWithReason {
634 value: value.clone(),
635 schema: root_schema.clone(),
636 reason,
637 });
638 }
639 encode_internal(
640 value,
641 root_schema,
642 resolved_schema.get_names(),
643 &root_schema.namespace(),
644 buffer,
645 )?;
646 Ok(())
647}
648
649pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
656 let mut buffer = Vec::new();
657 write_avro_datum(schema, value, &mut buffer)?;
658 Ok(buffer)
659}
660
661pub fn to_avro_datum_schemata<T: Into<Value>>(
666 schema: &Schema,
667 schemata: Vec<&Schema>,
668 value: T,
669) -> AvroResult<Vec<u8>> {
670 let mut buffer = Vec::new();
671 write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
672 Ok(buffer)
673}
674
675#[cfg(not(target_arch = "wasm32"))]
676fn generate_sync_marker() -> [u8; 16] {
677 let mut marker = [0_u8; 16];
678 std::iter::repeat_with(rand::random)
679 .take(16)
680 .enumerate()
681 .for_each(|(i, n)| marker[i] = n);
682 marker
683}
684
685#[cfg(target_arch = "wasm32")]
686fn generate_sync_marker() -> [u8; 16] {
687 let mut marker = [0_u8; 16];
688 std::iter::repeat_with(quad_rand::rand)
689 .take(4)
690 .flat_map(|i| i.to_be_bytes())
691 .enumerate()
692 .for_each(|(i, n)| marker[i] = n);
693 marker
694}
695
696#[cfg(test)]
697mod tests {
698 use std::{cell::RefCell, rc::Rc};
699
700 use super::*;
701 use crate::{
702 decimal::Decimal,
703 duration::{Days, Duration, Millis, Months},
704 schema::{DecimalSchema, FixedSchema, Name},
705 types::Record,
706 util::zig_i64,
707 Reader,
708 };
709 use pretty_assertions::assert_eq;
710 use serde::{Deserialize, Serialize};
711
712 use apache_avro_test_helper::TestResult;
713
714 const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
715
716 const SCHEMA: &str = r#"
717 {
718 "type": "record",
719 "name": "test",
720 "fields": [
721 {
722 "name": "a",
723 "type": "long",
724 "default": 42
725 },
726 {
727 "name": "b",
728 "type": "string"
729 }
730 ]
731 }
732 "#;
733 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
734
735 #[test]
736 fn test_to_avro_datum() -> TestResult {
737 let schema = Schema::parse_str(SCHEMA)?;
738 let mut record = Record::new(&schema).unwrap();
739 record.put("a", 27i64);
740 record.put("b", "foo");
741
742 let mut expected = Vec::new();
743 zig_i64(27, &mut expected)?;
744 zig_i64(3, &mut expected)?;
745 expected.extend([b'f', b'o', b'o']);
746
747 assert_eq!(to_avro_datum(&schema, record)?, expected);
748
749 Ok(())
750 }
751
752 #[test]
753 fn test_union_not_null() -> TestResult {
754 let schema = Schema::parse_str(UNION_SCHEMA)?;
755 let union = Value::Union(1, Box::new(Value::Long(3)));
756
757 let mut expected = Vec::new();
758 zig_i64(1, &mut expected)?;
759 zig_i64(3, &mut expected)?;
760
761 assert_eq!(to_avro_datum(&schema, union)?, expected);
762
763 Ok(())
764 }
765
766 #[test]
767 fn test_union_null() -> TestResult {
768 let schema = Schema::parse_str(UNION_SCHEMA)?;
769 let union = Value::Union(0, Box::new(Value::Null));
770
771 let mut expected = Vec::new();
772 zig_i64(0, &mut expected)?;
773
774 assert_eq!(to_avro_datum(&schema, union)?, expected);
775
776 Ok(())
777 }
778
779 fn logical_type_test<T: Into<Value> + Clone>(
780 schema_str: &'static str,
781
782 expected_schema: &Schema,
783 value: Value,
784
785 raw_schema: &Schema,
786 raw_value: T,
787 ) -> TestResult {
788 let schema = Schema::parse_str(schema_str)?;
789 assert_eq!(&schema, expected_schema);
790 let ser = to_avro_datum(&schema, value.clone())?;
792 let raw_ser = to_avro_datum(raw_schema, raw_value)?;
793 assert_eq!(ser, raw_ser);
794
795 let mut r = ser.as_slice();
797 let de = crate::from_avro_datum(&schema, &mut r, None)?;
798 assert_eq!(de, value);
799 Ok(())
800 }
801
802 #[test]
803 fn date() -> TestResult {
804 logical_type_test(
805 r#"{"type": "int", "logicalType": "date"}"#,
806 &Schema::Date,
807 Value::Date(1_i32),
808 &Schema::Int,
809 1_i32,
810 )
811 }
812
813 #[test]
814 fn time_millis() -> TestResult {
815 logical_type_test(
816 r#"{"type": "int", "logicalType": "time-millis"}"#,
817 &Schema::TimeMillis,
818 Value::TimeMillis(1_i32),
819 &Schema::Int,
820 1_i32,
821 )
822 }
823
824 #[test]
825 fn time_micros() -> TestResult {
826 logical_type_test(
827 r#"{"type": "long", "logicalType": "time-micros"}"#,
828 &Schema::TimeMicros,
829 Value::TimeMicros(1_i64),
830 &Schema::Long,
831 1_i64,
832 )
833 }
834
835 #[test]
836 fn timestamp_millis() -> TestResult {
837 logical_type_test(
838 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
839 &Schema::TimestampMillis,
840 Value::TimestampMillis(1_i64),
841 &Schema::Long,
842 1_i64,
843 )
844 }
845
846 #[test]
847 fn timestamp_micros() -> TestResult {
848 logical_type_test(
849 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
850 &Schema::TimestampMicros,
851 Value::TimestampMicros(1_i64),
852 &Schema::Long,
853 1_i64,
854 )
855 }
856
857 #[test]
858 fn decimal_fixed() -> TestResult {
859 let size = 30;
860 let inner = Schema::Fixed(FixedSchema {
861 name: Name::new("decimal")?,
862 aliases: None,
863 doc: None,
864 size,
865 default: None,
866 attributes: Default::default(),
867 });
868 let value = vec![0u8; size];
869 logical_type_test(
870 r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
871 &Schema::Decimal(DecimalSchema {
872 precision: 20,
873 scale: 5,
874 inner: Box::new(inner.clone()),
875 }),
876 Value::Decimal(Decimal::from(value.clone())),
877 &inner,
878 Value::Fixed(size, value),
879 )
880 }
881
882 #[test]
883 fn decimal_bytes() -> TestResult {
884 let inner = Schema::Bytes;
885 let value = vec![0u8; 10];
886 logical_type_test(
887 r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
888 &Schema::Decimal(DecimalSchema {
889 precision: 4,
890 scale: 3,
891 inner: Box::new(inner.clone()),
892 }),
893 Value::Decimal(Decimal::from(value.clone())),
894 &inner,
895 value,
896 )
897 }
898
899 #[test]
900 fn duration() -> TestResult {
901 let inner = Schema::Fixed(FixedSchema {
902 name: Name::new("duration")?,
903 aliases: None,
904 doc: None,
905 size: 12,
906 default: None,
907 attributes: Default::default(),
908 });
909 let value = Value::Duration(Duration::new(
910 Months::new(256),
911 Days::new(512),
912 Millis::new(1024),
913 ));
914 logical_type_test(
915 r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
916 &Schema::Duration,
917 value,
918 &inner,
919 Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
920 )
921 }
922
923 #[test]
924 fn test_writer_append() -> TestResult {
925 let schema = Schema::parse_str(SCHEMA)?;
926 let mut writer = Writer::new(&schema, Vec::new());
927
928 let mut record = Record::new(&schema).unwrap();
929 record.put("a", 27i64);
930 record.put("b", "foo");
931
932 let n1 = writer.append(record.clone())?;
933 let n2 = writer.append(record.clone())?;
934 let n3 = writer.flush()?;
935 let result = writer.into_inner()?;
936
937 assert_eq!(n1 + n2 + n3, result.len());
938
939 let mut data = Vec::new();
940 zig_i64(27, &mut data)?;
941 zig_i64(3, &mut data)?;
942 data.extend(b"foo");
943 data.extend(data.clone());
944
945 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
947 let last_data_byte = result.len() - 16;
949 assert_eq!(
950 &result[last_data_byte - data.len()..last_data_byte],
951 data.as_slice()
952 );
953
954 Ok(())
955 }
956
957 #[test]
958 fn test_writer_extend() -> TestResult {
959 let schema = Schema::parse_str(SCHEMA)?;
960 let mut writer = Writer::new(&schema, Vec::new());
961
962 let mut record = Record::new(&schema).unwrap();
963 record.put("a", 27i64);
964 record.put("b", "foo");
965 let record_copy = record.clone();
966 let records = vec![record, record_copy];
967
968 let n1 = writer.extend(records)?;
969 let n2 = writer.flush()?;
970 let result = writer.into_inner()?;
971
972 assert_eq!(n1 + n2, result.len());
973
974 let mut data = Vec::new();
975 zig_i64(27, &mut data)?;
976 zig_i64(3, &mut data)?;
977 data.extend(b"foo");
978 data.extend(data.clone());
979
980 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
982 let last_data_byte = result.len() - 16;
984 assert_eq!(
985 &result[last_data_byte - data.len()..last_data_byte],
986 data.as_slice()
987 );
988
989 Ok(())
990 }
991
992 #[derive(Debug, Clone, Deserialize, Serialize)]
993 struct TestSerdeSerialize {
994 a: i64,
995 b: String,
996 }
997
998 #[test]
999 fn test_writer_append_ser() -> TestResult {
1000 let schema = Schema::parse_str(SCHEMA)?;
1001 let mut writer = Writer::new(&schema, Vec::new());
1002
1003 let record = TestSerdeSerialize {
1004 a: 27,
1005 b: "foo".to_owned(),
1006 };
1007
1008 let n1 = writer.append_ser(record)?;
1009 let n2 = writer.flush()?;
1010 let result = writer.into_inner()?;
1011
1012 assert_eq!(n1 + n2, result.len());
1013
1014 let mut data = Vec::new();
1015 zig_i64(27, &mut data)?;
1016 zig_i64(3, &mut data)?;
1017 data.extend(b"foo");
1018
1019 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1021 let last_data_byte = result.len() - 16;
1023 assert_eq!(
1024 &result[last_data_byte - data.len()..last_data_byte],
1025 data.as_slice()
1026 );
1027
1028 Ok(())
1029 }
1030
1031 #[test]
1032 fn test_writer_extend_ser() -> TestResult {
1033 let schema = Schema::parse_str(SCHEMA)?;
1034 let mut writer = Writer::new(&schema, Vec::new());
1035
1036 let record = TestSerdeSerialize {
1037 a: 27,
1038 b: "foo".to_owned(),
1039 };
1040 let record_copy = record.clone();
1041 let records = vec![record, record_copy];
1042
1043 let n1 = writer.extend_ser(records)?;
1044 let n2 = writer.flush()?;
1045 let result = writer.into_inner()?;
1046
1047 assert_eq!(n1 + n2, result.len());
1048
1049 let mut data = Vec::new();
1050 zig_i64(27, &mut data)?;
1051 zig_i64(3, &mut data)?;
1052 data.extend(b"foo");
1053 data.extend(data.clone());
1054
1055 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1057 let last_data_byte = result.len() - 16;
1059 assert_eq!(
1060 &result[last_data_byte - data.len()..last_data_byte],
1061 data.as_slice()
1062 );
1063
1064 Ok(())
1065 }
1066
1067 fn make_writer_with_codec(schema: &Schema) -> Writer<'_, Vec<u8>> {
1068 Writer::with_codec(schema, Vec::new(), Codec::Deflate)
1069 }
1070
1071 fn make_writer_with_builder(schema: &Schema) -> Writer<'_, Vec<u8>> {
1072 Writer::builder()
1073 .writer(Vec::new())
1074 .schema(schema)
1075 .codec(Codec::Deflate)
1076 .block_size(100)
1077 .build()
1078 }
1079
1080 fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1081 let mut record = Record::new(schema).unwrap();
1082 record.put("a", 27i64);
1083 record.put("b", "foo");
1084
1085 let n1 = writer.append(record.clone())?;
1086 let n2 = writer.append(record.clone())?;
1087 let n3 = writer.flush()?;
1088 let result = writer.into_inner()?;
1089
1090 assert_eq!(n1 + n2 + n3, result.len());
1091
1092 let mut data = Vec::new();
1093 zig_i64(27, &mut data)?;
1094 zig_i64(3, &mut data)?;
1095 data.extend(b"foo");
1096 data.extend(data.clone());
1097 Codec::Deflate.compress(&mut data)?;
1098
1099 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1101 let last_data_byte = result.len() - 16;
1103 assert_eq!(
1104 &result[last_data_byte - data.len()..last_data_byte],
1105 data.as_slice()
1106 );
1107
1108 Ok(())
1109 }
1110
1111 #[test]
1112 fn test_writer_with_codec() -> TestResult {
1113 let schema = Schema::parse_str(SCHEMA)?;
1114 let writer = make_writer_with_codec(&schema);
1115 check_writer(writer, &schema)
1116 }
1117
1118 #[test]
1119 fn test_writer_with_builder() -> TestResult {
1120 let schema = Schema::parse_str(SCHEMA)?;
1121 let writer = make_writer_with_builder(&schema);
1122 check_writer(writer, &schema)
1123 }
1124
1125 #[test]
1126 fn test_logical_writer() -> TestResult {
1127 const LOGICAL_TYPE_SCHEMA: &str = r#"
1128 {
1129 "type": "record",
1130 "name": "logical_type_test",
1131 "fields": [
1132 {
1133 "name": "a",
1134 "type": [
1135 "null",
1136 {
1137 "type": "long",
1138 "logicalType": "timestamp-micros"
1139 }
1140 ]
1141 }
1142 ]
1143 }
1144 "#;
1145 let codec = Codec::Deflate;
1146 let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1147 let mut writer = Writer::builder()
1148 .schema(&schema)
1149 .codec(codec)
1150 .writer(Vec::new())
1151 .build();
1152
1153 let mut record1 = Record::new(&schema).unwrap();
1154 record1.put(
1155 "a",
1156 Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1157 );
1158
1159 let mut record2 = Record::new(&schema).unwrap();
1160 record2.put("a", Value::Union(0, Box::new(Value::Null)));
1161
1162 let n1 = writer.append(record1)?;
1163 let n2 = writer.append(record2)?;
1164 let n3 = writer.flush()?;
1165 let result = writer.into_inner()?;
1166
1167 assert_eq!(n1 + n2 + n3, result.len());
1168
1169 let mut data = Vec::new();
1170 zig_i64(1, &mut data)?;
1172 zig_i64(1234, &mut data)?;
1173
1174 zig_i64(0, &mut data)?;
1176 codec.compress(&mut data)?;
1177
1178 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1180 let last_data_byte = result.len() - 16;
1182 assert_eq!(
1183 &result[last_data_byte - data.len()..last_data_byte],
1184 data.as_slice()
1185 );
1186
1187 Ok(())
1188 }
1189
1190 #[test]
1191 fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1192 let schema = Schema::parse_str(SCHEMA)?;
1193 let mut writer = Writer::new(&schema, Vec::new());
1194
1195 writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1196 writer.add_user_metadata("strKey".to_string(), "strValue")?;
1197 writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1198 writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1199
1200 let mut record = Record::new(&schema).unwrap();
1201 record.put("a", 27i64);
1202 record.put("b", "foo");
1203
1204 writer.append(record.clone())?;
1205 writer.append(record.clone())?;
1206 writer.flush()?;
1207 let result = writer.into_inner()?;
1208
1209 assert_eq!(result.len(), 260);
1210
1211 Ok(())
1212 }
1213
1214 #[test]
1215 fn test_avro_3881_metadata_empty_body() -> TestResult {
1216 let schema = Schema::parse_str(SCHEMA)?;
1217 let mut writer = Writer::new(&schema, Vec::new());
1218 writer.add_user_metadata("a".to_string(), "b")?;
1219 let result = writer.into_inner()?;
1220
1221 let reader = Reader::with_schema(&schema, &result[..])?;
1222 let mut expected = HashMap::new();
1223 expected.insert("a".to_string(), vec![b'b']);
1224 assert_eq!(reader.user_metadata(), &expected);
1225 assert_eq!(reader.into_iter().count(), 0);
1226
1227 Ok(())
1228 }
1229
1230 #[test]
1231 fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1232 let schema = Schema::parse_str(SCHEMA)?;
1233 let mut writer = Writer::new(&schema, Vec::new());
1234
1235 let mut record = Record::new(&schema).unwrap();
1236 record.put("a", 27i64);
1237 record.put("b", "foo");
1238 writer.append(record.clone())?;
1239
1240 match writer.add_user_metadata("stringKey".to_string(), String::from("value2")) {
1241 Err(e @ Error::FileHeaderAlreadyWritten) => {
1242 assert_eq!(e.to_string(), "The file metadata is already flushed.")
1243 }
1244 Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1245 Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1246 }
1247
1248 Ok(())
1249 }
1250
1251 #[test]
1252 fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1253 let schema = Schema::parse_str(SCHEMA)?;
1254 let mut writer = Writer::new(&schema, Vec::new());
1255
1256 let key = "avro.stringKey".to_string();
1257 match writer.add_user_metadata(key.clone(), "value") {
1258 Err(ref e @ Error::InvalidMetadataKey(_)) => {
1259 assert_eq!(e.to_string(), format!("Metadata keys starting with 'avro.' are reserved for internal usage: {key}."))
1260 }
1261 Err(e) => panic!(
1262 "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1263 ),
1264 Ok(_) => panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'"),
1265 }
1266
1267 Ok(())
1268 }
1269
1270 #[test]
1271 fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1272 let schema = Schema::parse_str(SCHEMA)?;
1273
1274 let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1275 user_meta_data.insert(
1276 "stringKey".to_string(),
1277 Value::String("stringValue".to_string()),
1278 );
1279 user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1280 user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1281
1282 let writer: Writer<'_, Vec<u8>> = Writer::builder()
1283 .writer(Vec::new())
1284 .schema(&schema)
1285 .user_metadata(user_meta_data.clone())
1286 .build();
1287
1288 assert_eq!(writer.user_metadata, user_meta_data);
1289
1290 Ok(())
1291 }
1292
1293 #[derive(Serialize, Clone)]
1294 struct TestSingleObjectWriter {
1295 a: i64,
1296 b: f64,
1297 c: Vec<String>,
1298 }
1299
1300 impl AvroSchema for TestSingleObjectWriter {
1301 fn get_schema() -> Schema {
1302 let schema = r#"
1303 {
1304 "type":"record",
1305 "name":"TestSingleObjectWrtierSerialize",
1306 "fields":[
1307 {
1308 "name":"a",
1309 "type":"long"
1310 },
1311 {
1312 "name":"b",
1313 "type":"double"
1314 },
1315 {
1316 "name":"c",
1317 "type":{
1318 "type":"array",
1319 "items":"string"
1320 }
1321 }
1322 ]
1323 }
1324 "#;
1325 Schema::parse_str(schema).unwrap()
1326 }
1327 }
1328
1329 impl From<TestSingleObjectWriter> for Value {
1330 fn from(obj: TestSingleObjectWriter) -> Value {
1331 Value::Record(vec![
1332 ("a".into(), obj.a.into()),
1333 ("b".into(), obj.b.into()),
1334 (
1335 "c".into(),
1336 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1337 ),
1338 ])
1339 }
1340 }
1341
1342 #[test]
1343 fn test_single_object_writer() -> TestResult {
1344 let mut buf: Vec<u8> = Vec::new();
1345 let obj = TestSingleObjectWriter {
1346 a: 300,
1347 b: 34.555,
1348 c: vec!["cat".into(), "dog".into()],
1349 };
1350 let mut writer = GenericSingleObjectWriter::new_with_capacity(
1351 &TestSingleObjectWriter::get_schema(),
1352 1024,
1353 )
1354 .expect("Should resolve schema");
1355 let value = obj.into();
1356 let written_bytes = writer
1357 .write_value_ref(&value, &mut buf)
1358 .expect("Error serializing properly");
1359
1360 assert!(buf.len() > 10, "no bytes written");
1361 assert_eq!(buf.len(), written_bytes);
1362 assert_eq!(buf[0], 0xC3);
1363 assert_eq!(buf[1], 0x01);
1364 assert_eq!(
1365 &buf[2..10],
1366 &TestSingleObjectWriter::get_schema()
1367 .fingerprint::<Rabin>()
1368 .bytes[..]
1369 );
1370 let mut msg_binary = Vec::new();
1371 encode(
1372 &value,
1373 &TestSingleObjectWriter::get_schema(),
1374 &mut msg_binary,
1375 )
1376 .expect("encode should have failed by here as a dependency of any writing");
1377 assert_eq!(&buf[10..], &msg_binary[..]);
1378
1379 Ok(())
1380 }
1381
1382 #[test]
1383 fn test_writer_parity() -> TestResult {
1384 let obj1 = TestSingleObjectWriter {
1385 a: 300,
1386 b: 34.555,
1387 c: vec!["cat".into(), "dog".into()],
1388 };
1389
1390 let mut buf1: Vec<u8> = Vec::new();
1391 let mut buf2: Vec<u8> = Vec::new();
1392 let mut buf3: Vec<u8> = Vec::new();
1393
1394 let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1395 &TestSingleObjectWriter::get_schema(),
1396 1024,
1397 )
1398 .expect("Should resolve schema");
1399 let mut specific_writer =
1400 SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
1401 .expect("Resolved should pass");
1402 specific_writer
1403 .write(obj1.clone(), &mut buf1)
1404 .expect("Serialization expected");
1405 specific_writer
1406 .write_value(obj1.clone(), &mut buf2)
1407 .expect("Serialization expected");
1408 generic_writer
1409 .write_value(obj1.into(), &mut buf3)
1410 .expect("Serialization expected");
1411 assert_eq!(buf1, buf2);
1412 assert_eq!(buf1, buf3);
1413
1414 Ok(())
1415 }
1416
1417 #[test]
1418 fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1419 const SCHEMA: &str = r#"
1420 {
1421 "type": "record",
1422 "name": "Conference",
1423 "fields": [
1424 {"type": "string", "name": "name"},
1425 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1426 ]
1427 }"#;
1428
1429 #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1430 pub struct Conference {
1431 pub name: String,
1432 pub time: Option<i64>,
1433 }
1434
1435 let conf = Conference {
1436 name: "RustConf".to_string(),
1437 time: Some(1234567890),
1438 };
1439
1440 let schema = Schema::parse_str(SCHEMA)?;
1441 let mut writer = Writer::new(&schema, Vec::new());
1442
1443 let bytes = writer.append_ser(conf)?;
1444
1445 assert_eq!(198, bytes);
1446
1447 Ok(())
1448 }
1449
1450 #[test]
1451 fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1452 const SCHEMA: &str = r#"
1453 {
1454 "type": "record",
1455 "name": "Conference",
1456 "fields": [
1457 {"type": "string", "name": "name"},
1458 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1459 ]
1460 }"#;
1461
1462 #[derive(Debug, PartialEq, Clone, Serialize)]
1463 pub struct Conference {
1464 pub name: String,
1465 pub time: Option<f64>, }
1467
1468 let conf = Conference {
1469 name: "RustConf".to_string(),
1470 time: Some(12345678.90),
1471 };
1472
1473 let schema = Schema::parse_str(SCHEMA)?;
1474 let mut writer = Writer::new(&schema, Vec::new());
1475
1476 match writer.append_ser(conf) {
1477 Ok(bytes) => panic!("Expected an error, but got {} bytes written", bytes),
1478 Err(e) => {
1479 assert_eq!(
1480 e.to_string(),
1481 r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Long: 12345678.9. Cause: Expected: Long. Got: Double"#
1482 );
1483 }
1484 }
1485 Ok(())
1486 }
1487
1488 #[test]
1489 fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1490 const SCHEMA: &str = r#"
1491 {
1492 "type": "record",
1493 "name": "ExampleSchema",
1494 "fields": [
1495 {"name": "exampleField", "type": "string"}
1496 ]
1497 }
1498 "#;
1499
1500 #[derive(Clone, Default)]
1501 struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1502
1503 impl TestBuffer {
1504 fn len(&self) -> usize {
1505 self.0.borrow().len()
1506 }
1507 }
1508
1509 impl Write for TestBuffer {
1510 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1511 self.0.borrow_mut().write(buf)
1512 }
1513
1514 fn flush(&mut self) -> std::io::Result<()> {
1515 Ok(())
1516 }
1517 }
1518
1519 let shared_buffer = TestBuffer::default();
1520
1521 let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1522
1523 let schema = Schema::parse_str(SCHEMA)?;
1524
1525 let mut writer = Writer::new(&schema, buffered_writer);
1526
1527 let mut record = Record::new(writer.schema()).unwrap();
1528 record.put("exampleField", "value");
1529
1530 writer.append(record)?;
1531 writer.flush()?;
1532
1533 assert_eq!(
1534 shared_buffer.len(),
1535 167,
1536 "the test buffer was not fully written to after Writer::flush was called"
1537 );
1538
1539 Ok(())
1540 }
1541}