diff --git a/example/fm/Dockerfile b/example/fm/Dockerfile new file mode 100644 index 0000000..2c81a61 --- /dev/null +++ b/example/fm/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.10 +WORKDIR /aigear/fm/ +COPY . /aigear/fm/ +COPY requirements.txt /aigear/fm/requirements.txt + +RUN python -m pip install --upgrade pip +RUN python -m pip install -r /aigear/fm/requirements.txt +RUN python -m pip install aigear-0.0.1-py3-none-any.whl + +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONBUFFERED=1 diff --git a/example/fm/aigear-0.0.1-py3-none-any.whl b/example/fm/aigear-0.0.1-py3-none-any.whl new file mode 100644 index 0000000..4040f36 Binary files /dev/null and b/example/fm/aigear-0.0.1-py3-none-any.whl differ diff --git a/example/fm/pipeline.py b/example/fm/pipeline.py index acf4b8b..910a018 100644 --- a/example/fm/pipeline.py +++ b/example/fm/pipeline.py @@ -369,7 +369,7 @@ def fm_pipeline(): if __name__ == '__main__': - # import os + import os # my_pipeline() fm_pipeline.run_in_executor() @@ -388,9 +388,10 @@ def fm_pipeline(): # # fm_pipeline.deploy( # volumes=volumes, - # skip_build_image=True, + # skip_build_image=False, # cpu_count=cpu_count_train, # mem_limit=mem_limit_train + # ) # ).to_service( # hostname=hostname, # ports=ports, diff --git a/example/fm/requirements.txt b/example/fm/requirements.txt index 28def66..3635f3f 100644 --- a/example/fm/requirements.txt +++ b/example/fm/requirements.txt @@ -1,3 +1,4 @@ -dask>=2024.7.0 -rankfm>=0.2.5 -scikit-learn>=1.5.0 +dask>=2022.10.1 +scikit-learn>=1.2.0 +git+https://github.com/ErraticO/rankfm.git +haversine >= 2.0.0 diff --git a/example/pipeline/Dockerfile b/example/pipeline/Dockerfile index d6a2642..ebff3e3 100644 --- a/example/pipeline/Dockerfile +++ b/example/pipeline/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.9 +FROM python:3.10 WORKDIR /pipeline COPY . . diff --git a/example/pipeline/iris_pipeline.py b/example/pipeline/iris_pipeline.py index ce3452c..99257e2 100644 --- a/example/pipeline/iris_pipeline.py +++ b/example/pipeline/iris_pipeline.py @@ -3,6 +3,7 @@ from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score from aigear.pipeline import workflow, task +from pathlib import Path import pickle import json import os @@ -44,7 +45,9 @@ def get_env_variables(): @task def save_model(model, model_path): - with open(model_path, "wb") as md: + path = Path(model_path) + path.parent.mkdir(parents=True, exist_ok=True) + with path.open('wb') as md: pickle.dump(model, md) @@ -82,11 +85,12 @@ def my_pipeline(): skip_build_image=True, cpu_count=cpu_count_train, mem_limit=mem_limit_train - ).to_service( - hostname=hostname, - ports=ports, - volumes=volumes, - tag=service_dir, - cpu_count=cpu_count_service, - mem_limit=mem_limit_service ) + # ).to_service( + # hostname=hostname, + # ports=ports, + # volumes=volumes, + # tag=service_dir, + # cpu_count=cpu_count_service, + # mem_limit=mem_limit_service + # ) diff --git a/pyproject.toml b/pyproject.toml index ea2f2f1..2b0073a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,8 @@ Issues = "https://github.com/retail-ai-inc/aigear/issues" common = [ "tabulate >= 0.9", "cloudpickle >= 2.0.0", + "astor >= 0.8.1", + "sqlalchemy >= 1.1.13" ] docker = [ "docker >= 6.13", @@ -51,6 +53,11 @@ msgrpc = [ "grpcio-health-checking >= 1.56.0", "sentry-sdk >= 1.29.2", ] +gcp = [ + "google-cloud-compute", + "google-cloud-pubsub", + "google-cloud-functions", +] [project.scripts] aigear-msgrpc = "aigear.microservices.grpc.service:main" diff --git a/src/aigear/common/logger.py b/src/aigear/common/logger.py index fdfc97a..d43dda0 100644 --- a/src/aigear/common/logger.py +++ b/src/aigear/common/logger.py @@ -19,7 +19,7 @@ def init_logger(): logger_instance = logging.getLogger(__name__) handler = logging.StreamHandler(sys.stdout) # formatter = JsonFormatter() - formatter = logging.Formatter('%(process)d - %(asctime)s - %(levelname)s - %(message)s') + formatter = logging.Formatter('aigear-%(process)d - %(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger_instance.addHandler(handler) logger_instance.setLevel(logging.INFO) diff --git a/src/aigear/common/sh.py b/src/aigear/common/sh.py new file mode 100644 index 0000000..2ef2c4f --- /dev/null +++ b/src/aigear/common/sh.py @@ -0,0 +1,19 @@ +import subprocess + + +def run_sh( + command: list, + inputs: str = None, +): + result = subprocess.run( + command, + input=inputs, + text=True, + capture_output=True, + shell=True, + ) + stderr = result.stderr + if stderr: + return stderr + else: + return result.stdout diff --git a/src/aigear/deploy/docker/builder.py b/src/aigear/deploy/docker/builder.py index 516c9bd..777ebf9 100644 --- a/src/aigear/deploy/docker/builder.py +++ b/src/aigear/deploy/docker/builder.py @@ -55,12 +55,12 @@ def push(self): @staticmethod def get_image_id(tag: str): - try: - with docker_client() as client: + with docker_client() as client: + try: image = client.images.get(tag) - return image.id - except ImageNotFound: - logger.info('Image not found.') + return image.id + except ImageNotFound: + logger.info(f'Image not found: {tag}.') def build_image( @@ -202,15 +202,21 @@ def default_dockerfile( package_source = " -i " + package_source lines.append(f"COPY requirements.txt {workdir}/requirements.txt") + lines.append( + "" + ) lines.append(f"RUN python -m pip install --upgrade pip{package_source}") lines.append( f"RUN python -m pip install -r {workdir}/requirements.txt{package_source}" ) lines.append( - f""" - ENV PYTHONDONTWRITEBYTECODE=1 \ - PYTHONBUFFERED=1 - """ + "" + ) + lines.append( + "ENV PYTHONDONTWRITEBYTECODE=1" + ) + lines.append( + "ENV PYTHONBUFFERED=1" ) with Path("Dockerfile").open("w") as f: diff --git a/src/aigear/deploy/docker/define_dockerfile.py b/src/aigear/deploy/docker/define_dockerfile.py index 6a35df5..f2a90cb 100644 --- a/src/aigear/deploy/docker/define_dockerfile.py +++ b/src/aigear/deploy/docker/define_dockerfile.py @@ -15,4 +15,4 @@ with open('Dockerfile', 'w') as f: f.write(dockerfile_content) -print("Dockerfile 创建完成。") +print("Dockerfile creation completed.") diff --git a/src/aigear/deploy/gcp/function.py b/src/aigear/deploy/gcp/function.py new file mode 100644 index 0000000..ea9f1b2 --- /dev/null +++ b/src/aigear/deploy/gcp/function.py @@ -0,0 +1,82 @@ +from pathlib import Path +from ...common.logger import logger +from ...common.sh import run_sh + + +class CloudFunction: + def __init__( + self, + function_name, + region, + entry_point, + topic_name, + ): + self.function_name = function_name + self.region = region + self.entry_point = entry_point + self.topic_name = topic_name + self.source_path = Path(__file__).resolve().parent / "function_test" + + def deploy(self): + command = [ + "gcloud", "functions", "deploy", + self.function_name, + "--gen2", + "--runtime=nodejs20", + f"--region={self.region}", + f"--entry-point={self.entry_point}", + f"--trigger-topic={self.topic_name}", + f"--source={self.source_path}", + ] + event = run_sh(command) + logger.info(event) + if "ERROR" in event: + logger.info("Error occurred while creating cloud function.") + + def logs(self, limit=5): + command = [ + "gcloud", "functions", "logs", "read", + "--gen2", + f"--region={self.region}", + f"--limit={limit}", + self.function_name, + ] + event = run_sh(command) + logger.info(event) + + def describe(self): + is_exist = False + command = [ + "gcloud", "functions", "describe", + self.function_name, + f"--region={self.region}", + ] + event = run_sh(command) + if "ACTIVE" in event: + is_exist = True + logger.info(f"Find resources: {event}") + elif "ERROR" in event and "not found" in event: + logger.info(f"NOT_FOUND: Resource not found: {event}") + else: + logger.info(event) + return is_exist + + def list(self): + command = [ + "gcloud", "functions", "list", + f"--regions={self.region}", + "--v2", + f"--filter={self.function_name}", + ] + event = run_sh(command) + logger.info(f"\n{event}") + + def delete(self): + command = [ + "gcloud", "functions", "delete", + self.function_name, + "--gen2", + f"--region={self.region}", + ] + event = run_sh(command, "yes\n") + logger.info(f"\n{event}") diff --git a/src/aigear/deploy/gcp/function/index.js b/src/aigear/deploy/gcp/function/index.js new file mode 100644 index 0000000..2bdec9f --- /dev/null +++ b/src/aigear/deploy/gcp/function/index.js @@ -0,0 +1,158 @@ + +/** + * Triggered from a message on a Cloud Pub/Sub topic. + * + * @param {!Object} event Event payload. + * @param {!Object} context Metadata for the event. + */ +const Buffer = require('safe-buffer').Buffer; +const Compute = require('@google-cloud/compute'); +const compute = new Compute(); +// Change this const value to your project +const projectId = 'ssc-ape-staging'; +const zone = 'asia-northeast1-a'; + +//Build environment and clean up +const commonCommand = 'cd /var\ngcloud auth configure-docker asia-northeast1-docker.pkg.dev --quiet\nsudo docker pull ${dockerImage}\nsudo docker run ${dockerImage}\ndocker_exit_code=$?\n[ ${docker_exit_code} -eq "0" ] && gcloud pubsub topics publish trial_rankfm_ape_pubsub --message \'createVMDate\' || gcloud pubsub topics publish trial_rankfm_ape_pubsub --message "Exit code: ${docker_exit_code}"\ngcp_zone=$(curl -H Metadata-Flavor:Google http://metadata.google.internal/computeMetadata/v1/instance/zone -s | cut -d/ -f4)\nsleep 300\nhostname_result=$(hostname)\nextracted_name=$(echo ${hostname_result} | cut -d. -f1)\ngcloud compute instances delete ${extracted_name} --zone ${gcp_zone}'; + + +const vmConfig = { + kind: 'compute#instance', + name: 'aigear-vm', + zone: `projects/${projectId}/zones/${zone}`, + machineType: `projects/${projectId}/zones/${zone}/machineTypes/`, + displayDevice: { + enableDisplay: false + }, + metadata: { + kind: 'compute#metadata', + items: [ + { + key: 'startup-script', + value: commonCommand + } + ] + }, + tags: { + items: [] + }, + disks: [ + { + kind: 'compute#attachedDisk', + type: 'PERSISTENT', + boot: true, + mode: 'READ_WRITE', + autoDelete: true, + deviceName: 'aigear-vm', + initializeParams: { + sourceImage: `projects/${projectId}/global/images/ml-model-training-cloud-function-image`, + diskType: `projects/${projectId}/zones/${zone}/diskTypes/pd-standard`, + diskSizeGb: '20' + }, + diskEncryptionKey: {} + } + ], + canIpForward: false, + networkInterfaces: [ + { + kind: 'compute#networkInterface', + subnetwork: `projects/${projectId}/regions/asia-northeast1/subnetworks/default`, + accessConfigs: [ + { + kind: 'compute#accessConfig', + name: 'External NAT', + type: 'ONE_TO_ONE_NAT', + networkTier: 'PREMIUM' + } + ], + aliasIpRanges: [] + } + ], + description: '', + labels: {}, + scheduling: { + preemptible: false, + onHostMaintenance: 'MIGRATE', + automaticRestart: true, + nodeAffinities: [] + }, + deletionProtection: false, + reservationAffinity: { + consumeReservationType: 'ANY_RESERVATION' + }, + serviceAccounts: [ + { + email: `200251827214-compute@developer.gserviceaccount.com`, + scopes: [ + 'https://www.googleapis.com/auth/cloud-platform' + ] + } + ], + shieldedInstanceConfig: { + enableSecureBoot: false, + enableVtpm: true, + enableIntegrityMonitoring: true + }, + confidentialInstanceConfig: { + enableConfidentialCompute: false + } +} +const functions = require('@google-cloud/functions-framework'); + +// Register a CloudEvent callback with the Functions Framework that will +// be executed when the Pub/Sub trigger topic receives a message. +functions.cloudEvent('cronjobProcessPubSub', cloudEvent => { + // The Pub/Sub message is passed as the CloudEvent's data payload. + const message = Buffer.from(cloudEvent.data.message.data, 'base64').toString(); + const cronjobInfo = JSON.parse(message); + if(cronjobInfo.length == 0) { + return; + } + console.log(`vmName is ${cronjobInfo[0].vmName}`); + console.log(`command is ${cronjobInfo[0].command}`); + console.log(`cronjobInfo.spec is ${cronjobInfo[0].spec}`); + + // set vm spec + const spec = cronjobInfo[0].spec ? cronjobInfo[0].spec : 'e2-medium'; + const machineTypeSpec = `projects/${projectId}/zones/${zone}/machineTypes/` + spec; + console.log(`machineTypeSpec is ${machineTypeSpec}`); + + vmConfig.machineType = machineTypeSpec; + + // VM and hard disk name + vmConfig.name = cronjobInfo[0].vmName; + vmConfig.disks[0].deviceName = cronjobInfo[0].vmName; + const diskSizeGb = cronjobInfo[0].diskSizeGb ? cronjobInfo[0].diskSizeGb : '20'; + vmConfig.disks[0].initializeParams.diskSizeGb = diskSizeGb; + vmConfig.scheduling.onHostMaintenance = cronjobInfo[0].onHostMaintenance; + const vmName = cronjobInfo[0].vmName + Date.now(); + const dockerImage = cronjobInfo[0].dockerImage; + console.log(JSON.stringify(cronjobInfo)); + cronjobInfo.shift() + + vmConfig.metadata.items[0].value = commonCommand.replace(/\${dockerImage}/g,dockerImage).replace('createVMDate',JSON.stringify(cronjobInfo)); + + + try { + compute.zone(zone) + .createVM(vmName, vmConfig) + .then(data => { + // Operation pending. + const vm = data[0]; + const operation = data[1]; + console.log(`VM being created: ${vm.id}`); + console.log(`Operation info: ${operation.id}`); + return operation.promise(); + }) + .then(() => { + const message = 'VM created with success, Cloud Function finished execution.'; + console.log(message); + }) + .catch(err => { + console.log(err); + }); + + } catch (err) { + console.log(err); + } +}); diff --git a/src/aigear/deploy/gcp/function/package.json b/src/aigear/deploy/gcp/function/package.json new file mode 100644 index 0000000..5d9464d --- /dev/null +++ b/src/aigear/deploy/gcp/function/package.json @@ -0,0 +1,9 @@ +{ + "dependencies": { + "@google-cloud/functions-framework": "^3.0.0", + "@google-cloud/compute": "2.4.1", + "@google-cloud/pubsub": "^0.18.0", + "ssh2": "^0.6.0", + "safe-buffer": "^5.1.2" + } +} \ No newline at end of file diff --git a/src/aigear/deploy/gcp/iam.py b/src/aigear/deploy/gcp/iam.py new file mode 100644 index 0000000..0e1b900 --- /dev/null +++ b/src/aigear/deploy/gcp/iam.py @@ -0,0 +1,21 @@ +from ...common.logger import logger +from ...common.sh import run_sh + + +def check_iam(project_id: str): + is_owner = False + command = [ + "gcloud", "projects", "get-iam-policy", + project_id, + "--flatten=bindings[].members", + "--format=table(bindings.role)", + "--filter=bindings.members:$(gcloud config get-value account)", + ] + event = run_sh(command) + if "roles/owner" in event: + is_owner = True + elif event == "": + logger.info("The currently logged in GCP account does not have owner privileges.") + else: + logger.info(event) + return is_owner diff --git a/src/aigear/deploy/gcp/images.py b/src/aigear/deploy/gcp/images.py new file mode 100644 index 0000000..62fd14c --- /dev/null +++ b/src/aigear/deploy/gcp/images.py @@ -0,0 +1,130 @@ +from ..docker.client import docker_client, ImageNotFound +from ...common.logger import logger +from ...common.sh import run_sh + + +class ArtifactRegistry: + def __init__(self): + self.rep_exists = True + + def create( + self, + repository: str, + location: str, + description: str, + ): + command = [ + "gcloud", "artifacts", "repositories", "create", repository, + "--repository-format=docker", + f"--location={location}", + f"--description={description}" + ] + self._run_sh( + command=command, + ) + + def describe( + self, + repository: str, + location: str, + ): + command = [ + "gcloud", "artifacts", "repositories", "describe", repository, + f"--location={location}", + ] + self._run_sh( + command=command, + ) + + def docker_auth( + self, + location, + ): + self._run_sh( + command=["gcloud", "auth", "configure-docker", f"{location}-docker.pkg.dev"], + inputs="yes\n", + ) + + def _run_sh( + self, + command: list, + inputs: str = None, + ): + event = run_sh(command, inputs) + if "ALREADY_EXISTS" in event: + logger.info("The repository already exists.") + elif "NOT_FOUND" in event: + self.rep_exists = False + logger.error("The repository not found.") + elif "registered correctly" in event: + logger.info("gcloud credential helpers already registered correctly.") + elif "Registry URL" in event: + logger.info("The repository already exists.") + else: + logger.info(event) + + +class ToGCPImage(ArtifactRegistry): + def __init__( + self, + project_id: str, + location: str, + repository: str, + description: str = "", + ): + super().__init__() + self.project_id = project_id + self.location = location + self.repository = repository + self.description = description + self.gcp_image = None + self.docker_auth(location) + self.describe( + repository=repository, + location=location, + ) + + def tag( + self, + source_image, + gcp_image, + tag=None, + ): + with docker_client() as client: + try: + local_image = client.images.get(source_image) + except ImageNotFound: + logger.info(f'Image not found: {source_image}.') + + if tag is None: + self.gcp_image = f"{self.location}-docker.pkg.dev/{self.project_id}/{self.repository}/{gcp_image}" + else: + self.gcp_image = f"{self.location}-docker.pkg.dev/{self.project_id}/{self.repository}/{gcp_image}:{tag}" + + local_image.tag(self.gcp_image) + return self + + def push(self): + if self.gcp_image is None: + logger.info("The local image is not tagged in artifact registry format.") + return None + if not self.rep_exists: + self.create( + self.repository, + self.location, + self.description, + ) + + with docker_client() as client: + for event in client.images.push(self.gcp_image, stream=True, decode=True): + status = event.get("status") + if "error" in event: + logger.info(event["error"]) + elif "aux" in event: + logger.info(event["aux"]) + elif status in ["Preparing", "Waiting", "Pushing"]: + continue + elif status == "Pushed": + logger.info(event) + else: + logger.info(event) diff --git a/src/aigear/deploy/gcp/infra.py b/src/aigear/deploy/gcp/infra.py new file mode 100644 index 0000000..e9c055a --- /dev/null +++ b/src/aigear/deploy/gcp/infra.py @@ -0,0 +1,59 @@ +from .function import CloudFunction +from .pub_sub import Publish, Subscriptions +from ...common.logger import logger +from .iam import check_iam +from .project import get_project_id + + +class Infra: + def __init__( + self, + topic_name, + function_name, + region, + ): + sub_name = f"{topic_name}_sub" + _entry_point = "helloPubSub" + + self.pub = Publish(topic_name) + self.sub = Subscriptions(sub_name, topic_name) + self.cf = CloudFunction( + function_name, + region, + _entry_point, + topic_name, + ) + + def create(self): + pub_exist, sub_exist, cf_exist = self._check_infra(self.pub, self.sub, self.cf) + if not pub_exist: + self.pub.create() + + if not sub_exist: + self.sub.create() + + if not cf_exist: + self.cf.deploy() + + def clear(self): + project_id = get_project_id() + owner_pm = check_iam(project_id) + if owner_pm: + self.pub.delete() + self.sub.delete() + self.cf.delete() + else: + logger.info("You are not the owner and cannot delete aigear infrastructure.") + + @staticmethod + def _check_infra(pub, sub, cf): + pub_exist = pub.describe() + if not pub_exist: + logger.info("Pub(pubsub) not created.") + sub_exist = sub.describe() + if not sub_exist: + logger.info("Sub(pubsub) not created.") + cf_exist = cf.describe() + if not cf_exist: + logger.info("Cloud function not created.") + return pub_exist, sub_exist, cf_exist diff --git a/src/aigear/deploy/gcp/project.py b/src/aigear/deploy/gcp/project.py new file mode 100644 index 0000000..ca12ad0 --- /dev/null +++ b/src/aigear/deploy/gcp/project.py @@ -0,0 +1,32 @@ +from ...common.logger import logger +from ...common.sh import run_sh + + +def get_project_id(): + project_id = None + command = [ + "gcloud", "config", "get-value", "project" + ] + event = run_sh(command) + if "unset" in event: + logger.info("No project id set.") + elif "ERROR" in event: + logger.info(event) + else: + project_id = event.strip() + return project_id + + +def get_region(): + region = None + command = [ + "gcloud", "config", "get-value", "compute/region" + ] + event = run_sh(command) + if "unset" in event: + logger.info("No project id set.") + elif "ERROR" in event: + logger.info(event) + else: + region = event.strip() + return region diff --git a/src/aigear/deploy/gcp/pub_sub.py b/src/aigear/deploy/gcp/pub_sub.py new file mode 100644 index 0000000..c82d303 --- /dev/null +++ b/src/aigear/deploy/gcp/pub_sub.py @@ -0,0 +1,104 @@ +from ...common.logger import logger +from ...common.sh import run_sh + + +class Publish: + def __init__(self, topic_name: str): + self.topic_name = topic_name + + def create(self): + command = [ + "gcloud", "pubsub", "topics", "create", self.topic_name + ] + event = run_sh(command) + logger.info(event) + + def describe(self): + is_exist = False + command = [ + "gcloud", "pubsub", "topics", "describe", self.topic_name + ] + event = run_sh(command) + if "name: projects" in event: + is_exist = True + logger.info(f"Find resources: {event}") + elif "NOT_FOUND" in event: + logger.info(f"NOT_FOUND: Resource not found (resource={self.topic_name})") + else: + logger.info(event) + return is_exist + + def delete(self): + command = [ + "gcloud", "pubsub", "topics", "delete", self.topic_name + ] + event = run_sh(command) + logger.info(event) + + def list(self): + command = [ + "gcloud", "pubsub", "topics", "list", f"--filter=name.scope(topic):{self.topic_name}" + ] + event = run_sh(command) + logger.info(event) + + def publish(self, message): + command = [ + "gcloud", "pubsub", "topics", "publish", self.topic_name, + f"--message={message}", + ] + event = run_sh(command) + logger.info(event) + + +class Subscriptions: + def __init__(self, sub_name: str, topic_name: str): + self.sub_name = sub_name + self.topic_name = topic_name + + def create(self): + command = [ + "gcloud", "pubsub", "subscriptions", "create", self.sub_name, + f"--topic={self.topic_name}", + ] + event = run_sh(command) + logger.info(event) + + def describe(self): + is_exist = False + command = [ + "gcloud", "pubsub", "subscriptions", "describe", self.sub_name + ] + event = run_sh(command) + if "name: projects" in event: + is_exist = True + logger.info(f"Find resources: {event}") + elif "NOT_FOUND" in event: + logger.info(f"NOT_FOUND: Resource not found (resource={self.sub_name})") + else: + logger.info(event) + return is_exist + + def delete(self): + command = [ + "gcloud", "pubsub", "subscriptions", "delete", self.sub_name + ] + event = run_sh(command) + logger.info(event) + + @staticmethod + def list(): + command = [ + "gcloud", "pubsub", "subscriptions", "list" + ] + event = run_sh(command) + logger.info(event) + + def pull(self): + command = [ + "gcloud", "pubsub", "subscriptions", "pull", self.sub_name, + "--format=json(ackId,message.attributes,message.data.decode(\"base64\").decode(\"utf-8\")," + "message.messageId,message.publishTime)" + ] + event = run_sh(command) + logger.info(event) diff --git a/src/aigear/deploy/gcp/scheduler.py b/src/aigear/deploy/gcp/scheduler.py new file mode 100644 index 0000000..f51c675 --- /dev/null +++ b/src/aigear/deploy/gcp/scheduler.py @@ -0,0 +1,124 @@ +import json +from ...common.logger import logger +from ...common.sh import run_sh + + +class Scheduler: + def __init__( + self, + name: str, + location: str, + schedule: str, + topic_name: str, + message: any, + time_zone: str = "Etc/UTC", + ): + self.name = name + self.location = location + self.schedule = schedule + self.topic_name = topic_name + self.message = message + self.time_zone = time_zone + + def create(self): + is_exist = self.describe() + if not is_exist: + message_body = json.dumps(self.message) + command = [ + "gcloud", "scheduler", "jobs", "create", "pubsub", + self.name, + "--location", self.location, + "--schedule", self.schedule, + "--topic", self.topic_name, + "--message-body", message_body, + "--time-zone", self.time_zone, + ] + event = run_sh(command) + logger.info(event) + if "ERROR" in event: + logger.info("Error occurred while creating cloud function.") + else: + logger.info(f"the cloud scheduler(self.name) already exists.") + + def delete(self): + command = [ + "gcloud", "scheduler", "jobs", "delete", + self.name, + "--location", self.location, + ] + event = run_sh(command, "yes\n") + logger.info(event) + + def describe(self): + is_exist = False + command = [ + "gcloud", "scheduler", "jobs", "describe", + self.name, + "--location", self.location, + ] + event = run_sh(command) + logger.info(event) + if "ENABLED" in event: + is_exist = True + return is_exist + + def list(self): + command = [ + "gcloud", "scheduler", "jobs", "list", + "--location", self.location, + f"--filter={self.name}", + ] + event = run_sh(command) + logger.info(f"\n{event}") + + def run(self): + command = [ + "gcloud", "scheduler", "jobs", "run", + self.name, + "--location", self.location, + ] + event = run_sh(command) + if event: + logger.info(event) + else: + logger.info("Running successfully, executing job.") + + def pause(self): + command = [ + "gcloud", "scheduler", "jobs", "pause", + self.name, + "--location", self.location, + ] + event = run_sh(command) + logger.info(event) + + def resume(self): + command = [ + "gcloud", "scheduler", "jobs", "resume", + self.name, + "--location", self.location, + ] + event = run_sh(command) + logger.info(event) + + @staticmethod + def update( + name, + location, + schedule, + topic_name, + message, + ): + message_body = json.dumps(message) + command = [ + "gcloud", "scheduler", "jobs", "update", "pubsub", + name, + "--location", location, + "--schedule", schedule, + "--topic", topic_name, + "--message-body", message_body, + ] + event = run_sh(command) + logger.info(event) + if "ERROR" in event: + logger.info("Error occurred while creating cloud function.") diff --git a/src/aigear/deploy/gcp/storage.py b/src/aigear/deploy/gcp/storage.py new file mode 100644 index 0000000..a2abe30 --- /dev/null +++ b/src/aigear/deploy/gcp/storage.py @@ -0,0 +1,95 @@ +from ...common.logger import logger +from ...common.sh import run_sh + + +class Bucket: + def __init__( + self, + bucket_name: str, + project_id: str, + location: str, + ): + self.bucket = f"gs://{bucket_name}-{project_id}" + self.location = location + + def create(self): + command = [ + "gcloud", "storage", "buckets", "create", + self.bucket, + f"--location={self.location}", + "--uniform-bucket-level-access", + ] + event = run_sh(command) + logger.info(event) + + def describe(self): + is_exist = False + command = [ + "gcloud", "storage", "buckets", "describe", + self.bucket, + ] + event = run_sh(command) + logger.info(event) + if self.bucket in event and "ERROR" not in event: + is_exist = True + return is_exist + + def list(self): + command = [ + "gcloud", "storage", "buckets", "list", + self.bucket, + ] + event = run_sh(command) + logger.info(f"\n{event}") + + def delete(self): + command = [ + "gcloud", "storage", "rm", "-r", + self.bucket, + ] + event = run_sh(command) + logger.info(event) + + +class ManagedFolders: + def __init__(self, bucket_name, project_id): + self.bucket = f"gs://{bucket_name}-{project_id}" + + def create(self, folder_name): + folder = f"{self.bucket}/{folder_name}" + command = [ + "gcloud", "storage", "managed-folders", "create", + folder, + ] + event = run_sh(command) + logger.info(event) + + def describe(self, folder_name): + is_exist = False + folder = f"{self.bucket}/{folder_name}" + command = [ + "gcloud", "storage", "managed-folders", "describe", + folder, + ] + event = run_sh(command) + logger.info(event) + if folder in event and "ERROR" not in event: + is_exist = True + return is_exist + + def list(self): + command = [ + "gcloud", "storage", "managed-folders", "list", + self.bucket, + ] + event = run_sh(command) + logger.info(f"\n{event}") + + def delete(self, folder_name): + folder = f"{self.bucket}/{folder_name}" + command = [ + "gcloud", "storage", "managed-folders", "delete", + folder, + ] + event = run_sh(command) + logger.info(event) diff --git a/src/aigear/manage/local/db_models.py b/src/aigear/manage/local/db_models.py index 63835df..8c06058 100644 --- a/src/aigear/manage/local/db_models.py +++ b/src/aigear/manage/local/db_models.py @@ -1,10 +1,11 @@ -from sqlalchemy import Column, Integer, String, DateTime, JSON +from sqlalchemy import Column, Integer, String, DateTime, JSON, ForeignKey +from sqlalchemy.orm import relationship from datetime import datetime from .init_db import Base class ModelMeta(Base): - __tablename__ = "model_meta" + __tablename__ = "models" id = Column(Integer, primary_key=True, autoincrement=True) author = Column(String) @@ -16,9 +17,20 @@ class ModelMeta(Base): created_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) + # pipeline_id = Column(Integer, ForeignKey('pipelines.id')) + # pipelines = relationship("Pipeline", back_populates="models") + +# # The relationship between the pipeline and the model +# class PipelineModels: +# __tablename__ = "pipeline_models" + +# id = Column(Integer, primary_key=True, autoincrement=True) +# pipeline_id = Column(Integer) +# model_id = Column(Integer) + class PipelineMeta(Base): - __tablename__ = "pipeline_meta" + __tablename__ = "pipelines" id = Column(Integer, primary_key=True, autoincrement=True) author = Column(String) @@ -28,3 +40,5 @@ class PipelineMeta(Base): tags = Column(JSON) created_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) + + # models = relationship("Models", back_populates="model") diff --git a/src/aigear/microservices/grpc/service.py b/src/aigear/microservices/grpc/service.py index 796de45..784a990 100644 --- a/src/aigear/microservices/grpc/service.py +++ b/src/aigear/microservices/grpc/service.py @@ -84,7 +84,7 @@ def get_env_variables(tag: str): def check_env_variables(env_variables: dict): run_or_not = True for env_variable_name in env_variables: - if env_variables[env_variable_name] == None: + if env_variables[env_variable_name] is None: logger.error( f"{env_variable_name} not found in the env variables!") run_or_not = False diff --git a/src/aigear/pipeline/pipeline.py b/src/aigear/pipeline/pipeline.py index f0a4a5d..e9ab2d9 100644 --- a/src/aigear/pipeline/pipeline.py +++ b/src/aigear/pipeline/pipeline.py @@ -13,7 +13,6 @@ state, ) from ..common.callable import get_call_parameters -from ..common.hashing import file_hash, stable_hash from .executor import TaskRunner from ..deploy.docker.builder import ImageBuilder from ..deploy.docker.container import run_or_restart_container @@ -61,18 +60,23 @@ def deploy( image_id = image_builder.get_image_id(tag=self.name) else: image_id = image_builder.build(tag=self.name) - flow_path = flow_path_in_workdir(self._flow_file) - command = f"aigear-workflow --script_path {flow_path} --function_name {self.fn.__name__}" - run_or_restart_container( - container_name=self.name, - image_id=image_id, - command=command, - volumes=volumes, - ports=ports, - hostname=hostname, - is_stream_logs=is_stream_logs, - **kwargs - ) + + if image_id: + flow_path = flow_path_in_workdir(self._flow_file) + command = f"aigear-workflow --script_path {flow_path} --function_name {self.fn.__name__}" + # TODO: Next optimization, the container name should include the version(self.version) + run_or_restart_container( + container_name=self.name, + image_id=image_id, + command=command, + volumes=volumes, + ports=ports, + hostname=hostname, + is_stream_logs=is_stream_logs, + **kwargs + ) + else: + logger.info('You skipped building the image, so image not found.') return self def to_service(