## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#fromtypingimportAny,Callable,List,Optional,TypeVarfrompy4j.java_gatewayimportjava_import,is_instance_of,JavaObjectfrompysparkimportRDD,SparkConffrompyspark.serializersimportNoOpSerializer,UTF8Deserializer,CloudPickleSerializerfrompyspark.contextimportSparkContextfrompyspark.storagelevelimportStorageLevelfrompyspark.streaming.dstreamimportDStreamfrompyspark.streaming.listenerimportStreamingListenerfrompyspark.streaming.utilimportTransformFunction,TransformFunctionSerializerimportwarnings__all__=["StreamingContext"]T=TypeVar("T")
[docs]classStreamingContext:""" Main entry point for Spark Streaming functionality. A StreamingContext represents the connection to a Spark cluster, and can be used to create :class:`DStream` various input sources. It can be from an existing :class:`SparkContext`. After creating and transforming DStreams, the streaming computation can be started and stopped using `context.start()` and `context.stop()`, respectively. `context.awaitTermination()` allows the current thread to wait for the termination of the context by `stop()` or by an exception. .. deprecated:: Spark 3.4.0 This is deprecated as of Spark 3.4.0. There are no longer updates to DStream and it's a legacy project. There is a newer and easier to use streaming engine in Spark called Structured Streaming. You should use Spark Structured Streaming for your streaming applications. Parameters ---------- sparkContext : :class:`SparkContext` SparkContext object. batchDuration : int, optional the time interval (in seconds) at which streaming data will be divided into batches """_transformerSerializer=None# Reference to a currently active StreamingContext_activeContext=Nonedef__init__(self,sparkContext:SparkContext,batchDuration:Optional[int]=None,jssc:Optional[JavaObject]=None,):warnings.warn("DStream is deprecated as of Spark 3.4.0. Migrate to Structured Streaming.",FutureWarning,)self._sc=sparkContextself._jvm=self._sc._jvmself._jssc=jsscorself._initialize_context(self._sc,batchDuration)def_initialize_context(self,sc:SparkContext,duration:Optional[int])->JavaObject:self._ensure_initialized()assertself._jvmisnotNoneanddurationisnotNonereturnself._jvm.JavaStreamingContext(sc._jsc,self._jduration(duration))def_jduration(self,seconds:int)->JavaObject:""" Create Duration object given number of seconds """assertself._jvmisnotNonereturnself._jvm.Duration(int(seconds*1000))@classmethoddef_ensure_initialized(cls)->None:SparkContext._ensure_initialized()gw=SparkContext._gatewayassertgwisnotNonejava_import(gw.jvm,"org.apache.spark.streaming.*")java_import(gw.jvm,"org.apache.spark.streaming.api.java.*")java_import(gw.jvm,"org.apache.spark.streaming.api.python.*")frompyspark.java_gatewayimportensure_callback_server_startedensure_callback_server_started(gw)# register serializer for TransformFunction# it happens before creating SparkContext when loading from checkpointingcls._transformerSerializer=TransformFunctionSerializer(SparkContext._active_spark_context,CloudPickleSerializer(),gw,)
[docs]@classmethoddefgetOrCreate(cls,checkpointPath:str,setupFunc:Callable[[],"StreamingContext"])->"StreamingContext":""" Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be recreated from the checkpoint data. If the data does not exist, then the provided setupFunc will be used to create a new context. Parameters ---------- checkpointPath : str Checkpoint directory used in an earlier streaming program setupFunc : function Function to create a new context and setup DStreams """cls._ensure_initialized()gw=SparkContext._gatewayassertgwisnotNone# Check whether valid checkpoint information exists in the given pathssc_option=gw.jvm.StreamingContextPythonHelper().tryRecoverFromCheckpoint(checkpointPath)ifssc_option.isEmpty():ssc=setupFunc()ssc.checkpoint(checkpointPath)returnsscjssc=gw.jvm.JavaStreamingContext(ssc_option.get())# If there is already an active instance of Python SparkContext use it, or create a new oneifnotSparkContext._active_spark_context:jsc=jssc.sparkContext()conf=SparkConf(_jconf=jsc.getConf())SparkContext(conf=conf,gateway=gw,jsc=jsc)sc=SparkContext._active_spark_contextassertscisnotNone# update ctx in serializerassertcls._transformerSerializerisnotNonecls._transformerSerializer.ctx=screturnStreamingContext(sc,None,jssc)
[docs]@classmethoddefgetActive(cls)->Optional["StreamingContext"]:""" Return either the currently active StreamingContext (i.e., if there is a context started but not stopped) or None. """activePythonContext=cls._activeContextifactivePythonContextisnotNone:# Verify that the current running Java StreamingContext is active and is the same one# backing the supposedly active Python contextactivePythonContextJavaId=activePythonContext._jssc.ssc().hashCode()activeJvmContextOption=activePythonContext._jvm.StreamingContext.getActive()ifactiveJvmContextOption.isEmpty():cls._activeContext=NoneelifactiveJvmContextOption.get().hashCode()!=activePythonContextJavaId:cls._activeContext=NoneraiseRuntimeError("JVM's active JavaStreamingContext is not the JavaStreamingContext ""backing the action Python StreamingContext. This is unexpected.")returncls._activeContext
[docs]@classmethoddefgetActiveOrCreate(cls,checkpointPath:str,setupFunc:Callable[[],"StreamingContext"])->"StreamingContext":""" Either return the active StreamingContext (i.e. currently started but not stopped), or recreate a StreamingContext from checkpoint data or create a new StreamingContext using the provided setupFunc function. If the checkpointPath is None or does not contain valid checkpoint data, then setupFunc will be called to create a new context and setup DStreams. Parameters ---------- checkpointPath : str Checkpoint directory used in an earlier streaming program. Can be None if the intention is to always create a new context when there is no active context. setupFunc : function Function to create a new JavaStreamingContext and setup DStreams """ifnotcallable(setupFunc):raiseTypeError("setupFunc should be callable.")activeContext=cls.getActive()ifactiveContextisnotNone:returnactiveContextelifcheckpointPathisnotNone:returncls.getOrCreate(checkpointPath,setupFunc)else:returnsetupFunc()
@propertydefsparkContext(self)->SparkContext:""" Return SparkContext which is associated with this StreamingContext. """returnself._sc
[docs]defstart(self)->None:""" Start the execution of the streams. """self._jssc.start()StreamingContext._activeContext=self
[docs]defawaitTermination(self,timeout:Optional[int]=None)->None:""" Wait for the execution to stop. Parameters ---------- timeout : int, optional time to wait in seconds """iftimeoutisNone:self._jssc.awaitTermination()else:self._jssc.awaitTerminationOrTimeout(int(timeout*1000))
[docs]defawaitTerminationOrTimeout(self,timeout:int)->None:""" Wait for the execution to stop. Return `true` if it's stopped; or throw the reported error during the execution; or `false` if the waiting time elapsed before returning from the method. Parameters ---------- timeout : int time to wait in seconds """returnself._jssc.awaitTerminationOrTimeout(int(timeout*1000))
[docs]defstop(self,stopSparkContext:bool=True,stopGraceFully:bool=False)->None:""" Stop the execution of the streams, with option of ensuring all received data has been processed. Parameters ---------- stopSparkContext : bool, optional Stop the associated SparkContext or not stopGracefully : bool, optional Stop gracefully by waiting for the processing of all received data to be completed """self._jssc.stop(stopSparkContext,stopGraceFully)StreamingContext._activeContext=NoneifstopSparkContext:self._sc.stop()
[docs]defremember(self,duration:int)->None:""" Set each DStreams in this context to remember RDDs it generated in the last given duration. DStreams remember RDDs only for a limited duration of time and releases them for garbage collection. This method allows the developer to specify how long to remember the RDDs (if the developer wishes to query old data outside the DStream computation). Parameters ---------- duration : int Minimum duration (in seconds) that each DStream should remember its RDDs """self._jssc.remember(self._jduration(duration))
[docs]defcheckpoint(self,directory:str)->None:""" Sets the context to periodically checkpoint the DStream operations for master fault-tolerance. The graph will be checkpointed every batch interval. Parameters ---------- directory : str HDFS-compatible directory where the checkpoint data will be reliably stored """self._jssc.checkpoint(directory)
[docs]defsocketTextStream(self,hostname:str,port:int,storageLevel:StorageLevel=StorageLevel.MEMORY_AND_DISK_2)->"DStream[str]":""" Create an input from TCP source hostname:port. Data is received using a TCP socket and receive byte is interpreted as UTF8 encoded ``\\n`` delimited lines. Parameters ---------- hostname : str Hostname to connect to for receiving data port : int Port to connect to for receiving data storageLevel : :class:`pyspark.StorageLevel`, optional Storage level to use for storing the received objects """jlevel=self._sc._getJavaStorageLevel(storageLevel)returnDStream(self._jssc.socketTextStream(hostname,port,jlevel),self,UTF8Deserializer())
[docs]deftextFileStream(self,directory:str)->"DStream[str]":""" Create an input stream that monitors a Hadoop-compatible file system for new files and reads them as text files. Files must be written to the monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored. The text files must be encoded as UTF-8. """returnDStream(self._jssc.textFileStream(directory),self,UTF8Deserializer())
[docs]defbinaryRecordsStream(self,directory:str,recordLength:int)->"DStream[bytes]":""" Create an input stream that monitors a Hadoop-compatible file system for new files and reads them as flat binary files with records of fixed length. Files must be written to the monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored. Parameters ---------- directory : str Directory to load data from recordLength : int Length of each record in bytes """returnDStream(self._jssc.binaryRecordsStream(directory,recordLength),self,NoOpSerializer())
def_check_serializers(self,rdds:List[RDD[T]])->None:# make sure they have same serializeriflen(set(rdd._jrdd_deserializerforrddinrdds))>1:foriinrange(len(rdds)):# reset them to sc.serializerrdds[i]=rdds[i]._reserialize()
[docs]defqueueStream(self,rdds:List[RDD[T]],oneAtATime:bool=True,default:Optional[RDD[T]]=None,)->"DStream[T]":""" Create an input stream from a queue of RDDs or list. In each batch, it will process either one or all of the RDDs returned by the queue. Parameters ---------- rdds : list Queue of RDDs oneAtATime : bool, optional pick one rdd each time or pick all of them once. default : :class:`pyspark.RDD`, optional The default rdd if no more in rdds Notes ----- Changes to the queue after the stream is created will not be recognized. """ifdefaultandnotisinstance(default,RDD):default=self._sc.parallelize(default)# type: ignore[arg-type]ifnotrddsanddefault:rdds=[rdds]# type: ignore[list-item]ifrddsandnotisinstance(rdds[0],RDD):rdds=[self._sc.parallelize(input)forinputinrdds]# type: ignore[arg-type]self._check_serializers(rdds)assertself._jvmisnotNonequeue=self._jvm.PythonDStream.toRDDQueue([r._jrddforrinrdds])ifdefault:default=default._reserialize(rdds[0]._jrdd_deserializer)assertdefaultisnotNonejdstream=self._jssc.queueStream(queue,oneAtATime,default._jrdd)else:jdstream=self._jssc.queueStream(queue,oneAtATime)returnDStream(jdstream,self,rdds[0]._jrdd_deserializer)
[docs]deftransform(self,dstreams:List["DStream[Any]"],transformFunc:Callable[...,RDD[T]])->"DStream[T]":""" Create a new DStream in which each RDD is generated by applying a function on RDDs of the DStreams. The order of the JavaRDDs in the transform function parameter will be the same as the order of corresponding DStreams in the list. """jdstreams=[d._jdstreamfordindstreams]# change the final serializer to sc.serializerfunc=TransformFunction(self._sc,lambdat,*rdds:transformFunc(rdds),*[d._jrdd_deserializerfordindstreams],)assertself._jvmisnotNonejfunc=self._jvm.TransformFunction(func)jdstream=self._jssc.transform(jdstreams,jfunc)returnDStream(jdstream,self,self._sc.serializer)
[docs]defunion(self,*dstreams:"DStream[T]")->"DStream[T]":""" Create a unified DStream from multiple DStreams of the same type and same slide duration. """ifnotdstreams:raiseValueError("should have at least one DStream to union")iflen(dstreams)==1:returndstreams[0]iflen(set(s._jrdd_deserializerforsindstreams))>1:raiseValueError("All DStreams should have same serializer")iflen(set(s._slideDurationforsindstreams))>1:raiseValueError("All DStreams should have same slide duration")assertSparkContext._jvmisnotNonejdstream_cls=SparkContext._jvm.org.apache.spark.streaming.api.java.JavaDStreamjpair_dstream_cls=SparkContext._jvm.org.apache.spark.streaming.api.java.JavaPairDStreamgw=SparkContext._gatewayifis_instance_of(gw,dstreams[0]._jdstream,jdstream_cls):cls=jdstream_clselifis_instance_of(gw,dstreams[0]._jdstream,jpair_dstream_cls):cls=jpair_dstream_clselse:cls_name=dstreams[0]._jdstream.getClass().getCanonicalName()raiseTypeError("Unsupported Java DStream class %s"%cls_name)assertgwisnotNonejdstreams=gw.new_array(cls,len(dstreams))foriinrange(0,len(dstreams)):jdstreams[i]=dstreams[i]._jdstreamreturnDStream(self._jssc.union(jdstreams),self,dstreams[0]._jrdd_deserializer,)
[docs]defaddStreamingListener(self,streamingListener:StreamingListener)->None:""" Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for receiving system events related to streaming. """assertself._jvmisnotNoneself._jssc.addStreamingListener(self._jvm.JavaStreamingListenerWrapper(self._jvm.PythonStreamingListenerWrapper(streamingListener)))