Source code for intake_avro.source

from intake.source import base
import io

from . import __version__


[docs]class AvroTableSource(base.DataSource): """ Source to load tabular Avro datasets. Parameters ---------- urlpath: str Location of the data files; can include protocol and glob characters. """ version = __version__ container = 'dataframe' name = 'avro_table' def __init__(self, urlpath, metadata=None, storage_options=None): self._urlpath = urlpath self._storage_options = storage_options or {} self._head = None super(AvroTableSource, self).__init__(metadata=metadata) def _get_schema(self): from dask.bytes.core import open_files import uavro.core as avrocore self._files = open_files(self._urlpath, mode='rb', **self._storage_options) if self._head is None: with self._files[0] as f: self._head = avrocore.read_header(f) dtypes = self._head['dtypes'] # Avro schemas have a "namespace" and a "name" that could be metadata return base.Schema(datashape=None, dtype=dtypes, shape=(None, len(dtypes)), npartitions=len(self._files), extra_metadata={}) def _get_partition(self, i): return read_file_uavro(self._files[i], self._head)
[docs] def read(self): self._get_schema() return self.to_dask().compute()
[docs] def to_dask(self): """Create lazy dask dataframe object""" import dask.dataframe as dd from dask import delayed self.discover() dpart = delayed(read_file_uavro) return dd.from_delayed([dpart(f, self._head) for f in self._files], meta=self.dtype)
[docs] def to_spark(self): """Pass URL to spark to load as a DataFrame Note that this requires ``org.apache.spark.sql.avro.AvroFileFormat`` to be installed in your spark classes. This feature is experimental. """ from intake_spark.base import SparkHolder sh = SparkHolder(True, [ ['read'], ['format', ["com.databricks.spark.avro"]], ['load', [self._urlpath]] ], {}) return sh.setup()
def read_file_uavro(f, head): import uavro.core as avrocore with f as f: data = f.read() return avrocore.filelike_to_dataframe(io.BytesIO(data), len(data), head, scan=True)
[docs]class AvroSequenceSource(base.DataSource): """ Source to load Avro datasets as sequence of Python dicts. Parameters ---------- urlpath: str Location of the data files; can include protocol and glob characters. """ version = __version__ container = 'python' name = 'avro_sequence' def __init__(self, urlpath, metadata=None, storage_options=None): self._urlpath = urlpath self._storage_options = storage_options or {} self._head = None super(AvroSequenceSource, self).__init__(metadata=metadata) def _get_schema(self): from dask.bytes.core import open_files self._files = open_files(self._urlpath, mode='rb', **self._storage_options) # avro schemas have a "namespace" and a "name" that could be metadata return base.Schema(datashape=None, dtype=None, shape=None, npartitions=len(self._files), extra_metadata={}) def _get_partition(self, i): self._get_schema() return read_file_fastavro(self._files[i])
[docs] def read(self): self._get_schema() return self.to_dask().compute()
[docs] def to_dask(self): """Create lazy dask bag object""" from dask import delayed import dask.bag as db self._get_schema() dpart = delayed(read_file_fastavro) return db.from_delayed([dpart(f) for f in self._files])
def read_file_fastavro(f): import fastavro with f as f: return list(fastavro.reader(f))