Load Local Data
Stream Load aims to load local files into Palo.
Stream Load, unlike other command submission methods, connects and interacts with Palo through HTTP protocol.
HOST: PORT involved in the method should be HTTP protocol port.
- Public cloud users must use the HTTP protocol port of Compute Node (BE), which is 8040 by default.
- Private deployment users can use the HTTP protocol port of Leader Node (FE), which is 8030 by default. However, it is necessary to ensure that the machine network where the client is located can connect the machine where the Compute Node is located.
We use curl command as an example to demonstrate how to Load data in this document.
We give a code example of importing data using Java at the end of the document.
Load data
Stream Load request body is as follows:
PUT /api/{db}/{table}/_stream_load
-
Create a table
Create a table to store the data to be imported through
CREATE TABLE
command. Refer to command manual CREATE TABLE for specific Load methods. Examples are as follows:CREATE TABLE IF NOT EXISTS load_test ( id INT, name VARCHAR(128) ) DISTRIBUTED BY HASH(id) BUCKETS 8;
-
Load data
Execute the following curl command to Load local file:
curl -u user:passwd -H "label:example_label_1" -T /path/to/local/your_file.txt http://host:port/api/example_db/load_test/_stream_load
- user: passwd is the user created in Palo. The initial user is admin, and the password is set when creating Palo cluster.
- host: port is the HTTP protocol port of Compute Node, which is 8040 by default, the user can view it on details page of the Intelligent Cloud Palo cluster .
- label: can be specified in Header to uniquely identify the Load task
Refer to Stream Load command file for more advanced operation about Stream Load command.
-
Waiting for Load results
Stream Load command is a synchronization command, the successful return results means successful import. If the Load data is large, it may take a long time to wait. Examples are as follows:
{ "TxnId": 1003, "Label": "example_label_1", "Status": "Success", "Message": "OK", "NumberTotalRows": 1000000, "NumberLoadedRows": 1000000, "NumberFilteredRows": 1, "NumberUnselectedRows": 0, "LoadBytes": 40888898, "LoadTimeMs": 2144, "BeginTxnTimeMs": 1, "StreamLoadPutTimeMs": 2, "ReadDataTimeMs": 325, "WriteDataTimeMs": 1933, "CommitAndPublishTimeMs": 106, "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005" }
- If the status of
Status
field isSuccess
, it indicates that the load is successful. - Refer to Stream Load command file for detailed instructions of other fields.
- If the status of
Suggestions for use
- Stream Load can only load local files.
- It is suggested that the data volume of an Load request should be controlled within 1 GB. If there are a large number of local files, they can be submitted concurrently in batches.
Java code examples
Here is a simple JAVA example to execute Stream Load:
package demo.palo;
load com.google.gson.Gson;
load com.google.gson.reflect.TypeToken;
load java.io.BufferedInputStream;
load java.io.BufferedOutputStream;
load java.io.BufferedReader;
load java.io.File;
load java.io.FileInputStream;
load java.io.InputStream;
load java.io.InputStreamReader;
load java.lang.reflect.Type;
load java.net.HttpURLConnection;
load java.net.URL;
load java.nio.charset.StandardCharsets;
load java.util.Base64;
public class PaloStreamLoadDemo {
private final static String HOST = "127.0.0.1"; // Compute Node host
private final static int PORT = 8040; // Compute Node HTTP port
private static final String STREAM_LOAD_URL_PATTERN = "http://%s:%d/api/%s/%s/_stream_load";
private final static String DB = "example_db";
private final static String TABLE = "example_tbl";
private final static String USER = "user";
private final static String PASSWD = "passwd";
// local file to be loaded
private final static String LOAD_FILE = "./data.txt";
public static void main(String[] args) throws Exception {
streamLoad();
}
private static void streamLoad() throws Exception {
String loadUrlStr = String.format(STREAM_LOAD_URL_PATTERN, HOST, PORT, DB, TABLE);
URL loadUrl = new URL(loadUrlStr);
HttpURLConnection conn = (HttpURLConnection) loadUrl.openConnection();
conn.setRequestMethod("PUT");
String auth = String.format("%s:%s", USER, PASSWD);
String authEncoding = Base64.getEncoder().encodeToString(auth.getBytes(StandardCharsets.UTF_8));
conn.setRequestProperty("Authorization", "Basic " + authEncoding);
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
// set header.
// your add add any other headers here.
conn.addRequestProperty("column_separator", ",");
conn.addRequestProperty("label", "example_label");
conn.setDoOutput(true);
conn.setDoInput(true);
// read and send file content
File loadFile = new File(LOAD_FILE);
try (BufferedOutputStream bos = new BufferedOutputStream(conn.getOutputStream());
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(loadFile));) {
int i;
while ((i = bis.read()) > 0) {
bos.write(i);
}
}
// get response
int status = conn.getResponseCode();
String respMsg = conn.getResponseMessage();
System.out.println("get status: " + status + ", response msg: " + respMsg);
// parse the response json
InputStream stream = (InputStream) conn.getContent();
BufferedReader br = new BufferedReader(new InputStreamReader(stream));
StringBuilder sb = new StringBuilder();
String line;
while ((line = br.readLine()) != null) {
sb.append(line);
}
Type type = new TypeToken<SubmitResult>() {
}.getType();
SubmitResult result = new Gson().fromJson(sb.toString(), type);
System.out.println("Get result status: " + result.Status);
}
// The response json class
public static class SubmitResult {
public String TxnId;
public String Label;
public String Status;
public String ExistingJobStatus;
public String Message;
public String NumberTotalRows;
public String NumberLoadedRows;
public String NumberFilteredRows;
public String NumberUnselectedRows;
public String LoadBytes;
public String LoadTimeMs;
public String BeginTxnTimeMs;
public String StreamLoadPutTimeMs;
public String ReadDataTimeMs;
public String WriteDataTimeMs;
public String CommitAndPublishTimeMs;
public String ErrorURL;
}
}