Azure Machine Learning Pipeline Issue
Hello Team,
Currently, we are running a Large set of ML recommendation models in the Azure Compute Cluster while running this model it will take more than 5 days.
How can we run a large number of datasets in the Azure Compute cluster? For Example Around (5 million) records.
Find the sample Code :
import os
import pickle
import argparse
import pandas as pd
import json
from azureml.core import Workspace, Datastore, Run
from azureml.data.dataset_factory import TabularDatasetFactory
import tempfile
# Load environment variables
from dotenv import load_dotenv
load_dotenv()
# Parse arguments
parser = argparse.ArgumentParser(“model_training”)
parser.add_argument(“–model_training”, type=str, help=”Model training data path”)
parser.add_argument(“–interaction”, type=str, help=”Interaction type”)
args = parser.parse_args()
# Workspace setup
workspace = Workspace(subscription_id=os.environ.get(“SUBSCRIPTION_ID”),
resource_group=os.environ.get(“RESOURCE_GROUP”),
workspace_name=os.environ.get(“WORKSPACE_NAME”))
print(‘Workspace:’, workspace)
# Get the datastore from the Azure ML workspace
datastore = Datastore.get(workspace, datastore_name=’data_factory’)
print(‘Datastore:’, datastore)
# Define the path to your Parquet files in the datastore
datastore_path = [(datastore, ‘sampless_silver/’)]
# Create a TabularDataset from the Parquet files in the datastore
dataset = TabularDatasetFactory.from_parquet_files(path=datastore_path)
print(‘Dataset:’, dataset)
# Convert the TabularDataset to a Pandas DataFrame
training_dataset = dataset.to_pandas_dataframe()
print(‘Training Dataset:’, training_dataset)
# Sample data
training_dataset = training_dataset.head(25000000)
training_dataset = training_dataset.sample(frac=1).reset_index(drop=True)
training_dataset[“views”] = pd.to_numeric(training_dataset[‘views’], errors=’coerce’)
df_selected = training_dataset.rename(columns={‘clientId’: ‘userID’, ‘offerId’: ‘itemID’, ‘views’: ‘views’})
df_selected = df_selected[[‘userID’, ‘itemID’, ‘views’]]
print(‘Selected Data:’, df_selected)
# Create and fit model
from lightfm import LightFM
from lightfm import cross_validation
dataset = Dataset()
dataset.fit(users=df_selected[‘userID’], items=df_selected[‘itemID’])
(interactions, weights) = dataset.build_interactions(df_selected.iloc[:, 0:3].values)
user_dict_label = dataset.mapping()[0]
item_dict_label = dataset.mapping()[2]
train_interactions, test_interactions = cross_validation.random_train_test_split(
interactions, test_percentage=0.25, random_state=np.random.RandomState(2016))
model = LightFM(loss=’warp’, no_components=1300, learning_rate=0.000001,
random_state=np.random.RandomState(2016), user_alpha=0.000005, max_sampled=100, k=100,
learning_schedule=’adadelta’, item_alpha=0.000005)
print(‘Model:’, model)
model.fit(interactions=train_interactions, epochs=2, verbose=True, num_threads=8)
user_dict_label = {str(key): value for key, value in user_dict_label.items()}
item_dict_label = {str(key): value for key, value in item_dict_label.items()}
# Save and upload model
with tempfile.TemporaryDirectory() as tmpdirname:
recommendation_model_offer = os.path.join(tmpdirname, “sample_recommendation_model.pkl”)
with open(recommendation_model_offer, ‘wb’) as f:
pickle.dump(model, f)
model_intersection = os.path.join(tmpdirname, “sample_training_intersection.pkl”)
with open(model_intersection, ‘wb’) as f:
pickle.dump(interactions, f)
model_user_dict = os.path.join(tmpdirname, “users_dict_label.json”)
with open(model_user_dict, ‘w’) as f:
json.dump(user_dict_label, f)
model_item_dict = os.path.join(tmpdirname, “items_dict_label.json”)
with open(model_item_dict, ‘w’) as f:
json.dump(item_dict_label, f)
datastore.upload_files(
files=[recommendation_model_offer, model_intersection, model_user_dict, model_item_dict],
target_path=’SAMPLE_MODEL_TRAINING/’,
overwrite=True
)
print(‘Files uploaded to datastore’)
# Register the model
register_name = f”{args.interaction}_light_fm_recommendation_model”
Model.register(workspace=workspace, model_path=tmpdirname, model_name=register_name,
tags={‘affinity’: args.interaction, ‘sample’: ‘recommendation’})
print(‘Model registered’)
Please share the feedback. Thanks!
Hello Team,Currently, we are running a Large set of ML recommendation models in the Azure Compute Cluster while running this model it will take more than 5 days.How can we run a large number of datasets in the Azure Compute cluster? For Example Around (5 million) records.Find the sample Code :import osimport pickleimport argparseimport pandas as pdimport jsonfrom azureml.core import Workspace, Datastore, Runfrom azureml.data.dataset_factory import TabularDatasetFactoryimport tempfile# Load environment variablesfrom dotenv import load_dotenvload_dotenv()# Parse argumentsparser = argparse.ArgumentParser(“model_training”)parser.add_argument(“–model_training”, type=str, help=”Model training data path”)parser.add_argument(“–interaction”, type=str, help=”Interaction type”)args = parser.parse_args()# Workspace setupworkspace = Workspace(subscription_id=os.environ.get(“SUBSCRIPTION_ID”),resource_group=os.environ.get(“RESOURCE_GROUP”),workspace_name=os.environ.get(“WORKSPACE_NAME”))print(‘Workspace:’, workspace)# Get the datastore from the Azure ML workspacedatastore = Datastore.get(workspace, datastore_name=’data_factory’)print(‘Datastore:’, datastore)# Define the path to your Parquet files in the datastoredatastore_path = [(datastore, ‘sampless_silver/’)]# Create a TabularDataset from the Parquet files in the datastoredataset = TabularDatasetFactory.from_parquet_files(path=datastore_path)print(‘Dataset:’, dataset)# Convert the TabularDataset to a Pandas DataFrametraining_dataset = dataset.to_pandas_dataframe()print(‘Training Dataset:’, training_dataset)# Sample datatraining_dataset = training_dataset.head(25000000)training_dataset = training_dataset.sample(frac=1).reset_index(drop=True)training_dataset[“views”] = pd.to_numeric(training_dataset[‘views’], errors=’coerce’)df_selected = training_dataset.rename(columns={‘clientId’: ‘userID’, ‘offerId’: ‘itemID’, ‘views’: ‘views’})df_selected = df_selected[[‘userID’, ‘itemID’, ‘views’]]print(‘Selected Data:’, df_selected)# Create and fit modelfrom lightfm import LightFMfrom lightfm import cross_validationdataset = Dataset()dataset.fit(users=df_selected[‘userID’], items=df_selected[‘itemID’])(interactions, weights) = dataset.build_interactions(df_selected.iloc[:, 0:3].values)user_dict_label = dataset.mapping()[0]item_dict_label = dataset.mapping()[2]train_interactions, test_interactions = cross_validation.random_train_test_split(interactions, test_percentage=0.25, random_state=np.random.RandomState(2016))model = LightFM(loss=’warp’, no_components=1300, learning_rate=0.000001,random_state=np.random.RandomState(2016), user_alpha=0.000005, max_sampled=100, k=100,learning_schedule=’adadelta’, item_alpha=0.000005)print(‘Model:’, model)model.fit(interactions=train_interactions, epochs=2, verbose=True, num_threads=8)user_dict_label = {str(key): value for key, value in user_dict_label.items()}item_dict_label = {str(key): value for key, value in item_dict_label.items()}# Save and upload modelwith tempfile.TemporaryDirectory() as tmpdirname:recommendation_model_offer = os.path.join(tmpdirname, “sample_recommendation_model.pkl”)with open(recommendation_model_offer, ‘wb’) as f:pickle.dump(model, f)model_intersection = os.path.join(tmpdirname, “sample_training_intersection.pkl”)with open(model_intersection, ‘wb’) as f:pickle.dump(interactions, f)model_user_dict = os.path.join(tmpdirname, “users_dict_label.json”)with open(model_user_dict, ‘w’) as f:json.dump(user_dict_label, f)model_item_dict = os.path.join(tmpdirname, “items_dict_label.json”)with open(model_item_dict, ‘w’) as f:json.dump(item_dict_label, f)datastore.upload_files(files=[recommendation_model_offer, model_intersection, model_user_dict, model_item_dict],target_path=’SAMPLE_MODEL_TRAINING/’,overwrite=True)print(‘Files uploaded to datastore’)# Register the modelregister_name = f”{args.interaction}_light_fm_recommendation_model”Model.register(workspace=workspace, model_path=tmpdirname, model_name=register_name,tags={‘affinity’: args.interaction, ‘sample’: ‘recommendation’})print(‘Model registered’)Please share the feedback. Thanks! Read More