Skip to content Skip to sidebar Skip to footer

How To Load My Pickeled Ml Model From Gcs To Dataflow/apache Beam

I've developed an apache beam pipeline locally where I run predictions on a sample file. Locally on my computer I can load the model like this: with open('gs://newbucket322/my_dump

Solution 1:

You can define a ParDo as below

class PerdictOutcome(beam.DoFn):
    """ Format the input to the desired shape"""

    def __init__(self, project=None, bucket_name=None, model_path=None, destination_name=None):
        self._model = Noneself._project = project
        self._bucket_name = bucket_name
        self._model_path = model_path
        self._destination_name = destination_name

    def download_blob(bucket_name=None, source_blob_name=None, project=None, destination_file_name=None):
        """Downloads a blob from the bucket."""
        destination_file_name = source_blob_name
        storage_client = storage.Client(<gs://path">)
        bucket = storage_client.get_bucket(bucket_name)
        blob = bucket.blob(source_blob_name)

        blob.download_to_filename(destination_file_name)
    # Load once or very few times
    def setup(self):
        logging.info(
            "Model Initialization {}".format(self._model_path))
        download_blob(bucket_name=self._bucket_name, source_blob_name=self._model_path,
                      project=self._project, destination_file_name=self._destination_name)
        # unpickle model model
        self._model = pickle.load(open(self._destination_name, 'rb'))

    def process(self, element):
        element["prediction"] = self._model.predict(element["data"])
        return [element]

Then you can invoke this ParDo in your pipeline as below:-

    model = (p
         | "Read Files" >> TextIO...
         | "Run Predictions" >> beam.ParDo(PredictSklearn(project=known_args.bucket_project_id, bucket_name=known_args.bucket_name, model_path=known_args.model_path, destination_name=known_args.destination_name)
      )

Post a Comment for "How To Load My Pickeled Ml Model From Gcs To Dataflow/apache Beam"