Source code for ragnar.objstream

from typing import Union

from ragnar.stream import Stream
from functools import partial
import types


[docs]class ObjStream(Stream): """This object inherits from the Stream object, specific for record-based data. :param columns: Object headers. :type repeatable: list :param skip_first: 0mits the first row (usually headers) :type repeatable: bool, optional """ def __init__(self, value, **kwargs): super().__init__(value, **kwargs) self.is_dict_source = None self.skip_first = kwargs.pop("skip_first", False) self.columns = kwargs.pop("columns", None) def __applyto_func__(self, row, **kwargs): keys = kwargs.pop("keys", None) idxs = kwargs.pop("idxs", None) func = kwargs.pop("func", None) entire_object = kwargs.pop("entire_object", False) if isinstance(row, types.GeneratorType): row = list(row) if self.is_dict_source: row = row[0] if isinstance(row, list): if self.is_dict_source is None: self.is_dict_source = False for index, element in enumerate(row): if index in idxs: if entire_object: yield func(dict(zip(self.columns, row))) else: yield func(element) else: yield element if isinstance(row, dict): if self.is_dict_source is None: self.is_dict_source = True for key in keys: if entire_object: row[key] = func(row) else: row[key] = func(row[key]) yield row def __next__(self): try: if self.skip_first: self.__show_next__() self.skip_first = False if self.is_dict_source: # Internally the stream creates a list of the object so the first one is extracted return self.__show_next__()[0] elif self.is_dict_source is None: # The content is known after the end of the first iteration obj = self.__show_next__() if self.is_dict_source and self.columns: raise Exception( "For dictionary type data sources it is not allowed to add the parameter columns" ) if self.is_dict_source: return obj[0] else: return obj else: return self.__show_next__() except StopIteration: self.__rebuild__() raise StopIteration
[docs] def applyto(self, fields: Union[str, list], func, entire_object=False): """ This method adds a function to apply to the execution stack. :param fields: fields where to apply the function. :type fields: typing.Union[list, str] :param func: method to be included in the execution stack :param entire_object: passing the field or the entire record as a dictionary to the function :type entire_object: bool, optional """ columns = getattr(self, "columns", None) idxs = [] cols = [] if columns and isinstance(fields, list): for col in fields: idxs.append(columns.index(col)) elif columns and isinstance(fields, str): idxs.append(columns.index(fields)) elif columns is None and isinstance(fields, list): cols = fields elif columns is None and isinstance(fields, str): cols = [fields] self.do( partial( self.__applyto_func__, keys=cols, idxs=idxs, func=func, entire_object=entire_object, ) ) return self