Source code for ragnar.stream

import types
from functools import reduce
from collections.abc import Iterable
from itertools import tee, chain


# based on https://github.com/pytoolz/toolz/blob/master/toolz/functoolz.py
[docs]def func_cat(value, *forms): def evalform_front(value, form): if isinstance(value, Iterable) and callable(form): for obj in value: if obj is not None: yield form(obj) return reduce(evalform_front, forms, value)
[docs]class Stream(object): """This object brings together the advantages of generators and the functional programming paradigm. Stream is an object that allows accumulating actions to be applied to a dataset. :param repeatable: This parameter is used to enable repeating iterations if `'True'` allows to iterate as many times as required :type repeatable: bool, optional """ def __init__(self, value, **kwargs): self.value = value # storage used for child classes, which will be used to store variables # between to recreate the stream. It is mandatory to store with the same name # as the variable in the child class. self.__set_store__(dict(kwargs)) # working variables self.repeatable = kwargs.pop("repeatable", False) # Iterator config self.iter = None self.iter_repeatable = None # Func config self.__stack__ = [] def __set_store__(self, parameter: dict): """ Method that stores the initial settings for the Stream. :param parameter: A dictionary taken from the arguments **kwargs :return: None """ self.__store__ = type("Store", (object,), parameter) def __regenerate_store__(self): for key in vars(self.__store__).keys(): if not key.startswith("__"): setattr(self, key, getattr(self.__store__, key)) def __build__(self): for stack in self.__stack__: if stack.type == "do": if stack.chain: self.value = chain(*func_cat(self.value, stack.fuction)) else: self.value = func_cat(self.value, stack.fuction) if stack.type == "filter": self.value = filter(stack.fuction, self.value) if self.repeatable: self.iter, self.iter_repeatable = tee(iter(self.value)) else: self.iter = iter(self.value) def __rebuild__(self): """ This method will regenerate the iterator. """ if self.repeatable: self.__regenerate_store__() self.iter, self.iter_repeatable = tee(self.iter_repeatable) def __iter__(self): if not self.iter: self.__build__() return self def __next__(self): try: return self.__show_next__() except StopIteration: self.__rebuild__() raise StopIteration def __show_next__(self): """ This method allows you to treat the item before showing it. """ item = self.iter.__next__() if isinstance(item, types.GeneratorType): return list(item) return item def __add_to_stack__(self, function_factory): self.__stack__.append(function_factory)
[docs] def do(self, func, chain=False): """ This method adds a function to apply to the execution stack. :param func: method to be included in the execution stack :param chain: The results are merged into a single dataset. For example if you read multiple files the results are merged to loop like a single list. :type chain: bool, optional """ self.__add_to_stack__( type( "FunctionFactory", (object,), {"type": "do", "fuction": func, "chain": chain}, ) ) return self
[docs] def filter(self, func): """ This method adds a filter to apply to the execution stack. :param func: method to be included in the execution stack. It must be a function that returns a boolean value, otherwise the filter is not applied. """ self.__stack__.append( type("FunctionFactory", (object,), {"type": "filter", "fuction": func}) ) return self