Connect to ZettaBlock's Data Lake via Python

Connect to ZettaBlock’s Data Lake from the comfort of your Jupyter Notebooks (in Python).

Steps

  1. Paste the below code into one cell of your jupyter notebook. If any of the libraries being imported in this code are not available, please install those libraries using “pip install libraryname”.
import requests
import json
import pandas as pd
import logging
import time

user = <insert username> #example “myuser”
password = <insert password>
api_key = <insert your api key> 

def get_catalog():
        url = "https://api.zettablock.com/api/v1/databases"

        headers = {
            "accept": "application/json",
            "X-API-KEY": api_key
        }

        response = requests.get(url, headers=headers)
        res = response.json()
        return res["items"][0]["id"]
    
class ZettablockClient:
    
    sql_api_url = "https://api.zettablock.com/qugate/v1/databases/{catalog}/queries"
    query_queue_url = "https://api.zettablock.com/qugate/v1/queries/{query_id}/trigger"
    query_runs_url = "https://api.zettablock.com/qugate/v1/queryruns/{query_run_id}/status"
    query_run_stream_url = "https://api.zettablock.com/qugate/v1/stream/queryruns/{query_run_id}/result"

    
    @classmethod
    def sql_to_df(cls, query: str) -> pd.DataFrame:

        catalog = get_catalog()

        
        params = {'includeColumnName': 'true'}
        headers = {'Content-Type': 'application/x-www-form-urlencoded'}
        data = {'query': query, 'resultCacheExpireMillis': 86400000}
        
        queue_response = (requests.post(cls.sql_api_url.format(catalog=catalog), data=json.dumps(data),
                                 params=params, headers=headers, auth=(user, password))).json()
        query_id = queue_response.get('id')
        trigger_response = (requests.post(cls.query_queue_url.format(query_id=query_id), data=json.dumps({}), headers=headers, auth=(user, password))).json()
        
        if trigger_response.get('message'):
            if trigger_response.get('message')[0:22] == "Failed to submit query":
                warning_message = queue_response.get('analyzeResult').get('warnings')
                trigger_error = trigger_response.get('message')
                logging.warning(warning_message)
                logging.error(trigger_error)
                raise SyntaxError("Failed to submit query. Check SQL syntax.")
        
        
        query_run_id = trigger_response.get("queryrunId")
        logging.info(f"Executed query: {query_run_id}")
        state_url = cls.query_runs_url.format(query_run_id=query_run_id)
        logging.info(f"Track status here: {state_url}")
        
        def get_state():
            return ((requests.get(state_url, auth=(user,password))).json()).get('state')
        
        state = get_state()
        state_timer = 1
        while state != "SUCCEEDED":
            if state == 'FAILED':
                response = requests.get(cls.query_run_stream_url.format(query_run_id=query_run_id), headers=headers, params=params, auth=(user, password))
                print(f"Request failed with response: {response.text}")
                raise ConnectionError("The response from the database was invalid.")
            
            if state_timer > 3000:
                raise TimeoutError("Query took more than 5 minutes.")
            elif state_timer % 100 == 0:
                print(state)
                response = requests.get(cls.query_run_stream_url.format(query_run_id=query_run_id), headers=headers, params=params, auth=(user, password))
                print(f"Request failed with response: {response.text}")
                print(f"Elapsed query execution time: {int(state_timer/10)}s")
            state = get_state()
            time.sleep(0.1)
            state_timer += 1
            
        logging.info(f"Query run: {query_run_id} has succeeded! Link to status: {state_url}")
        response = requests.get(cls.query_run_stream_url.format(query_run_id=query_run_id), headers=headers, params=params, auth=(user, password))
       
        try:
            if response.status_code == 200:
                response.encoding = "utf-8"
                lines = [line.split(',') for line in response.iter_lines(decode_unicode=True)]
                return lines
            else:
                logging.error(f"Request failed with status: {response.status_code}")
                logging.error(f"Request failed with response: {response.text}")
                print(f"Request failed with response: {response.text}")
                raise ConnectionError("The response from the database was invalid.")
        except TypeError as e:
            logging.error(e)


zb = ZettablockClient()
  1. Add the values of below parameters in the script and run the cell.
    • Username: Your ZettaBlock login username
    • Password: Your ZettaBlock login password
    • API Key: You can get the API key by logging to Zettablock's portal -> usage. Any API key in your account will work fine.
  2. From the next cell onwards, use the ZettaBlock client created in the script to query the data like below:
df = pd.DataFrame( zb.sql_to_df("select max(data_creation_date) from ethereum_mainnet.blocks")