Machines don’t break down out of nowhere—there are at all times indicators. The issue? People aren’t at all times nice at noticing them. That’s the place Machine Predictive Upkeep is available in! This information will take you thru the thrilling world of Machine Predictive Upkeep, utilizing AWS and MLOps to make sure your tools stays predictably dependable.
Studying Aims
- Perceive design and implement an end-to-end MLOps pipeline for predictive upkeep, protecting information ingestion, mannequin coaching, and deployment.
- Study to combine important instruments like Docker, FastAPI, and AWS providers to construct a strong, production-ready machine studying software.
- Discover using GitHub Actions for automating CI/CD workflows, making certain easy and dependable code integration and deployment.
- Arrange finest practices for monitoring, efficiency monitoring, and steady enchancment to maintain your machine studying fashions environment friendly and maintainable.
This text was revealed as part of the Information Science Blogathon.
Downside: Unplanned Downtime & Upkeep Prices
Surprising tools failures in industrial settings trigger downtime and monetary losses. In our challenge, we’re utilizing MLOps finest practices and machine studying to detect points early, enabling well timed repairs and decreasing disruptions.
Earlier than diving into implementation, let’s take a more in-depth have a look at the challenge structure.

Obligatory Conditions
Beneath we are going to first look within the stipulations required:
Clone the repository:
git clone "https://github.com/karthikponna/Predictive_Maintenance_MLOps.git"
cd Predictive_Maintenance_MLOps
Create and activate the digital setting:
# For macOS and Linux:
python3 -m venv venv
supply venv/bin/activate
# For Home windows:
python -m venv venv
.venvScriptsactivate
Set up Required Dependencies:
pip set up -r necessities.txt
Set Up Atmosphere Variables:
# Create a `.env` file and add your MongoDB connection string:
MONGO_URI=your_mongodb_connection_string
Undertaking Construction
The Undertaking Construction outlines the important thing elements and group of the challenge, making certain readability and maintainability. It helps in understanding how totally different modules work together and the way the general system is designed. A well-defined construction simplifies growth, debugging, and scalability.
project_root/
│
├── .github/
│ └── workflows/
│ └── principal.yml
│
├── data_schema/
│ └── schema.yaml
│
├── final_model/
│ ├── mannequin.pkl
│ └── preprocessor.pkl
│
├── Machine_Predictive_Data/
│ └── predictive_maintenance.csv
│
├── machine_predictive_maintenance/
│ ├── cloud/
│ ├── elements/
│ ├── fixed/
│ ├── entity/
│ ├── exception/
│ ├── logging/
│ ├── pipeline/
│ ├── utils/
│ └── __init__.py
│
├── my_venv/
│
├── notebooks/
│ ├── EDA.ipynb
│ ├── prediction_output/
│
├── templates/
│ └── desk.html
│
├── valid_data/
| └──take a look at.csv
│
├── .env
├── .gitignore
├── app.py
├── Dockerfile
├── principal.py
├── push_data.py
├── README.md
├── necessities.txt
├── setup.py
├── test_mongodb.py
Information Ingestion
On this challenge, we use a machine predictive upkeep CSV file, changing it into JSON information, and inserting it right into a MongoDB assortment.
Dataset Hyperlink: https://www.kaggle.com/datasets/shivamb/machine-predictive-maintenance-classification
Right here is the code snippet to transform CSV -> JSON information -> MongoDB
class PredictiveDataExtract():
def __init__(self):
"""
Initializes the PredictiveDataExtract class.
"""
attempt:
go
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def csv_to_json_convertor(self, file_path):
attempt:
information = pd.read_csv(file_path)
information.reset_index(drop=True, inplace=True)
information = checklist(json.masses(information.T.to_json()).values())
return information
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def insert_data_mongodb(self, information, database, assortment):
attempt:
self.information = information
self.database = database
self.assortment = assortment
self.mongo_client = pymongo.MongoClient(MONGO_DB_URL)
self.database = self.mongo_client[self.database]
self.assortment = self.database[self.collection]
self.assortment.insert_many(self.information)
return(len(self.information))
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
if __name__=="__main__":
FILE_PATH="Machine_Predictive_Data/predictive_maintenance.csv"
DATABASE="Predictive_Maintenance_MLOps"
assortment = "Machine_Predictive_Data"
predictive_data_obj=PredictiveDataExtract()
information = predictive_data_obj.csv_to_json_convertor(FILE_PATH)
no_of_records = predictive_data_obj.insert_data_mongodb(information, DATABASE, assortment)
print(no_of_records)
Right here is the code snippet to fetch information from MongoDB, cut up the information into prepare and take a look at CSV recordsdata, and retailer them as a Information Ingestion artifact.
class DataIngestion:
def __init__(self, data_ingestion_config:DataIngestionConfig):
"""
Initializes the DataIngestion class with the offered configuration.
Parameters:
data_ingestion_config: DataIngestionConfig
Configuration object containing particulars for information ingestion.
"""
attempt:
self.data_ingestion_config=data_ingestion_config
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def export_collection_as_dataframe(self):
attempt:
database_name= self.data_ingestion_config.database_name
collection_name= self.data_ingestion_config.collection_name
self.mongo_client = pymongo.MongoClient(MONGO_DB_URL)
assortment = self.mongo_client[database_name][collection_name]
df = pd.DataFrame(checklist(assortment.discover()))
if "_id" in df.columns.to_list():
df = df.drop(columns=["_id"], axis=1)
df.change({"na":np.nan},inplace=True)
return df
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def export_data_into_feature_store(self,dataframe: pd.DataFrame):
attempt:
feature_store_file_path=self.data_ingestion_config.feature_store_file_path
#creating folder
dir_path = os.path.dirname(feature_store_file_path)
os.makedirs(dir_path,exist_ok=True)
dataframe.to_csv(feature_store_file_path,index=False,header=True)
return dataframe
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e,sys)
def split_data_as_train_test(self, dataframe:pd.DataFrame):
attempt:
train_set, test_set = train_test_split(
dataframe, test_size=self.data_ingestion_config.train_test_split_ratio
)
logging.information("Carried out prepare take a look at cut up on the dataframe")
logging.information(
"Exited split_data_as_train_test methodology of Data_Ingestion class"
)
dir_path = os.path.dirname(self.data_ingestion_config.training_file_path)
os.makedirs(dir_path, exist_ok=True)
logging.information(f"Exporting prepare and take a look at file path.")
train_set.to_csv(
self.data_ingestion_config.training_file_path, index=False, header=True
)
test_set.to_csv(
self.data_ingestion_config.testing_file_path, index=False, header=True
)
logging.information(f"Exported prepare and take a look at file path." )
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def initiate_data_ingestion(self):
attempt:
dataframe = self.export_collection_as_dataframe()
dataframe = self.export_data_into_feature_store(dataframe)
self.split_data_as_train_test(dataframe)
dataingestionartifact= DataIngestionArtifact(trained_file_path=self.data_ingestion_config.training_file_path,
test_file_path=self.data_ingestion_config.testing_file_path)
return dataingestionartifact
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e,sys)
Information Validation
On this step, we’ll verify if the ingested information meets the anticipated format by making certain all required columns are current utilizing a predefined schema and evaluating the coaching and testing information for any variations. Then we save the clear information and create a drift report, so solely high quality information is used for the following step, which is remodeling the information.
Information Validation code snippet:
class DataValidation:
def __init__(self, data_ingestion_artifact:DataIngestionArtifact,
data_validation_config: DataValidationConfig):
attempt:
self.data_ingestion_artifact= data_ingestion_artifact
self.data_validation_config= data_validation_config
self._schema_config = read_yaml_file(SCHEMA_FILE_PATH)
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
@staticmethod
def read_data(file_path) -> pd.DataFrame:
attempt:
return pd.read_csv(file_path)
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def validate_number_of_columns(self, dataframe:pd.DataFrame)-> bool:
attempt:
number_of_columns = len(self._schema_config)
logging.information(f"Required variety of columns:{number_of_columns}")
logging.information(f"Information body has columns:{len(dataframe.columns)}")
if len(dataframe.columns) == number_of_columns:
return True
return False
besides Exception as e:
elevate MachinePredictiveMaintenanceException (e, sys)
def is_columns_exist(self, df:pd.DataFrame) -> bool:
attempt:
dataframe_columns = df.columns
missing_numerical_columns = []
missing_categorical_columns = []
for column in self._schema_config["numerical_columns"]:
if column not in dataframe_columns:
missing_numerical_columns.append(column)
if len(missing_numerical_columns) > 0:
logging.information(f"Lacking numerical column: {missing_numerical_columns}")
for column in self._schema_config["categorical_columns"]:
if column not in dataframe_columns:
missing_categorical_columns.append(column)
if len(missing_categorical_columns) > 0:
logging.information(f"Lacking categorical column: {missing_categorical_columns}")
return False if len(missing_categorical_columns)>0 or len(missing_numerical_columns)>0 else True
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def detect_dataset_drift(self, base_df, current_df, threshold = 0.05) -> bool :
attempt:
standing = True
report = {}
for column in base_df.columns:
d1 = base_df[column]
d2 = current_df[column]
is_same_dist = ks_2samp(d1, d2)
if threshold DataValidationArtifact:
attempt:
validation_error_msg = ""
logging.information("Beginning information validation")
train_file_path = self.data_ingestion_artifact.trained_file_path
test_file_path = self.data_ingestion_artifact.test_file_path
# learn the information from the prepare and take a look at
train_dataframe = DataValidation.read_data(train_file_path)
test_dataframe = DataValidation.read_data(test_file_path)
# validate variety of columns
standing = self.validate_number_of_columns(dataframe=train_dataframe)
logging.information(f"All required columns current in coaching dataframe: {standing}")
if not standing:
validation_error_msg += f"Practice dataframe doesn't include all columns.n"
standing = self.validate_number_of_columns(dataframe=test_dataframe)
if not standing:
validation_error_msg += f"Check dataframe doesn't include all columns.n"
standing = self.is_columns_exist(df=train_dataframe)
if not standing:
validation_error_msg += f"Columns are lacking in coaching dataframe."
standing = self.is_columns_exist(df=test_dataframe)
if not standing:
validation_error_msg += f"columns are lacking in take a look at dataframe."
## lets verify datadrift
standing=self.detect_dataset_drift(base_df=train_dataframe,current_df=test_dataframe)
dir_path=os.path.dirname(self.data_validation_config.valid_train_file_path)
os.makedirs(dir_path,exist_ok=True)
train_dataframe.to_csv(
self.data_validation_config.valid_train_file_path, index=False, header=True
)
test_dataframe.to_csv(
self.data_validation_config.valid_test_file_path, index=False, header=True
)
data_validation_artifact = DataValidationArtifact(
validation_status=standing,
valid_train_file_path=self.data_validation_config.valid_train_file_path,
valid_test_file_path=self.data_validation_config.valid_test_file_path,
invalid_train_file_path=None,
invalid_test_file_path=None,
drift_report_file_path=self.data_validation_config.drift_report_file_path,
)
return data_validation_artifact
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e,sys)
Beneath is the drift report generated by the Information Validation.
Air temperature [K]:
drift_status: true
p_value: 0.016622943467175914
Failure Sort:
drift_status: false
p_value: 1.0
Course of temperature [K]:
drift_status: false
p_value: 0.052940072765804994
Product ID:
drift_status: false
p_value: 0.09120557172716418
Rotational pace [rpm]:
drift_status: false
p_value: 0.2673520066245566
Goal:
drift_status: false
p_value: 0.999999998717466
Instrument put on [min]:
drift_status: false
p_value: 0.13090856779628832
Torque [Nm]:
drift_status: false
p_value: 0.5001773464540389
Sort:
drift_status: false
p_value: 1.0
UDI:
drift_status: true
p_value: 0.022542489133976953
Information Transformation
Right here, we are going to clear and rework the validated information by changing temperature options from Kelvin to Celsius, dropping pointless columns, and making use of a metamorphosis pipeline that makes use of ordinal encoding and Min-Max scaling, and dealing with information imbalance utilizing SMOTEENN.
The remodeled coaching and take a look at datasets are saved as .npy recordsdata together with a serialized preprocessing object i.e. Minmax scaler(preprocessing.pkl) all encapsulated as an artifact for additional mannequin coaching.
class DataTransformation:
def __init__(self,data_validation_artifact: DataValidationArtifact,
data_transformation_config: DataTransformationConfig):
attempt:
self.data_validation_artifact: DataValidationArtifact = data_validation_artifact
self.data_transformation_config: DataTransformationConfig = data_transformation_config
self._schema_config = read_yaml_file(file_path=SCHEMA_FILE_PATH)
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e,sys)
@staticmethod
def read_data(file_path) -> pd.DataFrame:
attempt:
return pd.read_csv(file_path)
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def get_data_transformer_object(self):
attempt:
logging.information("Obtained numerical cols from schema config")
scaler = MinMaxScaler()
# Fetching classes for OrdinalEncoder from schema config
ordinal_categories = self._schema_config.get('ordinal_categories', [])
ordinal_encoder = OrdinalEncoder(classes=ordinal_categories)
logging.information("Initialized MinMaxScaler, OrdinalEncoder with classes")
ordinal_columns = self._schema_config['ordinal_columns']
scaling_features = self._schema_config['scaling_features']
preprocessor = ColumnTransformer(
[
("Ordinal_Encoder", ordinal_encoder, ordinal_columns),
("MinMaxScaling", scaler, scaling_features)
]
)
logging.information("Created preprocessor object from ColumnTransformer")
return preprocessor
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def initiate_data_transformation(self) -> DataTransformationArtifact:
attempt:
logging.information("Beginning information transformation")
preprocessor = self.get_data_transformer_object()
train_df = DataTransformation.read_data(self.data_validation_artifact.valid_train_file_path)
test_df = DataTransformation.read_data(self.data_validation_artifact.valid_test_file_path)
input_feature_train_df = train_df.drop(columns=[TARGET_COLUMN], axis=1)
target_feature_train_df = train_df[TARGET_COLUMN]
input_feature_train_df['Air temperature [c]'] = input_feature_train_df['Air temperature [K]'] - 273.15
input_feature_train_df['Process temperature [c]'] = input_feature_train_df['Process temperature [K]'] - 273.15
drop_cols = self._schema_config['drop_columns']
input_feature_train_df = drop_columns(df=input_feature_train_df, cols = drop_cols)
logging.information("Accomplished dropping the columns for Coaching dataset")
input_feature_test_df = test_df.drop(columns=[TARGET_COLUMN], axis=1)
target_feature_test_df = test_df[TARGET_COLUMN]
input_feature_test_df['Air temperature [c]'] = input_feature_test_df['Air temperature [K]'] - 273.15
input_feature_test_df['Process temperature [c]'] = input_feature_test_df['Process temperature [K]'] - 273.15
drop_cols = self._schema_config['drop_columns']
input_feature_test_df = drop_columns(df=input_feature_test_df, cols = drop_cols)
logging.information("Accomplished dropping the columns for Testing dataset")
input_feature_train_arr = preprocessor.fit_transform(input_feature_train_df)
input_feature_test_arr = preprocessor.rework(input_feature_test_df)
smt = SMOTEENN(sampling_strategy="minority")
input_feature_train_final, target_feature_train_final = smt.fit_resample(
input_feature_train_arr, target_feature_train_df
)
logging.information("Utilized SMOTEENN on coaching dataset")
input_feature_test_final, target_feature_test_final = smt.fit_resample(
input_feature_test_arr, target_feature_test_df
)
train_arr = np.c_[
input_feature_train_final, np.array(target_feature_train_final)
]
test_arr = np.c_[
input_feature_test_final, np.array(target_feature_test_final)
]
save_numpy_array_data(self.data_transformation_config.transformed_train_file_path, array=train_arr, )
save_numpy_array_data(self.data_transformation_config.transformed_test_file_path,array=test_arr,)
save_object( self.data_transformation_config.transformed_object_file_path, preprocessor,)
save_object( "final_model/preprocessor.pkl", preprocessor,)
data_transformation_artifact=DataTransformationArtifact(
transformed_object_file_path=self.data_transformation_config.transformed_object_file_path,
transformed_train_file_path=self.data_transformation_config.transformed_train_file_path,
transformed_test_file_path=self.data_transformation_config.transformed_test_file_path
)
return data_transformation_artifact
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
Coaching and Analysis
Now we are going to prepare totally different classification fashions on the remodeled coaching information like Resolution Tree, Random Forest, Gradient Boosting, Logistic Regression, and AdaBoost fashions and consider their efficiency utilizing analysis metrics like f1-score, precision, and recall, after which log these particulars with MLflow.
After choosing the right mannequin, it saves each the mannequin and transformation object regionally to artifact and analysis metrics to make use of it for deployment.
class ModelTrainer:
def __init__(self, data_transformation_artifact: DataTransformationArtifact, model_trainer_config: ModelTrainerConfig):
attempt:
self.data_transformation_artifact = data_transformation_artifact
self.model_trainer_config = model_trainer_config
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def track_mlflow(self, best_model, classification_metric, input_example):
"""
Log mannequin and metrics to MLflow.
Args:
best_model: The educated mannequin object.
classification_metric: The classification metrics (f1, precision, recall).
input_example: An instance enter information pattern for the mannequin.
"""
attempt:
with mlflow.start_run():
f1_score=classification_metric.f1_score
precision_score=classification_metric.precision_score
recall_score=classification_metric.recall_score
mlflow.log_metric("f1_score",f1_score)
mlflow.log_metric("precision",precision_score)
mlflow.log_metric("recall_score",recall_score)
mlflow.sklearn.log_model(best_model,"mannequin", input_example=input_example)
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def train_model(self, X_train, y_train, X_test, y_test ):
"""
Practice a number of fashions and choose the best-performing one primarily based on analysis metrics.
Args:
X_train: Coaching options.
y_train: Coaching labels.
X_test: Testing options.
y_test: Testing labels.
Returns:
ModelTrainerArtifact: An artifact containing particulars concerning the educated mannequin and its metrics.
"""
fashions = {
"Random Forest": RandomForestClassifier(verbose=1),
"Resolution Tree": DecisionTreeClassifier(),
"Gradient Boosting": GradientBoostingClassifier(verbose=1),
"Logistic Regression": LogisticRegression(verbose=1),
"AdaBoost": AdaBoostClassifier(),
}
params={
"Resolution Tree": {
'criterion':['gini', 'entropy', 'log_loss'],
# 'splitter':['best','random'],
# 'max_features':['sqrt','log2'],
},
"Random Forest":{
# 'criterion':['gini', 'entropy', 'log_loss'],
# 'max_features':['sqrt','log2',None],
'n_estimators': [8,16,32,128,256]
},
"Gradient Boosting":{
# 'loss':['log_loss', 'exponential'],
'learning_rate':[.1,.01,.05,.001],
'subsample':[0.6,0.7,0.75,0.85,0.9],
# 'criterion':['squared_error', 'friedman_mse'],
# 'max_features':['auto','sqrt','log2'],
'n_estimators': [8,16,32,64,128,256]
},
"Logistic Regression":{},
"AdaBoost":{
'learning_rate':[.1,.01,.001],
'n_estimators': [8,16,32,64,128,256]
}
}
model_report: dict = evaluate_models(X_train=X_train, y_train=y_train, X_test=X_test, y_test=y_test,
fashions=fashions, param=params)
best_model_score = max(sorted(model_report.values()))
logging.information(f"Finest Mannequin Rating: {best_model_score}")
best_model_name = checklist(model_report.keys())[
list(model_report.values()).index(best_model_score)
]
logging.information(f"Finest Mannequin Title: {best_model_name}")
best_model = fashions[best_model_name]
y_train_pred = best_model.predict(X_train)
classification_train_metric = get_classification_score(y_true=y_train, y_pred=y_train_pred)
input_example = X_train[:1]
print(input_example)
# Observe the experiments with mlflow
self.track_mlflow(best_model, classification_train_metric, input_example)
y_test_pred=best_model.predict(X_test)
classification_test_metric = get_classification_score(y_true=y_test, y_pred=y_test_pred)
# Observe the experiments with mlflow
self.track_mlflow(best_model, classification_test_metric, input_example)
preprocessor = load_object(file_path=self.data_transformation_artifact.transformed_object_file_path)
model_dir_path = os.path.dirname(self.model_trainer_config.trained_model_file_path)
os.makedirs(model_dir_path,exist_ok=True)
Machine_Predictive_Model = MachinePredictiveModel(mannequin=best_model)
save_object(self.model_trainer_config.trained_model_file_path,obj=MachinePredictiveModel)
save_object("final_model/mannequin.pkl",best_model)
## Mannequin Coach Artifact
model_trainer_artifact=ModelTrainerArtifact(trained_model_file_path=self.model_trainer_config.trained_model_file_path,
train_metric_artifact=classification_train_metric,
test_metric_artifact=classification_test_metric
)
logging.information(f"Mannequin coach artifact: {model_trainer_artifact}")
return model_trainer_artifact
def initiate_model_trainer(self) -> ModelTrainerArtifact:
attempt:
train_file_path = self.data_transformation_artifact.transformed_train_file_path
test_file_path = self.data_transformation_artifact.transformed_test_file_path
#loading coaching array and testing array
train_arr = load_numpy_array_data(train_file_path)
test_arr = load_numpy_array_data(test_file_path)
X_train, y_train, X_test, y_test = (
train_arr[:, :-1],
train_arr[:, -1],
test_arr[:, :-1],
test_arr[:, -1],
)
model_trainer_artifact=self.train_model(X_train,y_train,X_test,y_test)
return model_trainer_artifact
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
Now, let’s create a training_pipeline.py file the place we sequentially combine all of the steps of knowledge ingestion, validation, transformation, and mannequin coaching into an entire pipeline.
def run_pipeline(self):
"""
Executes the whole coaching pipeline.
Returns:
ModelTrainerArtifact: Accommodates metadata concerning the educated mannequin.
"""
attempt:
data_ingestion_artifact= self.data_ingestion()
data_validation_artifact= self.data_validation(data_ingestion_artifact=data_ingestion_artifact)
data_transformation_artifact= self.data_transformation(data_validation_artifact=data_validation_artifact)
model_trainer_artifact= self.model_trainer(data_transformation_artifact=data_transformation_artifact)
self.sync_artifact_dir_to_s3()
self.sync_saved_model_dir_to_s3()
return model_trainer_artifact
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e,sys)
You possibly can visually see how we now have constructed the training_pipeline.

Now that we’ve accomplished creating the pipeline, run the training_pipeline.py file to view the artifacts generated within the earlier steps.
python training_pipeline.py
Run the next command to view the MLflow dashboard.
mlflow ui # Launch the MLflow dashboard to observe experiments.
As you possibly can see, we now have efficiently logged mannequin metrics like recall, precision, and F1-score in MLflow.

AWS Integration
After working the training_pipeline.py file, we generated artifacts and saved them regionally. Now, we are going to retailer these artifacts in an AWS S3 bucket to allow cloud-based storage and accessibility.
Utilizing a Docker picture, we are going to push it to AWS ECR through GitHub Actions after which deploy it to manufacturing utilizing AWS EC2. We’ll talk about this course of in additional element within the upcoming sections.

AWS S3
Observe these steps to create an AWS S3 bucket:
Step1: Obtain AWS CLI
- You possibly can click on right here to obtain AWS CLI for Home windows, Linux, and macOS
Step2: Log in to AWS IAM Consumer
- You possibly can sign up to the Console together with your IAM person credentials and choose the suitable AWS area (eg., us-east-1).
- As soon as logged in, seek for ‘IAM’ within the AWS search bar. Navigate to ‘Customers,’ choose your username, and go to the ‘Safety Credentials’ tab. Below ‘Entry Keys,’ click on ‘Create entry key,’ select ‘CLI,’ after which affirm by clicking ‘Create entry key’.
- Now, open your terminal and sort aws configure. Enter your AWS Entry Key and Secret Entry Key when prompted, then press Enter.
Your IAM person has been efficiently related to your challenge.
Step3: Navigate to S3 Service
- As soon as logged in, seek for “S3” within the AWS search bar.
- Click on on Amazon S3 service and choose the “Create bucket” button. Subsequent, kind in a novel bucket title (eg., machinepredictive)
- Evaluate the configuration settings, after which click on on “Create bucket.”
Your bucket is now created, and you can begin importing artifacts. Now, add the next code to your training_pipeline.py file and run it once more to see the artifacts in your AWS S3 bucket.
def sync_artifact_dir_to_s3(self):
"""
Syncs the artifact listing to S3.
"""
attempt:
aws_bucket_url = f"s3://{TRAINING_BUCKET_NAME}/artifact/{self.training_pipeline_config.timestamp}"
self.s3_sync.sync_folder_to_s3(folder = self.training_pipeline_config.artifact_dir,aws_bucket_url=aws_bucket_url)
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e,sys)
def sync_saved_model_dir_to_s3(self):
"""
Syncs the saved mannequin listing to S3.
"""
attempt:
aws_bucket_url = f"s3://{TRAINING_BUCKET_NAME}/final_model/{self.training_pipeline_config.timestamp}"
self.s3_sync.sync_folder_to_s3(folder = self.training_pipeline_config.model_dir,aws_bucket_url=aws_bucket_url)
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e,sys)

Amazon Elastic Container Registry (ECR)
Observe these steps to create an ECR repository.
- Seek for ‘ECR’ within the AWS search bar.
- Click on Create Repository, and choose Personal Repository.
- Present a repository title (e.g., machinepredictive) and click on Create.

Copy the URI out of your ECR repository, which ought to look one thing like “788614365622.dkr.ecr.ap-southeast-2.amazonaws.com“, and put it aside someplace. We’ll want it later to stick into GitHub Secrets and techniques.
Docker Integration for Deployment
Dockerizing our challenge ensures it runs easily in any setting with out dependency points. It’s a must have device for packaging and sharing functions effortlessly.
Protecting the Docker picture as small as attainable is vital for effectivity. We are able to reduce its measurement through the use of strategies like multi-staging and selecting a light-weight Python base picture.
FROM python:3.10-slim-buster
WORKDIR /app
COPY . /app
RUN apt replace -y && apt set up awscli -y
RUN apt-get replace && pip set up -r necessities.txt
CMD ["python3", "app.py"]
Setup Motion Secrets and techniques and Variables
To securely retailer delicate info like AWS credentials and repository URIs, we have to arrange GitHub Motion Secrets and techniques in our repository. Observe these steps:
Step1: Open Your GitHub Repository
Navigate to your repository on GitHub.
Step2: Go to Settings
On the high of your repository web page, find and click on on the “Settings” tab.
Step3: Entry Secrets and techniques and Variables
Within the left sidebar, scroll right down to Secrets and techniques and Variables → Choose Actions.
Step4: Create a New Secret
Click on the New repository secret button.
Step5: Add AWS Credentials
- Create a secret with the title “AWS_ACCESS_KEY_ID” and paste your AWS Entry Key.
- Create one other secret named AWS_SECRET_ACCESS_KEY and paste your AWS Secret Entry Key.
- create a secret named “AWS_REGION” and paste your chosen area (eg., us-east-1).
Step6: Add AWS ECR Repository URI
- Copy your ECR repository URI (e.g., 788614365622.dkr.ecr.ap-southeast-2.amazonaws.com).
- Create a brand new secret named “AWS_ECR_LOGIN_URI” and paste the copied URI.
- Create a brand new secret named “ECR_REPOSITORY_NAME” and paste the title of the ECR repository e.g., machinepredictive).

AWS EC2
Now let’s perceive create an occasion with AWS EC2. Observe these steps.
- Navigate to the EC2 service within the AWS Administration Console and click on Launch an occasion.
- Title your occasion (e.g., machinepredictive) and choose Ubuntu as your working system.
- Select the occasion kind as t2.massive.
- Choose your default key pair.
- Below community safety, select the default VPC and configure the safety teams as wanted.
- Lastly, click on Launch Occasion.
AWS EC2 CLI
After creating the occasion named machinepredictive, choose its Occasion ID and click on Join. Then, beneath EC2 Occasion Join, click on Join once more. This may open an AWS CLI interface the place you possibly can run the required instructions.
Now, enter these instructions within the CLI one after the other to arrange Docker in your EC2 occasion:
sudo apt-get replace -y
sudo apt-get improve
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh
sudo usermod -aG docker ubuntu
newgrp docker
Hold your CLI session open. Subsequent, navigate to your GitHub repository, click on on Settings, then go to the Actions part, and beneath Runners, click on New self-hosted runner. Select the Linux runner picture. Now, run the next instructions one after the other in your CLI to obtain and configure your self-hosted runner:
# Create a folder and navigate into it
mkdir actions-runner && cd actions-runner
# Obtain the most recent runner package deal
curl -o actions-runner-linux-x64-2.322.0.tar.gz -L https://github.com/actions/runner/releases/obtain/v2.322.0/actions-runner-linux-x64-2.322.0.tar.gz
# Non-obligatory: Validate the hash
echo "b13b784808359f31bc79b08a191f5f83757852957dd8fe3dbfcc38202ccf5768 actions-runner-linux-x64-2.322.0.tar.gz" | shasum -a 256 -c
# Extract the installer
tar xzf ./actions-runner-linux-x64-2.322.0.tar.gz
# Configure the runner
./config.sh --url https://github.com/karthikponna/Predictive_Maintenance_MLOps --token "Paste your token right here"
# Run the runner
./run.sh
This may obtain, configure, and begin your self-hosted GitHub Actions runner in your EC2 occasion.
After organising the self-hosted GitHub Actions runner, the CLI will immediate you to enter a reputation for the runner. Sort self-hosted and press Enter.
CI/CD with GitHub Actions
You possibly can try the .github/workflows/principal.yml code file.
Now, let’s dive into what every part of this principal.yml file does.
- Steady Integration Job: This job runs on an Ubuntu runner to take a look at the code, lint it, and execute unit assessments.
- Steady Supply Job: Triggered after CI, this job installs utilities, configures AWS credentials, logs into Amazon ECR, and builds, tags, and pushes your Docker picture to ECR.
- Steady Deployment Job: Working on a self-hosted runner, this job pulls the most recent Docker picture from ECR, runs it as a container to serve customers, and cleans up any earlier photographs or containers.

As soon as the Steady Deployment Job completes efficiently, you’ll see an output like this within the AWS CLI.

Open your AWS EC2 occasion by clicking its Occasion ID. Confirm that the occasion state is Working and find the Public IPv4 DNS. Click on on the “Open handle” choice to routinely launch your FastAPI software in your browser. Now, let’s dive into FastAPI.
FastAPI
You possibly can try the app.py file.
On this FastAPI software, I’ve created two principal routes, one for coaching the mannequin and one other for producing predictions. Let’s discover every route intimately.

Practice route
The /prepare endpoint begins the mannequin coaching course of utilizing a predefined dataset, making it straightforward to replace and enhance the mannequin with new information. It’s particularly helpful for retraining the mannequin to enhance accuracy or incorporate new information, making certain the mannequin stays up-to-date and performs optimally.

Predict route
The /predict endpoint accepts a CSV file through POST. This endpoint handles incoming information, leverages the educated mannequin to generate predictions, after which returns the outcomes formatted as JSON. This route is ideal for making use of the mannequin to new datasets, making it environment friendly for large-scale prediction duties.
Within the /predict route, we’ve included a pattern take a look at.csv file; you possibly can obtain it right here.

Conclusion
Collectively, we constructed a full production-ready Predictive Upkeep MLOps challenge—from gathering and preprocessing information to coaching, evaluating, and deploying our mannequin utilizing Docker, AWS, and FastAPI. This challenge reveals how MLOps can bridge the hole between growth and manufacturing, making it simpler to construct strong, scalable options.
Bear in mind, this information is all about studying and making use of these strategies to your information science tasks. Don’t hesitate to experiment, innovate, and discover new enhancements as you progress. Thanks for sticking with me till the top—continue to learn, hold doing, and continue to grow!
GitHub repo: https://github.com/karthikponna/Predictive_Maintenance_MLOps
Key Takeaways
- We constructed an end-to-end MLOps pipeline that covers information ingestion, mannequin coaching, and deployment.
- Docker, AWS, and FastAPI work collectively seamlessly to maneuver from growth to manufacturing.
- Dockerizing our ML challenge is vital—it ensures it runs easily in any setting with none dependency complications.
- Steady deployment ensures the mannequin stays environment friendly and up-to-date in real-world functions.
Often Requested Questions
A. Docker ensures our ML challenge runs easily in any setting by eliminating dependency points.
A. AWS providers like EC2, S3, and ECR allow seamless deployment, storage, and scaling of our software.
A. MLflow makes machine studying growth simpler by providing instruments for monitoring experiments, versioning fashions, and deploying them.
A. GitHub Actions automates the CI/CD course of—working assessments, constructing Docker photographs, and deploying updates—making certain a easy transition from growth to manufacturing.
The media proven on this article is just not owned by Analytics Vidhya and is used on the Creator’s discretion.
Login to proceed studying and luxuriate in expert-curated content material.