## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#"""A wrapper class for Spark Column to behave like pandas Series."""importdatetimeimportreimportinspectimportsysimportwarningsfromcollections.abcimportMappingfromfunctoolsimportpartial,reducefromtypingimport(Any,Callable,Dict,Generic,IO,Iterable,List,Optional,Sequence,Tuple,Type,Union,cast,no_type_check,overload,TYPE_CHECKING,)importnumpyasnpimportpandasaspdfrompandas.core.accessorimportCachedAccessorfrompandas.io.formats.printingimportpprint_thingfrompandas.api.typesimport(# type: ignore[attr-defined]is_list_like,is_hashable,CategoricalDtype,)frompandas.tseries.frequenciesimportDateOffsetfrompysparkimportSparkContextfrompyspark.sqlimportfunctionsasF,Column,DataFrameasSparkDataFramefrompyspark.sql.typesimport(ArrayType,BooleanType,DecimalType,DoubleType,FloatType,IntegerType,IntegralType,LongType,NumericType,Row,StructType,TimestampType,)frompyspark.sql.windowimportWindowfrompysparkimportpandasasps# For running doctests and reference resolution in PyCharm.frompyspark.pandas._typingimportAxis,Dtype,Label,Name,Scalar,Tfrompyspark.pandas.accessorsimportPandasOnSparkSeriesMethodsfrompyspark.pandas.categoricalimportCategoricalAccessorfrompyspark.pandas.configimportget_optionfrompyspark.pandas.correlationimport(compute,CORRELATION_VALUE_1_COLUMN,CORRELATION_VALUE_2_COLUMN,CORRELATION_CORR_OUTPUT_COLUMN,CORRELATION_COUNT_OUTPUT_COLUMN,)frompyspark.pandas.baseimportIndexOpsMixinfrompyspark.pandas.exceptionsimportSparkPandasIndexingErrorfrompyspark.pandas.frameimportDataFramefrompyspark.pandas.genericimportFramefrompyspark.pandas.internalimport(InternalField,InternalFrame,DEFAULT_SERIES_NAME,NATURAL_ORDER_COLUMN_NAME,SPARK_DEFAULT_INDEX_NAME,SPARK_DEFAULT_SERIES_NAME,)frompyspark.pandas.missing.seriesimportMissingPandasLikeSeriesfrompyspark.pandas.plotimportPandasOnSparkPlotAccessorfrompyspark.pandas.utilsimport(combine_frames,is_name_like_tuple,is_name_like_value,name_like_string,same_anchor,scol_for,sql_conf,validate_arguments_and_invoke_function,validate_axis,validate_bool_kwarg,verify_temp_column_name,SPARK_CONF_ARROW_ENABLED,log_advice,)frompyspark.pandas.datetimesimportDatetimeMethodsfrompyspark.pandas.sparkimportfunctionsasSFfrompyspark.pandas.spark.accessorsimportSparkSeriesMethodsfrompyspark.pandas.stringsimportStringMethodsfrompyspark.pandas.typedefimport(infer_return_type,spark_type_to_pandas_dtype,ScalarType,SeriesType,create_type_for_series_type,)frompyspark.pandas.typedef.typehintsimportas_spark_typeifTYPE_CHECKING:frompyspark.sql._typingimportColumnOrNamefrompyspark.pandas.groupbyimportSeriesGroupByfrompyspark.pandas.resampleimportSeriesResamplerfrompyspark.pandas.indexesimportIndexfrompyspark.pandas.spark.accessorsimportSparkIndexOpsMethods# This regular expression pattern is compiled and defined here to avoid to compile the same# pattern every time it is used in _repr_ in Series.# This pattern basically seeks the footer string from pandas'REPR_PATTERN=re.compile(r"Length: (?P<length>[0-9]+)")_flex_doc_SERIES="""Return {desc} of series and other, element-wise (binary operator `{op_name}`).Equivalent to ``{equiv}``Parameters----------other : Series or scalar valueReturns-------Series The result of the operation.See Also--------Series.{reverse}{series_examples}"""_add_example_SERIES="""Examples-------->>> df = ps.DataFrame({'a': [2, 2, 4, np.nan],... 'b': [2, np.nan, 2, np.nan]},... index=['a', 'b', 'c', 'd'], columns=['a', 'b'])>>> df a ba 2.0 2.0b 2.0 NaNc 4.0 2.0d NaN NaN>>> df.a.add(df.b)a 4.0b NaNc 6.0d NaNdtype: float64>>> df.a.radd(df.b)a 4.0b NaNc 6.0d NaNdtype: float64"""_sub_example_SERIES="""Examples-------->>> df = ps.DataFrame({'a': [2, 2, 4, np.nan],... 'b': [2, np.nan, 2, np.nan]},... index=['a', 'b', 'c', 'd'], columns=['a', 'b'])>>> df a ba 2.0 2.0b 2.0 NaNc 4.0 2.0d NaN NaN>>> df.a.subtract(df.b)a 0.0b NaNc 2.0d NaNdtype: float64>>> df.a.rsub(df.b)a 0.0b NaNc -2.0d NaNdtype: float64"""_mul_example_SERIES="""Examples-------->>> df = ps.DataFrame({'a': [2, 2, 4, np.nan],... 'b': [2, np.nan, 2, np.nan]},... index=['a', 'b', 'c', 'd'], columns=['a', 'b'])>>> df a ba 2.0 2.0b 2.0 NaNc 4.0 2.0d NaN NaN>>> df.a.multiply(df.b)a 4.0b NaNc 8.0d NaNdtype: float64>>> df.a.rmul(df.b)a 4.0b NaNc 8.0d NaNdtype: float64"""_div_example_SERIES="""Examples-------->>> df = ps.DataFrame({'a': [2, 2, 4, np.nan],... 'b': [2, np.nan, 2, np.nan]},... index=['a', 'b', 'c', 'd'], columns=['a', 'b'])>>> df a ba 2.0 2.0b 2.0 NaNc 4.0 2.0d NaN NaN>>> df.a.divide(df.b)a 1.0b NaNc 2.0d NaNdtype: float64>>> df.a.rdiv(df.b)a 1.0b NaNc 0.5d NaNdtype: float64"""_pow_example_SERIES="""Examples-------->>> df = ps.DataFrame({'a': [2, 2, 4, np.nan],... 'b': [2, np.nan, 2, np.nan]},... index=['a', 'b', 'c', 'd'], columns=['a', 'b'])>>> df a ba 2.0 2.0b 2.0 NaNc 4.0 2.0d NaN NaN>>> df.a.pow(df.b)a 4.0b NaNc 16.0d NaNdtype: float64>>> df.a.rpow(df.b)a 4.0b NaNc 16.0d NaNdtype: float64"""_mod_example_SERIES="""Examples-------->>> df = ps.DataFrame({'a': [2, 2, 4, np.nan],... 'b': [2, np.nan, 2, np.nan]},... index=['a', 'b', 'c', 'd'], columns=['a', 'b'])>>> df a ba 2.0 2.0b 2.0 NaNc 4.0 2.0d NaN NaN>>> df.a.mod(df.b)a 0.0b NaNc 0.0d NaNdtype: float64>>> df.a.rmod(df.b)a 0.0b NaNc 2.0d NaNdtype: float64"""_floordiv_example_SERIES="""Examples-------->>> df = ps.DataFrame({'a': [2, 2, 4, np.nan],... 'b': [2, np.nan, 2, np.nan]},... index=['a', 'b', 'c', 'd'], columns=['a', 'b'])>>> df a ba 2.0 2.0b 2.0 NaNc 4.0 2.0d NaN NaN>>> df.a.floordiv(df.b)a 1.0b NaNc 2.0d NaNdtype: float64>>> df.a.rfloordiv(df.b)a 1.0b NaNc 0.0d NaNdtype: float64"""# Needed to disambiguate Series.str and str typestr_type=str
[docs]classSeries(Frame,IndexOpsMixin,Generic[T]):""" pandas-on-Spark Series that corresponds to pandas Series logically. This holds Spark Column internally. :ivar _internal: an internal immutable Frame to manage metadata. :type _internal: InternalFrame :ivar _psdf: Parent's pandas-on-Spark DataFrame :type _psdf: ps.DataFrame Parameters ---------- data : array-like, dict, or scalar value, pandas Series Contains data stored in Series Note that if `data` is a pandas Series, other arguments should not be used. index : array-like or Index (1d) Values must be hashable and have the same length as `data`. Non-unique index values are allowed. Will default to RangeIndex (0, 1, 2, ..., n) if not provided. If both a dict and index sequence is used, the index will override the keys found in the dict. dtype : numpy.dtype or None If None, dtype will be inferred copy : boolean, default False Copy input data """def__init__(# type: ignore[no-untyped-def]self,data=None,index=None,dtype=None,name=None,copy=False,fastpath=False):assertdataisnotNoneself._anchor:DataFrameself._col_label:Labelifisinstance(data,DataFrame):assertdtypeisNoneassertnameisNoneassertnotcopyassertnotfastpathself._anchor=dataself._col_label=indexelse:ifisinstance(data,pd.Series):assertindexisNoneassertdtypeisNoneassertnameisNoneassertnotcopyassertnotfastpaths=dataelse:frompyspark.pandas.indexes.baseimportIndexifisinstance(index,Index):raiseTypeError("The given index cannot be a pandas-on-Spark index. ""Try pandas index or array-like.")s=pd.Series(data=data,index=index,dtype=dtype,name=name,copy=copy,fastpath=fastpath)internal=InternalFrame.from_pandas(pd.DataFrame(s))ifs.nameisNone:internal=internal.copy(column_labels=[None])anchor=DataFrame(internal)self._anchor=anchorself._col_label=anchor._internal.column_labels[0]object.__setattr__(anchor,"_psseries",{self._column_label:self})@propertydef_psdf(self)->DataFrame:returnself._anchor@propertydef_internal(self)->InternalFrame:returnself._psdf._internal.select_column(self._column_label)@propertydef_column_label(self)->Optional[Label]:returnself._col_labeldef_update_anchor(self,psdf:DataFrame)->None:assertpsdf._internal.column_labels==[self._column_label],(psdf._internal.column_labels,[self._column_label],)self._anchor=psdfobject.__setattr__(psdf,"_psseries",{self._column_label:self})def_with_new_scol(self,scol:Column,*,field:Optional[InternalField]=None)->"Series":""" Copy pandas-on-Spark Series with the new Spark Column. :param scol: the new Spark Column :return: the copied Series """name=name_like_string(self._column_label)internal=self._internal.copy(data_spark_columns=[scol.alias(name)],data_fields=[fieldiffieldisNoneorfield.struct_fieldisNoneelsefield.copy(name=name)],)returnfirst_series(DataFrame(internal))spark:"SparkIndexOpsMethods"=CachedAccessor(# type: ignore[assignment]"spark",SparkSeriesMethods)@propertydefdtypes(self)->Dtype:"""Return the dtype object of the underlying data. >>> s = ps.Series(list('abc')) >>> s.dtype == s.dtypes True """returnself.dtype@propertydefaxes(self)->List["Index"]:""" Return a list of the row axis labels. Examples -------- >>> psser = ps.Series([1, 2, 3]) >>> psser.axes [Int64Index([0, 1, 2], dtype='int64')] """return[self.index]# Arithmetic Operators
pow.__doc__=_flex_doc_SERIES.format(desc="Exponential power of series",op_name="**",equiv="series ** other",reverse="rpow",series_examples=_pow_example_SERIES,)
rfloordiv.__doc__=_flex_doc_SERIES.format(desc="Reverse Integer division",op_name="//",equiv="other // series",reverse="floordiv",series_examples=_floordiv_example_SERIES,)# create accessor for pandas-on-Spark specific methods.pandas_on_spark=CachedAccessor("pandas_on_spark",PandasOnSparkSeriesMethods)# keep the name "koalas" for backward compatibility.koalas=CachedAccessor("koalas",PandasOnSparkSeriesMethods)# Comparison Operators
[docs]defeq(self,other:Any)->"Series":""" Compare if the current value is equal to the other. >>> df = ps.DataFrame({'a': [1, 2, 3, 4], ... 'b': [1, np.nan, 1, np.nan]}, ... index=['a', 'b', 'c', 'd'], columns=['a', 'b']) >>> df.a == 1 a True b False c False d False Name: a, dtype: bool >>> df.b.eq(1) a True b False c True d False Name: b, dtype: bool """returnself==other
equals=eq
[docs]defgt(self,other:Any)->"Series":""" Compare if the current value is greater than the other. >>> df = ps.DataFrame({'a': [1, 2, 3, 4], ... 'b': [1, np.nan, 1, np.nan]}, ... index=['a', 'b', 'c', 'd'], columns=['a', 'b']) >>> df.a > 1 a False b True c True d True Name: a, dtype: bool >>> df.b.gt(1) a False b False c False d False Name: b, dtype: bool """returnself>other
[docs]defge(self,other:Any)->"Series":""" Compare if the current value is greater than or equal to the other. >>> df = ps.DataFrame({'a': [1, 2, 3, 4], ... 'b': [1, np.nan, 1, np.nan]}, ... index=['a', 'b', 'c', 'd'], columns=['a', 'b']) >>> df.a >= 2 a False b True c True d True Name: a, dtype: bool >>> df.b.ge(2) a False b False c False d False Name: b, dtype: bool """returnself>=other
[docs]deflt(self,other:Any)->"Series":""" Compare if the current value is less than the other. >>> df = ps.DataFrame({'a': [1, 2, 3, 4], ... 'b': [1, np.nan, 1, np.nan]}, ... index=['a', 'b', 'c', 'd'], columns=['a', 'b']) >>> df.a < 1 a False b False c False d False Name: a, dtype: bool >>> df.b.lt(2) a True b False c True d False Name: b, dtype: bool """returnself<other
[docs]defle(self,other:Any)->"Series":""" Compare if the current value is less than or equal to the other. >>> df = ps.DataFrame({'a': [1, 2, 3, 4], ... 'b': [1, np.nan, 1, np.nan]}, ... index=['a', 'b', 'c', 'd'], columns=['a', 'b']) >>> df.a <= 2 a True b True c False d False Name: a, dtype: bool >>> df.b.le(2) a True b False c True d False Name: b, dtype: bool """returnself<=other
[docs]defne(self,other:Any)->"Series":""" Compare if the current value is not equal to the other. >>> df = ps.DataFrame({'a': [1, 2, 3, 4], ... 'b': [1, np.nan, 1, np.nan]}, ... index=['a', 'b', 'c', 'd'], columns=['a', 'b']) >>> df.a != 1 a False b True c True d True Name: a, dtype: bool >>> df.b.ne(1) a False b True c False d True Name: b, dtype: bool """returnself!=other
[docs]defdivmod(self,other:Any)->Tuple["Series","Series"]:""" Return Integer division and modulo of series and other, element-wise (binary operator `divmod`). Parameters ---------- other : Series or scalar value Returns ------- 2-Tuple of Series The result of the operation. See Also -------- Series.rdivmod """returnself.floordiv(other),self.mod(other)
[docs]defrdivmod(self,other:Any)->Tuple["Series","Series"]:""" Return Integer division and modulo of series and other, element-wise (binary operator `rdivmod`). Parameters ---------- other : Series or scalar value Returns ------- 2-Tuple of Series The result of the operation. See Also -------- Series.divmod """returnself.rfloordiv(other),self.rmod(other)
[docs]defbetween(self,left:Any,right:Any,inclusive:Union[bool,str]="both")->"Series":""" Return boolean Series equivalent to left <= series <= right. This function returns a boolean vector containing `True` wherever the corresponding Series element is between the boundary values `left` and `right`. NA values are treated as `False`. Parameters ---------- left : scalar or list-like Left boundary. right : scalar or list-like Right boundary. inclusive : {"both", "neither", "left", "right"} or boolean. "both" by default. Include boundaries. Whether to set each bound as closed or open. Booleans are deprecated in favour of `both` or `neither`. Returns ------- Series Series representing whether each element is between left and right (inclusive). See Also -------- Series.gt : Greater than of series and other. Series.lt : Less than of series and other. Notes ----- This function is equivalent to ``(left <= ser) & (ser <= right)`` Examples -------- >>> s = ps.Series([2, 0, 4, 8, np.nan]) Boundary values are included by default: >>> s.between(0, 4) 0 True 1 True 2 True 3 False 4 False dtype: bool With `inclusive` set to "neither" boundary values are excluded: >>> s.between(0, 4, inclusive="neither") 0 True 1 False 2 False 3 False 4 False dtype: bool With `inclusive` set to "right" only right boundary value is included: >>> s.between(0, 4, inclusive="right") 0 True 1 False 2 True 3 False 4 False dtype: bool With `inclusive` set to "left" only left boundary value is included: >>> s.between(0, 4, inclusive="left") 0 True 1 True 2 False 3 False 4 False dtype: bool `left` and `right` can be any scalar value: >>> s = ps.Series(['Alice', 'Bob', 'Carol', 'Eve']) >>> s.between('Anna', 'Daniel') 0 False 1 True 2 True 3 False dtype: bool """ifinclusiveisTrueorinclusiveisFalse:warnings.warn("Boolean inputs to the `inclusive` argument are deprecated in ""favour of `both` or `neither`.",FutureWarning,)ifinclusive:inclusive="both"else:inclusive="neither"ifinclusive=="both":lmask=self>=leftrmask=self<=rightelifinclusive=="left":lmask=self>=leftrmask=self<rightelifinclusive=="right":lmask=self>leftrmask=self<=rightelifinclusive=="neither":lmask=self>leftrmask=self<rightelse:raiseValueError("Inclusive has to be either string of 'both',""'left', 'right', or 'neither'.")returnlmask&rmask
[docs]defcov(self,other:"Series",min_periods:Optional[int]=None,ddof:int=1)->float:""" Compute covariance with Series, excluding missing values. .. versionadded:: 3.3.0 Parameters ---------- other : Series Series with which to compute the covariance. min_periods : int, optional Minimum number of observations needed to have a valid result. ddof : int, default 1 Delta degrees of freedom. The divisor used in calculations is ``N - ddof``, where ``N`` represents the number of elements. .. versionadded:: 3.4.0 Returns ------- float Covariance between Series and other Examples -------- >>> from pyspark.pandas.config import set_option, reset_option >>> s1 = ps.Series([0.90010907, 0.13484424, 0.62036035]) >>> s2 = ps.Series([0.12528585, 0.26962463, 0.51111198]) >>> with ps.option_context("compute.ops_on_diff_frames", True): ... s1.cov(s2) -0.016857... >>> with ps.option_context("compute.ops_on_diff_frames", True): ... s1.cov(s2, ddof=2) -0.033715... """ifnotisinstance(other,Series):raiseTypeError("unsupported type: %s"%type(other))ifnotnp.issubdtype(self.dtype,np.number):# type: ignore[arg-type]raiseTypeError("unsupported dtype: %s"%self.dtype)ifnotnp.issubdtype(other.dtype,np.number):# type: ignore[arg-type]raiseTypeError("unsupported dtype: %s"%other.dtype)ifnotisinstance(ddof,int):raiseTypeError("ddof must be integer")min_periods=1ifmin_periodsisNoneelsemin_periodsifsame_anchor(self,other):sdf=self._internal.spark_frame.select(self.spark.column,other.spark.column)else:combined=combine_frames(self.to_frame(),other.to_frame())sdf=combined._internal.spark_frame.select(*combined._internal.data_spark_columns)sdf=sdf.dropna()iflen(sdf.head(min_periods))<min_periods:returnnp.nanelse:sdf=sdf.select(SF.covar(F.col(sdf.columns[0]),F.col(sdf.columns[1]),ddof))returnsdf.head(1)[0][0]
# TODO: NaN and None when ``arg`` is an empty dict# TODO: Support ps.Series ``arg``
[docs]defmap(self,arg:Union[Dict,Callable[[Any],Any],pd.Series],na_action:Optional[str]=None)->"Series":""" Map values of Series according to input correspondence. Used for substituting each value in a Series with another value, that may be derived from a function, a ``dict``. .. note:: make sure the size of the dictionary is not huge because it could downgrade the performance or throw OutOfMemoryError due to a huge expression within Spark. Consider the input as a function as an alternative instead in this case. Parameters ---------- arg : function, dict or pd.Series Mapping correspondence. na_action : If `ignore`, propagate NA values, without passing them to the mapping correspondence. Returns ------- Series Same index as caller. See Also -------- Series.apply : For applying more complex functions on a Series. DataFrame.applymap : Apply a function element-wise on a whole DataFrame. Notes ----- When ``arg`` is a dictionary, values in Series that are not in the dictionary (as keys) is converted to ``None``. However, if the dictionary is a ``dict`` subclass that defines ``__missing__`` (i.e. provides a method for default values), then this default is used rather than ``None``. Examples -------- >>> s = ps.Series(['cat', 'dog', None, 'rabbit']) >>> s 0 cat 1 dog 2 None 3 rabbit dtype: object ``map`` accepts a ``dict``. Values that are not found in the ``dict`` are converted to ``None``, unless the dict has a default value (e.g. ``defaultdict``): >>> s.map({'cat': 'kitten', 'dog': 'puppy'}) 0 kitten 1 puppy 2 None 3 None dtype: object It also accepts a pandas Series: >>> pser = pd.Series(['kitten', 'puppy'], index=['cat', 'dog']) >>> s.map(pser) 0 kitten 1 puppy 2 None 3 None dtype: object It also accepts a function: >>> def format(x) -> str: ... return 'I am a {}'.format(x) >>> s.map(format) 0 I am a cat 1 I am a dog 2 I am a None 3 I am a rabbit dtype: object To avoid applying the function to missing values (and keep them as NaN) na_action='ignore' can be used: >>> s.map('I am a {}'.format, na_action='ignore') 0 I am a cat 1 I am a dog 2 None 3 I am a rabbit dtype: object """ifisinstance(arg,(dict,pd.Series)):is_start=True# In case dictionary is empty.current=F.when(F.lit(False),F.lit(None).cast(self.spark.data_type))forto_replace,valueinarg.items():ifis_start:current=F.when(self.spark.column==F.lit(to_replace),value)is_start=Falseelse:current=current.when(self.spark.column==F.lit(to_replace),value)ifhasattr(arg,"__missing__"):tmp_val=arg[np._NoValue]# type: ignore[attr-defined]# Remove in case it's set in defaultdict.delarg[np._NoValue]# type: ignore[attr-defined]current=current.otherwise(F.lit(tmp_val))else:current=current.otherwise(F.lit(None).cast(self.spark.data_type))returnself._with_new_scol(current)else:returnself.pandas_on_spark.transform_batch(lambdapser:pser.map(arg,na_action))
@propertydefshape(self)->Tuple[int]:"""Return a tuple of the shape of the underlying data."""return(len(self),)@propertydefname(self)->Name:"""Return name of the Series."""name=self._column_labelifnameisnotNoneandlen(name)==1:returnname[0]else:returnname@name.setterdefname(self,name:Name)->None:self.rename(name,inplace=True)# TODO: Currently, changing index labels taking dictionary/Series is not supported.
[docs]defrename(self,index:Optional[Union[Name,Callable[[Any],Any]]]=None,**kwargs:Any)->"Series":""" Alter Series index labels or name. Parameters ---------- index : scalar or function, optional Functions are transformations to apply to the index. Scalar will alter the Series.name attribute. inplace : bool, default False Whether to return a new Series. If True then value of copy is ignored. Returns ------- Series Series with index labels or name altered. Examples -------- >>> s = ps.Series([1, 2, 3]) >>> s 0 1 1 2 2 3 dtype: int64 >>> s.rename("my_name") # scalar, changes Series.name 0 1 1 2 2 3 Name: my_name, dtype: int64 >>> s.rename(lambda x: x ** 2) # function, changes labels 0 1 1 2 4 3 dtype: int64 """ifindexisNone:passifcallable(index):ifkwargs.get("inplace",False):raiseValueError("inplace True is not supported yet for a function 'index'")frame=self.to_frame()new_index_name=verify_temp_column_name(frame,"__index_name__")frame[new_index_name]=self.index.map(index)frame.set_index(new_index_name,inplace=True)frame.index.name=self.index.namereturnfirst_series(frame).rename(self.name)elifisinstance(index,(pd.Series,dict)):raiseValueError("'index' of %s type is not supported yet"%type(index).__name__)elifnotis_hashable(index):raiseTypeError("Series.name must be a hashable type")elifnotisinstance(index,tuple):index=(index,)name=name_like_string(index)scol=self.spark.column.alias(name)field=self._internal.data_fields[0].copy(name=name)internal=self._internal.copy(column_labels=[index],data_spark_columns=[scol],data_fields=[field],column_label_names=None,)psdf:DataFrame=DataFrame(internal)ifkwargs.get("inplace",False):self._col_label=indexself._update_anchor(psdf)returnselfelse:returnfirst_series(psdf)
[docs]defrename_axis(self,mapper:Optional[Any]=None,index:Optional[Any]=None,inplace:bool=False)->Optional["Series"]:""" Set the name of the axis for the index or columns. Parameters ---------- mapper, index : scalar, list-like, dict-like or function, optional A scalar, list-like, dict-like or functions transformations to apply to the index values. inplace : bool, default False Modifies the object directly, instead of creating a new Series. Returns ------- Series, or None if `inplace` is True. See Also -------- Series.rename : Alter Series index labels or name. DataFrame.rename : Alter DataFrame index labels or name. Index.rename : Set new names on index. Examples -------- >>> s = ps.Series(["dog", "cat", "monkey"], name="animal") >>> s # doctest: +NORMALIZE_WHITESPACE 0 dog 1 cat 2 monkey Name: animal, dtype: object >>> s.rename_axis("index").sort_index() # doctest: +NORMALIZE_WHITESPACE index 0 dog 1 cat 2 monkey Name: animal, dtype: object **MultiIndex** >>> index = pd.MultiIndex.from_product([['mammal'], ... ['dog', 'cat', 'monkey']], ... names=['type', 'name']) >>> s = ps.Series([4, 4, 2], index=index, name='num_legs') >>> s # doctest: +NORMALIZE_WHITESPACE type name mammal dog 4 cat 4 monkey 2 Name: num_legs, dtype: int64 >>> s.rename_axis(index={'type': 'class'}).sort_index() # doctest: +NORMALIZE_WHITESPACE class name mammal cat 4 dog 4 monkey 2 Name: num_legs, dtype: int64 >>> s.rename_axis(index=str.upper).sort_index() # doctest: +NORMALIZE_WHITESPACE TYPE NAME mammal cat 4 dog 4 monkey 2 Name: num_legs, dtype: int64 """psdf=self.to_frame().rename_axis(mapper=mapper,index=index,inplace=False)ifinplace:self._update_anchor(psdf)returnNoneelse:returnfirst_series(psdf)
@propertydefindex(self)->"ps.Index":"""The index (axis labels) Column of the Series. See Also -------- Index """returnself._psdf.index@propertydefis_unique(self)->bool:""" Return boolean if values in the object are unique Returns ------- is_unique : boolean >>> ps.Series([1, 2, 3]).is_unique True >>> ps.Series([1, 2, 2]).is_unique False >>> ps.Series([1, 2, 3, None]).is_unique True """scol=self.spark.column# Here we check:# 1. the distinct count without nulls and count without nulls for non-null values# 2. count null values and see if null is a distinct value.## This workaround is to calculate the distinct count including nulls in# single pass. Note that COUNT(DISTINCT expr) in Spark is designed to ignore nulls.returnself._internal.spark_frame.select((F.count(scol)==F.countDistinct(scol))&(F.count(F.when(scol.isNull(),1).otherwise(None))<=1)).collect()[0][0]
[docs]defreset_index(self,level:Optional[Union[int,Name,Sequence[Union[int,Name]]]]=None,drop:bool=False,name:Optional[Name]=None,inplace:bool=False,)->Optional[Union["Series",DataFrame]]:""" Generate a new DataFrame or Series with the index reset. This is useful when the index needs to be treated as a column, or when the index is meaningless and needs to be reset to the default before another operation. Parameters ---------- level : int, str, tuple, or list, default optional For a Series with a MultiIndex, only remove the specified levels from the index. Removes all levels by default. drop : bool, default False Just reset the index, without inserting it as a column in the new DataFrame. name : object, optional The name to use for the column containing the original Series values. Uses self.name by default. This argument is ignored when drop is True. inplace : bool, default False Modify the Series in place (do not create a new object). Returns ------- Series or DataFrame When `drop` is False (the default), a DataFrame is returned. The newly created columns will come first in the DataFrame, followed by the original Series values. When `drop` is True, a `Series` is returned. In either case, if ``inplace=True``, no value is returned. Examples -------- >>> s = ps.Series([1, 2, 3, 4], index=pd.Index(['a', 'b', 'c', 'd'], name='idx')) Generate a DataFrame with default index. >>> s.reset_index() idx 0 0 a 1 1 b 2 2 c 3 3 d 4 To specify the name of the new column use `name`. >>> s.reset_index(name='values') idx values 0 a 1 1 b 2 2 c 3 3 d 4 To generate a new Series with the default set `drop` to True. >>> s.reset_index(drop=True) 0 1 1 2 2 3 3 4 dtype: int64 To update the Series in place, without generating a new one set `inplace` to True. Note that it also requires ``drop=True``. >>> s.reset_index(inplace=True, drop=True) >>> s 0 1 1 2 2 3 3 4 dtype: int64 """inplace=validate_bool_kwarg(inplace,"inplace")ifinplaceandnotdrop:raiseTypeError("Cannot reset_index inplace on a Series to create a DataFrame")ifdrop:psdf=self._psdf[[self.name]]else:psser=selfifnameisnotNone:psser=psser.rename(name)psdf=psser.to_frame()psdf=psdf.reset_index(level=level,drop=drop)ifdrop:ifinplace:self._update_anchor(psdf)returnNoneelse:returnfirst_series(psdf)else:returnpsdf
[docs]defto_frame(self,name:Optional[Name]=None)->DataFrame:""" Convert Series to DataFrame. Parameters ---------- name : object, default None The passed name should substitute for the series name (if it has one). Returns ------- DataFrame DataFrame representation of Series. Examples -------- >>> s = ps.Series(["a", "b", "c"]) >>> s.to_frame() 0 0 a 1 b 2 c >>> s = ps.Series(["a", "b", "c"], name="vals") >>> s.to_frame() vals 0 a 1 b 2 c """ifnameisnotNone:renamed=self.rename(name)elifself._column_labelisNone:renamed=self.rename(DEFAULT_SERIES_NAME)else:renamed=selfreturnDataFrame(renamed._internal)
to_dataframe=to_frame
[docs]defto_string(self,buf:Optional[IO[str]]=None,na_rep:str="NaN",float_format:Optional[Callable[[float],str]]=None,header:bool=True,index:bool=True,length:bool=False,dtype:bool=False,name:bool=False,max_rows:Optional[int]=None,)->Optional[str]:""" Render a string representation of the Series. .. note:: This method should only be used if the resulting pandas object is expected to be small, as all the data is loaded into the driver's memory. If the input is large, set max_rows parameter. Parameters ---------- buf : StringIO-like, optional buffer to write to na_rep : string, optional string representation of NAN to use, default 'NaN' float_format : one-parameter function, optional formatter function to apply to columns' elements if they are floats default None header : boolean, default True Add the Series header (index name) index : bool, optional Add index (row) labels, default True length : boolean, default False Add the Series length dtype : boolean, default False Add the Series dtype name : boolean, default False Add the Series name if not None max_rows : int, optional Maximum number of rows to show before truncating. If None, show all. Returns ------- formatted : string (if not buffer passed) Examples -------- >>> df = ps.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)], columns=['dogs', 'cats']) >>> print(df['dogs'].to_string()) 0 0.2 1 0.0 2 0.6 3 0.2 >>> print(df['dogs'].to_string(max_rows=2)) 0 0.2 1 0.0 """# Make sure locals() call is at the top of the function so we don't capture local variables.args=locals()ifmax_rowsisnotNone:psseries=self.head(max_rows)else:psseries=selfreturnvalidate_arguments_and_invoke_function(psseries._to_internal_pandas(),self.to_string,pd.Series.to_string,args)
[docs]defto_clipboard(self,excel:bool=True,sep:Optional[str]=None,**kwargs:Any)->None:# Docstring defined below by reusing DataFrame.to_clipboard's.args=locals()psseries=selfreturnvalidate_arguments_and_invoke_function(psseries._to_internal_pandas(),self.to_clipboard,pd.Series.to_clipboard,args)
[docs]defto_dict(self,into:Type=dict)->Mapping:""" Convert Series to {label -> value} dict or dict-like object. .. note:: This method should only be used if the resulting pandas DataFrame is expected to be small, as all the data is loaded into the driver's memory. Parameters ---------- into : class, default dict The collections.abc.Mapping subclass to use as the return object. Can be the actual class or an empty instance of the mapping type you want. If you want a collections.defaultdict, you must pass it initialized. Returns ------- collections.abc.Mapping Key-value representation of Series. Examples -------- >>> s = ps.Series([1, 2, 3, 4]) >>> s_dict = s.to_dict() >>> sorted(s_dict.items()) [(0, 1), (1, 2), (2, 3), (3, 4)] >>> from collections import OrderedDict, defaultdict >>> s.to_dict(OrderedDict) OrderedDict([(0, 1), (1, 2), (2, 3), (3, 4)]) >>> dd = defaultdict(list) >>> s.to_dict(dd) # doctest: +ELLIPSIS defaultdict(<class 'list'>, {...}) """# Make sure locals() call is at the top of the function so we don't capture local variables.args=locals()psseries=selfreturnvalidate_arguments_and_invoke_function(psseries._to_internal_pandas(),self.to_dict,pd.Series.to_dict,args)
[docs]defto_pandas(self)->pd.Series:""" Return a pandas Series. .. note:: This method should only be used if the resulting pandas object is expected to be small, as all the data is loaded into the driver's memory. Examples -------- >>> df = ps.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)], columns=['dogs', 'cats']) >>> df['dogs'].to_pandas() 0 0.2 1 0.0 2 0.6 3 0.2 Name: dogs, dtype: float64 """log_advice("`to_pandas` loads all data into the driver's memory. ""It should only be used if the resulting pandas Series is expected to be small.")returnself._to_pandas()
def_to_pandas(self)->pd.Series:""" Same as `to_pandas()`, without issuing the advice log for internal usage. """returnself._to_internal_pandas().copy()
[docs]defto_list(self)->List:""" Return a list of the values. These are each a scalar type, which is a Python scalar (for str, int, float) or a pandas scalar (for Timestamp/Timedelta/Interval/Period) .. note:: This method should only be used if the resulting list is expected to be small, as all the data is loaded into the driver's memory. """log_advice("`to_list` loads all data into the driver's memory. ""It should only be used if the resulting list is expected to be small.")returnself._to_internal_pandas().tolist()
tolist=to_list
[docs]defduplicated(self,keep:Union[bool,str]="first")->"Series":""" Indicate duplicate Series values. Duplicated values are indicated as ``True`` values in the resulting Series. Either all duplicates, all except the first or all except the last occurrence of duplicates can be indicated. .. versionadded:: 3.4.0 Parameters ---------- keep : {'first', 'last', False}, default 'first' Method to handle marking duplicates: - 'first' : Mark duplicates as ``True`` except for the first occurrence. - 'last' : Mark duplicates as ``True`` except for the last occurrence. - ``False`` : Mark all duplicates as ``True``. Returns ------- Series Series indicating whether each value has occurred in the preceding values See Also -------- Index.drop_duplicates : Remove duplicate values from Index. DataFrame.duplicated : Equivalent method on DataFrame. Series.drop_duplicates : Remove duplicate values from Series. Examples -------- By default, for each set of duplicated values, the first occurrence is set on False and all others on True: >>> animals = ps.Series(['lama', 'cow', 'lama', 'beetle', 'lama']) >>> animals.duplicated().sort_index() 0 False 1 False 2 True 3 False 4 True dtype: bool which is equivalent to >>> animals.duplicated(keep='first').sort_index() 0 False 1 False 2 True 3 False 4 True dtype: bool By using 'last', the last occurrence of each set of duplicated values is set on False and all others on True: >>> animals.duplicated(keep='last').sort_index() 0 True 1 False 2 True 3 False 4 False dtype: bool By setting keep on ``False``, all duplicates are True: >>> animals.duplicated(keep=False).sort_index() 0 True 1 False 2 True 3 False 4 True dtype: bool """returnself._psdf[[self.name]].duplicated(keep=keep).rename(self.name)
[docs]defdrop_duplicates(self,keep:Union[bool,str]="first",inplace:bool=False)->Optional["Series"]:""" Return Series with duplicate values removed. Parameters ---------- keep : {'first', 'last', ``False``}, default 'first' Method to handle dropping duplicates: - 'first' : Drop duplicates except for the first occurrence. - 'last' : Drop duplicates except for the last occurrence. - ``False`` : Drop all duplicates. inplace : bool, default ``False`` If ``True``, performs operation inplace and returns None. Returns ------- Series Series with duplicates dropped. Examples -------- Generate a Series with duplicated entries. >>> s = ps.Series(['lama', 'cow', 'lama', 'beetle', 'lama', 'hippo'], ... name='animal') >>> s.sort_index() 0 lama 1 cow 2 lama 3 beetle 4 lama 5 hippo Name: animal, dtype: object With the 'keep' parameter, the selection behavior of duplicated values can be changed. The value 'first' keeps the first occurrence for each set of duplicated entries. The default value of keep is 'first'. >>> s.drop_duplicates().sort_index() 0 lama 1 cow 3 beetle 5 hippo Name: animal, dtype: object The value 'last' for parameter 'keep' keeps the last occurrence for each set of duplicated entries. >>> s.drop_duplicates(keep='last').sort_index() 1 cow 3 beetle 4 lama 5 hippo Name: animal, dtype: object The value ``False`` for parameter 'keep' discards all sets of duplicated entries. Setting the value of 'inplace' to ``True`` performs the operation inplace and returns ``None``. >>> s.drop_duplicates(keep=False, inplace=True) >>> s.sort_index() 1 cow 3 beetle 5 hippo Name: animal, dtype: object """inplace=validate_bool_kwarg(inplace,"inplace")psdf=self._psdf[[self.name]].drop_duplicates(keep=keep)ifinplace:self._update_anchor(psdf)returnNoneelse:returnfirst_series(psdf)
[docs]defreindex(self,index:Optional[Any]=None,fill_value:Optional[Any]=None)->"Series":""" Conform Series to new index with optional filling logic, placing NA/NaN in locations having no value in the previous index. A new object is produced. Parameters ---------- index: array-like, optional New labels / index to conform to, should be specified using keywords. Preferably an Index object to avoid duplicating data fill_value : scalar, default np.NaN Value to use for missing values. Defaults to NaN, but can be any "compatible" value. Returns ------- Series with changed index. See Also -------- Series.reset_index : Remove row labels or move them to new columns. Examples -------- Create a series with some fictional data. >>> index = ['Firefox', 'Chrome', 'Safari', 'IE10', 'Konqueror'] >>> ser = ps.Series([200, 200, 404, 404, 301], ... index=index, name='http_status') >>> ser Firefox 200 Chrome 200 Safari 404 IE10 404 Konqueror 301 Name: http_status, dtype: int64 Create a new index and reindex the Series. By default values in the new index that do not have corresponding records in the Series are assigned ``NaN``. >>> new_index= ['Safari', 'Iceweasel', 'Comodo Dragon', 'IE10', ... 'Chrome'] >>> ser.reindex(new_index).sort_index() Chrome 200.0 Comodo Dragon NaN IE10 404.0 Iceweasel NaN Safari 404.0 Name: http_status, dtype: float64 We can fill in the missing values by passing a value to the keyword ``fill_value``. >>> ser.reindex(new_index, fill_value=0).sort_index() Chrome 200 Comodo Dragon 0 IE10 404 Iceweasel 0 Safari 404 Name: http_status, dtype: int64 To further illustrate the filling functionality in ``reindex``, we will create a Series with a monotonically increasing index (for example, a sequence of dates). >>> date_index = pd.date_range('1/1/2010', periods=6, freq='D') >>> ser2 = ps.Series([100, 101, np.nan, 100, 89, 88], ... name='prices', index=date_index) >>> ser2.sort_index() 2010-01-01 100.0 2010-01-02 101.0 2010-01-03 NaN 2010-01-04 100.0 2010-01-05 89.0 2010-01-06 88.0 Name: prices, dtype: float64 Suppose we decide to expand the series to cover a wider date range. >>> date_index2 = pd.date_range('12/29/2009', periods=10, freq='D') >>> ser2.reindex(date_index2).sort_index() 2009-12-29 NaN 2009-12-30 NaN 2009-12-31 NaN 2010-01-01 100.0 2010-01-02 101.0 2010-01-03 NaN 2010-01-04 100.0 2010-01-05 89.0 2010-01-06 88.0 2010-01-07 NaN Name: prices, dtype: float64 """returnfirst_series(self.to_frame().reindex(index=index,fill_value=fill_value)).rename(self.name)
[docs]defreindex_like(self,other:Union["Series","DataFrame"])->"Series":""" Return a Series with matching indices as other object. Conform the object to the same index on all axes. Places NA/NaN in locations having no value in the previous index. Parameters ---------- other : Series or DataFrame Its row and column indices are used to define the new indices of this object. Returns ------- Series Series with changed indices on each axis. See Also -------- DataFrame.set_index : Set row labels. DataFrame.reset_index : Remove row labels or move them to new columns. DataFrame.reindex : Change to new indices or expand indices. Notes ----- Same as calling ``.reindex(index=other.index, ...)``. Examples -------- >>> s1 = ps.Series([24.3, 31.0, 22.0, 35.0], ... index=pd.date_range(start='2014-02-12', ... end='2014-02-15', freq='D'), ... name="temp_celsius") >>> s1 2014-02-12 24.3 2014-02-13 31.0 2014-02-14 22.0 2014-02-15 35.0 Name: temp_celsius, dtype: float64 >>> s2 = ps.Series(["low", "low", "medium"], ... index=pd.DatetimeIndex(['2014-02-12', '2014-02-13', ... '2014-02-15']), ... name="winspeed") >>> s2 2014-02-12 low 2014-02-13 low 2014-02-15 medium Name: winspeed, dtype: object >>> s2.reindex_like(s1).sort_index() 2014-02-12 low 2014-02-13 low 2014-02-14 None 2014-02-15 medium Name: winspeed, dtype: object """ifisinstance(other,(Series,DataFrame)):returnself.reindex(index=other.index)else:raiseTypeError("other must be a pandas-on-Spark Series or DataFrame")
[docs]deffillna(self,value:Optional[Any]=None,method:Optional[str]=None,axis:Optional[Axis]=None,inplace:bool=False,limit:Optional[int]=None,)->Optional["Series"]:"""Fill NA/NaN values. .. note:: the current implementation of 'method' parameter in fillna uses Spark's Window without specifying partition specification. This leads to moveing all data into a single partition in a single machine and could cause serious performance degradation. Avoid this method with very large datasets. Parameters ---------- value : scalar, dict, Series Value to use to fill holes. alternately a dict/Series of values specifying which value to use for each column. DataFrame is not supported. method : {'backfill', 'bfill', 'pad', 'ffill', None}, default None Method to use for filling holes in reindexed Series pad / ffill: propagate last valid observation forward to next valid backfill / bfill: use NEXT valid observation to fill gap axis : {0 or `index`} 1 and `columns` are not supported. inplace : boolean, default False Fill in place (do not create a new object) limit : int, default None If method is specified, this is the maximum number of consecutive NaN values to forward/backward fill. In other words, if there is a gap with more than this number of consecutive NaNs, it will only be partially filled. If method is not specified, this is the maximum number of entries along the entire axis where NaNs will be filled. Must be greater than 0 if not None Returns ------- Series Series with NA entries filled. Examples -------- >>> s = ps.Series([np.nan, 2, 3, 4, np.nan, 6], name='x') >>> s 0 NaN 1 2.0 2 3.0 3 4.0 4 NaN 5 6.0 Name: x, dtype: float64 Replace all NaN elements with 0s. >>> s.fillna(0) 0 0.0 1 2.0 2 3.0 3 4.0 4 0.0 5 6.0 Name: x, dtype: float64 We can also propagate non-null values forward or backward. >>> s.fillna(method='ffill') 0 NaN 1 2.0 2 3.0 3 4.0 4 4.0 5 6.0 Name: x, dtype: float64 >>> s = ps.Series([np.nan, 'a', 'b', 'c', np.nan], name='x') >>> s.fillna(method='ffill') 0 None 1 a 2 b 3 c 4 c Name: x, dtype: object """psser=self._fillna(value=value,method=method,axis=axis,limit=limit)ifmethodisnotNone:psser=DataFrame(psser._psdf._internal.resolved_copy)._psser_for(self._column_label)inplace=validate_bool_kwarg(inplace,"inplace")ifinplace:self._psdf._update_internal_frame(psser._psdf._internal,check_same_anchor=False)returnNoneelse:returnpsser.copy()
def_fillna(self,value:Optional[Any]=None,method:Optional[str]=None,axis:Optional[Axis]=None,limit:Optional[int]=None,part_cols:Sequence["ColumnOrName"]=(),)->"Series":axis=validate_axis(axis)ifaxis!=0:raiseNotImplementedError("fillna currently only works for axis=0 or axis='index'")if(valueisNone)and(methodisNone):raiseValueError("Must specify a fillna 'value' or 'method' parameter.")if(methodisnotNone)and(methodnotin["ffill","pad","backfill","bfill"]):raiseValueError("Expecting 'pad', 'ffill', 'backfill' or 'bfill'.")scol=self.spark.columnifnotself.spark.nullableandnotisinstance(self.spark.data_type,(FloatType,DoubleType)):returnself._psdf.copy()._psser_for(self._column_label)cond=self.isnull().spark.columnifvalueisnotNone:ifnotisinstance(value,(float,int,str,bool)):raiseTypeError("Unsupported type %s"%type(value).__name__)iflimitisnotNone:raiseNotImplementedError("limit parameter for value is not support now")scol=F.when(cond,value).otherwise(scol)else:ifmethodin["ffill","pad"]:func=F.lastend=Window.currentRow-1iflimitisnotNone:begin=Window.currentRow-limitelse:begin=Window.unboundedPrecedingelifmethodin["bfill","backfill"]:func=F.firstbegin=Window.currentRow+1iflimitisnotNone:end=Window.currentRow+limitelse:end=Window.unboundedFollowingwindow=(Window.partitionBy(*part_cols).orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(begin,end))scol=F.when(cond,func(scol,True).over(window)).otherwise(scol)returnDataFrame(self._psdf._internal.with_new_spark_column(self._column_label,scol.alias(name_like_string(self.name))# TODO: dtype?))._psser_for(self._column_label)
[docs]defdropna(self,axis:Axis=0,inplace:bool=False,**kwargs:Any)->Optional["Series"]:""" Return a new Series with missing values removed. Parameters ---------- axis : {0 or 'index'}, default 0 There is only one axis to drop values from. inplace : bool, default False If True, do operation inplace and return None. **kwargs Not in use. Returns ------- Series Series with NA entries dropped from it. Examples -------- >>> ser = ps.Series([1., 2., np.nan]) >>> ser 0 1.0 1 2.0 2 NaN dtype: float64 Drop NA values from a Series. >>> ser.dropna() 0 1.0 1 2.0 dtype: float64 Keep the Series with valid entries in the same variable. >>> ser.dropna(inplace=True) >>> ser 0 1.0 1 2.0 dtype: float64 """inplace=validate_bool_kwarg(inplace,"inplace")# TODO: last two examples from pandas produce different results.psdf=self._psdf[[self.name]].dropna(axis=axis,inplace=False)ifinplace:self._update_anchor(psdf)returnNoneelse:returnfirst_series(psdf)
[docs]defclip(self,lower:Union[float,int]=None,upper:Union[float,int]=None,inplace:bool=False,)->"Series":""" Trim values at input threshold(s). Assigns values outside boundary-to-boundary values. Parameters ---------- lower : float or int, default None Minimum threshold value. All values below this threshold will be set to it. upper : float or int, default None Maximum threshold value. All values above this threshold will be set to it. inplace : bool, default False if True, perform operation in-place Returns ------- Series Series with the values outside the clip boundaries replaced Examples -------- >>> psser = ps.Series([0, 2, 4]) >>> psser 0 0 1 2 2 4 dtype: int64 >>> psser.clip(1, 3) 0 1 1 2 2 3 dtype: int64 Clip can be performed in-place. >>> psser.clip(2, 3, inplace=True) >>> psser 0 2 1 2 2 3 dtype: int64 Notes ----- One difference between this implementation and pandas is that running `pd.Series(['a', 'b']).clip(0, 1)` will crash with "TypeError: '<=' not supported between instances of 'str' and 'int'" while `ps.Series(['a', 'b']).clip(0, 1)` will output the original Series, simply ignoring the incompatible types. """ifis_list_like(lower)oris_list_like(upper):raiseTypeError("List-like value are not supported for 'lower' and 'upper' at the "+"moment")iflowerisNoneandupperisNone:returnselfifisinstance(self.spark.data_type,NumericType):scol=self.spark.columniflowerisnotNone:scol=F.when(scol<lower,lower).otherwise(scol)ifupperisnotNone:scol=F.when(scol>upper,upper).otherwise(scol)ifinplace:internal=self._internal.copy(data_spark_columns=[scol.alias(self._internal.data_spark_column_names[0])],data_fields=[self._internal.data_fields[0]],)self._psdf._update_internal_frame(internal,check_same_anchor=False)returnNoneelse:returnself._with_new_scol(scol.alias(self._internal.data_spark_column_names[0]),field=self._internal.data_fields[0],)else:returnself
[docs]defdrop(self,labels:Optional[Union[Name,List[Name]]]=None,index:Optional[Union[Name,List[Name]]]=None,columns:Optional[Union[Name,List[Name]]]=None,level:Optional[int]=None,inplace:bool=False,)->"Series":""" Return Series with specified index labels removed. Remove elements of a Series based on specifying the index labels. When using a multi-index, labels on different levels can be removed by specifying the level. Parameters ---------- labels : single label or list-like Index labels to drop. index : single label or list-like Redundant for application on Series, but index can be used instead of labels. columns : single label or list-like No change is made to the Series; use ‘index’ or ‘labels’ instead. .. versionadded:: 3.4.0 level : int or level name, optional For MultiIndex, level for which the labels will be removed. inplace: bool, default False If True, do operation inplace and return None .. versionadded:: 3.4.0 Returns ------- Series Series with specified index labels removed. See Also -------- Series.dropna Examples -------- >>> s = ps.Series(data=np.arange(3), index=['A', 'B', 'C']) >>> s A 0 B 1 C 2 dtype: int64 Drop single label A >>> s.drop('A') B 1 C 2 dtype: int64 Drop labels B and C >>> s.drop(labels=['B', 'C']) A 0 dtype: int64 With 'index' rather than 'labels' returns exactly same result. >>> s.drop(index='A') B 1 C 2 dtype: int64 >>> s.drop(index=['B', 'C']) A 0 dtype: int64 With 'columns', no change is made to the Series. >>> s.drop(columns=['A']) A 0 B 1 C 2 dtype: int64 With 'inplace=True', do operation inplace and return None. >>> s.drop(index=['B', 'C'], inplace=True) >>> s A 0 dtype: int64 Also support for MultiIndex >>> midx = pd.MultiIndex([['lama', 'cow', 'falcon'], ... ['speed', 'weight', 'length']], ... [[0, 0, 0, 1, 1, 1, 2, 2, 2], ... [0, 1, 2, 0, 1, 2, 0, 1, 2]]) >>> s = ps.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3], ... index=midx) >>> s lama speed 45.0 weight 200.0 length 1.2 cow speed 30.0 weight 250.0 length 1.5 falcon speed 320.0 weight 1.0 length 0.3 dtype: float64 >>> s.drop(labels='weight', level=1) lama speed 45.0 length 1.2 cow speed 30.0 length 1.5 falcon speed 320.0 length 0.3 dtype: float64 >>> s.drop(('lama', 'weight')) lama speed 45.0 length 1.2 cow speed 30.0 weight 250.0 length 1.5 falcon speed 320.0 weight 1.0 length 0.3 dtype: float64 >>> s.drop([('lama', 'speed'), ('falcon', 'weight')]) lama weight 200.0 length 1.2 cow speed 30.0 weight 250.0 length 1.5 falcon speed 320.0 length 0.3 dtype: float64 """dropped=self._drop(labels=labels,index=index,level=level,inplace=inplace,columns=columns)returnNoneifdroppedisNoneelsefirst_series(dropped)
def_drop(self,labels:Optional[Union[Name,List[Name]]]=None,index:Optional[Union[Name,List[Name]]]=None,level:Optional[int]=None,inplace:bool=False,columns:Optional[Union[Name,List[Name]]]=None,)->Optional[DataFrame]:iflabelsisnotNone:ifcolumnsisnotNoneorindexisnotNone:raiseValueError("Cannot specify both 'labels' and 'index'/'columns'")returnself._drop(index=labels,level=level,inplace=inplace,columns=columns)ifindexisnotNone:internal=self._internaliflevelisNone:level=0iflevel>=internal.index_level:raiseValueError("'level' should be less than the number of indexes")ifis_name_like_tuple(index):index_list=[cast(Label,index)]elifis_name_like_value(index):index_list=[(index,)]elifall(is_name_like_value(idxes,allow_tuple=False)foridxesinindex):index_list=[(idex,)foridexinindex]elifnotall(is_name_like_tuple(idxes)foridxesinindex):raiseValueError("If the given index is a list, it ""should only contains names as all tuples or all non tuples ""that contain index names")else:index_list=cast(List[Label],index)drop_index_scols=[]foridxesinindex_list:try:index_scols=[internal.index_spark_columns[lvl]==idxforlvl,idxinenumerate(idxes,level)]exceptIndexError:raiseKeyError("Key length ({}) exceeds index depth ({})".format(internal.index_level,len(idxes)))drop_index_scols.append(reduce(lambdax,y:x&y,index_scols))cond=~reduce(lambdax,y:x|y,drop_index_scols)dropped_internal=internal.with_filter(cond)ifinplace:self._update_anchor(DataFrame(dropped_internal))returnNoneelse:returnDataFrame(dropped_internal)elifcolumnsisnotNone:returnself._psdfelse:raiseValueError("Need to specify at least one of 'labels', 'index' or 'columns'")
[docs]defhead(self,n:int=5)->"Series":""" Return the first n rows. This function returns the first n rows for the object based on position. It is useful for quickly testing if your object has the right type of data in it. Parameters ---------- n : Integer, default = 5 Returns ------- The first n rows of the caller object. Examples -------- >>> df = ps.DataFrame({'animal':['alligator', 'bee', 'falcon', 'lion']}) >>> df.animal.head(2) # doctest: +NORMALIZE_WHITESPACE 0 alligator 1 bee Name: animal, dtype: object """returnfirst_series(self.to_frame().head(n)).rename(self.name)
[docs]deflast(self,offset:Union[str,DateOffset])->"Series":""" Select final periods of time series data based on a date offset. When having a Series with dates as index, this function can select the last few elements based on a date offset. Parameters ---------- offset : str or DateOffset The offset length of the data that will be selected. For instance, '3D' will display all the rows having their index within the last 3 days. Returns ------- Series A subset of the caller. Raises ------ TypeError If the index is not a :class:`DatetimeIndex` Examples -------- >>> index = pd.date_range('2018-04-09', periods=4, freq='2D') >>> psser = ps.Series([1, 2, 3, 4], index=index) >>> psser 2018-04-09 1 2018-04-11 2 2018-04-13 3 2018-04-15 4 dtype: int64 Get the rows for the last 3 days: >>> psser.last('3D') 2018-04-13 3 2018-04-15 4 dtype: int64 Notice the data for 3 last calendar days were returned, not the last 3 observed days in the dataset, and therefore data for 2018-04-11 was not returned. """returnfirst_series(self.to_frame().last(offset)).rename(self.name)
[docs]deffirst(self,offset:Union[str,DateOffset])->"Series":""" Select first periods of time series data based on a date offset. When having a Series with dates as index, this function can select the first few elements based on a date offset. Parameters ---------- offset : str or DateOffset The offset length of the data that will be selected. For instance, '3D' will display all the rows having their index within the first 3 days. Returns ------- Series A subset of the caller. Raises ------ TypeError If the index is not a :class:`DatetimeIndex` Examples -------- >>> index = pd.date_range('2018-04-09', periods=4, freq='2D') >>> psser = ps.Series([1, 2, 3, 4], index=index) >>> psser 2018-04-09 1 2018-04-11 2 2018-04-13 3 2018-04-15 4 dtype: int64 Get the rows for the first 3 days: >>> psser.first('3D') 2018-04-09 1 2018-04-11 2 dtype: int64 Notice the data for 3 first calendar days were returned, not the first 3 observed days in the dataset, and therefore data for 2018-04-13 was not returned. """returnfirst_series(self.to_frame().first(offset)).rename(self.name)
# TODO: Categorical type isn't supported (due to PySpark's limitation) and# some doctests related with timestamps were not added.
[docs]defunique(self)->"Series":""" Return unique values of Series object. Uniques are returned in order of appearance. Hash table-based unique, therefore does NOT sort. .. note:: This method returns newly created Series whereas pandas returns the unique values as a NumPy array. Returns ------- Returns the unique values as a Series. See Also -------- Index.unique groupby.SeriesGroupBy.unique Examples -------- >>> psser = ps.Series([2, 1, 3, 3], name='A') >>> psser.unique().sort_values() 1 1 0 2 2 3 Name: A, dtype: int64 >>> ps.Series([pd.Timestamp('2016-01-01') for _ in range(3)]).unique() 0 2016-01-01 dtype: datetime64[ns] >>> psser.name = ('x', 'a') >>> psser.unique().sort_values() 1 1 0 2 2 3 Name: (x, a), dtype: int64 """sdf=self._internal.spark_frame.select(self.spark.column).distinct()internal=InternalFrame(spark_frame=sdf,index_spark_columns=None,column_labels=[self._column_label],data_spark_columns=[scol_for(sdf,self._internal.data_spark_column_names[0])],data_fields=[self._internal.data_fields[0]],column_label_names=self._internal.column_label_names,)returnfirst_series(DataFrame(internal))
[docs]defsort_values(self,ascending:bool=True,inplace:bool=False,na_position:str="last",ignore_index:bool=False,)->Optional["Series"]:""" Sort by the values. Sort a Series in ascending or descending order by some criterion. Parameters ---------- ascending : bool or list of bool, default True Sort ascending vs. descending. Specify list for multiple sort orders. If this is a list of bools, must match the length of the by. inplace : bool, default False if True, perform operation in-place na_position : {'first', 'last'}, default 'last' `first` puts NaNs at the beginning, `last` puts NaNs at the end ignore_index : bool, default False If True, the resulting axis will be labeled 0, 1, …, n - 1. .. versionadded:: 3.4.0 Returns ------- sorted_obj : Series ordered by values. Examples -------- >>> s = ps.Series([np.nan, 1, 3, 10, 5]) >>> s 0 NaN 1 1.0 2 3.0 3 10.0 4 5.0 dtype: float64 Sort values ascending order (default behaviour) >>> s.sort_values(ascending=True) 1 1.0 2 3.0 4 5.0 3 10.0 0 NaN dtype: float64 Sort values descending order >>> s.sort_values(ascending=False) 3 10.0 4 5.0 2 3.0 1 1.0 0 NaN dtype: float64 Sort values descending order and ignoring index >>> s.sort_values(ascending=False, ignore_index=True) 0 10.0 1 5.0 2 3.0 3 1.0 4 NaN dtype: float64 Sort values inplace >>> s.sort_values(ascending=False, inplace=True) >>> s 3 10.0 4 5.0 2 3.0 1 1.0 0 NaN dtype: float64 Sort values putting NAs first >>> s.sort_values(na_position='first') 0 NaN 1 1.0 2 3.0 4 5.0 3 10.0 dtype: float64 Sort a series of strings >>> s = ps.Series(['z', 'b', 'd', 'a', 'c']) >>> s 0 z 1 b 2 d 3 a 4 c dtype: object >>> s.sort_values() 3 a 1 b 4 c 2 d 0 z dtype: object """inplace=validate_bool_kwarg(inplace,"inplace")psdf=self._psdf[[self.name]]._sort(by=[self.spark.column],ascending=ascending,na_position=na_position)ifinplace:ifignore_index:psdf.reset_index(drop=True,inplace=inplace)self._update_anchor(psdf)returnNoneelse:returnfirst_series(psdf.reset_index(drop=True))ifignore_indexelsefirst_series(psdf)
[docs]defsort_index(self,axis:Axis=0,level:Optional[Union[int,List[int]]]=None,ascending:bool=True,inplace:bool=False,kind:str=None,na_position:str="last",ignore_index:bool=False,)->Optional["Series"]:""" Sort object by labels (along an axis) Parameters ---------- axis : index, columns to direct sorting. Currently, only axis = 0 is supported. level : int or level name or list of ints or list of level names if not None, sort on values in specified index level(s) ascending : boolean, default True Sort ascending vs. descending inplace : bool, default False if True, perform operation in-place kind : str, default None pandas-on-Spark does not allow specifying the sorting algorithm now, default None na_position : {‘first’, ‘last’}, default ‘last’ first puts NaNs at the beginning, last puts NaNs at the end. Not implemented for MultiIndex. ignore_index : bool, default False If True, the resulting axis will be labeled 0, 1, …, n - 1. .. versionadded:: 3.4.0 Returns ------- sorted_obj : Series Examples -------- >>> s = ps.Series([2, 1, np.nan], index=['b', 'a', np.nan]) >>> s.sort_index() # doctest: +SKIP a 1.0 b 2.0 None NaN dtype: float64 >>> s.sort_index(ignore_index=True) 0 1.0 1 2.0 2 NaN dtype: float64 >>> s.sort_index(ascending=False) # doctest: +SKIP b 2.0 a 1.0 None NaN dtype: float64 >>> s.sort_index(na_position='first') # doctest: +SKIP None NaN a 1.0 b 2.0 dtype: float64 >>> s.sort_index(inplace=True) >>> s # doctest: +SKIP a 1.0 b 2.0 None NaN dtype: float64 Multi-index series. >>> s = ps.Series(range(4), index=[['b', 'b', 'a', 'a'], [1, 0, 1, 0]], name='0') >>> s.sort_index() a 0 3 1 2 b 0 1 1 0 Name: 0, dtype: int64 >>> s.sort_index(level=1) # doctest: +SKIP a 0 3 b 0 1 a 1 2 b 1 0 Name: 0, dtype: int64 >>> s.sort_index(level=[1, 0]) a 0 3 b 0 1 a 1 2 b 1 0 Name: 0, dtype: int64 """inplace=validate_bool_kwarg(inplace,"inplace")psdf=self._psdf[[self.name]].sort_index(axis=axis,level=level,ascending=ascending,kind=kind,na_position=na_position)ifinplace:ifignore_index:psdf.reset_index(drop=True,inplace=inplace)self._update_anchor(psdf)returnNoneelse:returnfirst_series(psdf.reset_index(drop=True))ifignore_indexelsefirst_series(psdf)
[docs]defswaplevel(self,i:Union[int,Name]=-2,j:Union[int,Name]=-1,copy:bool=True)->"Series":""" Swap levels i and j in a MultiIndex. Default is to swap the two innermost levels of the index. Parameters ---------- i, j : int, str Level of the indices to be swapped. Can pass level name as string. copy : bool, default True Whether to copy underlying data. Must be True. Returns ------- Series Series with levels swapped in MultiIndex. Examples -------- >>> midx = pd.MultiIndex.from_arrays([['a', 'b'], [1, 2]], names = ['word', 'number']) >>> midx # doctest: +SKIP MultiIndex([('a', 1), ('b', 2)], names=['word', 'number']) >>> psser = ps.Series(['x', 'y'], index=midx) >>> psser word number a 1 x b 2 y dtype: object >>> psser.swaplevel() number word 1 a x 2 b y dtype: object >>> psser.swaplevel(0, 1) number word 1 a x 2 b y dtype: object >>> psser.swaplevel('number', 'word') number word 1 a x 2 b y dtype: object """assertcopyisTruereturnfirst_series(self.to_frame().swaplevel(i,j,axis=0)).rename(self.name)
[docs]defswapaxes(self,i:Axis,j:Axis,copy:bool=True)->"Series":""" Interchange axes and swap values axes appropriately. Parameters ---------- i: {0 or 'index', 1 or 'columns'}. The axis to swap. j: {0 or 'index', 1 or 'columns'}. The axis to swap. copy : bool, default True. Returns ------- Series Examples -------- >>> psser = ps.Series([1, 2, 3], index=["x", "y", "z"]) >>> psser x 1 y 2 z 3 dtype: int64 >>> >>> psser.swapaxes(0, 0) x 1 y 2 z 3 dtype: int64 """assertcopyisTruei=validate_axis(i)j=validate_axis(j)ifnoti==j==0:raiseValueError("Axis must be 0 for Series")returnself.copy()
[docs]defadd_prefix(self,prefix:str)->"Series":""" Prefix labels with string `prefix`. For Series, the row labels are prefixed. For DataFrame, the column labels are prefixed. Parameters ---------- prefix : str The string to add before each label. Returns ------- Series New Series with updated labels. See Also -------- Series.add_suffix: Suffix column labels with string `suffix`. DataFrame.add_suffix: Suffix column labels with string `suffix`. DataFrame.add_prefix: Prefix column labels with string `prefix`. Examples -------- >>> s = ps.Series([1, 2, 3, 4]) >>> s 0 1 1 2 2 3 3 4 dtype: int64 >>> s.add_prefix('item_') item_0 1 item_1 2 item_2 3 item_3 4 dtype: int64 """assertisinstance(prefix,str)internal=self._internal.resolved_copysdf=internal.spark_frame.select([F.concat(F.lit(prefix),index_spark_column).alias(index_spark_column_name)forindex_spark_column,index_spark_column_nameinzip(internal.index_spark_columns,internal.index_spark_column_names)]+internal.data_spark_columns)returnfirst_series(DataFrame(internal.with_new_sdf(sdf,index_fields=([None]*internal.index_level))))
[docs]defadd_suffix(self,suffix:str)->"Series":""" Suffix labels with string suffix. For Series, the row labels are suffixed. For DataFrame, the column labels are suffixed. Parameters ---------- suffix : str The string to add after each label. Returns ------- Series New Series with updated labels. See Also -------- Series.add_prefix: Prefix row labels with string `prefix`. DataFrame.add_prefix: Prefix column labels with string `prefix`. DataFrame.add_suffix: Suffix column labels with string `suffix`. Examples -------- >>> s = ps.Series([1, 2, 3, 4]) >>> s 0 1 1 2 2 3 3 4 dtype: int64 >>> s.add_suffix('_item') 0_item 1 1_item 2 2_item 3 3_item 4 dtype: int64 """assertisinstance(suffix,str)internal=self._internal.resolved_copysdf=internal.spark_frame.select([F.concat(index_spark_column,F.lit(suffix)).alias(index_spark_column_name)forindex_spark_column,index_spark_column_nameinzip(internal.index_spark_columns,internal.index_spark_column_names)]+internal.data_spark_columns)returnfirst_series(DataFrame(internal.with_new_sdf(sdf,index_fields=([None]*internal.index_level))))
[docs]defautocorr(self,lag:int=1)->float:""" Compute the lag-N autocorrelation. This method computes the Pearson correlation between the Series and its shifted self. .. note:: the current implementation of rank uses Spark's Window without specifying partition specification. This leads to moveing all data into a single partition in a single machine and could cause serious performance degradation. Avoid this method with very large datasets. .. versionadded:: 3.4.0 Parameters ---------- lag : int, default 1 Number of lags to apply before performing autocorrelation. Returns ------- float The Pearson correlation between self and self.shift(lag). See Also -------- Series.corr : Compute the correlation between two Series. Series.shift : Shift index by desired number of periods. DataFrame.corr : Compute pairwise correlation of columns. Notes ----- If the Pearson correlation is not well defined return 'NaN'. Examples -------- >>> s = ps.Series([.2, .0, .6, .2, np.nan, .5, .6]) >>> s.autocorr() # doctest: +ELLIPSIS -0.141219... >>> s.autocorr(0) # doctest: +ELLIPSIS 1.0... >>> s.autocorr(2) # doctest: +ELLIPSIS 0.970725... >>> s.autocorr(-3) # doctest: +ELLIPSIS 0.277350... >>> s.autocorr(5) # doctest: +ELLIPSIS -1.000000... >>> s.autocorr(6) # doctest: +ELLIPSIS nan If the Pearson correlation is not well defined, then 'NaN' is returned. >>> s = ps.Series([1, 0, 0, 0]) >>> s.autocorr() nan """# This implementation is suboptimal because it moves all data to a single partition,# global sort should be used instead of window, but it should be a startifnotisinstance(lag,int):raiseTypeError("lag should be an int; however, got [%s]"%type(lag).__name__)sdf=self._internal.spark_framescol=self.spark.columniflag==0:corr=sdf.select(F.corr(scol,scol)).head()[0]else:lag_scol=F.lag(scol,lag).over(Window.orderBy(NATURAL_ORDER_COLUMN_NAME))lag_col_name=verify_temp_column_name(sdf,"__autocorr_lag_tmp_col__")corr=(sdf.withColumn(lag_col_name,lag_scol).select(F.corr(scol,F.col(lag_col_name))).head()[0])returnnp.nanifcorrisNoneelsecorr
[docs]defcorr(self,other:"Series",method:str="pearson",min_periods:Optional[int]=None)->float:""" Compute correlation with `other` Series, excluding missing values. .. versionadded:: 3.3.0 Parameters ---------- other : Series method : {'pearson', 'spearman', 'kendall'} * pearson : standard correlation coefficient * spearman : Spearman rank correlation * kendall : Kendall Tau correlation coefficient .. versionchanged:: 3.4.0 support 'kendall' for method parameter min_periods : int, optional Minimum number of observations needed to have a valid result. .. versionadded:: 3.4.0 Returns ------- correlation : float Notes ----- The complexity of Kendall correlation is O(#row * #row), if the dataset is too large, sampling ahead of correlation computation is recommended. Examples -------- >>> df = ps.DataFrame({'s1': [.2, .0, .6, .2], ... 's2': [.3, .6, .0, .1]}) >>> s1 = df.s1 >>> s2 = df.s2 >>> s1.corr(s2, method='pearson') -0.85106... >>> s1.corr(s2, method='spearman') -0.94868... >>> s1.corr(s2, method='kendall') -0.91287... >>> s1 = ps.Series([1, np.nan, 2, 1, 1, 2, 3]) >>> s2 = ps.Series([3, 4, 1, 1, 5]) >>> with ps.option_context("compute.ops_on_diff_frames", True): ... s1.corr(s2, method="pearson") -0.52223... >>> with ps.option_context("compute.ops_on_diff_frames", True): ... s1.corr(s2, method="spearman") -0.54433... >>> with ps.option_context("compute.ops_on_diff_frames", True): ... s1.corr(s2, method="kendall") -0.51639... >>> with ps.option_context("compute.ops_on_diff_frames", True): ... s1.corr(s2, method="kendall", min_periods=5) nan """ifmethodnotin["pearson","spearman","kendall"]:raiseValueError(f"Invalid method {method}")ifnotisinstance(other,Series):raiseTypeError("'other' must be a Series")ifmin_periodsisnotNoneandnotisinstance(min_periods,int):raiseTypeError(f"Invalid min_periods type {type(min_periods).__name__}")min_periods=1ifmin_periodsisNoneelsemin_periodsifsame_anchor(self,other):combined=selfthis=selfthat=otherelse:combined=combine_frames(self._psdf,other._psdf)# type: ignore[assignment]this=combined["this"]that=combined["that"]sdf=combined._internal.spark_frameindex_col_name=verify_temp_column_name(sdf,"__ser_corr_index_temp_column__")this_scol=this._internal.spark_column_for(this._internal.column_labels[0])that_scol=that._internal.spark_column_for(that._internal.column_labels[0])sdf=sdf.select(F.lit(0).alias(index_col_name),this_scol.cast("double").alias(CORRELATION_VALUE_1_COLUMN),that_scol.cast("double").alias(CORRELATION_VALUE_2_COLUMN),)sdf=compute(sdf=sdf,groupKeys=[index_col_name],method=method).select(F.when(F.col(CORRELATION_COUNT_OUTPUT_COLUMN)<min_periods,F.lit(None).cast("double")).otherwise(F.col(CORRELATION_CORR_OUTPUT_COLUMN)))results=sdf.take(1)iflen(results)==0:raiseValueError("attempt to get corr of an empty sequence")else:returnnp.nanifresults[0][0]isNoneelseresults[0][0]
[docs]defnsmallest(self,n:int=5)->"Series":""" Return the smallest `n` elements. Parameters ---------- n : int, default 5 Return this many ascending sorted values. Returns ------- Series The `n` smallest values in the Series, sorted in increasing order. See Also -------- Series.nlargest: Get the `n` largest elements. Series.sort_values: Sort Series by values. Series.head: Return the first `n` rows. Notes ----- Faster than ``.sort_values().head(n)`` for small `n` relative to the size of the ``Series`` object. In pandas-on-Spark, thanks to Spark's lazy execution and query optimizer, the two would have same performance. Examples -------- >>> data = [1, 2, 3, 4, np.nan ,6, 7, 8] >>> s = ps.Series(data) >>> s 0 1.0 1 2.0 2 3.0 3 4.0 4 NaN 5 6.0 6 7.0 7 8.0 dtype: float64 The `n` largest elements where ``n=5`` by default. >>> s.nsmallest() 0 1.0 1 2.0 2 3.0 3 4.0 5 6.0 dtype: float64 >>> s.nsmallest(3) 0 1.0 1 2.0 2 3.0 dtype: float64 """returnself.sort_values(ascending=True).head(n)
[docs]defnlargest(self,n:int=5)->"Series":""" Return the largest `n` elements. Parameters ---------- n : int, default 5 Returns ------- Series The `n` largest values in the Series, sorted in decreasing order. See Also -------- Series.nsmallest: Get the `n` smallest elements. Series.sort_values: Sort Series by values. Series.head: Return the first `n` rows. Notes ----- Faster than ``.sort_values(ascending=False).head(n)`` for small `n` relative to the size of the ``Series`` object. In pandas-on-Spark, thanks to Spark's lazy execution and query optimizer, the two would have same performance. Examples -------- >>> data = [1, 2, 3, 4, np.nan ,6, 7, 8] >>> s = ps.Series(data) >>> s 0 1.0 1 2.0 2 3.0 3 4.0 4 NaN 5 6.0 6 7.0 7 8.0 dtype: float64 The `n` largest elements where ``n=5`` by default. >>> s.nlargest() 7 8.0 6 7.0 5 6.0 3 4.0 2 3.0 dtype: float64 >>> s.nlargest(n=3) 7 8.0 6 7.0 5 6.0 dtype: float64 """returnself.sort_values(ascending=False).head(n)
[docs]defappend(self,to_append:"Series",ignore_index:bool=False,verify_integrity:bool=False)->"Series":""" Concatenate two or more Series. .. deprecated:: 3.4.0 Parameters ---------- to_append : Series or list/tuple of Series ignore_index : boolean, default False If True, do not use the index labels. verify_integrity : boolean, default False If True, raise Exception on creating index with duplicates Returns ------- appended : Series Examples -------- >>> s1 = ps.Series([1, 2, 3]) >>> s2 = ps.Series([4, 5, 6]) >>> s3 = ps.Series([4, 5, 6], index=[3,4,5]) >>> s1.append(s2) 0 1 1 2 2 3 0 4 1 5 2 6 dtype: int64 >>> s1.append(s3) 0 1 1 2 2 3 3 4 4 5 5 6 dtype: int64 With ignore_index set to True: >>> s1.append(s2, ignore_index=True) 0 1 1 2 2 3 3 4 4 5 5 6 dtype: int64 """warnings.warn("The Series.append method is deprecated ""and will be removed in a future version. ""Use pyspark.pandas.concat instead.",FutureWarning,)returnfirst_series(self.to_frame().append(to_append.to_frame(),ignore_index,verify_integrity)).rename(self.name)
[docs]defapply(self,func:Callable,args:Sequence[Any]=(),**kwds:Any)->"Series":""" Invoke function on values of Series. Can be a Python function that only works on the Series. .. note:: this API executes the function once to infer the type which is potentially expensive, for instance, when the dataset is created after aggregations or sorting. To avoid this, specify return type in ``func``, for instance, as below: >>> def square(x) -> np.int32: ... return x ** 2 pandas-on-Spark uses return type hint and does not try to infer the type. Parameters ---------- func : function Python function to apply. Note that type hint for return type is required. args : tuple Positional arguments passed to func after the series value. **kwds Additional keyword arguments passed to func. Returns ------- Series See Also -------- Series.aggregate : Only perform aggregating type operations. Series.transform : Only perform transforming type operations. DataFrame.apply : The equivalent function for DataFrame. Examples -------- Create a Series with typical summer temperatures for each city. >>> s = ps.Series([20, 21, 12], ... index=['London', 'New York', 'Helsinki']) >>> s London 20 New York 21 Helsinki 12 dtype: int64 Square the values by defining a function and passing it as an argument to ``apply()``. >>> def square(x) -> np.int64: ... return x ** 2 >>> s.apply(square) London 400 New York 441 Helsinki 144 dtype: int64 Define a custom function that needs additional positional arguments and pass these additional arguments using the ``args`` keyword >>> def subtract_custom_value(x, custom_value) -> np.int64: ... return x - custom_value >>> s.apply(subtract_custom_value, args=(5,)) London 15 New York 16 Helsinki 7 dtype: int64 Define a custom function that takes keyword arguments and pass these arguments to ``apply`` >>> def add_custom_values(x, **kwargs) -> np.int64: ... for month in kwargs: ... x += kwargs[month] ... return x >>> s.apply(add_custom_values, june=30, july=20, august=25) London 95 New York 96 Helsinki 87 dtype: int64 Use a function from the Numpy library >>> def numpy_log(col) -> np.float64: ... return np.log(col) >>> s.apply(numpy_log) London 2.995732 New York 3.044522 Helsinki 2.484907 dtype: float64 You can omit the type hint and let pandas-on-Spark infer its type. >>> s.apply(np.log) London 2.995732 New York 3.044522 Helsinki 2.484907 dtype: float64 """assertcallable(func),"the first argument should be a callable function."try:spec=inspect.getfullargspec(func)return_sig=spec.annotations.get("return",None)should_infer_schema=return_sigisNoneexceptTypeError:# Falls back to schema inference if it fails to get signature.should_infer_schema=Truedefapply_each(s:Any)->pd.Series:returns.apply(func,args=args,**kwds)ifshould_infer_schema:returnself.pandas_on_spark._transform_batch(apply_each,None)else:sig_return=infer_return_type(func)ifnotisinstance(sig_return,ScalarType):raiseValueError("Expected the return type of this function to be of scalar type, ""but found type {}".format(sig_return))return_type=sig_returnreturnself.pandas_on_spark._transform_batch(apply_each,return_type)
# TODO: not all arguments are implemented comparing to pandas' for now.
[docs]defaggregate(self,func:Union[str,List[str]])->Union[Scalar,"Series"]:"""Aggregate using one or more operations over the specified axis. Parameters ---------- func : str or a list of str function name(s) as string apply to series. Returns ------- scalar, Series The return can be: - scalar : when Series.agg is called with single function - Series : when Series.agg is called with several functions Notes ----- `agg` is an alias for `aggregate`. Use the alias. See Also -------- Series.apply : Invoke function on a Series. Series.transform : Only perform transforming type operations. Series.groupby : Perform operations over groups. DataFrame.aggregate : The equivalent function for DataFrame. Examples -------- >>> s = ps.Series([1, 2, 3, 4]) >>> s.agg('min') 1 >>> s.agg(['min', 'max']).sort_index() max 4 min 1 dtype: int64 """ifisinstance(func,list):returnfirst_series(self.to_frame().aggregate(func)).rename(self.name)elifisinstance(func,str):returngetattr(self,func)()else:raiseTypeError("func must be a string or list of strings")
agg=aggregatedeftranspose(self,*args:Any,**kwargs:Any)->"Series":""" Return the transpose, which is self. Examples -------- It returns the same object as the transpose of the given series object, which is by definition self. >>> s = ps.Series([1, 2, 3]) >>> s 0 1 1 2 2 3 dtype: int64 >>> s.transpose() 0 1 1 2 2 3 dtype: int64 """returnself.copy()T=property(transpose)
[docs]deftransform(self,func:Union[Callable,List[Callable]],axis:Axis=0,*args:Any,**kwargs:Any)->Union["Series",DataFrame]:""" Call ``func`` producing the same type as `self` with transformed values and that has the same axis length as input. .. note:: this API executes the function once to infer the type which is potentially expensive, for instance, when the dataset is created after aggregations or sorting. To avoid this, specify return type in ``func``, for instance, as below: >>> def square(x) -> np.int32: ... return x ** 2 pandas-on-Spark uses return type hint and does not try to infer the type. Parameters ---------- func : function or list A function or a list of functions to use for transforming the data. axis : int, default 0 or 'index' Can only be set to 0 now. *args Positional arguments to pass to `func`. **kwargs Keyword arguments to pass to `func`. Returns ------- An instance of the same type with `self` that must have the same length as input. See Also -------- Series.aggregate : Only perform aggregating type operations. Series.apply : Invoke function on Series. DataFrame.transform : The equivalent function for DataFrame. Examples -------- >>> s = ps.Series(range(3)) >>> s 0 0 1 1 2 2 dtype: int64 >>> def sqrt(x) -> float: ... return np.sqrt(x) >>> s.transform(sqrt) 0 0.000000 1 1.000000 2 1.414214 dtype: float64 Even though the resulting instance must have the same length as the input, it is possible to provide several input functions: >>> def exp(x) -> float: ... return np.exp(x) >>> s.transform([sqrt, exp]) sqrt exp 0 0.000000 1.000000 1 1.000000 2.718282 2 1.414214 7.389056 You can omit the type hint and let pandas-on-Spark infer its type. >>> s.transform([np.sqrt, np.exp]) sqrt exp 0 0.000000 1.000000 1 1.000000 2.718282 2 1.414214 7.389056 """axis=validate_axis(axis)ifaxis!=0:raiseNotImplementedError('axis should be either 0 or "index" currently.')ifisinstance(func,list):applied=[]forfinfunc:applied.append(self.apply(f,args=args,**kwargs).rename(f.__name__))internal=self._internal.with_new_columns(applied)returnDataFrame(internal)else:returnself.apply(func,args=args,**kwargs)
[docs]defround(self,decimals:int=0)->"Series":""" Round each value in a Series to the given number of decimals. Parameters ---------- decimals : int Number of decimal places to round to (default: 0). If decimals are negative, it specifies the number of positions to the left of the decimal point. Returns ------- Series object See Also -------- DataFrame.round Examples -------- >>> df = ps.Series([0.028208, 0.038683, 0.877076], name='x') >>> df 0 0.028208 1 0.038683 2 0.877076 Name: x, dtype: float64 >>> df.round(2) 0 0.03 1 0.04 2 0.88 Name: x, dtype: float64 """ifnotisinstance(decimals,int):raiseTypeError("decimals must be an integer")scol=F.round(self.spark.column,decimals)returnself._with_new_scol(scol,field=(self._internal.data_fields[0].copy(nullable=True)ifnotisinstance(self.spark.data_type,DecimalType)elseNone),)
# TODO: add 'interpolation' parameter.
[docs]defquantile(self,q:Union[float,Iterable[float]]=0.5,accuracy:int=10000)->Union[Scalar,"Series"]:""" Return value at the given quantile. .. note:: Unlike pandas', the quantile in pandas-on-Spark is an approximated quantile based upon approximate percentile computation because computing quantile across a large dataset is extremely expensive. Parameters ---------- q : float or array-like, default 0.5 (50% quantile) 0 <= q <= 1, the quantile(s) to compute. accuracy : int, optional Default accuracy of approximation. Larger value means better accuracy. The relative error can be deduced by 1.0 / accuracy. Returns ------- float or Series If the current object is a Series and ``q`` is an array, a Series will be returned where the index is ``q`` and the values are the quantiles, otherwise a float will be returned. Examples -------- >>> s = ps.Series([1, 2, 3, 4, 5]) >>> s.quantile(.5) 3.0 >>> (s + 1).quantile(.5) 4.0 >>> s.quantile([.25, .5, .75]) 0.25 2.0 0.50 3.0 0.75 4.0 dtype: float64 >>> (s + 1).quantile([.25, .5, .75]) 0.25 3.0 0.50 4.0 0.75 5.0 dtype: float64 """ifisinstance(q,Iterable):returnfirst_series(cast("ps.DataFrame",self.to_frame().quantile(q=q,axis=0,numeric_only=False,accuracy=accuracy),)).rename(self.name)else:ifnotisinstance(accuracy,int):raiseTypeError("accuracy must be an integer; however, got [%s]"%type(accuracy).__name__)ifnotisinstance(q,float):raiseTypeError("q must be a float or an array of floats; however, [%s] found."%type(q))q_float=qifq_float<0.0orq_float>1.0:raiseValueError("percentiles should all be in the interval [0, 1].")defquantile(psser:Series)->Column:spark_type=psser.spark.data_typespark_column=psser.spark.columnifisinstance(spark_type,(BooleanType,NumericType)):returnF.percentile_approx(spark_column.cast(DoubleType()),q_float,accuracy)else:raiseTypeError("Could not convert {} ({}) to numeric".format(spark_type_to_pandas_dtype(spark_type),spark_type.simpleString()))returnself._reduce_for_stat_function(quantile,name="quantile")
# TODO: add axis, pct, na_option parameter
[docs]defrank(self,method:str="average",ascending:bool=True,numeric_only:Optional[bool]=None)->"Series":""" Compute numerical data ranks (1 through n) along axis. Equal values are assigned a rank that is the average of the ranks of those values. .. note:: the current implementation of rank uses Spark's Window without specifying partition specification. This leads to moveing all data into a single partition in a single machine and could cause serious performance degradation. Avoid this method with very large datasets. Parameters ---------- method : {'average', 'min', 'max', 'first', 'dense'} * average: average rank of group * min: lowest rank in group * max: highest rank in group * first: ranks assigned in order they appear in the array * dense: like 'min', but rank always increases by 1 between groups ascending : boolean, default True False for ranks by high (1) to low (N) numeric_only : bool, optional If set to True, rank numeric Series, or return an empty Series for non-numeric Series Returns ------- ranks : same type as caller Examples -------- >>> s = ps.Series([1, 2, 2, 3], name='A') >>> s 0 1 1 2 2 2 3 3 Name: A, dtype: int64 >>> s.rank() 0 1.0 1 2.5 2 2.5 3 4.0 Name: A, dtype: float64 If method is set to 'min', it uses lowest rank in group. >>> s.rank(method='min') 0 1.0 1 2.0 2 2.0 3 4.0 Name: A, dtype: float64 If method is set to 'max', it uses highest rank in group. >>> s.rank(method='max') 0 1.0 1 3.0 2 3.0 3 4.0 Name: A, dtype: float64 If method is set to 'first', it is assigned rank in order without groups. >>> s.rank(method='first') 0 1.0 1 2.0 2 3.0 3 4.0 Name: A, dtype: float64 If method is set to 'dense', it leaves no gaps in group. >>> s.rank(method='dense') 0 1.0 1 2.0 2 2.0 3 3.0 Name: A, dtype: float64 If numeric_only is set to 'True', rank only numeric Series, return an empty Series otherwise. >>> s = ps.Series(['a', 'b', 'c'], name='A', index=['x', 'y', 'z']) >>> s x a y b z c Name: A, dtype: object >>> s.rank(numeric_only=True) Series([], Name: A, dtype: float64) """is_numeric=isinstance(self.spark.data_type,(NumericType,BooleanType))ifnumeric_onlyandnotis_numeric:returnps.Series([],dtype="float64",name=self.name)else:returnself._rank(method,ascending).spark.analyzed
def_rank(self,method:str="average",ascending:bool=True,*,part_cols:Sequence["ColumnOrName"]=(),)->"Series":ifmethodnotin["average","min","max","first","dense"]:msg="method must be one of 'average', 'min', 'max', 'first', 'dense'"raiseValueError(msg)ifself._internal.index_level>1:raiseNotImplementedError("rank do not support MultiIndex now")ifascending:asc_func=Column.ascelse:asc_func=Column.descifmethod=="first":window=(Window.orderBy(asc_func(self.spark.column),asc_func(F.col(NATURAL_ORDER_COLUMN_NAME)),).partitionBy(*part_cols).rowsBetween(Window.unboundedPreceding,Window.currentRow))scol=F.row_number().over(window)elifmethod=="dense":window=(Window.orderBy(asc_func(self.spark.column)).partitionBy(*part_cols).rowsBetween(Window.unboundedPreceding,Window.currentRow))scol=F.dense_rank().over(window)else:ifmethod=="average":stat_func=F.meanelifmethod=="min":stat_func=F.minelifmethod=="max":stat_func=F.maxwindow1=(Window.orderBy(asc_func(self.spark.column)).partitionBy(*part_cols).rowsBetween(Window.unboundedPreceding,Window.currentRow))window2=Window.partitionBy(cast("List[ColumnOrName]",[self.spark.column])+list(part_cols)).rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)scol=stat_func(F.row_number().over(window1)).over(window2)returnself._with_new_scol(scol.cast(DoubleType()))
[docs]deffilter(self,items:Optional[Sequence[Any]]=None,like:Optional[str]=None,regex:Optional[str]=None,axis:Optional[Axis]=None,)->"Series":axis=validate_axis(axis)ifaxis==1:raiseValueError("Series does not support columns axis.")returnfirst_series(self.to_frame().filter(items=items,like=like,regex=regex,axis=axis),).rename(self.name)
[docs]defdiff(self,periods:int=1)->"Series":""" First discrete difference of element. Calculates the difference of a Series element compared with another element in the DataFrame (default is the element in the same column of the previous row). .. note:: the current implementation of diff uses Spark's Window without specifying partition specification. This leads to moveing all data into a single partition in a single machine and could cause serious performance degradation. Avoid this method with very large datasets. Parameters ---------- periods : int, default 1 Periods to shift for calculating difference, accepts negative values. Returns ------- diffed : Series Examples -------- >>> df = ps.DataFrame({'a': [1, 2, 3, 4, 5, 6], ... 'b': [1, 1, 2, 3, 5, 8], ... 'c': [1, 4, 9, 16, 25, 36]}, columns=['a', 'b', 'c']) >>> df a b c 0 1 1 1 1 2 1 4 2 3 2 9 3 4 3 16 4 5 5 25 5 6 8 36 >>> df.b.diff() 0 NaN 1 0.0 2 1.0 3 1.0 4 2.0 5 3.0 Name: b, dtype: float64 Difference with previous value >>> df.c.diff(periods=3) 0 NaN 1 NaN 2 NaN 3 15.0 4 21.0 5 27.0 Name: c, dtype: float64 Difference with following value >>> df.c.diff(periods=-1) 0 -3.0 1 -5.0 2 -7.0 3 -9.0 4 -11.0 5 NaN Name: c, dtype: float64 """returnself._diff(periods).spark.analyzed
def_diff(self,periods:int,*,part_cols:Sequence["ColumnOrName"]=())->"Series":ifnotisinstance(periods,int):raiseTypeError("periods should be an int; however, got [%s]"%type(periods).__name__)window=(Window.partitionBy(*part_cols).orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(-periods,-periods))scol=self.spark.column-F.lag(self.spark.column,periods).over(window)returnself._with_new_scol(scol,field=self._internal.data_fields[0].copy(nullable=True))
[docs]defidxmax(self,skipna:bool=True)->Union[Tuple,Any]:""" Return the row label of the maximum value. If multiple values equal the maximum, the first row label with that value is returned. Parameters ---------- skipna : bool, default True Exclude NA/null values. If the entire Series is NA, the result will be NA. Returns ------- Index Label of the maximum value. Raises ------ ValueError If the Series is empty. See Also -------- Series.idxmin : Return index *label* of the first occurrence of minimum of values. Examples -------- >>> s = ps.Series(data=[1, None, 4, 3, 5], ... index=['A', 'B', 'C', 'D', 'E']) >>> s A 1.0 B NaN C 4.0 D 3.0 E 5.0 dtype: float64 >>> s.idxmax() 'E' If `skipna` is False and there is an NA value in the data, the function returns ``nan``. >>> s.idxmax(skipna=False) nan In case of multi-index, you get a tuple: >>> index = pd.MultiIndex.from_arrays([ ... ['a', 'a', 'b', 'b'], ['c', 'd', 'e', 'f']], names=('first', 'second')) >>> s = ps.Series(data=[1, None, 4, 5], index=index) >>> s first second a c 1.0 d NaN b e 4.0 f 5.0 dtype: float64 >>> s.idxmax() ('b', 'f') If multiple values equal the maximum, the first row label with that value is returned. >>> s = ps.Series([1, 100, 1, 100, 1, 100], index=[10, 3, 5, 2, 1, 8]) >>> s 10 1 3 100 5 1 2 100 1 1 8 100 dtype: int64 >>> s.idxmax() 3 """sdf=self._internal.spark_framescol=self.spark.columnindex_scols=self._internal.index_spark_columnsifskipna:sdf=sdf.orderBy(scol.desc_nulls_last(),NATURAL_ORDER_COLUMN_NAME)else:sdf=sdf.orderBy(scol.desc_nulls_first(),NATURAL_ORDER_COLUMN_NAME)results=sdf.select([scol]+index_scols).take(1)iflen(results)==0:raiseValueError("attempt to get idxmin of an empty sequence")ifresults[0][0]isNone:# This will only happen when skipna is False because we will# place nulls first.returnnp.nanvalues=list(results[0][1:])iflen(values)==1:returnvalues[0]else:returntuple(values)
[docs]defidxmin(self,skipna:bool=True)->Union[Tuple,Any]:""" Return the row label of the minimum value. If multiple values equal the minimum, the first row label with that value is returned. Parameters ---------- skipna : bool, default True Exclude NA/null values. If the entire Series is NA, the result will be NA. Returns ------- Index Label of the minimum value. Raises ------ ValueError If the Series is empty. See Also -------- Series.idxmax : Return index *label* of the first occurrence of maximum of values. Notes ----- This method is the Series version of ``ndarray.argmin``. This method returns the label of the minimum, while ``ndarray.argmin`` returns the position. To get the position, use ``series.values.argmin()``. Examples -------- >>> s = ps.Series(data=[1, None, 4, 0], ... index=['A', 'B', 'C', 'D']) >>> s A 1.0 B NaN C 4.0 D 0.0 dtype: float64 >>> s.idxmin() 'D' If `skipna` is False and there is an NA value in the data, the function returns ``nan``. >>> s.idxmin(skipna=False) nan In case of multi-index, you get a tuple: >>> index = pd.MultiIndex.from_arrays([ ... ['a', 'a', 'b', 'b'], ['c', 'd', 'e', 'f']], names=('first', 'second')) >>> s = ps.Series(data=[1, None, 4, 0], index=index) >>> s first second a c 1.0 d NaN b e 4.0 f 0.0 dtype: float64 >>> s.idxmin() ('b', 'f') If multiple values equal the minimum, the first row label with that value is returned. >>> s = ps.Series([1, 100, 1, 100, 1, 100], index=[10, 3, 5, 2, 1, 8]) >>> s 10 1 3 100 5 1 2 100 1 1 8 100 dtype: int64 >>> s.idxmin() 10 """sdf=self._internal.spark_framescol=self.spark.columnindex_scols=self._internal.index_spark_columnsifskipna:sdf=sdf.orderBy(scol.asc_nulls_last(),NATURAL_ORDER_COLUMN_NAME)else:sdf=sdf.orderBy(scol.asc_nulls_first(),NATURAL_ORDER_COLUMN_NAME)results=sdf.select([scol]+index_scols).take(1)iflen(results)==0:raiseValueError("attempt to get idxmin of an empty sequence")ifresults[0][0]isNone:# This will only happen when skipna is False because we will# place nulls first.returnnp.nanvalues=list(results[0][1:])iflen(values)==1:returnvalues[0]else:returntuple(values)
[docs]defpop(self,item:Name)->Union["Series",Scalar]:""" Return item and drop from series. Parameters ---------- item : label Label of index to be popped. Returns ------- Value that is popped from series. Examples -------- >>> s = ps.Series(data=np.arange(3), index=['A', 'B', 'C']) >>> s A 0 B 1 C 2 dtype: int64 >>> s.pop('A') 0 >>> s B 1 C 2 dtype: int64 >>> s = ps.Series(data=np.arange(3), index=['A', 'A', 'C']) >>> s A 0 A 1 C 2 dtype: int64 >>> s.pop('A') A 0 A 1 dtype: int64 >>> s C 2 dtype: int64 Also support for MultiIndex >>> midx = pd.MultiIndex([['lama', 'cow', 'falcon'], ... ['speed', 'weight', 'length']], ... [[0, 0, 0, 1, 1, 1, 2, 2, 2], ... [0, 1, 2, 0, 1, 2, 0, 1, 2]]) >>> s = ps.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3], ... index=midx) >>> s lama speed 45.0 weight 200.0 length 1.2 cow speed 30.0 weight 250.0 length 1.5 falcon speed 320.0 weight 1.0 length 0.3 dtype: float64 >>> s.pop('lama') speed 45.0 weight 200.0 length 1.2 dtype: float64 >>> s cow speed 30.0 weight 250.0 length 1.5 falcon speed 320.0 weight 1.0 length 0.3 dtype: float64 Also support for MultiIndex with several indexes. >>> midx = pd.MultiIndex([['a', 'b', 'c'], ... ['lama', 'cow', 'falcon'], ... ['speed', 'weight', 'length']], ... [[0, 0, 0, 0, 0, 0, 1, 1, 1], ... [0, 0, 0, 1, 1, 1, 2, 2, 2], ... [0, 1, 2, 0, 1, 2, 0, 0, 2]] ... ) >>> s = ps.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3], ... index=midx) >>> s a lama speed 45.0 weight 200.0 length 1.2 cow speed 30.0 weight 250.0 length 1.5 b falcon speed 320.0 speed 1.0 length 0.3 dtype: float64 >>> s.pop(('a', 'lama')) speed 45.0 weight 200.0 length 1.2 dtype: float64 >>> s a cow speed 30.0 weight 250.0 length 1.5 b falcon speed 320.0 speed 1.0 length 0.3 dtype: float64 >>> s.pop(('b', 'falcon', 'speed')) (b, falcon, speed) 320.0 (b, falcon, speed) 1.0 dtype: float64 """ifnotis_name_like_value(item):raiseTypeError("'key' should be string or tuple that contains strings")ifnotis_name_like_tuple(item):item=(item,)ifself._internal.index_level<len(item):raiseKeyError("Key length ({}) exceeds index depth ({})".format(len(item),self._internal.index_level))internal=self._internalscols=internal.index_spark_columns[len(item):]+[self.spark.column]rows=[internal.spark_columns[level]==indexforlevel,indexinenumerate(item)]sdf=internal.spark_frame.filter(reduce(lambdax,y:x&y,rows)).select(scols)psdf=self._drop(item)self._update_anchor(psdf)ifself._internal.index_level==len(item):# if spark_frame has one column and one data, return data only without framepdf=sdf.limit(2).toPandas()length=len(pdf)iflength==1:val=pdf[internal.data_spark_column_names[0]].iloc[0]ifisinstance(self.dtype,CategoricalDtype):returnself.dtype.categories[val]else:returnvalitem_string=name_like_string(item)sdf=sdf.withColumn(SPARK_DEFAULT_INDEX_NAME,F.lit(str(item_string)))internal=InternalFrame(spark_frame=sdf,index_spark_columns=[scol_for(sdf,SPARK_DEFAULT_INDEX_NAME)],column_labels=[self._column_label],data_fields=[self._internal.data_fields[0]],)returnfirst_series(DataFrame(internal))else:internal=internal.copy(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolininternal.index_spark_column_names[len(item):]],index_fields=internal.index_fields[len(item):],index_names=self._internal.index_names[len(item):],data_spark_columns=[scol_for(sdf,internal.data_spark_column_names[0])],)returnfirst_series(DataFrame(internal))
[docs]defcopy(self,deep:bool=True)->"Series":""" Make a copy of this object's indices and data. Parameters ---------- deep : bool, default True this parameter is not supported but just dummy parameter to match pandas. Returns ------- copy : Series Examples -------- >>> s = ps.Series([1, 2], index=["a", "b"]) >>> s a 1 b 2 dtype: int64 >>> s_copy = s.copy() >>> s_copy a 1 b 2 dtype: int64 """returnfirst_series(DataFrame(self._internal))
[docs]defmode(self,dropna:bool=True)->"Series":""" Return the mode(s) of the dataset. Always returns Series even if only one value is returned. .. versionchanged:: 3.4.0 Series name is preserved to follow pandas 1.4+ behavior. Parameters ---------- dropna : bool, default True Don't consider counts of NaN/NaT. Returns ------- Series Modes of the Series. Examples -------- >>> s = ps.Series([0, 0, 1, 1, 1, np.nan, np.nan, np.nan]) >>> s 0 0.0 1 0.0 2 1.0 3 1.0 4 1.0 5 NaN 6 NaN 7 NaN dtype: float64 >>> s.mode() 0 1.0 dtype: float64 If there are several same modes, all items are shown >>> s = ps.Series([0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, ... np.nan, np.nan, np.nan]) >>> s 0 0.0 1 0.0 2 1.0 3 1.0 4 1.0 5 2.0 6 2.0 7 2.0 8 3.0 9 3.0 10 3.0 11 NaN 12 NaN 13 NaN dtype: float64 >>> s.mode().sort_values() 0 1.0 1 2.0 2 3.0 dtype: float64 With 'dropna' set to 'False', we can also see NaN in the result >>> s.mode(False).sort_values() 0 1.0 1 2.0 2 3.0 3 NaN dtype: float64 """scol=self.spark.columnname=self._internal.data_spark_column_names[0]sdf=(self._internal.spark_frame.select(SF.mode(scol,dropna).alias(name)).select(F.array_sort(F.col(name)).alias(name)).select(F.explode(F.col(name)).alias(name)))internal=InternalFrame(spark_frame=sdf,index_spark_columns=None,column_labels=[None])ser_mode=first_series(DataFrame(internal))ser_mode.name=self.namereturnser_mode
# TODO: introduce 'method', 'limit', 'in_place'; fully support 'regex'
[docs]defreplace(self,to_replace:Optional[Union[Any,List,Tuple,Dict]]=None,value:Optional[Union[List,Tuple]]=None,regex:Union[str,bool]=False,)->"Series":""" Replace values given in to_replace with value. Values of the Series are replaced with other values dynamically. .. note:: For partial pattern matching, the replacement is against the whole string, which is different from pandas. That's by the nature of underlying Spark API. Parameters ---------- to_replace : str, list, tuple, dict, Series, int, float, or None How to find the values that will be replaced. * numeric, str: - numeric: numeric values equal to to_replace will be replaced with value - str: string exactly matching to_replace will be replaced with value * list of str or numeric: - if to_replace and value are both lists or tuples, they must be the same length. - str and numeric rules apply as above. * dict: - Dicts can be used to specify different replacement values for different existing values. For example, {'a': 'b', 'y': 'z'} replaces the value ‘a’ with ‘b’ and ‘y’ with ‘z’. To use a dict in this way the value parameter should be None. - For a DataFrame a dict can specify that different values should be replaced in different columns. For example, {'a': 1, 'b': 'z'} looks for the value 1 in column ‘a’ and the value ‘z’ in column ‘b’ and replaces these values with whatever is specified in value. The value parameter should not be None in this case. You can treat this as a special case of passing two lists except that you are specifying the column to search in. See the examples section for examples of each of these. value : scalar, dict, list, tuple, str default None Value to replace any values matching to_replace with. For a DataFrame a dict of values can be used to specify which value to use for each column (columns not in the dict will not be filled). Regular expressions, strings and lists or dicts of such objects are also allowed. regex: bool or str, default False Whether to interpret to_replace and/or value as regular expressions. If this is True then to_replace must be a string. Alternatively, this could be a regular expression in which case to_replace must be None. Returns ------- Series Object after replacement. Examples -------- Scalar `to_replace` and `value` >>> s = ps.Series([0, 1, 2, 3, 4]) >>> s 0 0 1 1 2 2 3 3 4 4 dtype: int64 >>> s.replace(0, 5) 0 5 1 1 2 2 3 3 4 4 dtype: int64 List-like `to_replace` >>> s.replace([0, 4], 5000) 0 5000 1 1 2 2 3 3 4 5000 dtype: int64 >>> s.replace([1, 2, 3], [10, 20, 30]) 0 0 1 10 2 20 3 30 4 4 dtype: int64 Dict-like `to_replace` >>> s.replace({1: 1000, 2: 2000, 3: 3000, 4: 4000}) 0 0 1 1000 2 2000 3 3000 4 4000 dtype: int64 Also support for MultiIndex >>> midx = pd.MultiIndex([['lama', 'cow', 'falcon'], ... ['speed', 'weight', 'length']], ... [[0, 0, 0, 1, 1, 1, 2, 2, 2], ... [0, 1, 2, 0, 1, 2, 0, 1, 2]]) >>> s = ps.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3], ... index=midx) >>> s lama speed 45.0 weight 200.0 length 1.2 cow speed 30.0 weight 250.0 length 1.5 falcon speed 320.0 weight 1.0 length 0.3 dtype: float64 >>> s.replace(45, 450) lama speed 450.0 weight 200.0 length 1.2 cow speed 30.0 weight 250.0 length 1.5 falcon speed 320.0 weight 1.0 length 0.3 dtype: float64 >>> s.replace([45, 30, 320], 500) lama speed 500.0 weight 200.0 length 1.2 cow speed 500.0 weight 250.0 length 1.5 falcon speed 500.0 weight 1.0 length 0.3 dtype: float64 >>> s.replace({45: 450, 30: 300}) lama speed 450.0 weight 200.0 length 1.2 cow speed 300.0 weight 250.0 length 1.5 falcon speed 320.0 weight 1.0 length 0.3 dtype: float64 Regular expression `to_replace` >>> psser = ps.Series(['bat', 'foo', 'bait', 'abc', 'bar', 'zoo']) >>> psser.replace(to_replace=r'^ba.$', value='new', regex=True) 0 new 1 foo 2 bait 3 abc 4 new 5 zoo dtype: object >>> psser.replace(value='new', regex=r'^.oo$') 0 bat 1 new 2 bait 3 abc 4 bar 5 new dtype: object For partial pattern matching, the replacement is against the whole string >>> psser.replace('ba', 'xx', regex=True) 0 xx 1 foo 2 xx 3 abc 4 xx 5 zoo dtype: object """ifisinstance(regex,str):ifto_replaceisnotNone:raiseValueError("'to_replace' must be 'None' if 'regex' is not a bool")to_replace=regexregex=Trueelifnotisinstance(regex,bool):raiseNotImplementedError("'regex' of %s type is not supported"%type(regex).__name__)elifregexisTrue:assertisinstance(to_replace,str),"If 'regex' is True then 'to_replace' must be a string"ifto_replaceisNone:returnself.fillna(method="ffill")ifnotisinstance(to_replace,(str,list,tuple,dict,int,float)):raiseTypeError("'to_replace' should be one of str, list, tuple, dict, int, float")to_replace=list(to_replace)ifisinstance(to_replace,tuple)elseto_replacevalue=list(value)ifisinstance(value,tuple)elsevalueifisinstance(to_replace,list)andisinstance(value,list):ifnotlen(to_replace)==len(value):raiseValueError("Replacement lists must match in length. Expecting {} got {}".format(len(to_replace),len(value)))to_replace={k:vfork,vinzip(to_replace,value)}ifisinstance(to_replace,dict):is_start=Trueiflen(to_replace)==0:current=self.spark.columnelse:forto_replace_,valueinto_replace.items():cond=((F.isnan(self.spark.column)|self.spark.column.isNull())ifpd.isna(to_replace_)else(self.spark.column==F.lit(to_replace_)))ifis_start:current=F.when(cond,value)is_start=Falseelse:current=current.when(cond,value)current=current.otherwise(self.spark.column)else:ifregex:# to_replace must be a stringcond=self.spark.column.rlike(cast(str,to_replace))else:cond=self.spark.column.isin(to_replace)# to_replace may be a scalarifnp.array(pd.isna(to_replace)).any():cond=cond|F.isnan(self.spark.column)|self.spark.column.isNull()current=F.when(cond,value).otherwise(self.spark.column)returnself._with_new_scol(current)# TODO: dtype?
[docs]defupdate(self,other:"Series")->None:""" Modify Series in place using non-NA values from passed Series. Aligns on index. Parameters ---------- other : Series Examples -------- >>> from pyspark.pandas.config import set_option, reset_option >>> set_option("compute.ops_on_diff_frames", True) >>> s = ps.Series([1, 2, 3]) >>> s.update(ps.Series([4, 5, 6])) >>> s.sort_index() 0 4 1 5 2 6 dtype: int64 >>> s = ps.Series(['a', 'b', 'c']) >>> s.update(ps.Series(['d', 'e'], index=[0, 2])) >>> s.sort_index() 0 d 1 b 2 e dtype: object >>> s = ps.Series([1, 2, 3]) >>> s.update(ps.Series([4, 5, 6, 7, 8])) >>> s.sort_index() 0 4 1 5 2 6 dtype: int64 >>> s = ps.Series([1, 2, 3], index=[10, 11, 12]) >>> s 10 1 11 2 12 3 dtype: int64 >>> s.update(ps.Series([4, 5, 6])) >>> s.sort_index() 10 1 11 2 12 3 dtype: int64 >>> s.update(ps.Series([4, 5, 6], index=[11, 12, 13])) >>> s.sort_index() 10 1 11 4 12 5 dtype: int64 If ``other`` contains NaNs the corresponding values are not updated in the original Series. >>> s = ps.Series([1, 2, 3]) >>> s.update(ps.Series([4, np.nan, 6])) >>> s.sort_index() 0 4.0 1 2.0 2 6.0 dtype: float64 >>> reset_option("compute.ops_on_diff_frames") """ifnotisinstance(other,Series):raiseTypeError("'other' must be a Series")ifsame_anchor(self,other):scol=(F.when(other.spark.column.isNotNull(),other.spark.column).otherwise(self.spark.column).alias(self._psdf._internal.spark_column_name_for(self._column_label)))internal=self._psdf._internal.with_new_spark_column(self._column_label,scol# TODO: dtype?)self._psdf._update_internal_frame(internal)else:combined=combine_frames(self._psdf,other._psdf,how="leftouter")this_scol=combined["this"]._internal.spark_column_for(self._column_label)that_scol=combined["that"]._internal.spark_column_for(other._column_label)scol=(F.when(that_scol.isNotNull(),that_scol).otherwise(this_scol).alias(self._psdf._internal.spark_column_name_for(self._column_label)))internal=combined["this"]._internal.with_new_spark_column(self._column_label,scol# TODO: dtype?)self._psdf._update_internal_frame(internal.resolved_copy,check_same_anchor=False)
[docs]defwhere(self,cond:"Series",other:Any=np.nan)->"Series":""" Replace values where the condition is False. Parameters ---------- cond : boolean Series Where cond is True, keep the original value. Where False, replace with corresponding value from other. other : scalar, Series Entries where cond is False are replaced with corresponding value from other. Returns ------- Series Examples -------- >>> from pyspark.pandas.config import set_option, reset_option >>> set_option("compute.ops_on_diff_frames", True) >>> s1 = ps.Series([0, 1, 2, 3, 4]) >>> s2 = ps.Series([100, 200, 300, 400, 500]) >>> s1.where(s1 > 0).sort_index() 0 NaN 1 1.0 2 2.0 3 3.0 4 4.0 dtype: float64 >>> s1.where(s1 > 1, 10).sort_index() 0 10 1 10 2 2 3 3 4 4 dtype: int64 >>> s1.where(s1 > 1, s1 + 100).sort_index() 0 100 1 101 2 2 3 3 4 4 dtype: int64 >>> s1.where(s1 > 1, s2).sort_index() 0 100 1 200 2 2 3 3 4 4 dtype: int64 >>> reset_option("compute.ops_on_diff_frames") """assertisinstance(cond,Series)# We should check the DataFrame from both `cond` and `other`.should_try_ops_on_diff_frame=notsame_anchor(cond,self)or(isinstance(other,Series)andnotsame_anchor(other,self))ifshould_try_ops_on_diff_frame:# Try to perform it with 'compute.ops_on_diff_frame' option.psdf=self.to_frame()tmp_cond_col=verify_temp_column_name(psdf,"__tmp_cond_col__")tmp_other_col=verify_temp_column_name(psdf,"__tmp_other_col__")psdf[tmp_cond_col]=condpsdf[tmp_other_col]=other# above logic makes a Spark DataFrame looks like below:# +-----------------+---+----------------+-----------------+# |__index_level_0__| 0|__tmp_cond_col__|__tmp_other_col__|# +-----------------+---+----------------+-----------------+# | 0| 0| false| 100|# | 1| 1| false| 200|# | 3| 3| true| 400|# | 2| 2| true| 300|# | 4| 4| true| 500|# +-----------------+---+----------------+-----------------+condition=(F.when(psdf[tmp_cond_col].spark.column,psdf._psser_for(psdf._internal.column_labels[0]).spark.column,).otherwise(psdf[tmp_other_col].spark.column).alias(psdf._internal.data_spark_column_names[0]))internal=psdf._internal.with_new_columns([condition],column_labels=self._internal.column_labels)returnfirst_series(DataFrame(internal))else:ifisinstance(other,Series):other=other.spark.columncondition=(F.when(cond.spark.column,self.spark.column).otherwise(other).alias(self._internal.data_spark_column_names[0]))returnself._with_new_scol(condition)
[docs]defmask(self,cond:"Series",other:Any=np.nan)->"Series":""" Replace values where the condition is True. Parameters ---------- cond : boolean Series Where cond is False, keep the original value. Where True, replace with corresponding value from other. other : scalar, Series Entries where cond is True are replaced with corresponding value from other. Returns ------- Series Examples -------- >>> from pyspark.pandas.config import set_option, reset_option >>> set_option("compute.ops_on_diff_frames", True) >>> s1 = ps.Series([0, 1, 2, 3, 4]) >>> s2 = ps.Series([100, 200, 300, 400, 500]) >>> s1.mask(s1 > 0).sort_index() 0 0.0 1 NaN 2 NaN 3 NaN 4 NaN dtype: float64 >>> s1.mask(s1 > 1, 10).sort_index() 0 0 1 1 2 10 3 10 4 10 dtype: int64 >>> s1.mask(s1 > 1, s1 + 100).sort_index() 0 0 1 1 2 102 3 103 4 104 dtype: int64 >>> s1.mask(s1 > 1, s2).sort_index() 0 0 1 1 2 300 3 400 4 500 dtype: int64 >>> reset_option("compute.ops_on_diff_frames") """returnself.where(~cond,other)
[docs]defxs(self,key:Name,level:Optional[int]=None)->"Series":""" Return cross-section from the Series. This method takes a `key` argument to select data at a particular level of a MultiIndex. Parameters ---------- key : label or tuple of label Label contained in the index, or partially in a MultiIndex. level : object, defaults to first n levels (n=1 or len(key)) In case of a key partially contained in a MultiIndex, indicate which levels are used. Levels can be referred by label or position. Returns ------- Series Cross-section from the original Series corresponding to the selected index levels. Examples -------- >>> midx = pd.MultiIndex([['a', 'b', 'c'], ... ['lama', 'cow', 'falcon'], ... ['speed', 'weight', 'length']], ... [[0, 0, 0, 1, 1, 1, 2, 2, 2], ... [0, 0, 0, 1, 1, 1, 2, 2, 2], ... [0, 1, 2, 0, 1, 2, 0, 1, 2]]) >>> s = ps.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3], ... index=midx) >>> s a lama speed 45.0 weight 200.0 length 1.2 b cow speed 30.0 weight 250.0 length 1.5 c falcon speed 320.0 weight 1.0 length 0.3 dtype: float64 Get values at specified index >>> s.xs('a') lama speed 45.0 weight 200.0 length 1.2 dtype: float64 Get values at several indexes >>> s.xs(('a', 'lama')) speed 45.0 weight 200.0 length 1.2 dtype: float64 Get values at specified index and level >>> s.xs('lama', level=1) a speed 45.0 weight 200.0 length 1.2 dtype: float64 """ifnotisinstance(key,tuple):key=(key,)iflevelisNone:level=0internal=self._internalscols=(internal.index_spark_columns[:level]+internal.index_spark_columns[level+len(key):]+[self.spark.column])rows=[internal.spark_columns[lvl]==indexforlvl,indexinenumerate(key,level)]sdf=internal.spark_frame.filter(reduce(lambdax,y:x&y,rows)).select(scols)ifinternal.index_level==len(key):# if spark_frame has one column and one data, return data only without framepdf=sdf.limit(2).toPandas()length=len(pdf)iflength==1:returnpdf[self._internal.data_spark_column_names[0]].iloc[0]index_spark_column_names=(internal.index_spark_column_names[:level]+internal.index_spark_column_names[level+len(key):])index_names=internal.index_names[:level]+internal.index_names[level+len(key):]index_fields=internal.index_fields[:level]+internal.index_fields[level+len(key):]internal=internal.copy(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolinindex_spark_column_names],index_names=index_names,index_fields=index_fields,data_spark_columns=[scol_for(sdf,internal.data_spark_column_names[0])],)returnfirst_series(DataFrame(internal))
[docs]defpct_change(self,periods:int=1)->"Series":""" Percentage change between the current and a prior element. .. note:: the current implementation of this API uses Spark's Window without specifying partition specification. This leads to moveing all data into a single partition in a single machine and could cause serious performance degradation. Avoid this method with very large datasets. Parameters ---------- periods : int, default 1 Periods to shift for forming percent change. Returns ------- Series Examples -------- >>> psser = ps.Series([90, 91, 85], index=[2, 4, 1]) >>> psser 2 90 4 91 1 85 dtype: int64 >>> psser.pct_change() 2 NaN 4 0.011111 1 -0.065934 dtype: float64 >>> psser.sort_index().pct_change() 1 NaN 2 0.058824 4 0.011111 dtype: float64 >>> psser.pct_change(periods=2) 2 NaN 4 NaN 1 -0.055556 dtype: float64 """scol=self.spark.columnwindow=Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(-periods,-periods)prev_row=F.lag(scol,periods).over(window)returnself._with_new_scol((scol-prev_row)/prev_row).spark.analyzed
[docs]defcombine_first(self,other:"Series")->"Series":""" Combine Series values, choosing the calling Series's values first. Parameters ---------- other : Series The value(s) to be combined with the `Series`. Returns ------- Series The result of combining the Series with the other object. See Also -------- Series.combine : Perform element-wise operation on two Series using a given function. Notes ----- Result index will be the union of the two indexes. Examples -------- >>> s1 = ps.Series([1, np.nan]) >>> s2 = ps.Series([3, 4]) >>> with ps.option_context("compute.ops_on_diff_frames", True): ... s1.combine_first(s2) 0 1.0 1 4.0 dtype: float64 """ifnotisinstance(other,ps.Series):raiseTypeError("`combine_first` only allows `Series` for parameter `other`")ifsame_anchor(self,other):this=self.spark.columnthat=other.spark.columncombined=self._psdfelse:combined=combine_frames(self._psdf,other._psdf)this=combined["this"]._internal.spark_column_for(self._column_label)that=combined["that"]._internal.spark_column_for(other._column_label)# If `self` has missing value, use value of `other`cond=F.when(this.isNull(),that).otherwise(this)# If `self` and `other` come from same frame, the anchor should be keptifsame_anchor(self,other):returnself._with_new_scol(cond)# TODO: dtype?index_scols=combined._internal.index_spark_columnssdf=combined._internal.spark_frame.select(*index_scols,cond.alias(self._internal.data_spark_column_names[0])).distinct()internal=self._internal.with_new_sdf(sdf,index_fields=combined._internal.index_fields,data_fields=[None]# TODO: dtype?)returnfirst_series(DataFrame(internal))
[docs]defdot(self,other:Union["Series",DataFrame])->Union[Scalar,"Series"]:""" Compute the dot product between the Series and the columns of other. This method computes the dot product between the Series and another one, or the Series and each columns of a DataFrame. It can also be called using `self @ other` in Python >= 3.5. .. note:: This API is slightly different from pandas when indexes from both Series are not aligned and config 'compute.eager_check' is False. pandas raise an exception; however, pandas-on-Spark just proceeds and performs by ignoring mismatches with NaN permissively. >>> pdf1 = pd.Series([1, 2, 3], index=[0, 1, 2]) >>> pdf2 = pd.Series([1, 2, 3], index=[0, 1, 3]) >>> pdf1.dot(pdf2) # doctest: +SKIP ... ValueError: matrices are not aligned >>> psdf1 = ps.Series([1, 2, 3], index=[0, 1, 2]) >>> psdf2 = ps.Series([1, 2, 3], index=[0, 1, 3]) >>> with ps.option_context("compute.eager_check", False): ... psdf1.dot(psdf2) # doctest: +SKIP ... 5 Parameters ---------- other : Series, DataFrame. The other object to compute the dot product with its columns. Returns ------- scalar, Series Return the dot product of the Series and other if other is a Series, the Series of the dot product of Series and each row of other if other is a DataFrame. Notes ----- The Series and other must share the same index if other are a Series or a DataFrame. Examples -------- >>> s = ps.Series([0, 1, 2, 3]) >>> s.dot(s) 14 >>> s @ s 14 >>> psdf = ps.DataFrame({'x': [0, 1, 2, 3], 'y': [0, -1, -2, -3]}) >>> psdf x y 0 0 0 1 1 -1 2 2 -2 3 3 -3 >>> with ps.option_context("compute.ops_on_diff_frames", True): ... s.dot(psdf) ... x 14 y -14 dtype: int64 """ifnotsame_anchor(self,other):ifget_option("compute.eager_check")andnotcast(ps.Index,self.index.sort_values()).equals(cast(ps.Index,other.index.sort_values())):raiseValueError("matrices are not aligned")eliflen(self.index)!=len(other.index):raiseValueError("matrices are not aligned")ifisinstance(other,DataFrame):other_copy:DataFrame=other.copy()column_labels=other_copy._internal.column_labelsself_column_label=verify_temp_column_name(other_copy,"__self_column__")other_copy[self_column_label]=selfself_psser=other_copy._psser_for(self_column_label)product_pssers=[cast(Series,other_copy._psser_for(label)*self_psser)forlabelincolumn_labels]dot_product_psser=DataFrame(other_copy._internal.with_new_columns(product_pssers,column_labels=column_labels)).sum()returncast(Series,dot_product_psser).rename(self.name)else:assertisinstance(other,Series)return(self*other).sum()
def__matmul__(self,other:Union["Series",DataFrame])->Union[Scalar,"Series"]:""" Matrix multiplication using binary `@` operator in Python>=3.5. """returnself.dot(other)
[docs]defrepeat(self,repeats:Union[int,"Series"])->"Series":""" Repeat elements of a Series. Returns a new Series where each element of the current Series is repeated consecutively a given number of times. Parameters ---------- repeats : int or Series The number of repetitions for each element. This should be a non-negative integer. Repeating 0 times will return an empty Series. Returns ------- Series Newly created Series with repeated elements. See Also -------- Index.repeat : Equivalent function for Index. Examples -------- >>> s = ps.Series(['a', 'b', 'c']) >>> s 0 a 1 b 2 c dtype: object >>> s.repeat(2) 0 a 1 b 2 c 0 a 1 b 2 c dtype: object >>> ps.Series([1, 2, 3]).repeat(0) Series([], dtype: int64) """ifnotisinstance(repeats,(int,Series)):raiseTypeError("`repeats` argument must be integer or Series, but got {}".format(type(repeats)))ifisinstance(repeats,Series):ifnotsame_anchor(self,repeats):psdf=self.to_frame()temp_repeats=verify_temp_column_name(psdf,"__temp_repeats__")psdf[temp_repeats]=repeatsreturn(psdf._psser_for(psdf._internal.column_labels[0]).repeat(psdf[temp_repeats]).rename(self.name))else:scol=F.explode(F.array_repeat(self.spark.column,repeats.astype("int32").spark.column)).alias(name_like_string(self.name))sdf=self._internal.spark_frame.select(self._internal.index_spark_columns+[scol])internal=self._internal.copy(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolinself._internal.index_spark_column_names],data_spark_columns=[scol_for(sdf,name_like_string(self.name))],)returnfirst_series(DataFrame(internal))else:ifrepeats<0:raiseValueError("negative dimensions are not allowed")psdf=self._psdf[[self.name]]ifrepeats==0:returnfirst_series(DataFrame(psdf._internal.with_filter(F.lit(False))))else:returnfirst_series(cast("ps.DataFrame",ps.concat([psdf]*repeats)))
[docs]defasof(self,where:Union[Any,List])->Union[Scalar,"Series"]:""" Return the last row(s) without any NaNs before `where`. The last row (for each element in `where`, if list) without any NaN is taken. If there is no good value, NaN is returned. .. note:: This API is dependent on :meth:`Index.is_monotonic_increasing` which is expensive. Parameters ---------- where : index or array-like of indices Returns ------- scalar or Series The return can be: * scalar : when `self` is a Series and `where` is a scalar * Series: when `self` is a Series and `where` is an array-like Return scalar or Series Notes ----- Indices are assumed to be sorted. Raises if this is not the case and config 'compute.eager_check' is True. If 'compute.eager_check' is False pandas-on-Spark just proceeds and performs by ignoring the indeces's order Examples -------- >>> s = ps.Series([1, 2, np.nan, 4], index=[10, 20, 30, 40]) >>> s 10 1.0 20 2.0 30 NaN 40 4.0 dtype: float64 A scalar `where`. >>> s.asof(20) 2.0 For a sequence `where`, a Series is returned. The first value is NaN, because the first element of `where` is before the first index value. >>> s.asof([5, 20]).sort_index() 5 NaN 20 2.0 dtype: float64 Missing values are not considered. The following is ``2.0``, not NaN, even though NaN is at the index location for ``30``. >>> s.asof(30) 2.0 >>> s = ps.Series([1, 2, np.nan, 4], index=[10, 30, 20, 40]) >>> with ps.option_context("compute.eager_check", False): ... s.asof(20) ... 1.0 """should_return_series=Trueifisinstance(self.index,ps.MultiIndex):raiseValueError("asof is not supported for a MultiIndex")ifisinstance(where,(ps.Index,ps.Series,DataFrame)):raiseValueError("where cannot be an Index, Series or a DataFrame")ifget_option("compute.eager_check")andnotself.index.is_monotonic_increasing:raiseValueError("asof requires a sorted index")ifnotis_list_like(where):should_return_series=Falsewhere=[where]internal=self._internal.resolved_copyindex_scol=internal.index_spark_columns[0]index_type=internal.spark_type_for(index_scol)spark_column=internal.data_spark_columns[0]monotonically_increasing_id_column=verify_temp_column_name(internal.spark_frame,"__monotonically_increasing_id__")cond=[F.max_by(spark_column,F.when((index_scol<=F.lit(index).cast(index_type))&spark_column.isNotNull()ifpd.notna(index)# If index is nan and the value of the col is not null# then return monotonically_increasing_id. This will let max by# to return last index value, which is the behaviour of pandaselsespark_column.isNotNull(),F.col(monotonically_increasing_id_column),),)forindexinwhere]sdf=internal.spark_frame.withColumn(monotonically_increasing_id_column,F.monotonically_increasing_id()).select(cond)ifnotshould_return_series:withsql_conf({SPARK_CONF_ARROW_ENABLED:False}):# Disable Arrow to keep row ordering.result=sdf.limit(1).toPandas().iloc[0,0]returnresultifresultisnotNoneelsenp.nan# The data is expected to be small so it's fine to transpose/use default index.withps.option_context("compute.default_index_type","distributed","compute.max_rows",1):iflen(where)==len(set(where))andnotisinstance(index_type,TimestampType):psdf:DataFrame=DataFrame(sdf)psdf.columns=pd.Index(where)returnfirst_series(psdf.transpose()).rename(self.name)else:# If `where` has duplicate items, leverage the pandas directly# since pandas API on Spark doesn't support the duplicate column name.pdf:pd.DataFrame=sdf.limit(1).toPandas()pdf.columns=pd.Index(where)returnfirst_series(DataFrame(pdf.transpose())).rename(self.name)
[docs]defmad(self)->float:""" Return the mean absolute deviation of values. .. deprecated:: 3.4.0 Examples -------- >>> s = ps.Series([1, 2, 3, 4]) >>> s 0 1 1 2 2 3 3 4 dtype: int64 >>> s.mad() 1.0 """warnings.warn("The 'mad' method is deprecated and will be removed in a future version. ""To compute the same result, you may do `(series - series.mean()).abs().mean()`.",FutureWarning,)sdf=self._internal.spark_framespark_column=self.spark.columnavg=unpack_scalar(sdf.select(F.avg(spark_column)))mad=unpack_scalar(sdf.select(F.avg(F.abs(spark_column-avg))))returnmad
[docs]defunstack(self,level:int=-1)->DataFrame:""" Unstack, a.k.a. pivot, Series with MultiIndex to produce DataFrame. The level involved will automatically get sorted. Notes ----- Unlike pandas, pandas-on-Spark doesn't check whether an index is duplicated or not because the checking of duplicated index requires scanning whole data which can be quite expensive. Parameters ---------- level : int, str, or list of these, default last level Level(s) to unstack, can pass level name. Returns ------- DataFrame Unstacked Series. Examples -------- >>> s = ps.Series([1, 2, 3, 4], ... index=pd.MultiIndex.from_product([['one', 'two'], ... ['a', 'b']])) >>> s one a 1 b 2 two a 3 b 4 dtype: int64 >>> s.unstack(level=-1).sort_index() a b one 1 2 two 3 4 >>> s.unstack(level=0).sort_index() one two a 1 3 b 2 4 """ifnotisinstance(self.index,ps.MultiIndex):raiseValueError("Series.unstack only support for a MultiIndex")index_nlevels=self.index.nlevelsiflevel>0and(level>index_nlevels-1):raiseIndexError("Too many levels: Index has only {} levels, not {}".format(index_nlevels,level+1))eliflevel<0and(level<-index_nlevels):raiseIndexError("Too many levels: Index has only {} levels, {} is not a valid level number".format(index_nlevels,level))internal=self._internal.resolved_copyindex_map=list(zip(internal.index_spark_column_names,internal.index_names,internal.index_fields))pivot_col,column_label_names,_=index_map.pop(level)index_scol_names,index_names,index_fields=zip(*index_map)col=internal.data_spark_column_names[0]sdf=internal.spark_framesdf=sdf.groupby(list(index_scol_names)).pivot(pivot_col).agg(F.first(scol_for(sdf,col)))internal=InternalFrame(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolinindex_scol_names],index_names=list(index_names),index_fields=list(index_fields),column_label_names=[column_label_names],)internal=internal.copy(data_fields=[field.copy(dtype=self._internal.data_fields[0].dtype)forfieldininternal.data_fields])returnDataFrame(internal)
[docs]defitem(self)->Scalar:""" Return the first element of the underlying data as a Python scalar. Returns ------- scalar The first element of Series. Raises ------ ValueError If the data is not length-1. Examples -------- >>> psser = ps.Series([10]) >>> psser.item() 10 """returnself.head(2)._to_internal_pandas().item()
[docs]defitems(self)->Iterable[Tuple[Name,Any]]:""" Lazily iterate over (index, value) tuples. This method returns an iterable tuple (index, value). This is convenient if you want to create a lazy iterator. .. note:: Unlike pandas', the iteritems in pandas-on-Spark returns generator rather zip object Returns ------- iterable Iterable of tuples containing the (index, value) pairs from a Series. See Also -------- DataFrame.items : Iterate over (column name, Series) pairs. DataFrame.iterrows : Iterate over DataFrame rows as (index, Series) pairs. Examples -------- >>> s = ps.Series(['A', 'B', 'C']) >>> for index, value in s.items(): ... print("Index : {}, Value : {}".format(index, value)) Index : 0, Value : A Index : 1, Value : B Index : 2, Value : C """internal_index_columns=self._internal.index_spark_column_namesinternal_data_column=self._internal.data_spark_column_names[0]defextract_kv_from_spark_row(row:Row)->Tuple[Name,Any]:k=(row[internal_index_columns[0]]iflen(internal_index_columns)==1elsetuple(row[c]forcininternal_index_columns))v=row[internal_data_column]returnk,vfork,vinmap(extract_kv_from_spark_row,self._internal.resolved_copy.spark_frame.toLocalIterator()):yieldk,v
[docs]defiteritems(self)->Iterable[Tuple[Name,Any]]:""" This is an alias of ``items``. .. deprecated:: 3.4.0 iteritems is deprecated and will be removed in a future version. Use .items instead. """warnings.warn("Deprecated in 3.4, Use Series.items instead.",FutureWarning)returnself.items()
[docs]defdroplevel(self,level:Union[int,Name,List[Union[int,Name]]])->"Series":""" Return Series with requested index level(s) removed. Parameters ---------- level : int, str, or list-like If a string is given, must be the name of a level If list-like, elements must be names or positional indexes of levels. Returns ------- Series Series with requested index level(s) removed. Examples -------- >>> psser = ps.Series( ... [1, 2, 3], ... index=pd.MultiIndex.from_tuples( ... [("x", "a"), ("x", "b"), ("y", "c")], names=["level_1", "level_2"] ... ), ... ) >>> psser level_1 level_2 x a 1 b 2 y c 3 dtype: int64 Removing specific index level by level >>> psser.droplevel(0) level_2 a 1 b 2 c 3 dtype: int64 Removing specific index level by name >>> psser.droplevel("level_2") level_1 x 1 x 2 y 3 dtype: int64 """returnfirst_series(self.to_frame().droplevel(level=level,axis=0)).rename(self.name)
[docs]deftail(self,n:int=5)->"Series":""" Return the last `n` rows. This function returns last `n` rows from the object based on position. It is useful for quickly verifying data, for example, after sorting or appending rows. For negative values of `n`, this function returns all rows except the first `n` rows, equivalent to ``df[n:]``. Parameters ---------- n : int, default 5 Number of rows to select. Returns ------- type of caller The last `n` rows of the caller object. See Also -------- DataFrame.head : The first `n` rows of the caller object. Examples -------- >>> psser = ps.Series([1, 2, 3, 4, 5]) >>> psser 0 1 1 2 2 3 3 4 4 5 dtype: int64 >>> psser.tail(3) # doctest: +SKIP 2 3 3 4 4 5 dtype: int64 """returnfirst_series(self.to_frame().tail(n=n)).rename(self.name)
[docs]defexplode(self)->"Series":""" Transform each element of a list-like to a row. Returns ------- Series Exploded lists to rows; index will be duplicated for these rows. See Also -------- Series.str.split : Split string values on specified separator. Series.unstack : Unstack, a.k.a. pivot, Series with MultiIndex to produce DataFrame. DataFrame.melt : Unpivot a DataFrame from wide format to long format. DataFrame.explode : Explode a DataFrame from list-like columns to long format. Examples -------- >>> psser = ps.Series([[1, 2, 3], [], [3, 4]]) >>> psser 0 [1, 2, 3] 1 [] 2 [3, 4] dtype: object >>> psser.explode() # doctest: +SKIP 0 1.0 0 2.0 0 3.0 1 NaN 2 3.0 2 4.0 dtype: float64 """ifnotisinstance(self.spark.data_type,ArrayType):returnself.copy()scol=F.explode_outer(self.spark.column).alias(name_like_string(self._column_label))internal=self._internal.with_new_columns([scol],keep_order=False)returnfirst_series(DataFrame(internal))
[docs]defargsort(self)->"Series":""" Return the integer indices that would sort the Series values. Unlike pandas, the index order is not preserved in the result. Returns ------- Series Positions of values within the sort order with -1 indicating nan values. Examples -------- >>> psser = ps.Series([3, 3, 4, 1, 6, 2, 3, 7, 8, 7, 10]) >>> psser 0 3 1 3 2 4 3 1 4 6 5 2 6 3 7 7 8 8 9 7 10 10 dtype: int64 >>> psser.argsort().sort_index() 0 3 1 5 2 0 3 1 4 6 5 2 6 4 7 7 8 9 9 8 10 10 dtype: int64 """notnull=self.loc[self.notnull()]sdf_for_index=notnull._internal.spark_frame.select(notnull._internal.index_spark_columns)tmp_join_key=verify_temp_column_name(sdf_for_index,"__tmp_join_key__")sdf_for_index=InternalFrame.attach_distributed_sequence_column(sdf_for_index,tmp_join_key)# sdf_for_index:# +----------------+-----------------+# |__tmp_join_key__|__index_level_0__|# +----------------+-----------------+# | 0| 0|# | 1| 1|# | 2| 2|# | 3| 3|# | 4| 4|# +----------------+-----------------+sdf_for_data=notnull._internal.spark_frame.select(notnull.spark.column.alias("values"),NATURAL_ORDER_COLUMN_NAME)sdf_for_data=InternalFrame.attach_distributed_sequence_column(sdf_for_data,SPARK_DEFAULT_SERIES_NAME)# sdf_for_data:# +---+------+-----------------+# | 0|values|__natural_order__|# +---+------+-----------------+# | 0| 3| 25769803776|# | 1| 3| 51539607552|# | 2| 4| 77309411328|# | 3| 1| 103079215104|# | 4| 2| 128849018880|# +---+------+-----------------+sdf_for_data=sdf_for_data.sort(scol_for(sdf_for_data,"values"),NATURAL_ORDER_COLUMN_NAME).drop("values",NATURAL_ORDER_COLUMN_NAME)tmp_join_key=verify_temp_column_name(sdf_for_data,"__tmp_join_key__")sdf_for_data=InternalFrame.attach_distributed_sequence_column(sdf_for_data,tmp_join_key)# sdf_for_index: sdf_for_data:# +----------------+-----------------+ +----------------+---+# |__tmp_join_key__|__index_level_0__| |__tmp_join_key__| 0|# +----------------+-----------------+ +----------------+---+# | 0| 0| | 0| 3|# | 1| 1| | 1| 4|# | 2| 2| | 2| 0|# | 3| 3| | 3| 1|# | 4| 4| | 4| 2|# +----------------+-----------------+ +----------------+---+sdf=sdf_for_index.join(sdf_for_data,on=tmp_join_key).drop(tmp_join_key)internal=self._internal.with_new_sdf(spark_frame=sdf,data_columns=[SPARK_DEFAULT_SERIES_NAME],index_fields=[InternalField(dtype=field.dtype)forfieldinself._internal.index_fields],data_fields=[None],)psser=first_series(DataFrame(internal))returncast(Series,ps.concat([psser,self.loc[self.isnull()].spark.transform(lambda_:F.lit(-1))]),)
[docs]defargmax(self,axis:Axis=None,skipna:bool=True)->int:""" Return int position of the largest value in the Series. If the maximum is achieved in multiple locations, the first row position is returned. Parameters ---------- axis : None Dummy argument for consistency with Series. skipna : bool, default True Exclude NA/null values. Returns ------- int Row position of the maximum value. Examples -------- Consider dataset containing cereal calories >>> s = ps.Series({'Corn Flakes': 100.0, 'Almond Delight': 110.0, 'Unknown': np.nan, ... 'Cinnamon Toast Crunch': 120.0, 'Cocoa Puff': 110.0}) >>> s Corn Flakes 100.0 Almond Delight 110.0 Unknown NaN Cinnamon Toast Crunch 120.0 Cocoa Puff 110.0 dtype: float64 >>> s.argmax() 3 >>> s.argmax(skipna=False) -1 """axis=validate_axis(axis,none_axis=0)ifaxis==1:raiseValueError("axis can only be 0 or 'index'")sdf=self._internal.spark_frame.select(self.spark.column,NATURAL_ORDER_COLUMN_NAME)seq_col_name=verify_temp_column_name(sdf,"__distributed_sequence_column__")sdf=InternalFrame.attach_distributed_sequence_column(sdf,seq_col_name,)scol=scol_for(sdf,self._internal.data_spark_column_names[0])ifskipna:sdf=sdf.orderBy(scol.desc_nulls_last(),NATURAL_ORDER_COLUMN_NAME,seq_col_name)else:sdf=sdf.orderBy(scol.desc_nulls_first(),NATURAL_ORDER_COLUMN_NAME,seq_col_name)results=sdf.select(scol,seq_col_name).take(1)iflen(results)==0:raiseValueError("attempt to get argmax of an empty sequence")else:max_value=results[0]# If the maximum is achieved in multiple locations, the first row position is returned.return-1ifmax_value[0]isNoneelsemax_value[1]
[docs]defargmin(self,axis:Axis=None,skipna:bool=True)->int:""" Return int position of the smallest value in the Series. If the minimum is achieved in multiple locations, the first row position is returned. Parameters ---------- axis : None Dummy argument for consistency with Series. skipna : bool, default True Exclude NA/null values. Returns ------- int Row position of the minimum value. Examples -------- Consider dataset containing cereal calories >>> s = ps.Series({'Corn Flakes': 100.0, 'Almond Delight': 110.0, ... 'Cinnamon Toast Crunch': 120.0, 'Cocoa Puff': 110.0}) >>> s # doctest: +SKIP Corn Flakes 100.0 Almond Delight 110.0 Cinnamon Toast Crunch 120.0 Cocoa Puff 110.0 dtype: float64 >>> s.argmin() # doctest: +SKIP 0 """axis=validate_axis(axis,none_axis=0)ifaxis==1:raiseValueError("axis can only be 0 or 'index'")sdf=self._internal.spark_frame.select(self.spark.column,NATURAL_ORDER_COLUMN_NAME)seq_col_name=verify_temp_column_name(sdf,"__distributed_sequence_column__")sdf=InternalFrame.attach_distributed_sequence_column(sdf,seq_col_name,)scol=scol_for(sdf,self._internal.data_spark_column_names[0])ifskipna:sdf=sdf.orderBy(scol.asc_nulls_last(),NATURAL_ORDER_COLUMN_NAME,seq_col_name)else:sdf=sdf.orderBy(scol.asc_nulls_first(),NATURAL_ORDER_COLUMN_NAME,seq_col_name)results=sdf.select(scol,seq_col_name).take(1)iflen(results)==0:raiseValueError("attempt to get argmin of an empty sequence")else:min_value=results[0]# If the maximum is achieved in multiple locations, the first row position is returned.return-1ifmin_value[0]isNoneelsemin_value[1]
[docs]defcompare(self,other:"Series",keep_shape:bool=False,keep_equal:bool=False)->DataFrame:""" Compare to another Series and show the differences. .. note:: This API is slightly different from pandas when indexes from both Series are not identical and config 'compute.eager_check' is False. pandas raise an exception; however, pandas-on-Spark just proceeds and performs by ignoring mismatches. >>> psser1 = ps.Series([1, 2, 3, 4, 5], index=pd.Index([1, 2, 3, 4, 5])) >>> psser2 = ps.Series([1, 2, 3, 4, 5], index=pd.Index([1, 2, 4, 3, 6])) >>> psser1.compare(psser2) # doctest: +SKIP ... ValueError: Can only compare identically-labeled Series objects >>> with ps.option_context("compute.eager_check", False): ... psser1.compare(psser2) # doctest: +SKIP ... self other 3 3.0 4.0 4 4.0 3.0 5 5.0 NaN 6 NaN 5.0 Parameters ---------- other : Series Object to compare with. keep_shape : bool, default False If true, all rows and columns are kept. Otherwise, only the ones with different values are kept. keep_equal : bool, default False If true, the result keeps values that are equal. Otherwise, equal values are shown as NaNs. Returns ------- DataFrame Notes ----- Matching NaNs will not appear as a difference. Examples -------- >>> from pyspark.pandas.config import set_option, reset_option >>> set_option("compute.ops_on_diff_frames", True) >>> s1 = ps.Series(["a", "b", "c", "d", "e"]) >>> s2 = ps.Series(["a", "a", "c", "b", "e"]) Align the differences on columns >>> s1.compare(s2).sort_index() self other 1 b a 3 d b Keep all original rows >>> s1.compare(s2, keep_shape=True).sort_index() self other 0 None None 1 b a 2 None None 3 d b 4 None None Keep all original rows and all original values >>> s1.compare(s2, keep_shape=True, keep_equal=True).sort_index() self other 0 a a 1 b a 2 c c 3 d b 4 e e >>> reset_option("compute.ops_on_diff_frames") """combined:DataFrameifsame_anchor(self,other):self_column_label=verify_temp_column_name(other.to_frame(),"__self_column__")other_column_label=verify_temp_column_name(self.to_frame(),"__other_column__")combined=DataFrame(self._internal.with_new_columns([self.rename(self_column_label),other.rename(other_column_label)]))else:ifget_option("compute.eager_check")andnotself.index.equals(other.index):raiseValueError("Can only compare identically-labeled Series objects")combined=combine_frames(self.to_frame(),other.to_frame())this_column_label="self"that_column_label="other"ifkeep_equalandkeep_shape:combined.columns=pd.Index([this_column_label,that_column_label])returncombinedthis_data_scol=combined._internal.data_spark_columns[0]that_data_scol=combined._internal.data_spark_columns[1]index_scols=combined._internal.index_spark_columnssdf=combined._internal.spark_frameifkeep_shape:this_scol=(F.when(this_data_scol==that_data_scol,None).otherwise(this_data_scol).alias(this_column_label))this_field=combined._internal.data_fields[0].copy(name=this_column_label,nullable=True)that_scol=(F.when(this_data_scol==that_data_scol,None).otherwise(that_data_scol).alias(that_column_label))that_field=combined._internal.data_fields[1].copy(name=that_column_label,nullable=True)else:sdf=sdf.filter(~this_data_scol.eqNullSafe(that_data_scol))this_scol=this_data_scol.alias(this_column_label)this_field=combined._internal.data_fields[0].copy(name=this_column_label)that_scol=that_data_scol.alias(that_column_label)that_field=combined._internal.data_fields[1].copy(name=that_column_label)sdf=sdf.select(*index_scols,this_scol,that_scol,NATURAL_ORDER_COLUMN_NAME)internal=InternalFrame(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolinself._internal.index_spark_column_names],index_names=self._internal.index_names,index_fields=combined._internal.index_fields,column_labels=[(this_column_label,),(that_column_label,)],data_spark_columns=[scol_for(sdf,this_column_label),scol_for(sdf,that_column_label)],data_fields=[this_field,that_field],column_label_names=[None],)returnDataFrame(internal)
# TODO(SPARK-40553): 1, support array-like 'value'; 2, add parameter 'sorter'
[docs]defsearchsorted(self,value:Any,side:str="left")->int:""" Find indices where elements should be inserted to maintain order. Find the indices into a sorted Series self such that, if the corresponding elements in value were inserted before the indices, the order of self would be preserved. .. versionadded:: 3.4.0 Parameters ---------- value : scalar Values to insert into self. side : {‘left’, ‘right’}, optional If ‘left’, the index of the first suitable location found is given. If ‘right’, return the last such index. If there is no suitable index, return either 0 or N (where N is the length of self). Returns ------- int insertion point Notes ----- The Series must be monotonically sorted, otherwise wrong locations will likely be returned. Examples -------- >>> ser = ps.Series([1, 2, 2, 3]) >>> ser.searchsorted(0) 0 >>> ser.searchsorted(1) 0 >>> ser.searchsorted(2) 1 >>> ser.searchsorted(5) 4 >>> ser.searchsorted(0, side="right") 0 >>> ser.searchsorted(1, side="right") 1 >>> ser.searchsorted(2, side="right") 3 >>> ser.searchsorted(5, side="right") 4 """ifsidenotin["left","right"]:raiseValueError(f"Invalid side {side}")sdf=self._internal.spark_frameindex_col_name=verify_temp_column_name(sdf,"__search_sorted_index_col__")value_col_name=verify_temp_column_name(sdf,"__search_sorted_value_col__")sdf=InternalFrame.attach_distributed_sequence_column(sdf.select(self.spark.column.alias(value_col_name)),index_col_name)ifside=="left":results=sdf.select(F.min(F.when(F.lit(value)<=F.col(value_col_name),F.col(index_col_name))),F.count(F.lit(0)),).take(1)else:results=sdf.select(F.min(F.when(F.lit(value)<F.col(value_col_name),F.col(index_col_name))),F.count(F.lit(0)),).take(1)iflen(results)==0:return0else:returnresults[0][1]ifresults[0][0]isNoneelseresults[0][0]
[docs]defalign(self,other:Union[DataFrame,"Series"],join:str="outer",axis:Optional[Axis]=None,copy:bool=True,)->Tuple["Series",Union[DataFrame,"Series"]]:""" Align two objects on their axes with the specified join method. Join method is specified for each axis Index. Parameters ---------- other : DataFrame or Series join : {{'outer', 'inner', 'left', 'right'}}, default 'outer' axis : allowed axis of the other object, default None Align on index (0), columns (1), or both (None). copy : bool, default True Always returns new objects. If copy=False and no reindexing is required then original objects are returned. Returns ------- (left, right) : (Series, type of other) Aligned objects. Examples -------- >>> ps.set_option("compute.ops_on_diff_frames", True) >>> s1 = ps.Series([7, 8, 9], index=[10, 11, 12]) >>> s2 = ps.Series(["g", "h", "i"], index=[10, 20, 30]) >>> aligned_l, aligned_r = s1.align(s2) >>> aligned_l.sort_index() 10 7.0 11 8.0 12 9.0 20 NaN 30 NaN dtype: float64 >>> aligned_r.sort_index() 10 g 11 None 12 None 20 h 30 i dtype: object Align with the join type "inner": >>> aligned_l, aligned_r = s1.align(s2, join="inner") >>> aligned_l.sort_index() 10 7 dtype: int64 >>> aligned_r.sort_index() 10 g dtype: object Align with a DataFrame: >>> df = ps.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]}, index=[10, 20, 30]) >>> aligned_l, aligned_r = s1.align(df) >>> aligned_l.sort_index() 10 7.0 11 8.0 12 9.0 20 NaN 30 NaN dtype: float64 >>> aligned_r.sort_index() a b 10 1.0 a 11 NaN None 12 NaN None 20 2.0 b 30 3.0 c >>> ps.reset_option("compute.ops_on_diff_frames") """axis=validate_axis(axis)ifaxis==1:raiseValueError("Series does not support columns axis.")self_df=self.to_frame()left,right=self_df.align(other,join=join,axis=axis,copy=False)ifleftisself_df:left_ser=selfelse:left_ser=first_series(left).rename(self.name)return(left_ser.copy(),right.copy())ifcopyelse(left_ser,right)
# TODO(SPARK-42620): Add `inclusive` parameter and replace `include_start` & `include_end`.# See https://github.com/pandas-dev/pandas/issues/43248
[docs]defbetween_time(self,start_time:Union[datetime.time,str],end_time:Union[datetime.time,str],include_start:bool=True,include_end:bool=True,axis:Axis=0,)->"Series":""" Select values between particular times of the day (example: 9:00-9:30 AM). By setting ``start_time`` to be later than ``end_time``, you can get the times that are *not* between the two times. Parameters ---------- start_time : datetime.time or str Initial time as a time filter limit. end_time : datetime.time or str End time as a time filter limit. include_start : bool, default True Whether the start time needs to be included in the result. .. deprecated:: 3.4.0 include_end : bool, default True Whether the end time needs to be included in the result. .. deprecated:: 3.4.0 axis : {0 or 'index', 1 or 'columns'}, default 0 Determine range time on index or columns value. Returns ------- Series Data from the original object filtered to the specified dates range. Raises ------ TypeError If the index is not a :class:`DatetimeIndex` See Also -------- at_time : Select values at a particular time of the day. last : Select final periods of time series based on a date offset. DatetimeIndex.indexer_between_time : Get just the index locations for values between particular times of the day. Examples -------- >>> idx = pd.date_range('2018-04-09', periods=4, freq='1D20min') >>> psser = ps.Series([1, 2, 3, 4], index=idx) >>> psser 2018-04-09 00:00:00 1 2018-04-10 00:20:00 2 2018-04-11 00:40:00 3 2018-04-12 01:00:00 4 dtype: int64 >>> psser.between_time('0:15', '0:45') 2018-04-10 00:20:00 2 2018-04-11 00:40:00 3 dtype: int64 """returnfirst_series(self.to_frame().between_time(start_time,end_time,include_start,include_end,axis)).rename(self.name)
[docs]defat_time(self,time:Union[datetime.time,str],asof:bool=False,axis:Axis=0)->"Series":""" Select values at particular time of day (example: 9:30AM). Parameters ---------- time : datetime.time or str axis : {0 or 'index', 1 or 'columns'}, default 0 Returns ------- Series Raises ------ TypeError If the index is not a :class:`DatetimeIndex` See Also -------- between_time : Select values between particular times of the day. DatetimeIndex.indexer_at_time : Get just the index locations for values at particular time of the day. Examples -------- >>> idx = pd.date_range('2018-04-09', periods=4, freq='12H') >>> psser = ps.Series([1, 2, 3, 4], index=idx) >>> psser 2018-04-09 00:00:00 1 2018-04-09 12:00:00 2 2018-04-10 00:00:00 3 2018-04-10 12:00:00 4 dtype: int64 >>> psser.at_time('12:00') 2018-04-09 12:00:00 2 2018-04-10 12:00:00 4 dtype: int64 """returnfirst_series(self.to_frame().at_time(time,asof,axis)).rename(self.name)
def_cum(self,func:Callable[[Column],Column],skipna:bool,part_cols:Sequence["ColumnOrName"]=(),ascending:bool=True,)->"Series":# This is used to cummin, cummax, cumsum, etc.ifascending:window=(Window.orderBy(F.asc(NATURAL_ORDER_COLUMN_NAME)).partitionBy(*part_cols).rowsBetween(Window.unboundedPreceding,Window.currentRow))else:window=(Window.orderBy(F.desc(NATURAL_ORDER_COLUMN_NAME)).partitionBy(*part_cols).rowsBetween(Window.unboundedPreceding,Window.currentRow))ifskipna:# There is a behavior difference between pandas and PySpark. In case of cummax,## Input:# A B# 0 2.0 1.0# 1 5.0 NaN# 2 1.0 0.0# 3 2.0 4.0# 4 4.0 9.0## pandas:# A B# 0 2.0 1.0# 1 5.0 NaN# 2 5.0 1.0# 3 5.0 4.0# 4 5.0 9.0## PySpark:# A B# 0 2.0 1.0# 1 5.0 1.0# 2 5.0 1.0# 3 5.0 4.0# 4 5.0 9.0scol=F.when(# Manually sets nulls given the column defined above.self.spark.column.isNull(),F.lit(None),).otherwise(func(self.spark.column).over(window))else:# Here, we use two Windows.# One for real data.# The other one for setting nulls after the first null it meets.## There is a behavior difference between pandas and PySpark. In case of cummax,## Input:# A B# 0 2.0 1.0# 1 5.0 NaN# 2 1.0 0.0# 3 2.0 4.0# 4 4.0 9.0## pandas:# A B# 0 2.0 1.0# 1 5.0 NaN# 2 5.0 NaN# 3 5.0 NaN# 4 5.0 NaN## PySpark:# A B# 0 2.0 1.0# 1 5.0 1.0# 2 5.0 1.0# 3 5.0 4.0# 4 5.0 9.0scol=F.when(# By going through with max, it sets True after the first time it meets null.F.max(self.spark.column.isNull()).over(window),# Manually sets nulls given the column defined above.F.lit(None),).otherwise(func(self.spark.column).over(window))returnself._with_new_scol(scol)def_cumsum(self,skipna:bool,part_cols:Sequence["ColumnOrName"]=())->"Series":psser=selfifisinstance(psser.spark.data_type,BooleanType):psser=psser.spark.transform(lambdascol:scol.cast(LongType()))elifnotisinstance(psser.spark.data_type,NumericType):raiseTypeError("Could not convert {} ({}) to numeric".format(spark_type_to_pandas_dtype(psser.spark.data_type),psser.spark.data_type.simpleString(),))returnpsser._cum(F.sum,skipna,part_cols)def_cumprod(self,skipna:bool,part_cols:Sequence["ColumnOrName"]=())->"Series":ifisinstance(self.spark.data_type,BooleanType):scol=self._cum(lambdascol:F.min(F.coalesce(scol,F.lit(True))),skipna,part_cols).spark.column.cast(LongType())elifisinstance(self.spark.data_type,NumericType):num_zeros=self._cum(lambdascol:F.sum(F.when(scol==0,1).otherwise(0)),skipna,part_cols).spark.columnnum_negatives=self._cum(lambdascol:F.sum(F.when(scol<0,1).otherwise(0)),skipna,part_cols).spark.columnsign=F.when(num_negatives%2==0,1).otherwise(-1)abs_prod=F.exp(self._cum(lambdascol:F.sum(F.log(F.abs(scol))),skipna,part_cols).spark.column)scol=F.when(num_zeros>0,0).otherwise(sign*abs_prod)ifisinstance(self.spark.data_type,IntegralType):scol=F.round(scol).cast(LongType())else:raiseTypeError("Could not convert {} ({}) to numeric".format(spark_type_to_pandas_dtype(self.spark.data_type),self.spark.data_type.simpleString(),))returnself._with_new_scol(scol)# ----------------------------------------------------------------------# Accessor Methods# ----------------------------------------------------------------------dt=CachedAccessor("dt",DatetimeMethods)str=CachedAccessor("str",StringMethods)cat=CachedAccessor("cat",CategoricalAccessor)plot=CachedAccessor("plot",PandasOnSparkPlotAccessor)# ----------------------------------------------------------------------def_apply_series_op(self,op:Callable[["Series"],Union["Series",Column]],should_resolve:bool=False)->"Series":psser_or_scol=op(self)ifisinstance(psser_or_scol,Series):psser=psser_or_scolelse:psser=self._with_new_scol(psser_or_scol)ifshould_resolve:internal=psser._internal.resolved_copyreturnfirst_series(DataFrame(internal))else:returnpsser.copy()def_reduce_for_stat_function(self,sfun:Callable[["Series"],Column],name:str_type,axis:Optional[Axis]=None,numeric_only:bool=True,skipna:bool=True,**kwargs:Any,)->Scalar:""" Applies sfun to the column and returns a scalar Parameters ---------- sfun : the stats function to be used for aggregation name : original pandas API name. axis : used only for sanity check because series only support index axis. numeric_only : not used by this implementation, but passed down by stats functions. skipna: exclude NA/null values when computing the result. """axis=validate_axis(axis)ifaxis==1:raiseNotImplementedError("Series does not support columns axis.")ifnotskipnaandget_option("compute.eager_check")andself.hasnans:scol=F.first(F.lit(np.nan))else:scol=sfun(self)min_count=kwargs.get("min_count",0)ifmin_count>0:scol=F.when(Frame._count_expr(self)>=min_count,scol)result=unpack_scalar(self._internal.spark_frame.select(scol))returnresultifresultisnotNoneelsenp.nan# Override the `groupby` to specify the actual return type annotation.
[docs]defresample(self,rule:str_type,closed:Optional[str_type]=None,label:Optional[str_type]=None,on:Optional["Series"]=None,)->"SeriesResampler":""" Resample time-series data. Convenience method for frequency conversion and resampling of time series. The object must have a datetime-like index (only support `DatetimeIndex` for now), or the caller must pass the label of a datetime-like series/index to the ``on`` keyword parameter. .. versionadded:: 3.4.0 Parameters ---------- rule : str The offset string or object representing target conversion. Currently, supported units are {'Y', 'A', 'M', 'D', 'H', 'T', 'MIN', 'S'}. closed : {{'right', 'left'}}, default None Which side of bin interval is closed. The default is 'left' for all frequency offsets except for 'A', 'Y' and 'M' which all have a default of 'right'. label : {{'right', 'left'}}, default None Which bin edge label to label bucket with. The default is 'left' for all frequency offsets except for 'A', 'Y' and 'M' which all have a default of 'right'. on : Series, optional For a DataFrame, column to use instead of index for resampling. Column must be datetime-like. Returns ------- SeriesResampler Examples -------- Start by creating a series with 9 one minute timestamps. >>> index = pd.date_range('1/1/2000', periods=9, freq='T') >>> series = ps.Series(range(9), index=index, name='V') >>> series 2000-01-01 00:00:00 0 2000-01-01 00:01:00 1 2000-01-01 00:02:00 2 2000-01-01 00:03:00 3 2000-01-01 00:04:00 4 2000-01-01 00:05:00 5 2000-01-01 00:06:00 6 2000-01-01 00:07:00 7 2000-01-01 00:08:00 8 Name: V, dtype: int64 Downsample the series into 3 minute bins and sum the values of the timestamps falling into a bin. >>> series.resample('3T').sum().sort_index() 2000-01-01 00:00:00 3.0 2000-01-01 00:03:00 12.0 2000-01-01 00:06:00 21.0 Name: V, dtype: float64 Downsample the series into 3 minute bins as above, but label each bin using the right edge instead of the left. Please note that the value in the bucket used as the label is not included in the bucket, which it labels. For example, in the original series the bucket ``2000-01-01 00:03:00`` contains the value 3, but the summed value in the resampled bucket with the label ``2000-01-01 00:03:00`` does not include 3 (if it did, the summed value would be 6, not 3). To include this value, close the right side of the bin interval as illustrated in the example below this one. >>> series.resample('3T', label='right').sum().sort_index() 2000-01-01 00:03:00 3.0 2000-01-01 00:06:00 12.0 2000-01-01 00:09:00 21.0 Name: V, dtype: float64 Downsample the series into 3 minute bins as above, but close the right side of the bin interval. >>> series.resample('3T', label='right', closed='right').sum().sort_index() 2000-01-01 00:00:00 0.0 2000-01-01 00:03:00 6.0 2000-01-01 00:06:00 15.0 2000-01-01 00:09:00 15.0 Name: V, dtype: float64 Upsample the series into 30 second bins. >>> series.resample('30S').sum().sort_index()[0:5] # Select first 5 rows 2000-01-01 00:00:00 0.0 2000-01-01 00:00:30 0.0 2000-01-01 00:01:00 1.0 2000-01-01 00:01:30 0.0 2000-01-01 00:02:00 2.0 Name: V, dtype: float64 See Also -------- DataFrame.resample : Resample a DataFrame. groupby : Group by mapping, function, label, or list of labels. """frompyspark.pandas.indexesimportDatetimeIndexfrompyspark.pandas.resampleimportSeriesResamplerifonisNoneandnotisinstance(self.index,DatetimeIndex):raiseNotImplementedError("resample currently works only for DatetimeIndex")ifonisnotNoneandnotisinstance(as_spark_type(on.dtype),TimestampType):raiseNotImplementedError("`on` currently works only for TimestampType")agg_columns:List[ps.Series]=[]column_label=self._internal.column_labels[0]ifisinstance(self._internal.spark_type_for(column_label),(NumericType,BooleanType)):agg_columns.append(self)iflen(agg_columns)==0:raiseValueError("No available aggregation columns!")returnSeriesResampler(psser=self,resamplekey=on,rule=rule,closed=closed,label=label,agg_columns=agg_columns,)
def__getitem__(self,key:Any)->Any:try:if(isinstance(key,slice)andany(type(n)==intfornin[key.start,key.stop]))or(type(key)==intandnotisinstance(self.index.spark.data_type,(IntegerType,LongType))):# Seems like pandas Series always uses int as positional search when slicing# with ints, searches based on index values when the value is int.returnself.iloc[key]returnself.loc[key]exceptSparkPandasIndexingError:raiseKeyError("Key length ({}) exceeds index depth ({})".format(len(key),self._internal.index_level))def__getattr__(self,item:str_type)->Any:ifitem.startswith("__"):raiseAttributeError(item)ifhasattr(MissingPandasLikeSeries,item):property_or_func=getattr(MissingPandasLikeSeries,item)ifisinstance(property_or_func,property):returnproperty_or_func.fget(self)else:returnpartial(property_or_func,self)raiseAttributeError("'Series' object has no attribute '{}'".format(item))def_to_internal_pandas(self)->pd.Series:""" Return a pandas Series directly from _internal to avoid overhead of copy. This method is for internal use only. """returnself._psdf._internal.to_pandas_frame[self.name]def__repr__(self)->str_type:max_display_count=get_option("display.max_rows")ifmax_display_countisNone:returnself._to_internal_pandas().to_string(name=bool(self.name),dtype=bool(self.dtype))pser=self._psdf._get_or_create_repr_pandas_cache(max_display_count)[self.name]pser_length=len(pser)pser=pser.iloc[:max_display_count]ifpser_length>max_display_count:repr_string=pser.to_string(length=True)rest,prev_footer=repr_string.rsplit("\n",1)match=REPR_PATTERN.search(prev_footer)ifmatchisnotNone:length=match.group("length")dtype_name=str(self.dtype.name)ifself.nameisNone:footer="\ndtype: {dtype}\nShowing only the first {length}".format(length=length,dtype=pprint_thing(dtype_name))else:footer=("\nName: {name}, dtype: {dtype}""\nShowing only the first {length}".format(length=length,name=self.name,dtype=pprint_thing(dtype_name)))returnrest+footerreturnpser.to_string(name=self.name,dtype=self.dtype)def__dir__(self)->Iterable[str_type]:ifnotisinstance(self.spark.data_type,StructType):fields=[]else:fields=[fforfinself.spark.data_type.fieldNames()if" "notinf]returnlist(super().__dir__())+fieldsdef__iter__(self)->None:returnMissingPandasLikeSeries.__iter__(self)ifsys.version_info>=(3,7):# In order to support the type hints such as Series[...]. See DataFrame.__class_getitem__.def__class_getitem__(cls,params:Any)->Type[SeriesType]:returncreate_type_for_series_type(params)elif(3,5)<=sys.version_info<(3,7):# The implementation is in its metaclass so this flag is needed to distinguish# pandas-on-Spark Series.is_series=None
defunpack_scalar(sdf:SparkDataFrame)->Any:""" Takes a dataframe that is supposed to contain a single row with a single scalar value, and returns this value. """lst=sdf.limit(2).toPandas()assertlen(lst)==1,(sdf,lst)row=lst.iloc[0]lst2=list(row)assertlen(lst2)==1,(row,lst2)returnlst2[0]@overloaddeffirst_series(df:DataFrame)->Series:...@overloaddeffirst_series(df:pd.DataFrame)->pd.Series:...deffirst_series(df:Union[DataFrame,pd.DataFrame])->Union[Series,pd.Series]:""" Takes a DataFrame and returns the first column of the DataFrame as a Series """assertisinstance(df,(DataFrame,pd.DataFrame)),type(df)ifisinstance(df,DataFrame):returndf._psser_for(df._internal.column_labels[0])else:returndf[df.columns[0]]def_test()->None:importosimportdoctestimportsysfrompyspark.sqlimportSparkSessionimportpyspark.pandas.seriesos.chdir(os.environ["SPARK_HOME"])globs=pyspark.pandas.series.__dict__.copy()globs["ps"]=pyspark.pandasspark=(SparkSession.builder.master("local[4]").appName("pyspark.pandas.series tests").getOrCreate())(failure_count,test_count)=doctest.testmod(pyspark.pandas.series,globs=globs,optionflags=doctest.ELLIPSIS|doctest.NORMALIZE_WHITESPACE,)spark.stop()iffailure_count:sys.exit(-1)if__name__=="__main__":_test()