Skip to content

api

Public API for Zbrush

CommunicationWrapper

Source code in client/ayon_zbrush/api/communication_server.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class CommunicationWrapper:
    # TODO add logs and exceptions
    communicator = None

    log = logging.getLogger("CommunicationWrapper")

    @classmethod
    def create_qt_communicator(cls, *args, **kwargs):
        """Create communicator for Artist usage."""
        communicator = QtCommunicator(*args, **kwargs)
        cls.set_communicator(communicator)
        return communicator

    @classmethod
    def set_communicator(cls, communicator):
        if not cls.communicator:
            cls.communicator = communicator
        else:
            cls.log.warning("Communicator was set multiple times.")

    @classmethod
    def client(cls):
        if not cls.communicator:
            return None
        return cls.communicator.client()

    @classmethod
    def execute_zscript(cls, zscript):
        """Execute passed zscript in Zbrush."""
        if not cls.communicator:
            return
        return cls.communicator.execute_zscript(zscript)

create_qt_communicator(*args, **kwargs) classmethod

Create communicator for Artist usage.

Source code in client/ayon_zbrush/api/communication_server.py
45
46
47
48
49
50
@classmethod
def create_qt_communicator(cls, *args, **kwargs):
    """Create communicator for Artist usage."""
    communicator = QtCommunicator(*args, **kwargs)
    cls.set_communicator(communicator)
    return communicator

execute_zscript(zscript) classmethod

Execute passed zscript in Zbrush.

Source code in client/ayon_zbrush/api/communication_server.py
65
66
67
68
69
70
@classmethod
def execute_zscript(cls, zscript):
    """Execute passed zscript in Zbrush."""
    if not cls.communicator:
        return
    return cls.communicator.execute_zscript(zscript)

ZbrushHost

Bases: HostBase, IWorkfileHost, ILoadHost, IPublishHost

Source code in client/ayon_zbrush/api/pipeline.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
class ZbrushHost(HostBase, IWorkfileHost, ILoadHost, IPublishHost):
    name = "zbrush"

    def install(self):
        # Create workdir folder if does not exist yet
        workdir = os.getenv("AYON_WORKDIR")
        if not os.path.exists(workdir):
            os.makedirs(workdir)

        plugins_dir = os.path.join(ZBRUSH_HOST_DIR, "plugins")
        publish_dir = os.path.join(plugins_dir, "publish")
        load_dir = os.path.join(plugins_dir, "load")
        create_dir = os.path.join(plugins_dir, "create")

        pyblish.api.register_host("zbrush")
        pyblish.api.register_plugin_path(publish_dir)
        register_loader_plugin_path(load_dir)
        register_creator_plugin_path(create_dir)

        register_event_callback("application.launched", self.initial_app_launch)
        register_event_callback("application.exit", self.application_exit)

    def get_current_project_name(self):
        """
        Returns:
            Union[str, None]: Current project name.
        """

        return self.get_current_context().get("project_name")

    def get_current_folder_path(self):
        """
        Returns:
            Union[str, None]: Current folder path.
        """

        return self.get_current_context().get("folder_path")

    def get_current_task_name(self):
        """
        Returns:
            Union[str, None]: Current task name.
        """

        return self.get_current_context().get("task_name")

    def get_current_context(self):
        context = get_current_workfile_context()
        if not context:
            return get_global_context()
        if "project_name" in context:
            return context
        # This is legacy way how context was stored
        return {
            "project_name": context.get("project_name"),
            "folder_path": context.get("folder_path"),
            "task_name": context.get("task_name")
        }

    # --- Workfile ---
    def open_workfile(self, filepath):
        filepath = filepath.replace("\\", "/")
        execute_zscript(f"""
[IFreeze,
    [FileNameSetNext, "{filepath}"]
    [IKeyPress, 13, [IPress, File:Open:Open]]]
]
    """)
        set_current_file(filepath=filepath)
        return filepath

    def save_workfile(self, filepath=None):
        if not filepath:
            filepath = self.get_current_workfile()
        filepath = filepath.replace("\\", "/")
        # # move the json data to the files
        # # shutil.copy
        copy_ayon_data(filepath)
        set_current_file(filepath=filepath)
        execute_zscript(f"""
[IFreeze,
    [FileNameSetNext, "{filepath}"]
    [IKeyPress, 13, [IPress, File:SaveAs:SaveAs]]]
]
""")
        return filepath

    def get_current_workfile(self):
        work_dir = get_workdir()
        txt_dir = os.path.join(
            work_dir, ".zbrush_metadata").replace(
                "\\", "/"
        )
        with open (f"{txt_dir}/current_file.txt", "r") as current_file:
            content = str(current_file.read())
            filepath = content.rstrip('\x00')
            current_file.close()
            return filepath

    def workfile_has_unsaved_changes(self):
        # Pop-up dialog would be located to ask if users
        # save scene if it has unsaved changes
        return True

    def get_workfile_extensions(self):
        return [".zpr"]

    def list_instances(self):
        """Get all AYON instances."""
        # Figure out how to deal with this
        return get_instance_workfile_metadata()

    def write_instances(self, data):
        """Write all AYON instances"""
        return write_workfile_metadata(ZBRUSH_SECTION_NAME_INSTANCES, data)

    def get_containers(self):
        """Get the data of the containers

        Returns:
            list: the list which stores the data of the containers
        """
        return get_containers()

    def initial_app_launch(self):
        """Triggers on launch of the communication server for Zbrush.

        Usually this aligns roughly with the start of Zbrush.
        """
        #TODO: figure out how to deal with the last workfile issue
        set_current_file()
        context = get_global_context()
        save_current_workfile_context(context)

    def application_exit(self):
        """Logic related to TimerManager.

        Todo:
            This should be handled out of Zbrush integration logic.
        """
        remove_tmp_data()
        data = get_current_project_settings()
        stop_timer = data["zbrush"]["stop_timer_on_application_exit"]

        if not stop_timer:
            return

        # Stop application timer.
        webserver_url = os.environ.get("AYON_WEBSERVER_URL")
        rest_api_url = "{}/timers_manager/stop_timer".format(webserver_url)
        requests.post(rest_api_url)

    def update_context_data(self, data, changes):
        return write_workfile_metadata(ZBRUSH_METADATA_CREATE_CONTEXT, data)

    def get_context_data(self):
        get_load_workfile_metadata(ZBRUSH_METADATA_CREATE_CONTEXT)

application_exit()

Logic related to TimerManager.

Todo

This should be handled out of Zbrush integration logic.

Source code in client/ayon_zbrush/api/pipeline.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def application_exit(self):
    """Logic related to TimerManager.

    Todo:
        This should be handled out of Zbrush integration logic.
    """
    remove_tmp_data()
    data = get_current_project_settings()
    stop_timer = data["zbrush"]["stop_timer_on_application_exit"]

    if not stop_timer:
        return

    # Stop application timer.
    webserver_url = os.environ.get("AYON_WEBSERVER_URL")
    rest_api_url = "{}/timers_manager/stop_timer".format(webserver_url)
    requests.post(rest_api_url)

get_containers()

Get the data of the containers

Returns:

Name Type Description
list

the list which stores the data of the containers

Source code in client/ayon_zbrush/api/pipeline.py
150
151
152
153
154
155
156
def get_containers(self):
    """Get the data of the containers

    Returns:
        list: the list which stores the data of the containers
    """
    return get_containers()

get_current_folder_path()

Returns:

Type Description

Union[str, None]: Current folder path.

Source code in client/ayon_zbrush/api/pipeline.py
64
65
66
67
68
69
70
def get_current_folder_path(self):
    """
    Returns:
        Union[str, None]: Current folder path.
    """

    return self.get_current_context().get("folder_path")

get_current_project_name()

Returns:

Type Description

Union[str, None]: Current project name.

Source code in client/ayon_zbrush/api/pipeline.py
56
57
58
59
60
61
62
def get_current_project_name(self):
    """
    Returns:
        Union[str, None]: Current project name.
    """

    return self.get_current_context().get("project_name")

get_current_task_name()

Returns:

Type Description

Union[str, None]: Current task name.

Source code in client/ayon_zbrush/api/pipeline.py
72
73
74
75
76
77
78
def get_current_task_name(self):
    """
    Returns:
        Union[str, None]: Current task name.
    """

    return self.get_current_context().get("task_name")

initial_app_launch()

Triggers on launch of the communication server for Zbrush.

Usually this aligns roughly with the start of Zbrush.

Source code in client/ayon_zbrush/api/pipeline.py
158
159
160
161
162
163
164
165
166
def initial_app_launch(self):
    """Triggers on launch of the communication server for Zbrush.

    Usually this aligns roughly with the start of Zbrush.
    """
    #TODO: figure out how to deal with the last workfile issue
    set_current_file()
    context = get_global_context()
    save_current_workfile_context(context)

list_instances()

Get all AYON instances.

Source code in client/ayon_zbrush/api/pipeline.py
141
142
143
144
def list_instances(self):
    """Get all AYON instances."""
    # Figure out how to deal with this
    return get_instance_workfile_metadata()

write_instances(data)

Write all AYON instances

Source code in client/ayon_zbrush/api/pipeline.py
146
147
148
def write_instances(self, data):
    """Write all AYON instances"""
    return write_workfile_metadata(ZBRUSH_SECTION_NAME_INSTANCES, data)