Import Data with an SDK
- how-to
How to import documents into Couchbase with an SDK.
Introduction
Importing data can be done from the Couchbase Server UI, via the cbimport command-line tool shipped with Couchbase Server, or using the SDK to script the process.
Data load essentially consists of the following steps:
-
Prepare data in some well known format such as Comma Separated Values (.csv) or JSON documents.
-
Parse this data, and iterate over each document.
-
Connect to your Couchbase instance.
-
Connect to the appropriate bucket, scope, and collection.
-
Decide on the key for this document (could be an ID, a sequence number, or some combination of fields).
-
Do any additional processing if required.
-
Insert the document.
Couchbase Clients
Clients access data by connecting to a Couchbase cluster over the network. The most common type of client is a Couchbase SDK, which is a full programmatic API that enables applications to take the best advantage of Couchbase. This developer guide focuses on the most commonly-used SDKs, but full explanations and reference documentation for all SDKs is available.
The command line clients also provide a quick and streamlined interface for simple access and are suitable if you just want to access an item without writing any code.
For this guide, we are especially interested in the cbimport
tool.
With some editions, the command line clients are provided as part of the installation of Couchbase Server. Assuming a default installation, you can find them in the following location, depending on your operating system:
|
The Couchbase Server UI also offers a graphical interface to cbimport
.
Read the following for further information about the clients available for importing data:
Preparing the Data
To prepare the data, extract or generate your data in an appropriate data format.
The following are well supported for export as well as by cbimport
and the module ecosystems of all Couchbase SDKs.
-
CSV
-
TSV
-
JSON
-
JSONL
Comma Separated Values (.csv) are easily exported from many spreadsheet and database applications.
Ensure that the first row is a header row containing the names of the columns within the document.
id,type,name
20001,airline,CSV-air-1
20002,airline,CSV-air-2
Tab Separated Values (.tsv) are a common variant of CSV files.
id type name
20011 airline TSV-air-1
20012 airline TSV-air-2
JSON (.json) files are especially well suited to import into Couchbase, as it is the default native datatype.
A .json file contains only one single value, so to give flexibility to import one or many values, format this as an array of the values you want to store.
[
{"id":20021, "type":"airline", "name":"JSON-air-1"},
{"id":20022, "type":"airline", "name":"JSON-air-2"},
{"id":20023, "type":"airline", "name":"JSON-air-3"}
]
JSON Lines (.json) also known as NDJSON is a common format for streaming JSON, with one JSON object per line of text.
{"id":20031, "type":"airline", "name":"JSONL-air-1"}
{"id":20032, "type":"airline", "name":"JSONL-air-2"}
{"id":20033, "type":"airline", "name":"JSONL-air-3"}
Using cbimport
Using cbimport is straightforward. Ensure you have the path to the command line clients in Couchbase Server in your path.
You can import all of the data formats described above.
-
CSV
-
TSV
-
JSON
-
JSONL
To import a CSV file using cbimport csv
:
-
Use the
--dataset
argument to specify the CSV file. -
Use the
--cluster
,--username
, and--password
arguments to specify your connection details. -
Use the
--bucket
and--scope-collection-exp
arguments to specify the bucket, scope, and collection as required. -
Use the
--generate-key
argument to specify an ID for the imported documents.
The following example imports a local CSV file, generating IDs such as airline_1234
.
cbimport csv \
--dataset file://./import.csv \
--cluster localhost --username Administrator --password password \
--bucket travel-sample --scope-collection-exp inventory.airline \
--generate-key %type%_%id%
To import a TSV file using cbimport csv
:
-
Use the
--dataset
argument to specify the TSV file. -
Use the
--field-separator
argument to specify the field separation character, such as"\t"
for tab. -
Use the
--cluster
,--username
, and--password
arguments to specify your connection details. -
Use the
--bucket
and--scope-collection-exp
arguments to specify the bucket, scope, and collection as required. -
Use the
--generate-key
argument to specify an ID for the imported documents.
The following example imports a local TSV file, generating IDs such as airline_1234
.
cbimport csv \
--dataset file://./import.tsv --field-separator "\t" \
--cluster localhost --username Administrator --password password \
--bucket travel-sample --scope-collection-exp inventory.airline \
--generate-key %type%_%id%
To import a JSON file using cbimport json
:
-
Use the
--dataset
argument to specify the JSON file. -
Set the
--format
argument tolist
. -
Use the
--cluster
,--username
, and--password
arguments to specify your connection details. -
Use the
--bucket
and--scope-collection-exp
arguments to specify the bucket, scope, and collection as required. -
Use the
--generate-key
argument to specify an ID for the imported documents.
The following example imports a local JSON file, generating IDs such as airline_1234
.
cbimport json \
--dataset file://./import.json --format list \
--cluster localhost --username Administrator --password password \
--bucket travel-sample --scope-collection-exp inventory.airline \
--generate-key %type%_%id%
To import a CSV file using cbimport json
:
-
Use the
--dataset
argument to specify the JSONL file. -
Set the
--format
argument tolines
. -
Use the
--cluster
,--username
, and--password
arguments to specify your connection details. -
Use the
--bucket
and--scope-collection-exp
arguments to specify the bucket, scope, and collection as required. -
Use the
--generate-key
argument to specify an ID for the imported documents.
The following example imports a local JSONL file, generating IDs such as airline_1234
.
cbimport json \
--dataset file://./import.jsonl --format lines \
--cluster localhost --username Administrator --password password \
--bucket travel-sample --scope-collection-exp inventory.airline \
--generate-key %type%_%id%
Importing Using an SDK
While cbimport
accomplishes all the necessary steps in a single command, as above,
using an SDK gives you more flexibility and control.
However all the same considerations apply, so let us look at those in turn.
Parsing the Import into an Array or Stream of Records
The details of how to parse the import data vary depending on the chosen input format, and the most appropriate library for your SDK.
Parsing CSV and TSV Data
-
.NET
-
Java
-
Node.js
-
Python
To parse CSV and TSV data, use the CsvHelper library.
using CsvHelper;
using CsvHelper.Configuration;
using System.Globalization;
public async Task importCSV(string filename)
{
using (var reader = new StreamReader(filename))
using (var csv = new CsvReader(reader, CultureInfo.InvariantCulture))
{
var records = csv.GetRecords<dynamic>();
foreach (dynamic record in records) {
await upsertDocument(record);
}
}
}
public async Task importTSV(string filename)
{
using (var reader = new StreamReader("import.tsv"))
using (var tsv = new CsvReader(reader,
new CsvConfiguration(CultureInfo.InvariantCulture)
{ Delimiter = "\t" }))
{
var records = tsv.GetRecords<dynamic>();
foreach (dynamic record in records) {
await upsertDocument(record);
}
}
}
Click the View button to see any code sample in context.
To parse CSV and TSV data, use the opencsv library.
import com.opencsv.*;
public void importCSV() {
try (CSVReaderHeaderAware csv = new CSVReaderHeaderAware(
new FileReader("modules/howtos/examples/import.csv"))) {
Map<String, String> row;
while ((row = csv.readMap()) != null) {
upsertRow(row);
}
}
catch (java.io.FileNotFoundException e) {
System.out.println("handle FileNotFoundException...");
}
catch (java.io.IOException e) {
System.out.println("handle IOException...");
}
catch (com.opencsv.exceptions.CsvValidationException e) {
System.out.println("handle CsvValidationException...");
}
}
public void importTSV() {
CSVParser parser =
new CSVParserBuilder()
.withSeparator('\t')
.withIgnoreQuotations(true)
.build();
try (CSVReaderHeaderAware tsv =
new CSVReaderHeaderAwareBuilder(
new FileReader("modules/howtos/examples/import.tsv"))
.withCSVParser(parser)
.build()) {
Map<String, String> row;
while ((row = tsv.readMap()) != null) {
upsertRow(row);
}
}
// ...
}
If you are using the Reactor API then, as OpenCSV doesn’t have a built-in converter,
use the Flux::generate
method to convert the CSV or TSV file into a stream:
public void importCSV_batch() {
Flux<Map<String,String>> rows = Flux.generate(
() -> new CSVReaderHeaderAware(
new FileReader("modules/howtos/examples/import.csv")),
(state, sink) -> {
try {
Map<String,String> row = state.readMap();
if (row == null) { sink.complete(); }
else { sink.next(row); }
return state;
}
catch (Exception e) { throw new RuntimeException(e); }
},
state -> {
try { state.close(); }
catch (Exception e) { throw new RuntimeException(e); }
}
);
Flux<MutationResult> results =
rows
.map(row -> preprocess(row))
.flatMap(doc -> reactiveCollection.upsert(doc.getId(), doc.getContent()))
.doOnNext(System.out::println);
results.blockLast(Duration.ofSeconds(60));
}
public void importTSV_batch() {
Flux<Map<String,String>> rows = Flux.generate(
() -> {
CSVParser parser =
new CSVParserBuilder()
.withSeparator('\t')
.withIgnoreQuotations(true)
.build();
return
new CSVReaderHeaderAwareBuilder(
new FileReader("modules/howtos/examples/import.tsv"))
.withCSVParser(parser)
.build();
},
// ...
}
Click the View button to see any code sample in context.
To parse CSV and TSV data, use the csv-parse library.
const { parse: csvParser } = require('csv-parse');
const csvStream = (filename) =>
fs.createReadStream(filename)
.pipe(
csvParser({columns: true}))
const tsvStream = (filename) =>
fs.createReadStream(filename)
.pipe(
csvParser({
columns: true,
delimiter: '\t'}))
Click the View button to see any code sample in context.
To parse CSV and TSV data, use the csv library.
import csv
def csv_import(filename):
with open(filename, newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
upsert(row)
def tsv_import(filename):
with open(filename, newline='') as tsvfile:
reader = csv.DictReader(tsvfile, delimiter='\t')
for row in reader:
upsert(row)
Click the View button to see any code sample in context.
Parsing JSON and JSONL Data
-
.NET
-
Java
-
Node.js
-
Python
To parse JSON and JSONL data, use Newtonsoft.
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
public async Task importJSON(string filename)
{
using (var reader = new StreamReader("import.json"))
{
var jsonReader = new JsonTextReader(reader);
JArray arr = (JArray)JToken.ReadFrom(jsonReader);
foreach (JObject record in arr)
{
await upsertDocument(record);
}
}
}
public async Task importJSONL(string filename)
{
using (var reader = new StreamReader("import.jsonl"))
{
var jsonlReader = new JsonTextReader(reader)
{
SupportMultipleContent = true
};
while (jsonlReader.Read())
{
var record = (JObject)JToken.ReadFrom(jsonlReader);
await upsertDocument(record);
}
}
}
Click the View button to see any code sample in context.
To parse JSON data, read the file as a string, then use the built-in Couchbase JSON handling to parse the result into an array of JSON objects.
import com.couchbase.client.java.json.*;
public void importJSON() {
try {
String content = new String(
Files.readAllBytes( // read whole document into memory
Paths.get("modules/howtos/examples/import.json")),
StandardCharsets.UTF_8);
for (Object row: JsonArray.fromJson(content)) {
JsonObject json = ((JsonObject) row);
upsertRow(json.toMap());
}
}
catch (java.io.FileNotFoundException e) {
System.out.println("handle FileNotFoundException...");
}
catch (java.io.IOException e) {
System.out.println("handle IOException...");
}
}
If you are using the Reactor API then, once you’ve read the JSON array, use the Flux::fromIterable
method to convert it into streams:
public void importJSON_batch() {
try {
String content = new String(
Files.readAllBytes( // read whole document into memory
Paths.get("modules/howtos/examples/import.json")),
StandardCharsets.UTF_8);
Flux<MutationResult> results =
Flux.fromIterable(JsonArray.fromJson(content))
.map(row -> ((JsonObject) row).toMap())
.map(map -> preprocess(map))
.flatMap(doc -> reactiveCollection.upsert(doc.getId(), doc.getContent()))
.doOnNext(System.out::println);
results.blockLast(Duration.ofSeconds(60));
}
// ...
}
To parse JSONL data: do the same, but read the file line-by-line.
public void importJSONL() {
try (BufferedReader br =
new BufferedReader(
new FileReader("modules/howtos/examples/import.jsonl"))) {
String line;
while ((line = br.readLine()) != null) {
Map<String,Object> row =
JsonObject.fromJson(line).toMap();
upsertRow(row);
}
}
// ...
}
If you are using the Reactor API then open the JSONL file as a stream using the Flux::using
method.
public void importJSONL_batch() {
Flux<String> lines = Flux.using(
() -> Files.lines(Paths.get("modules/howtos/examples/import.jsonl")),
Flux::fromStream,
BaseStream::close);
Flux<MutationResult> results =
lines
.map(line -> JsonObject.fromJson(line).toMap())
.map(map -> preprocess(map))
.flatMap(doc -> reactiveCollection.upsert(doc.getId(), doc.getContent()))
.doOnNext(System.out::println);
results.blockLast(Duration.ofSeconds(60));
}
Click the View button to see any code sample in context.
Use the stream-json library.
stream-json formats its output with a { key: …, value: …} wrapper, so we need to map the stream into the expected format.
|
const stream = require('stream');
// for JSON
const StreamArray = require('stream-json/streamers/StreamArray')
// for JsonL
const {parser: jsonlParser} = require('stream-json/jsonl/Parser');
const map = (f) =>
new stream.Transform({
objectMode: true,
transform: (obj, _, next) => next(null, f(obj))
})
const jsonStream = (filename) =>
fs.createReadStream(filename)
.pipe(StreamArray.withParser())
.pipe(map(obj => obj.value))
const jsonlStream = (filename) =>
fs.createReadStream(filename)
.pipe(jsonlParser())
.pipe(map(obj => obj.value))
Click the View button to see any code sample in context.
Use the json library:
import json
def json_import(filename):
with open(filename) as jsonfile:
data = json.load(jsonfile)
for row in data:
upsert(row)
def jsonl_import(filename):
with open(filename) as jsonlfile:
for line in jsonlfile:
row = json.loads(line)
upsert(row)
Click the View button to see any code sample in context.
Connecting to the Couchbase Server
First, you need the connection details for the Couchbase server.
Now decide which bucket and scope and collection you want to import to, and create them if they don’t already exist.
-
.NET
-
Java
-
Node.js
-
Python
var cluster = await Cluster.ConnectAsync(
"couchbase://your-ip",
"Administrator", "password");
var bucket = await cluster.BucketAsync("travel-sample");
var scope = await bucket.ScopeAsync("inventory");
var _collection = await scope.CollectionAsync("airline");
Click the View button to see any code sample in context.
For more information, refer to Managing Connections.
Cluster cluster = Cluster.connect(
connectionString,
ClusterOptions
.clusterOptions(username, password));
Bucket bucket = cluster.bucket(bucketName);
Scope scope = bucket.scope("inventory");
collection = scope.collection("airline");
If you are using the Reactive API, then use the reactive collection instead:
reactiveCollection = collection.reactive();
Click the View button to see any code sample in context.
For more information, refer to Managing Connections.
const cluster = await couchbase.connect('couchbase://localhost', {
username: 'Administrator',
password: 'password',
})
const bucket = cluster.bucket('travel-sample')
const collection = bucket.scope('inventory').collection('airline')
Click the View button to see any code sample in context.
For more information, refer to Managing Connections.
cluster = Cluster(
"couchbase://localhost",
authenticator=PasswordAuthenticator(
"Administrator", "password"))
bucket = cluster.bucket("travel-sample")
collection = bucket.scope("inventory").collection("airline")
Click the View button to see any code sample in context.
For more information, refer to Managing Connections.
Inserting the Documents
Having processed each imported document, you can insert it into the keyspace. Couchbase is a key-value store, and the document is the value, so before you can insert the document, you need to determine the key.
To insert an imported document into the keyspace:
-
Specify the key. This could be as simple as extracting the
id
field from the document, or using an incrementing sequence number. -
Do any additional processing, for example calculating fields, or adding metadata about the importer.
-
Finally, use an upsert operation to the store the document.
Use upsert rather than insert to upload the document even if the target key already has a value.
This means that in the case of any error, it is easy to make any required tweaks to the import file and re-run the whole import.
|
-
.NET
-
Java
-
Node.js
-
Python
To store the data, hook the prepared data into an upsert
routine.
As CsvHelper and Newtonsoft generate different outputs, we’ve provided some overloaded options that work for either. |
// CsvHelper emits `dynamic` records
public async Task upsertDocument(dynamic record) {
// define the key
string key = record.type + "_" + record.id;
// do any additional processing
record.importer = ".NET SDK";
// upsert the document
await _collection.UpsertAsync(key, record);
// any required logging
Console.WriteLine(key);
}
// Newtonsoft.Json.Linq emits `JObjects`
public async Task upsertDocument(JObject record) {
// define the key
string key = record["type"] + "_" + record["id"];
// do any additional processing
record["importer"] = ".NET SDK";
// upsert the document
await _collection.UpsertAsync(key, record);
// any required logging
Console.WriteLine(key);
}
Click the View button to see any code sample in context.
For more information, refer to Data Operations.
To store the data, hook the prepared data into an upsert
routine.
For the blocking API, use the method below.
public void upsertRow(Map row) {
JsonDocument doc = preprocess(row);
String key = doc.getId();
Object value = doc.getContent();
// upsert the document
collection.upsert(key, value);
// any required logging
System.out.println(key);
System.out.println(value);
}
The Reactive API examples above already include a call to ReactiveCollection::upsert
.
In both cases, you must provide a preprocess routine which returns a key-value tuple object:
class JsonDocument {
private final String id;
private final JsonObject content;
public JsonDocument(String id, JsonObject content) {
this.id = id;
this.content = content;
}
public String getId() {
return id;
}
public JsonObject getContent() {
return content;
}
@Override
public String toString() {
return "JsonDocument{id='" + id + "', content=" + content + "}";
}
}
public JsonDocument preprocess(Map row) {
Map value = new HashMap(row);
// define the KEY
String key = value.get("type") + "_" + value.get("id");
// do any additional processing
value.put("importer", "Java SDK");
return new JsonDocument(key, JsonObject.from(value));
}
Click the View button to see any code sample in context.
For more information, refer to Data Operations.
To iterate the prepared data stream, use a simple for
loop in the same way as an array.
const importStream = async (stream) => {
for await (const doc of stream) {
upsertDocument(doc)
}
}
Hook the prepared stream in to an upsertDocument
routine:
const upsertDocument = async (doc) => {
try {
// Build the key
const key = `${doc.type}_${doc.id}`
// Do any processing, logging etc.
doc.importer = "import.js"
console.log(key, doc)
// Upsert the document
await collection.upsert(key, doc)
}
catch (error) {
// Error handling, retry, logging etc.
console.error(error)
}
}
Click the View button to see any code sample in context.
For more information, refer to Data Operations.
To store the data, define functions to determine the key, and process the value.
def key(row):
return "{type}_{id}".format(**row)
def process(row):
row["importer"] = "Python SDK"
return row
Hook the prepared data into an upsertDocument
routine which uses these functions.
def upsert(row):
k = key(row)
v = process(row)
print(k, v)
collection.upsert(k, v)
For more information, refer to Data Operations.
Note that the Python SDK offers a set of batch operations which are marked as volatile as of SDK 3.2.3, which may be more efficient. Here’s a brief example for CSV:
# multi operations volatile as of SDK 3.2.3
def csv_import_multi(filename):
with open(filename, newline='') as csvfile:
reader = csv.DictReader(csvfile)
data = { key(row): process(row) for row in reader }
print(data)
collection.upsert_multi(data)
Click the View button to see any code sample in context.
Related Links
Reference and information:
-
The Couchbase Server UI offers a graphical view of documents, to check your imports interactively.
How-to guides:
-
The Work with Documents guide shows how to read and update the data you have imported.
Key-Value Operations with SDKs: