Handling Kinesis Stream Event Batching with Typesense
TLDR Dui had questions about how to handle Kinesis stream events with Typesense. Kishore Nallan suggested using upsert mode for creation/update and differentiating with logical deletion. After various discussions including identifying and resolving a bug, they finalized to introduce an emplace
action in Typesense v0.23.


Nov 25, 2021 (22 months ago)
Dui
09:56 AMSo I have a Kinesis stream with a really big amount of indexes that should be created/updated/deleted.
How would I go about batching events so that I can be agnostic about the action? From the documentation, I've seen that one can batch an array of objects with one action:
client.collections('companies').documents().import(documents, {action: 'create'})
My main question is, would it be possible to make a batch with different actions or will I be forced to make different batches depending on the actions of the data that I receive from the kinesis stream?
For example, Algollia has this:
client.multipleBatch([
{ action: 'addObject', indexName: 'index1', body: { firstname: 'Jimmie', lastname: 'Barninger' } },
{ action: 'deleteObject', indexName: 'index2', body: { objectID: 'myID5' } }
])
Kishore Nallan
09:57 AMDeletes have to be sent separately. But we don't have a batched delete end-point for that.
We're aware of this limitation and hope to address it in the future.
Kishore Nallan
09:58 AMKishore Nallan
10:00 AMYou can set a
is_deleted: true
flag for logical deletion and then periodically do batch deletes using the delete by query end-point. Your searches also need to set is_deleted: false
so that these objects are not returned in search result.Kishore Nallan
10:04 AMDui
10:18 AMOkay, so I'm gonna have to make a few different batches depending on the action then?
You wrote: "We're aware of this limitation and hope to address it in the future." Do you mean a universal actions end-point or batched delete -endpoint?
Kishore Nallan
10:19 AMYou can have a single Kinesis consumer that can handle insert, update and delete.
Do you see any problem with using logical deletes?
Dui
10:24 AMSo you would recommend that my kinesis consumer would differentiate between the action and run an import(action) for each?
Logical deletion seems fine as for now, I guess the only downside is that I'd have to massage the data a little bit to be able to know which ones to delete, but it's basically how we are working today anyway 🙂
Kishore Nallan
10:28 AMaction=upsert
regardless of whether the record is an insert or update or delete and send them all in the same batch, in a single API call.The only additional processing you need to do is for the deletion case:. You can send a simple
{"id": "<id>", "is_deleted: true}
document or modify the actual document (if that's available to the consumer) to set the is_deleted
field.Dui
10:38 AM
Dui
03:03 PMWhen I run an import with two batched updates (both are targeting the same document, where one is upserting, and one is updating) there are fields that are missing in the result.
So one update runs first, it updates 3 out of 20 fields.
Secondly, an upsert is run, which is supposed to update 17 out of 20 fields.
The result is that I only get 17 out of 20 fields defined. It's like the update gets overwritten by the upsert's missing fields.
Kishore Nallan
03:04 PMKishore Nallan
03:04 PMDui
03:07 PMKishore Nallan
03:07 PMDui
03:08 PMKishore Nallan
03:08 PMDui
03:09 PMKishore Nallan
03:09 PMThat's correct, unless you always had the whole document, in which case, it's just upsert.
Kishore Nallan
03:10 PM
Kishore Nallan
03:11 PMDui
03:13 PMKishore Nallan
03:13 PMDui
03:13 PMKishore Nallan
03:14 PMDui
03:14 PMKishore Nallan
03:14 PM
Dui
03:46 PMDo you need anything from me?
Dui
04:26 PMKishore Nallan
04:41 PMNov 26, 2021 (22 months ago)
Kishore Nallan
02:34 AMDui
09:26 AMI make two updates in one import:
const body = [
{ id: 'test_id', lastChance: false },
{
id: 'test_id',
'price_SE.amount': 5000,
'price_SE.currency': 'SEK'
}
]
typesenseClient
.collections('test_index')
.documents()
.import(body, { 'update', batch_size: 100 })
When I look in the data in typesense, it seems like the
lastChance
-field from the first object is undefined. And if I reverse the order of these two, the price_SE.amount
gets undefined instead.Do you know what could be the case? It's like it is updating the fields that are missing from each of the objects with undefined instead of ignoring them.
Kishore Nallan
09:28 AMtest_id
document before the update was called?Kishore Nallan
09:46 AMKishore Nallan
09:54 AMThe update works fine for me. If you can provide a similar reproduceable example, I will be happy to take a look!
Dui
10:39 AMKishore Nallan
10:40 AMlastChance
-field from the first object is undefined.Does the field not exist in the returned document, or the value of the field is
undefined
? I won't rule out a bug lurking here, but need more information to ascertain that.Dui
10:42 AMKishore Nallan
10:43 AMlastChance
field, then ran the import above and now when you query the document the lastChance
is not found? Sorry for being a bit slow here 🙂Dui
10:55 AMlastChance
. After that, I ran two update imports where one of them contain lastChance
. Then the lastChance
is not found.Kishore Nallan
10:57 AMKishore Nallan
11:00 AMDui
11:00 AMconst batchImport = async ({ index, action, body }) => {
if (!body) return
typesenseClient
.collections(index)
.documents()
.import(body, { action, batch_size: 100 })
.then((res) => console.log(JSON.stringify(res)))
.catch((e) => console.error(JSON.stringify(e)))
}
await batchImport({ index, action: 'upsert', body: groupedData['upsert'] })
await batchImport({ index, action: 'update', body: groupedData['update'] })
this is the data:
groupedData {
upsert: [
{
id: 'CYgQsv4oZi',
'metadata.material': [Array],
'metadata.brand': 'Nike',
'categories.lvl0': [Array],
'categories.lvl1': [Array],
'categories.lvl2': [Array],
'categories.lvl3': [Array]
}
],
update: [
{ id: 'CYgQsv4oZi', lastChance: false },
{
id: 'CYgQsv4oZi',
'price_SE.amount': 5000,
'price_SE.currency': 'SEK'
}
]
}
Hope you can make that out 🙂 !
the reason for the batch is because when this works properly, I'm gonna start importing them by the thousands 😛
Kishore Nallan
12:29 PMDui
12:30 PMKishore Nallan
12:30 PMDui
12:31 PMKishore Nallan
12:33 PMWe balance that by making release candidate builds available to customers that we work closely with on new features, so that they help in both validating the feature and in overall maturity (since they will also be progressing from a dev -> prod for a given new feature that we add).
Dui
12:34 PMKishore Nallan
02:19 PMDui
04:12 PMNov 27, 2021 (22 months ago)
Kishore Nallan
08:08 AMNov 29, 2021 (22 months ago)
Dui
01:27 PMOne question that is related though:
If I run three
upsert
in a sequence which are targeting the same document (same id), it doesn't get updated for each upsert. Any ideas?Kishore Nallan
01:29 PMDui
01:42 PM[
{
"id": "CYgQsv4oZi",
"createdAt": 1637160199,
"updatedAt": 1637576744,
"metadata.brand": "Nike"
},
{
"id": "CYgQsv4oZi",
"lastChance": false
},
{
"id": "CYgQsv4oZi",
"pricing.amount": 50,
"pricing.currency": "SEK"
}
]
The end-result is that only the last object in the array gets written to the document.
Kishore Nallan
01:43 PMCYgQsv4oZi
already exist in the collection before this payload is sent?Dui
01:44 PMDui
01:48 PMKishore Nallan
01:48 PMKishore Nallan
01:49 PMinsert
: document does not exist and you want to insert whole document.upsert
: document might or might not exist and you want to insert/replace whole document.update
: document certainly exists and you want to insert part/whole document.Kishore Nallan
01:51 PM
Dui
01:51 PMpartialUpdate
with createIfNotExists
. Does something like that exist?Kishore Nallan
01:52 PMupsert
for people familiar with other DBs so we had to switch to that.Dui
01:53 PMKishore Nallan
01:54 PMemplace
action could be introduced that does upsert if document is not available or does update if it already exists and to which you can either pass whole or partial document.Dui
01:56 PMKishore Nallan
01:57 PMDui
01:57 PMKishore Nallan
01:57 PMKishore Nallan
02:04 PM
Dec 07, 2021 (21 months ago)
Kishore Nallan
12:36 PMDui
01:05 PMI'm discussing with my team so we will reach out soon 🙂
Dec 08, 2021 (21 months ago)
Kishore Nallan
07:24 AMDec 13, 2021 (21 months ago)
Dui
02:21 PMDui
02:22 PMhttps://github.com/typesense/typesense/issues/447
Kishore Nallan
02:27 PMKishore Nallan
02:28 PMDui
02:59 PMKishore Nallan
03:00 PMDui
03:02 PMKishore Nallan
03:02 PMKishore Nallan
03:03 PMDui
03:11 PMI'll reach out soon and see if we can book a meeting with some more people from my team - we are still investigating if we should move from algolia.
Kishore Nallan
03:12 PM
Dec 31, 2021 (21 months ago)
Kishore Nallan
02:26 AMemplace
action has been implemented. Do you want to do an initial test using a Docker build for initial verification before we upgrade your Cloud cluster?Jan 03, 2022 (20 months ago)
Dui
12:19 PMDui
12:20 PMKishore Nallan
12:20 PMDui
12:20 PM
Typesense
Indexed 2764 threads (79% resolved)
Similar Threads
Issues with Version "0.23.0.rc69" and Upsert Request in Typesense
Bill raised a concern about inconsistent performance of version "0.23.0.rc69" and upsert issues in Typesense. Jason advised upgrading to version "0.23" and switching to "emplace" instead of "upsert". Kishore Nallan provided additional clarification.

Troubleshooting 400 Error When Upgrading Typesense Firestore Extension
Orion experienced a `400` error after updating the Typesense Firestore extension, causing issues with cloud functions. They traced the issue back to a data type conflict in their Typesense collection schema after updating. With help from Jason and Kishore Nallan, they resolved the issue by recreating the collection.



Threading Problem During Multiple Collection Creation and Batch Insertion in Typesense
Johan has a problem with creating multiple collections and batch-inserting documents into Typesense, which is returning results from different collections. Kishore Nallan helps troubleshoot the issue and suggests a potential local race condition, which is fixed in a later build.
Troubleshooting Indexing Duration in Typesense Import
Alan asked about lengthy indexing times for importing documents to Typesense. Jason suggested various potential causes, including network connectivity and system resources. They later identified the problem to be an error in Alan's code.


Typesense Server Bulk Import/Upsert Issue Resolved
Adam was confused about the discrepancy between the successful responses and the actual indexed data while working with a custom WP plugin integrating with Typesense. The issue was a bug related to fetching documents in the wrong order, not a Typesense problem.
