import copy
from intake.source import base
from . import __version__
[docs]class AvroTableSource(base.DataSource):
"""
Source to load tabular Avro datasets.
Parameters
----------
urlpath: str
Location of the data files; can include protocol and glob characters.
blocksize: int or None
Partition the input files by roughly this number of bytes. Actual
partition sizes will depend on the inherent structure of the data
files. If None, each input file will be one partition, no file
scanning will be needed ahead of time
storage_options: dict or None
Parameters to pass on to the file-system backend
"""
version = __version__
container = 'dataframe'
name = 'avro_table'
def __init__(self, urlpath, blocksize=100000000,
metadata=None, storage_options=None):
self._urlpath = urlpath
self._storage_options = storage_options or {}
self._bs = blocksize
self._df = None
super(AvroTableSource, self).__init__(metadata=metadata)
def _get_schema(self):
if self._df is None:
from uavro import dask_read_avro
from uavro.core import read_header
from dask.bytes import open_files
self._df = dask_read_avro(self._urlpath, blocksize=self._bs,
storage_options=self._storage_options)
files = open_files(self._urlpath, **self._storage_options)
with copy.copy(files[0]) as f:
# we assume the same header for all files
self.metadata.update(read_header(f))
self.npartitions = self._df.npartitions
dtypes = {k: str(v) for k, v in self._df.dtypes.items()}
return base.Schema(datashape=None,
dtype=dtypes,
shape=(None, len(dtypes)),
npartitions=self.npartitions,
extra_metadata={})
def _get_partition(self, i):
self._get_schema()
return self._df.get_partition(i).compute()
[docs] def read(self):
self._get_schema()
return self._df.compute()
[docs] def to_dask(self):
"""Create lazy dask dataframe object"""
self._get_schema()
return self._df
[docs] def to_spark(self):
"""Pass URL to spark to load as a DataFrame
Note that this requires ``org.apache.spark.sql.avro.AvroFileFormat``
to be installed in your spark classes.
This feature is experimental.
"""
from intake_spark.base import SparkHolder
sh = SparkHolder(True, [
['read'],
['format', ["com.databricks.spark.avro"]],
['load', [self._urlpath]]
], {})
return sh.setup()
[docs]class AvroSequenceSource(base.DataSource):
"""
Source to load Avro datasets as sequence of Python dicts.
Parameters
----------
urlpath: str
Location of the data files; can include protocol and glob characters.
blocksize: int or None
Partition the input files by roughly this number of bytes. Actual
partition sizes will depend on the inherent structure of the data
files. If None, each input file will be one partition, no file
scanning will be needed ahead of time
storage_options: dict or None
Parameters to pass on to the file-system backend
"""
version = __version__
container = 'python'
name = 'avro_sequence'
def __init__(self, urlpath, blocksize=100000000, metadata=None,
storage_options=None):
self._urlpath = urlpath
self._bs = blocksize
self._storage_options = storage_options or {}
self._bag = None
super(AvroSequenceSource, self).__init__(metadata=metadata)
def _get_schema(self):
if self._bag is None:
from dask.bag import read_avro
self._bag = read_avro(self._urlpath, blocksize=self._bs,
storage_options=self._storage_options)
self.npartitions = self._bag.npartitions
return base.Schema(datashape=None,
dtype=None,
shape=None,
npartitions=self._bag.npartitions,
extra_metadata={})
def _get_partition(self, i):
self._get_schema()
return self._bag.to_delayed()[i].compute()
[docs] def read(self):
self._get_schema()
return self._bag.compute()
[docs] def to_dask(self):
"""Create lazy dask bag object"""
self._get_schema()
return self._bag